Skip to content

Commit 146b21d

Browse files
author
Brian Chen
authored
fix: lower batch size on BulkWriter retry (#688)
1 parent 6532099 commit 146b21d

File tree

3 files changed

+71
-4
lines changed

3 files changed

+71
-4
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,23 @@ class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
4242
final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
4343
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
4444
private final Executor executor;
45+
private int maxBatchSize;
4546

46-
BulkCommitBatch(FirestoreImpl firestore, Executor executor) {
47+
BulkCommitBatch(FirestoreImpl firestore, Executor executor, int maxBatchSize) {
4748
super(firestore);
4849
this.executor = executor;
50+
this.maxBatchSize = maxBatchSize;
51+
}
52+
53+
int getMaxBatchSize() {
54+
return maxBatchSize;
55+
}
56+
57+
void setMaxBatchSize(int size) {
58+
Preconditions.checkState(
59+
getMutationsSize() <= size,
60+
"New batch size cannot be less than the number of enqueued writes");
61+
this.maxBatchSize = size;
4962
}
5063

5164
ApiFuture<WriteResult> wrapResult(int writeIndex) {

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.api.gax.rpc.StatusCode.Code;
3030
import com.google.cloud.firestore.v1.FirestoreSettings;
3131
import com.google.common.annotations.VisibleForTesting;
32+
import com.google.common.base.Preconditions;
3233
import com.google.common.util.concurrent.MoreExecutors;
3334
import java.util.ArrayList;
3435
import java.util.List;
@@ -82,6 +83,9 @@ enum OperationType {
8283
/** The maximum number of writes that can be in a single batch. */
8384
public static final int MAX_BATCH_SIZE = 20;
8485

86+
/** The maximum number of writes that can be in a batch containing retries. */
87+
public static final int RETRY_MAX_BATCH_SIZE = 10;
88+
8589
/**
8690
* The maximum number of retries that will be attempted with backoff before stopping all retry
8791
* attempts.
@@ -237,7 +241,7 @@ public boolean onError(BulkWriterException error) {
237241
: Executors.newSingleThreadScheduledExecutor();
238242
this.successExecutor = MoreExecutors.directExecutor();
239243
this.errorExecutor = MoreExecutors.directExecutor();
240-
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
244+
this.bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
241245

242246
if (!options.getThrottlingEnabled()) {
243247
this.rateLimiter =
@@ -962,7 +966,7 @@ private void scheduleCurrentBatchLocked(final boolean flush) {
962966
if (bulkCommitBatch.getMutationsSize() == 0) return;
963967

964968
final BulkCommitBatch pendingBatch = bulkCommitBatch;
965-
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
969+
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, maxBatchSize);
966970

967971
// Use the write with the longest backoff duration when determining backoff.
968972
int highestBackoffDuration = 0;
@@ -1025,7 +1029,10 @@ public void run() {
10251029

10261030
@VisibleForTesting
10271031
void setMaxBatchSize(int size) {
1032+
Preconditions.checkState(
1033+
bulkCommitBatch.getMutationsSize() == 0, "BulkCommitBatch should be empty");
10281034
maxBatchSize = size;
1035+
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor, size);
10291036
}
10301037

10311038
@VisibleForTesting
@@ -1050,6 +1057,16 @@ void setMaxPendingOpCount(int newMax) {
10501057
private void sendOperationLocked(
10511058
ApiFunction<BulkCommitBatch, ApiFuture<WriteResult>> enqueueOperationOnBatchCallback,
10521059
final BulkWriterOperation op) {
1060+
// A backoff duration greater than 0 implies that this batch is a retry.
1061+
// Retried writes are sent with a batch size of 10 in order to guarantee
1062+
// that the batch is under the 10MiB limit.
1063+
if (op.getBackoffDuration() > 0) {
1064+
if (bulkCommitBatch.getMutationsSize() >= RETRY_MAX_BATCH_SIZE) {
1065+
scheduleCurrentBatchLocked(/* flush= */ false);
1066+
}
1067+
bulkCommitBatch.setMaxBatchSize(RETRY_MAX_BATCH_SIZE);
1068+
}
1069+
10531070
if (bulkCommitBatch.has(op.getDocumentReference())) {
10541071
// Create a new batch since the backend doesn't support batches with two writes to the same
10551072
// document.
@@ -1062,7 +1079,7 @@ private void sendOperationLocked(
10621079
bulkCommitBatch.enqueueOperation(op);
10631080
enqueueOperationOnBatchCallback.apply(bulkCommitBatch);
10641081

1065-
if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
1082+
if (bulkCommitBatch.getMutationsSize() == bulkCommitBatch.getMaxBatchSize()) {
10661083
scheduleCurrentBatchLocked(/* flush= */ false);
10671084
}
10681085
}

google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import com.google.firestore.v1.BatchWriteRequest;
4545
import com.google.firestore.v1.BatchWriteResponse;
4646
import com.google.firestore.v1.Value;
47+
import com.google.firestore.v1.Write;
4748
import com.google.protobuf.GeneratedMessageV3;
4849
import com.google.rpc.Code;
4950
import io.grpc.Status;
@@ -727,6 +728,42 @@ public boolean onError(BulkWriterException error) {
727728
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
728729
}
729730

731+
@Test
732+
public void retriesWithSmallerBatchSize() throws Exception {
733+
734+
final List<Write> writes = new ArrayList<>();
735+
final List<ApiFuture<BatchWriteResponse>> successResponses = new ArrayList<>();
736+
final List<ApiFuture<BatchWriteResponse>> failedResponses = new ArrayList<>();
737+
for (int i = 0; i < 15; i++) {
738+
writes.add(set(LocalFirestoreHelper.SINGLE_FIELD_PROTO, "coll/doc" + i));
739+
failedResponses.add(failedResponse(Code.ABORTED_VALUE));
740+
successResponses.add(successResponse(1));
741+
}
742+
743+
ResponseStubber responseStubber =
744+
new ResponseStubber() {
745+
{
746+
put(
747+
batchWrite(writes.toArray(new Write[0])),
748+
mergeResponses(failedResponses.toArray(new ApiFuture[0])));
749+
put(
750+
batchWrite(
751+
writes.subList(0, BulkWriter.RETRY_MAX_BATCH_SIZE).toArray(new Write[0])),
752+
mergeResponses(successResponses.subList(0, 10).toArray(new ApiFuture[0])));
753+
put(
754+
batchWrite(
755+
writes.subList(BulkWriter.RETRY_MAX_BATCH_SIZE, 15).toArray(new Write[0])),
756+
mergeResponses(successResponses.subList(10, 15).toArray(new ApiFuture[0])));
757+
}
758+
};
759+
responseStubber.initializeStub(batchWriteCapture, firestoreMock);
760+
761+
for (int i = 0; i < 15; i++) {
762+
bulkWriter.set(firestoreMock.document("coll/doc" + i), LocalFirestoreHelper.SINGLE_FIELD_MAP);
763+
}
764+
bulkWriter.close();
765+
}
766+
730767
@Test
731768
public void retryResolvesBeforeFlush() throws Exception {
732769
ResponseStubber responseStubber =

0 commit comments

Comments
 (0)