Skip to content

Commit 5905438

Browse files
fix: retry transactions that fail with expired transaction IDs (#447)
1 parent b529245 commit 5905438

File tree

5 files changed

+224
-234
lines changed

5 files changed

+224
-234
lines changed

google-cloud-firestore/src/main/java/com/google/cloud/firestore/Transaction.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,6 @@ public interface AsyncFunction<T> {
6565
}
6666

6767
private final TransactionOptions transactionOptions;
68-
@Nullable private final ByteString previousTransactionId;
6968
private ByteString transactionId;
7069

7170
Transaction(
@@ -74,8 +73,11 @@ public interface AsyncFunction<T> {
7473
@Nullable Transaction previousTransaction) {
7574
super(firestore);
7675
this.transactionOptions = transactionOptions;
77-
this.previousTransactionId =
78-
previousTransaction != null ? previousTransaction.transactionId : null;
76+
this.transactionId = previousTransaction != null ? previousTransaction.transactionId : null;
77+
}
78+
79+
public boolean hasTransactionId() {
80+
return transactionId != null;
7981
}
8082

8183
Transaction wrapResult(ApiFuture<WriteResult> result) {
@@ -89,11 +91,8 @@ ApiFuture<Void> begin() {
8991
beginTransaction.setDatabase(firestore.getDatabaseName());
9092

9193
if (TransactionOptionsType.READ_WRITE.equals(transactionOptions.getType())
92-
&& previousTransactionId != null) {
93-
beginTransaction
94-
.getOptionsBuilder()
95-
.getReadWriteBuilder()
96-
.setRetryTransaction(previousTransactionId);
94+
&& transactionId != null) {
95+
beginTransaction.getOptionsBuilder().getReadWriteBuilder().setRetryTransaction(transactionId);
9796
} else if (TransactionOptionsType.READ_ONLY.equals(transactionOptions.getType())) {
9897
final ReadOnly.Builder readOnlyBuilder = ReadOnly.newBuilder();
9998
if (transactionOptions.getReadTime() != null) {

google-cloud-firestore/src/main/java/com/google/cloud/firestore/TransactionRunner.java

Lines changed: 57 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -105,23 +105,40 @@ ApiFuture<T> run() {
105105
"Start runTransaction",
106106
ImmutableMap.of("attemptsRemaining", AttributeValue.longAttributeValue(attemptsRemaining)));
107107

108-
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
108+
return ApiFutures.catchingAsync(
109+
ApiFutures.transformAsync(
110+
maybeRollback(), new RollbackCallback(), MoreExecutors.directExecutor()),
111+
Throwable.class,
112+
new RestartTransactionCallback(),
113+
MoreExecutors.directExecutor());
114+
}
109115

110-
// Add a backoff delay. At first, this is 0.
111-
this.firestoreExecutor.schedule(
112-
new Runnable() {
113-
@Override
114-
public void run() {
115-
backoff.set(null);
116-
}
117-
},
118-
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
119-
TimeUnit.MILLISECONDS);
116+
private ApiFuture<Void> maybeRollback() {
117+
return transaction.hasTransactionId()
118+
? transaction.rollback()
119+
: ApiFutures.<Void>immediateFuture(null);
120+
}
120121

121-
nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
122+
/** A callback that invokes the BeginTransaction callback. */
123+
private class RollbackCallback implements ApiAsyncFunction<Void, T> {
124+
@Override
125+
public ApiFuture<T> apply(Void input) {
126+
final SettableApiFuture<Void> backoff = SettableApiFuture.create();
127+
// Add a backoff delay. At first, this is 0.
128+
firestoreExecutor.schedule(
129+
new Runnable() {
130+
@Override
131+
public void run() {
132+
backoff.set(null);
133+
}
134+
},
135+
nextBackoffAttempt.getRandomizedRetryDelay().toMillis(),
136+
TimeUnit.MILLISECONDS);
122137

123-
return ApiFutures.transformAsync(
124-
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
138+
nextBackoffAttempt = backoffAlgorithm.createNextAttempt(nextBackoffAttempt);
139+
return ApiFutures.transformAsync(
140+
backoff, new BackoffCallback(), MoreExecutors.directExecutor());
141+
}
125142
}
126143

127144
/**
@@ -138,7 +155,6 @@ public void run() {
138155
new ApiFutureCallback<T>() {
139156
@Override
140157
public void onFailure(Throwable t) {
141-
142158
callbackResult.setException(t);
143159
}
144160

@@ -168,12 +184,8 @@ public ApiFuture<T> apply(Void input) {
168184
*/
169185
private class BeginTransactionCallback implements ApiAsyncFunction<Void, T> {
170186
public ApiFuture<T> apply(Void ignored) {
171-
return ApiFutures.catchingAsync(
172-
ApiFutures.transformAsync(
173-
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor()),
174-
Throwable.class,
175-
new RestartTransactionCallback(),
176-
MoreExecutors.directExecutor());
187+
return ApiFutures.transformAsync(
188+
invokeUserCallback(), new UserFunctionCallback(), MoreExecutors.directExecutor());
177189
}
178190
}
179191

@@ -217,10 +229,10 @@ public ApiFuture<T> apply(Throwable throwable) {
217229
}
218230

219231
ApiException apiException = (ApiException) throwable;
220-
if (isRetryableTransactionError(apiException)) {
232+
if (transaction.hasTransactionId() && isRetryableTransactionError(apiException)) {
221233
if (attemptsRemaining > 0) {
222234
span.addAnnotation("retrying");
223-
return rollbackAndContinue();
235+
return run();
224236
} else {
225237
span.setStatus(TOO_MANY_RETRIES_STATUS);
226238
final FirestoreException firestoreException =
@@ -251,39 +263,36 @@ private boolean isRetryableTransactionError(ApiException exception) {
251263
case UNAUTHENTICATED:
252264
case RESOURCE_EXHAUSTED:
253265
return true;
266+
case INVALID_ARGUMENT:
267+
// The Firestore backend uses "INVALID_ARGUMENT" for transactions IDs that have expired.
268+
// While INVALID_ARGUMENT is generally not retryable, we retry this specific case.
269+
return exception.getMessage().contains("transaction has expired");
254270
default:
255271
return false;
256272
}
257273
}
258274

259-
/** Rolls the transaction back and attempts it again. */
260-
private ApiFuture<T> rollbackAndContinue() {
261-
return ApiFutures.transformAsync(
262-
transaction.rollback(),
263-
new ApiAsyncFunction<Void, T>() {
264-
@Override
265-
public ApiFuture<T> apply(Void input) {
266-
return run();
267-
}
268-
},
269-
MoreExecutors.directExecutor());
270-
}
271-
272275
/** Rolls the transaction back and returns the error. */
273276
private ApiFuture<T> rollbackAndReject(final Throwable throwable) {
274277
final SettableApiFuture<T> failedTransaction = SettableApiFuture.create();
275-
// We use `addListener()` since we want to return the original exception regardless of whether
276-
// rollback() succeeds.
277-
transaction
278-
.rollback()
279-
.addListener(
280-
new Runnable() {
281-
@Override
282-
public void run() {
283-
failedTransaction.setException(throwable);
284-
}
285-
},
286-
MoreExecutors.directExecutor());
278+
279+
if (transaction.hasTransactionId()) {
280+
// We use `addListener()` since we want to return the original exception regardless of
281+
// whether rollback() succeeds.
282+
transaction
283+
.rollback()
284+
.addListener(
285+
new Runnable() {
286+
@Override
287+
public void run() {
288+
failedTransaction.setException(throwable);
289+
}
290+
},
291+
MoreExecutors.directExecutor());
292+
} else {
293+
failedTransaction.setException(throwable);
294+
}
295+
287296
span.end();
288297
return failedTransaction;
289298
}

google-cloud-firestore/src/test/java/com/google/cloud/firestore/BulkWriterTest.java

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636
import com.google.cloud.firestore.spi.v1.FirestoreRpc;
3737
import com.google.firestore.v1.BatchWriteRequest;
3838
import com.google.firestore.v1.BatchWriteResponse;
39-
import com.google.protobuf.GeneratedMessageV3;
4039
import com.google.rpc.Code;
4140
import io.grpc.Status;
4241
import java.util.ArrayList;
@@ -120,13 +119,6 @@ private ApiFuture<BatchWriteResponse> mergeResponses(ApiFuture<BatchWriteRespons
120119
return ApiFutures.immediateFuture(response.build());
121120
}
122121

123-
private void verifyRequests(List<BatchWriteRequest> requests, ResponseStubber responseStubber) {
124-
int index = 0;
125-
for (GeneratedMessageV3 request : responseStubber.keySet()) {
126-
assertEquals(request, requests.get(index++));
127-
}
128-
}
129-
130122
@Before
131123
public void before() {
132124
doReturn(immediateExecutor).when(firestoreRpc).getExecutor();
@@ -150,10 +142,7 @@ public void hasSetMethod() throws Exception {
150142
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
151143
bulkWriter.close();
152144

153-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
154-
assertEquals(responseStubber.size(), requests.size());
155-
156-
verifyRequests(requests, responseStubber);
145+
responseStubber.verifyAllRequestsSent();
157146
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
158147
}
159148

@@ -172,10 +161,7 @@ public void hasUpdateMethod() throws Exception {
172161
ApiFuture<WriteResult> result = bulkWriter.update(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
173162
bulkWriter.close();
174163

175-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
176-
assertEquals(responseStubber.size(), requests.size());
177-
178-
verifyRequests(requests, responseStubber);
164+
responseStubber.verifyAllRequestsSent();
179165
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
180166
}
181167

@@ -192,10 +178,7 @@ public void hasDeleteMethod() throws Exception {
192178
ApiFuture<WriteResult> result = bulkWriter.delete(doc1);
193179
bulkWriter.close();
194180

195-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
196-
assertEquals(responseStubber.size(), requests.size());
197-
198-
verifyRequests(requests, responseStubber);
181+
responseStubber.verifyAllRequestsSent();
199182
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
200183
}
201184

@@ -214,10 +197,7 @@ public void hasCreateMethod() throws Exception {
214197
ApiFuture<WriteResult> result = bulkWriter.create(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
215198
bulkWriter.close();
216199

217-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
218-
assertEquals(responseStubber.size(), requests.size());
219-
220-
verifyRequests(requests, responseStubber);
200+
responseStubber.verifyAllRequestsSent();
221201
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result.get().getUpdateTime());
222202
}
223203

@@ -236,10 +216,7 @@ public void surfacesErrors() throws Exception {
236216
ApiFuture<WriteResult> result = bulkWriter.set(doc1, LocalFirestoreHelper.SINGLE_FIELD_MAP);
237217
bulkWriter.close();
238218

239-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
240-
assertEquals(responseStubber.size(), requests.size());
241-
242-
verifyRequests(requests, responseStubber);
219+
responseStubber.verifyAllRequestsSent();
243220
try {
244221
result.get();
245222
fail("set() should have failed");
@@ -274,10 +251,7 @@ public void addsWritesToNewBatchAfterFlush() throws Exception {
274251
ApiFuture<WriteResult> result2 = bulkWriter.set(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
275252
bulkWriter.close();
276253

277-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
278-
assertEquals(responseStubber.size(), requests.size());
279-
280-
verifyRequests(requests, responseStubber);
254+
responseStubber.verifyAllRequestsSent();
281255
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
282256
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
283257
}
@@ -350,10 +324,7 @@ public void canSendWritesToSameDocInSameBatch() throws Exception {
350324
bulkWriter.update(sameDoc, LocalFirestoreHelper.SINGLE_FIELD_MAP);
351325
bulkWriter.close();
352326

353-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
354-
assertEquals(responseStubber.size(), requests.size());
355-
356-
verifyRequests(requests, responseStubber);
327+
responseStubber.verifyAllRequestsSent();
357328
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
358329
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
359330
}
@@ -376,10 +347,7 @@ public void sendWritesToDifferentDocsInSameBatch() throws Exception {
376347
ApiFuture<WriteResult> result2 = bulkWriter.update(doc2, LocalFirestoreHelper.SINGLE_FIELD_MAP);
377348
bulkWriter.close();
378349

379-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
380-
assertEquals(responseStubber.size(), requests.size());
381-
382-
verifyRequests(requests, responseStubber);
350+
responseStubber.verifyAllRequestsSent();
383351
assertEquals(Timestamp.ofTimeSecondsAndNanos(1, 0), result1.get().getUpdateTime());
384352
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
385353
}
@@ -413,9 +381,7 @@ public void sendBatchesWhenSizeLimitIsReached() throws Exception {
413381
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
414382
assertEquals(Timestamp.ofTimeSecondsAndNanos(3, 0), result3.get().getUpdateTime());
415383

416-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
417-
assertEquals(responseStubber.size(), requests.size());
418-
verifyRequests(requests, responseStubber);
384+
responseStubber.verifyAllRequestsSent();
419385
}
420386

421387
@Test
@@ -462,8 +428,7 @@ public void retriesIndividualWritesThatFailWithAbortedOrUnavailable() throws Exc
462428
assertEquals(Timestamp.ofTimeSecondsAndNanos(2, 0), result2.get().getUpdateTime());
463429
assertEquals(Timestamp.ofTimeSecondsAndNanos(3, 0), result3.get().getUpdateTime());
464430

465-
List<BatchWriteRequest> requests = batchWriteCapture.getAllValues();
466-
assertEquals(responseStubber.size(), requests.size());
431+
responseStubber.verifyAllRequestsSent();
467432
}
468433

469434
@Test

0 commit comments

Comments
 (0)