Skip to content

Commit

Permalink
Limit batch deletes to 100, issue serveral batch if limit's exceeded
Browse files Browse the repository at this point in the history
  • Loading branch information
mziccard committed Nov 20, 2015
1 parent bbf3cb6 commit 2d825de
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.google.api.client.googleapis.batch.json.JsonBatchCallback;
import com.google.api.client.googleapis.json.GoogleJsonError;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.googleapis.media.MediaHttpDownloader;
import com.google.api.client.http.ByteArrayContent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpHeaders;
Expand All @@ -59,16 +58,18 @@
import com.google.api.services.storage.model.Objects;
import com.google.api.services.storage.model.StorageObject;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.google.gcloud.storage.StorageException;
import com.google.gcloud.storage.StorageOptions;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -82,6 +83,7 @@ public class DefaultStorageRpc implements StorageRpc {
// see: https://cloud.google.com/storage/docs/concepts-techniques#practices
private static final Set<Integer> RETRYABLE_CODES = ImmutableSet.of(504, 503, 502, 500, 429, 408);
private static final long MEGABYTE = 1024L * 1024L;
private static final int MAX_BATCH_DELETES = 100;

public DefaultStorageRpc(StorageOptions options) {
HttpTransport transport = options.httpTransportFactory().create();
Expand Down Expand Up @@ -361,6 +363,37 @@ public byte[] load(StorageObject from, Map<Option, ?> options)

@Override
public BatchResponse batch(BatchRequest request) throws StorageException {
List<List<Tuple<StorageObject, Map<Option, ?>>>> partitionedToDelete =
Lists.partition(request.toDelete, MAX_BATCH_DELETES);
Map<StorageObject, Tuple<Boolean, StorageException>> deletes =
Maps.newHashMapWithExpectedSize(request.toDelete.size());
Map<StorageObject, Tuple<StorageObject, StorageException>> updates =
Maps.newHashMapWithExpectedSize(request.toUpdate.size());
Map<StorageObject, Tuple<StorageObject, StorageException>> gets =
Maps.newHashMapWithExpectedSize(request.toGet.size());
Iterator<List<Tuple<StorageObject, Map<Option, ?>>>> iterator = partitionedToDelete.iterator();
BatchRequest chunkRequest = new BatchRequest(iterator.hasNext() ? iterator.next() :
ImmutableList.<Tuple<StorageObject, Map<Option, ?>>>of(), request.toUpdate, request.toGet);
BatchResponse chunkResponse = batchChunk(chunkRequest);
mergeBatchResults(deletes, updates, gets, chunkResponse);
while (iterator.hasNext()) {
chunkRequest = new BatchRequest(iterator.next(), null, null);
chunkResponse = batchChunk(chunkRequest);
mergeBatchResults(deletes, updates, gets, chunkResponse);
}
return new BatchResponse(deletes, updates, gets);
}

private static void mergeBatchResults(
Map<StorageObject, Tuple<Boolean, StorageException>> deletes,
Map<StorageObject, Tuple<StorageObject, StorageException>> updates,
Map<StorageObject, Tuple<StorageObject, StorageException>> gets, BatchResponse response) {
deletes.putAll(response.deletes);
updates.putAll(response.updates);
gets.putAll(response.gets);
}

private BatchResponse batchChunk(BatchRequest request) {
com.google.api.client.googleapis.batch.BatchRequest batch = storage.batch();
final Map<StorageObject, Tuple<Boolean, StorageException>> deletes =
Maps.newConcurrentMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ StorageObject patch(StorageObject storageObject, Map<Option, ?> options)

boolean delete(StorageObject object, Map<Option, ?> options) throws StorageException;

/**
* Sends a batch request. If the {@code request} parameter contains more than 100 deletes the
* batch request is split into multiple batch RPC calls, each containing no more than 100 deletes.
*/
BatchResponse batch(BatchRequest request) throws StorageException;

StorageObject compose(Iterable<StorageObject> sources, StorageObject target,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1398,7 +1398,8 @@ private static void checkContentType(BlobInfo blobInfo) throws IllegalArgumentEx
byte[] readAllBytes(BlobId blob, BlobSourceOption... options);

/**
* Send a batch request.
* Send a batch request. If the {@code batchRequest} parameter contains more than 100 deletes the
* batch request is split into multiple batch RPC calls, each containing no more than 100 deletes.
*
* @return the batch response
* @throws StorageException upon failure
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import com.google.api.client.util.Lists;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.gcloud.Page;
Expand Down Expand Up @@ -65,6 +66,7 @@ public class ITStorageTest {
private static final String CONTENT_TYPE = "text/plain";
private static final byte[] BLOB_BYTE_CONTENT = {0xD, 0xE, 0xA, 0xD};
private static final String BLOB_STRING_CONTENT = "Hello Google Cloud Storage!";
private static final int MAX_BATCH_DELETES = 100;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -623,6 +625,54 @@ public void testBatchRequest() {
assertTrue(deleteResponse.deletes().get(1).get());
}

@Test
public void testBatchRequestManyDeletes() {
List<BlobId> blobsToDelete = Lists.newArrayListWithCapacity(2 * MAX_BATCH_DELETES);
for (int i = 0; i < 2 * MAX_BATCH_DELETES; i++) {
blobsToDelete.add(BlobId.of(BUCKET, "test-batch-request-many-deletes-blob-" + i));
}
BatchRequest.Builder builder = BatchRequest.builder();
for (BlobId blob : blobsToDelete) {
builder.delete(blob);
}
String sourceBlobName1 = "test-batch-request-many-deletes-source-blob-1";
String sourceBlobName2 = "test-batch-request-many-deletes-source-blob-2";
BlobInfo sourceBlob1 = BlobInfo.builder(BUCKET, sourceBlobName1).build();
BlobInfo sourceBlob2 = BlobInfo.builder(BUCKET, sourceBlobName2).build();
assertNotNull(storage.create(sourceBlob1));
assertNotNull(storage.create(sourceBlob2));
BlobInfo updatedBlob2 = sourceBlob2.toBuilder().contentType(CONTENT_TYPE).build();

BatchRequest updateRequest = builder
.get(BUCKET, sourceBlobName1)
.update(updatedBlob2)
.build();
BatchResponse response = storage.apply(updateRequest);
assertEquals(2 * MAX_BATCH_DELETES, response.deletes().size());
assertEquals(1, response.updates().size());
assertEquals(1, response.gets().size());

// Check deletes
for (BatchResponse.Result<Boolean> deleteResult : response.deletes()) {
assertFalse(deleteResult.failed());
assertFalse(deleteResult.get());
}

// Check updates
BlobInfo remoteUpdatedBlob2 = response.updates().get(0).get();
assertEquals(sourceBlob2.bucket(), remoteUpdatedBlob2.bucket());
assertEquals(sourceBlob2.name(), remoteUpdatedBlob2.name());
assertEquals(updatedBlob2.contentType(), remoteUpdatedBlob2.contentType());

// Check gets
BlobInfo remoteBlob1 = response.gets().get(0).get();
assertEquals(sourceBlob1.bucket(), remoteBlob1.bucket());
assertEquals(sourceBlob1.name(), remoteBlob1.name());

assertTrue(storage.delete(BUCKET, sourceBlobName1));
assertTrue(storage.delete(BUCKET, sourceBlobName2));
}

@Test
public void testBatchRequestFail() {
String blobName = "test-batch-request-blob-fail";
Expand Down

0 comments on commit 2d825de

Please sign in to comment.