Skip to content

Commit e295aa5

Browse files
author
Brian Chen
authored
feat: add backoff to BulkWriter (#600)
1 parent fd565a1 commit e295aa5

File tree

4 files changed

+282
-15
lines changed

4 files changed

+282
-15
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkCommitBatch.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
/** Used to represent a batch that contains scheduled BulkWriterOperations. */
4040
class BulkCommitBatch extends UpdateBuilder<ApiFuture<WriteResult>> {
4141

42-
private final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
42+
final List<BulkWriterOperation> pendingOperations = new ArrayList<>();
4343
private final Set<DocumentReference> documents = new CopyOnWriteArraySet<>();
4444
private final Executor executor;
4545

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package com.google.cloud.firestore;
1818

19+
import static com.google.cloud.firestore.BulkWriterOperation.DEFAULT_BACKOFF_MAX_DELAY_MS;
20+
1921
import com.google.api.core.ApiAsyncFunction;
2022
import com.google.api.core.ApiFunction;
2123
import com.google.api.core.ApiFuture;
@@ -108,6 +110,12 @@ enum OperationType {
108110
*/
109111
private static final int RATE_LIMITER_MULTIPLIER_MILLIS = 5 * 60 * 1000;
110112

113+
/**
114+
* The default jitter to apply to the exponential backoff used in retries. For example, a factor
115+
* of 0.3 means a 30% jitter is applied.
116+
*/
117+
static final double DEFAULT_JITTER_FACTOR = 0.3;
118+
111119
private static final WriteResultCallback DEFAULT_SUCCESS_LISTENER =
112120
new WriteResultCallback() {
113121
public void onResult(DocumentReference documentReference, WriteResult result) {}
@@ -681,7 +689,7 @@ public ApiFuture<Void> flush() {
681689

682690
private ApiFuture<Void> flushLocked() {
683691
verifyNotClosedLocked();
684-
sendCurrentBatchLocked(/* flush= */ true);
692+
scheduleCurrentBatchLocked(/* flush= */ true);
685693
return lastOperation;
686694
}
687695

@@ -846,37 +854,61 @@ public void addWriteErrorListener(@Nonnull Executor executor, WriteErrorCallback
846854
* This allows retries to resolve as part of a {@link BulkWriter#flush()} or {@link
847855
* BulkWriter#close()} call.
848856
*/
849-
private void sendCurrentBatchLocked(final boolean flush) {
857+
private void scheduleCurrentBatchLocked(final boolean flush) {
850858
if (bulkCommitBatch.getMutationsSize() == 0) return;
851-
BulkCommitBatch pendingBatch = bulkCommitBatch;
859+
860+
final BulkCommitBatch pendingBatch = bulkCommitBatch;
852861
bulkCommitBatch = new BulkCommitBatch(firestore, bulkWriterExecutor);
853862

854-
// Send the batch if it is under the rate limit, or schedule another attempt after the
863+
// Use the write with the longest backoff duration when determining backoff.
864+
int highestBackoffDuration = 0;
865+
for (BulkWriterOperation op : pendingBatch.pendingOperations) {
866+
if (op.getBackoffDuration() > highestBackoffDuration) {
867+
highestBackoffDuration = op.getBackoffDuration();
868+
}
869+
}
870+
final int backoffMsWithJitter = applyJitter(highestBackoffDuration);
871+
872+
bulkWriterExecutor.schedule(
873+
new Runnable() {
874+
@Override
875+
public void run() {
876+
synchronized (lock) {
877+
sendBatchLocked(pendingBatch, flush);
878+
}
879+
}
880+
},
881+
backoffMsWithJitter,
882+
TimeUnit.MILLISECONDS);
883+
}
884+
885+
/** Sends the provided batch once the rate limiter does not require any delay. */
886+
private void sendBatchLocked(final BulkCommitBatch batch, final boolean flush) {
887+
// Send the batch if it is does not require any delay, or schedule another attempt after the
855888
// appropriate timeout.
856-
boolean underRateLimit = rateLimiter.tryMakeRequest(pendingBatch.getMutationsSize());
889+
boolean underRateLimit = rateLimiter.tryMakeRequest(batch.getMutationsSize());
857890
if (underRateLimit) {
858-
pendingBatch
891+
batch
859892
.bulkCommit()
860893
.addListener(
861894
new Runnable() {
862895
@Override
863896
public void run() {
864897
synchronized (lock) {
865-
sendCurrentBatchLocked(flush);
898+
scheduleCurrentBatchLocked(flush);
866899
}
867900
}
868901
},
869902
bulkWriterExecutor);
870-
871903
} else {
872-
long delayMs = rateLimiter.getNextRequestDelayMs(pendingBatch.getMutationsSize());
904+
long delayMs = rateLimiter.getNextRequestDelayMs(batch.getMutationsSize());
873905
logger.log(Level.FINE, String.format("Backing off for %d seconds", delayMs / 1000));
874906
bulkWriterExecutor.schedule(
875907
new Runnable() {
876908
@Override
877909
public void run() {
878910
synchronized (lock) {
879-
sendCurrentBatchLocked(flush);
911+
sendBatchLocked(batch, flush);
880912
}
881913
}
882914
},
@@ -905,7 +937,7 @@ private void sendOperationLocked(
905937
if (bulkCommitBatch.has(op.getDocumentReference())) {
906938
// Create a new batch since the backend doesn't support batches with two writes to the same
907939
// document.
908-
sendCurrentBatchLocked(/* flush= */ false);
940+
scheduleCurrentBatchLocked(/* flush= */ false);
909941
}
910942

911943
// Run the operation on the current batch and advance the `lastOperation` pointer. This
@@ -926,7 +958,7 @@ public ApiFuture<Void> apply(Void aVoid) {
926958
MoreExecutors.directExecutor());
927959

928960
if (bulkCommitBatch.getMutationsSize() == maxBatchSize) {
929-
sendCurrentBatchLocked(/* flush= */ false);
961+
scheduleCurrentBatchLocked(/* flush= */ false);
930962
}
931963
}
932964

@@ -989,4 +1021,11 @@ public void onSuccess(T writeResult) {
9891021
MoreExecutors.directExecutor());
9901022
return flushCallback;
9911023
}
1024+
1025+
private int applyJitter(int backoffMs) {
1026+
if (backoffMs == 0) return 0;
1027+
// Random value in [-0.3, 0.3].
1028+
double jitter = DEFAULT_JITTER_FACTOR * (Math.random() * 2 - 1);
1029+
return (int) Math.min(DEFAULT_BACKOFF_MAX_DELAY_MS, backoffMs + jitter * backoffMs);
1030+
}
9921031
}

google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriterOperation.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.core.ApiFutures;
2323
import com.google.api.core.SettableApiFuture;
2424
import com.google.common.util.concurrent.MoreExecutors;
25+
import io.grpc.Status;
2526

2627
/**
2728
* Represents a single write for BulkWriter, encapsulating operation dispatch and error handling.
@@ -34,7 +35,22 @@ class BulkWriterOperation {
3435
private final ApiFunction<WriteResult, ApiFuture<Void>> successListener;
3536
private final ApiFunction<BulkWriterException, ApiFuture<Boolean>> errorListener;
3637

38+
/**
39+
* The default initial backoff time in milliseconds after an error. Set to 1s according to
40+
* https://cloud.google.com/apis/design/errors.
41+
*/
42+
public static final int DEFAULT_BACKOFF_INITIAL_DELAY_MS = 1000;
43+
44+
/** The default maximum backoff time in milliseconds when retrying an operation. */
45+
public static final int DEFAULT_BACKOFF_MAX_DELAY_MS = 60 * 1000;
46+
47+
/** The default factor to increase the backup by after each failed attempt. */
48+
public static final double DEFAULT_BACKOFF_FACTOR = 1.5;
49+
3750
private int failedAttempts = 0;
51+
private Status lastStatus;
52+
53+
private int backoffDuration = 0;
3854

3955
/**
4056
* @param documentReference The document reference being written to.
@@ -68,6 +84,10 @@ public DocumentReference getDocumentReference() {
6884
return documentReference;
6985
}
7086

87+
public int getBackoffDuration() {
88+
return backoffDuration;
89+
}
90+
7191
/** Callback invoked when an operation attempt fails. */
7292
public ApiFuture<Void> onException(FirestoreException exception) {
7393
++failedAttempts;
@@ -94,6 +114,8 @@ public void onFailure(Throwable throwable) {
94114
@Override
95115
public void onSuccess(Boolean shouldRetry) {
96116
if (shouldRetry) {
117+
lastStatus = bulkWriterException.getStatus();
118+
updateBackoffDuration();
97119
scheduleWriteCallback.apply(BulkWriterOperation.this);
98120
} else {
99121
operationFuture.setException(bulkWriterException);
@@ -106,6 +128,16 @@ public void onSuccess(Boolean shouldRetry) {
106128
return callbackFuture;
107129
}
108130

131+
private void updateBackoffDuration() {
132+
if (lastStatus == Status.RESOURCE_EXHAUSTED) {
133+
backoffDuration = DEFAULT_BACKOFF_MAX_DELAY_MS;
134+
} else if (backoffDuration == 0) {
135+
backoffDuration = DEFAULT_BACKOFF_INITIAL_DELAY_MS;
136+
} else {
137+
backoffDuration *= DEFAULT_BACKOFF_FACTOR;
138+
}
139+
}
140+
109141
/** Callback invoked when the operation succeeds. */
110142
public ApiFuture<Void> onSuccess(final WriteResult result) {
111143
final SettableApiFuture<Void> callbackFuture = SettableApiFuture.create();

0 commit comments

Comments
 (0)