Skip to content
Permalink
Browse files
fix: keep track of any BeginTransaction option for a Read (#1485)
If a StreamingReadRequest that included a BeginTransaction option was retried as a result
of a transient error (UNAVAILABLE), the fact that the BeginTransaction option was included
would not be registered for the retried request. This could cause a transaction to fail if
the retried request returned an Aborted error, and that Aborted error was caught by the
application.
  • Loading branch information
olavloite committed Oct 6, 2021
1 parent bbe7603 commit 757d6ecfcceea58e0db7623778dde6f3e5f4b865
@@ -790,7 +790,7 @@ CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken
SpannerRpc.StreamingCall call =
rpc.read(builder.build(), stream.consumer(), session.getOptions());
call.request(prefetchChunks);
stream.setCall(call, selector != null && selector.hasBegin());
stream.setCall(call, /* withBeginTransaction = */ builder.getTransaction().hasBegin());
return stream;
}
};
@@ -605,7 +605,7 @@ public void testInlinedBeginFirstReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
Long value =
client
.readWriteTransaction()
.run(
@@ -625,6 +625,381 @@ public void testInlinedBeginFirstReadReturnsUnavailable() {
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
if (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
return transaction.executeUpdate(UPDATE_STATEMENT);
});
assertThat(value).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (AbortedException e) {
// Ignore the AbortedException and let the commit handle it.
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try {
return transaction.executeUpdate(UPDATE_STATEMENT);
} catch (AbortedException e) {
// Ignore the AbortedException and let the commit handle it.
}
return 0L;
});
assertThat(value).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstDmlReturnsUnavailableRetryReturnsAborted_WithCatchAll() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofExceptions(
Arrays.asList(
Status.UNAVAILABLE.asRuntimeException(), Status.ABORTED.asRuntimeException())));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
// The second attempt will return ABORTED and should cause the transaction to
// retry.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (AbortedException e) {
// Ignore the AbortedException and let the commit handle it.
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatch() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.CANCELLED) {
// Ignore and let the transaction continue.
// Also make sure that the next read operation will return Aborted.
mockSpanner.abortNextTransaction();
} else if (e.getErrorCode() == ErrorCode.ABORTED) {
// Ignore Aborted errors. This will cause the transaction to try to commit.
} else {
// Propagate any other errors (there should not be any in this test case).
throw e;
}
}
return 0L;
});

assertThat(value).isEqualTo(1L);
// 1. The initial attempt will inline the BeginTransaction option.
// 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction
// RPC.
// 3. The ABORTED error during the second attempt will NOT cause the next retry to use an
// explicit BeginTransaction RPC, because the previous attempt did return a transaction ID
// (the ID that was returned by the BeginTransaction RPC of that attempt).
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// There will be 3 attempts to read:
// 1. The first will return CANCELLED.
// 2. The second will return ABORTED.
// 3. The third will return the results.
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
// There are two attempts to commit:
// 1. The initial attempt will NOT try to commit, because the initial Read operation did not
// return a transaction ID.
// 2. The second attempt will try to commit, because the BeginTransaction RPC did return a
// transaction ID, and the Aborted error that was returned by the Read operation was caught
// by the application. This means that the TransactionRunner does not know that the
// transaction was aborted. The Commit RPC will return an Aborted error.
// 3. The third attempt will commit, as the Read operation succeeded and returned a
// transaction ID.
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithoutCatch()
throws InterruptedException, ExecutionException {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
// The CANCELLED error is not caught by the application, so it will bubble up and cause the
// transaction to fail.
assertThrows(
SpannerException.class,
() ->
client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet rs =
transaction.read(
"FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.CANCELLED) {
// Make sure that the next read operation will return Aborted.
mockSpanner.abortNextTransaction();
}
// Always propagate the error to the TransactionRunner.
throw e;
}
return 0L;
}));

// The initial attempt will inline the BeginTransaction option.
// There is no second attempt as the CANCELLED error is not caught.
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(0);
// The CANCELLED error means that there is no transaction ID returned by the Read operation.
// So there is also no transaction to rollback.
assertThat(countRequests(RollbackRequest.class)).isEqualTo(0);
}

@Test
public void testInlinedBeginFirstReadCancelledSecondReadAborted_WithCatchForCancelled()
throws InterruptedException, ExecutionException {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofException(Status.CANCELLED.asRuntimeException()));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
} catch (SpannerException e) {
if (e.getErrorCode() == ErrorCode.CANCELLED) {
// Do not propagate the CANCELLED error.
// Make sure that the next read operation will return Aborted.
mockSpanner.abortNextTransaction();
} else {
// Propagate all other errors to the TransactionRunner.
throw e;
}
}
return 0L;
});

assertThat(value).isEqualTo(1L);
// 1. The initial attempt will inline the BeginTransaction option.
// 2. The CANCELLED error during the first attempt will cause a retry with a BeginTransaction
// RPC, because the error was returned by the first statement in the transaction.
// 3. The ABORTED error during the second attempt will NOT cause the next retry to use an
// explicit BeginTransaction RPC, because the previous attempt did return a transaction ID
// (the ID that was returned by the BeginTransaction RPC of that attempt).
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
// There will be 3 attempts to read:
// 1. The first will return CANCELLED.
// 2. The second will return ABORTED.
// 3. The third will return the results.
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
// There is only one attempt to commit:
// 1. The initial attempt will NOT try to commit, because the initial Read operation did not
// return a transaction ID.
// 2. The second attempt will NOT try to commit, because the Aborted error from the Read
// operation is propagated to the TransactionRunner. This means that the TransactionRunner
// knows that the transaction was aborted, and will automatically initiate a retry without
// first trying to commit the transaction.
// 3. The third attempt will commit, as the Read operation succeeded and returned a
// transaction ID.
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginCommitAfterReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setCommitExecutionTime(
SimulatedExecutionTime.ofException(Status.UNAVAILABLE.asRuntimeException()));
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(1);
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailableAndCommitAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
final AtomicBoolean firstAttempt = new AtomicBoolean(true);
Long value =
client
.readWriteTransaction()
.run(
transaction -> {
long res = 0L;
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Collections.singletonList("ID"))) {
if (rs.next()) {
res = rs.getLong(0);
}
}
if (firstAttempt.compareAndSet(true, false)) {
mockSpanner.abortTransaction(transaction);
}
return res;
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(3);
assertThat(countRequests(CommitRequest.class)).isEqualTo(2);
}

@Test
public void testInlinedBeginTxWithQuery() {
DatabaseClient client =

0 comments on commit 757d6ec

Please sign in to comment.