New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: add retry on ABORTED errors #286
Conversation
Codecov Report
@@ Coverage Diff @@
## bc/bulk-master #286 +/- ##
====================================================
+ Coverage 72.94% 74.02% +1.07%
- Complexity 1015 1123 +108
====================================================
Files 63 67 +4
Lines 5407 6109 +702
Branches 617 672 +55
====================================================
+ Hits 3944 4522 +578
- Misses 1266 1373 +107
- Partials 197 214 +17
Continue to review full report at Codecov.
|
}); | ||
} | ||
|
||
private void bulkCommit(BulkCommitBatch batch) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about having the retry logic live in BulkWriter
instead of inside UpdateBuilder
? I originally wanted to take care of everything in UpdateBuilder
, but then I realized that doing so would prohibit creating a new BulkCommitBatch
via constructor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This matches Node so I am fully supportive.
null, | ||
((FirestoreException) e).getStatus(), | ||
e.getMessage())); | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't sure how to handle the case where a non-FirestoreException was thrown. I added an additional field for an Exception in the event that the Exception thrown does not have a Status
, but I wonder if this could be cleaner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code in bulkCommit
should wrap all exceptions in FirestoreException.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Post conversation: We can't wrap some of the java exceptions that getting Futures will throw. However, the exceptions will still be surfaced to the user accordingly.
* Removes all operations not specified in documentsToRetry from the batch. Marks the batch as | ||
* READY_TO_SEND in order to allow the next batch to be sent. | ||
*/ | ||
void sliceBatchForRetry(final Set<String> documentsToRetry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh man. I really tried to use the BulkCommitBatch constructor directly to properly imitate Node. However, since the BulkCommitBatch
logic lives inside UpdateBuilder
, it got pretty ugly. I had to add a bunch of getters, and managing the state on a BulkCommitBatch
becomes a tricky problem / potential for future bugs. I think slicing the batch here is the lesser of the two evils, and I'm open to separating out the batch state override.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about https://gist.github.com/schmidt-sebastian/48ed97084b74bf7e3dd5936eddbfc001, or is this only addressing half of the problem?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran into two main issues: first, my currently implementation requires the batch in bulkCommit()
to be marked final
, which means it can't be changed directly, though I guess we could put it in an array. The second is that the completeFuture
used to track awaitBulkCommit()
will never resolve, unless we pipe it through somehow (is that possible?).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: Ended up updating to use BulkCommitBatch as constructor instead of slicing, pending successfulAsList
google-cloud-firestore/src/main/java/com/google/cloud/firestore/UpdateBuilder.java
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BatchWriteResult.java
Outdated
Show resolved
Hide resolved
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
.execute( | ||
new Runnable() { | ||
public void run() { | ||
bulkCommit(batch); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We previously waited for bulkCommit
before removing the batch from the batch queue. Do we not need to do this anymore?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we are still waiting on bulkCommit
to finish before removing the batch. We're waiting for attemptBulkCommit
in bulkCommit()
, which means that bulkCommit()
should finish its logic before we remove batches from the batch queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update: N/A now that we are chaining with ApiFutures.
batch.processResults(commitFuture.get(), null); | ||
} catch (Exception e) { | ||
batch.processResults(new ArrayList<BatchWriteResult>(), e); | ||
MoreExecutors.directExecutor() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MoreExecutors.directExecutor(). execute(...)
seems equivalent to just running the code in the Runnable. Let me know if I am mistaken, maybe by adding a comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it was always good practice to execute runnables via an executor, as I haven't seen any Runnables being executed directly. Done.
google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java
Show resolved
Hide resolved
} catch (Exception e) { | ||
assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | ||
++opCount; | ||
} | ||
assertEquals(1, opCount); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} catch (Exception e) { | |
assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | |
++opCount; | |
} | |
assertEquals(1, opCount); | |
Assert.fail(...); | |
} catch (Exception e) { | |
assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
|
||
@Test | ||
public void failsWritesAfterAllRetryAttemptsFail() throws Exception { | ||
doThrow( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the mocking layer allow us to specify that we want this function to be called 10 times?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to use doAnswer.
} | ||
Set<Code> codes = FirestoreSettings.newBuilder().batchWriteSettings().getRetryableCodes(); | ||
for (Code code : codes) { | ||
if (code.toString().equals(status.getCode().toString())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (code.toString().equals(status.getCode().toString())) { | |
if (code.equals(Code.valueOf(status.getCode().name()))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's cleaner, thanks.
* Removes all operations not specified in documentsToRetry from the batch. Marks the batch as | ||
* READY_TO_SEND in order to allow the next batch to be sent. | ||
*/ | ||
void sliceBatchForRetry(final Set<String> documentsToRetry) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about https://gist.github.com/schmidt-sebastian/48ed97084b74bf7e3dd5936eddbfc001, or is this only addressing half of the problem?
60dcfbd
to
95c099f
Compare
This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available
|
||
BatchWriteResult(@Nullable Timestamp timestamp, Status status, String message) { | ||
BatchWriteResult( | ||
DocumentReference documentReference, @Nullable Timestamp timestamp, Exception exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exception should be @Nullable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
commitFuture.addListener( | ||
new Runnable() { | ||
public void run() { | ||
System.out.println("removing batch from batch queue. size: " + batchQueue.size()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops :(
} | ||
|
||
private ApiFuture<List<BatchWriteResult>> invokeBulkCommit(final BulkCommitBatch batch) { | ||
return batch.bulkCommit(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be inlined.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
private ApiFuture<Void> bulkCommit(final BulkCommitBatch batch, final int attempt) { | ||
final SettableApiFuture<Void> backoffFuture = SettableApiFuture.create(); | ||
|
||
class ProcessBulkCommitCallback implements ApiAsyncFunction<List<BatchWriteResult>, Void> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move these inner classes out of this method? Static classes within methods feel strange. :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I originally wanted to avoid adding class members to the callback classes, but readability/style is more important. Thanks!
@Override | ||
public ApiFuture<Void> apply(Void ignored) { | ||
|
||
// If the BatchWrite RPC fails, map the exception to each individual result. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move comment closer to the code that does this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
new BatchWriteResult( | ||
writes.get(i).documentReference, | ||
updateTime, | ||
updateTime == null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be based on the Status?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updateTime
is null if the Status is not OK from above, but I can see how using status again is more readable. Changed to an if/else block that hopefully increases readability.
} | ||
|
||
// Mark the batch as ready to send in order to allow the batch to be retried again. | ||
state = BatchState.READY_TO_SEND; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this now unused?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed.
batch.processResults(results); | ||
if (batch.getPendingOperationCount() > 0) { | ||
logger.log( | ||
Level.WARNING, | ||
String.format( | ||
"Current batch failed at retry #%d. Num failures: %d", | ||
attempt, batch.getPendingOperationCount())); | ||
|
||
if (attempt < MAX_RETRY_ATTEMPTS) { | ||
nextAttempt = backoff.createNextAttempt(nextAttempt); | ||
BulkCommitBatch newBatch = | ||
new BulkCommitBatch(firestore, batch, batch.getPendingDocuments()); | ||
return bulkCommit(newBatch, attempt + 1); | ||
} else { | ||
batch.failRemainingOperations(results); | ||
} | ||
} | ||
return ApiFutures.immediateFuture(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This confused me during the review. There is a strange interaction model going on here.
batch.processResults
changes the internal state, which reduces the number of pending operations (this is not obvious from the callsite). new BulkCommitBatch
then uses this filtered list of pending operations, but also uses the unfiltered list of writes.
While this matches what Node does, a lot of the back and forth in Node happens in the same codepath, and here these interactions all cross into and out of UpdateBuilder.
One improvement suggestion:
batch.processResults(results); | |
if (batch.getPendingOperationCount() > 0) { | |
logger.log( | |
Level.WARNING, | |
String.format( | |
"Current batch failed at retry #%d. Num failures: %d", | |
attempt, batch.getPendingOperationCount())); | |
if (attempt < MAX_RETRY_ATTEMPTS) { | |
nextAttempt = backoff.createNextAttempt(nextAttempt); | |
BulkCommitBatch newBatch = | |
new BulkCommitBatch(firestore, batch, batch.getPendingDocuments()); | |
return bulkCommit(newBatch, attempt + 1); | |
} else { | |
batch.failRemainingOperations(results); | |
} | |
} | |
return ApiFutures.immediateFuture(null); | |
batch.processResults(results); | |
List<DocumentReference> pendingOps = batch.getRemainingOperations(); | |
if (!pendingOps.isEmpty()) { | |
logger.log( | |
Level.WARNING, | |
String.format( | |
"Current batch failed at retry #%d. Num failures: %d", | |
attempt,pendingOps.size()))); | |
if (attempt < MAX_RETRY_ATTEMPTS) { | |
nextAttempt = backoff.createNextAttempt(nextAttempt); | |
BulkCommitBatch newBatch = | |
new BulkCommitBatch(firestore, batch, pendingOps); | |
return bulkCommit(newBatch, attempt + 1); | |
} else { | |
batch.failRemainingOperations(results); | |
} | |
} | |
return ApiFutures.immediateFuture(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for taking the time to write it out and explain it. I remember thinking the code was kind of tricky and had a smell to it, but I couldn't pinpoint what it was. Changed and updated comments for processResults
.
google-cloud-firestore/src/main/java/com/google/cloud/firestore/BulkWriter.java
Show resolved
Hide resolved
/** | ||
* A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent | ||
* modification errors. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
/** | |
* A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent | |
* modification errors. | |
*/ | |
/** | |
* A queue of batches to be written. Use a synchronized list to avoid multi-thread concurrent | |
* modification errors (as this list is modified from both the user thread and the network thread). | |
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks for the clarification
batch.processResults(results); | ||
Set<DocumentReference> pendingOps = batch.getPendingDocuments(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic is much cleaner now, but still not self-explanatory. I think this can be solved by naming though:
batch.processResults(results);
Set<DocumentReference> remainingOp = batch.getRemainingOperations();
Or:
batch.processResultsAndRemoveSuccesfulResultsFromPendingOperationsBernie2016(results);
Set<DocumentReference> remainingOp = batch. getPendingDocuments();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sad reacts only for 2020
@Nullable Timestamp updateTime; | ||
@Nullable Exception exception; | ||
if (code == Status.OK) { | ||
updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | ||
exception = null; | ||
} else { | ||
updateTime = null; | ||
exception = FirestoreException.serverRejected(code, status.getMessage()); | ||
} | ||
result.add( | ||
new BatchWriteResult(writes.get(i).documentReference, updateTime, exception)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Nullable Timestamp updateTime; | |
@Nullable Exception exception; | |
if (code == Status.OK) { | |
updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | |
exception = null; | |
} else { | |
updateTime = null; | |
exception = FirestoreException.serverRejected(code, status.getMessage()); | |
} | |
result.add( | |
new BatchWriteResult(writes.get(i).documentReference, updateTime, exception)); | |
Timestamp updateTime = null; | |
Exception exception = null; | |
if (Status.OK.equals(code)) { | |
updateTime = Timestamp.fromProto(writeResult.getUpdateTime()); | |
} else { | |
exception = FirestoreException.serverRejected(code, status.getMessage()); | |
} | |
result.add( | |
new BatchWriteResult(writes.get(i).documentReference, updateTime, exception)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops :(
convertBatchWriteResult(results.get(i), future); | ||
} else { | ||
resultsMap.get(i).setException(error); | ||
/** |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
} | ||
|
||
@Test | ||
public void allWritesCompleteWhenFlushCompletes() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this more "flushCompletesWhenallWritesComplete"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks
assertTrue(e.getMessage().contains("Mock batchWrite failed in test")); | ||
assertEquals(retryAttempts[0], BulkWriter.MAX_RETRY_ATTEMPTS + 1); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we add a test that:
- Adds two writes
- Calls flush
- Fails one write with retryable error, let's the other succeed
- Retries write which succeeds
- Test verifies that flush() only finishes now
I am not 100% convinced that this works right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added test.
* fix: add retry on ABORTED errors * WIP: test pass individually, concurrency error * WIP: tests passing but hidden concurrency bug * use BulkCommitBatch constructor, remove completeFuture * use firestoreExecutor and update rate limiter test * add todo for successfulAsList * Update BulkWriterTest to check number of retry attempts * build: manually bump com.google.api:api-common to v1.10.0 This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available * test: add junit timeout rule to BulkWriterTest (#312) * update to use successfulAsList * add test to verify successfulAsList * lint * resolve comments, fix concurrency issue? * resolve comments, add additional test for flush Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>
* fix: add retry on ABORTED errors * WIP: test pass individually, concurrency error * WIP: tests passing but hidden concurrency bug * use BulkCommitBatch constructor, remove completeFuture * use firestoreExecutor and update rate limiter test * add todo for successfulAsList * Update BulkWriterTest to check number of retry attempts * build: manually bump com.google.api:api-common to v1.10.0 This change should be cleaned up after com.google.com:google-cloud-shared-dependencies v0.8.4 is available * test: add junit timeout rule to BulkWriterTest (#312) * update to use successfulAsList * add test to verify successfulAsList * lint * resolve comments, fix concurrency issue? * resolve comments, add additional test for flush Co-authored-by: BenWhitehead <BenWhitehead@users.noreply.github.com>
Porting over from node.