Skip to content
Permalink
Browse files
fix: UNAVAILABLE error on first query could cause transaction to get …
…stuck (#807)

If the first query or read operation of a read/write transaction would return UNAVAILABLE for
the first element of the result stream, the transaction could get stuck. This was caused by the
internal retry mechanism that would wait for the initial attempt to return a transaction, which
was never returned as the UNAVAILABLE exception was internally handled by the result stream
iterator.

Fixes #799
  • Loading branch information
olavloite committed Jan 17, 2021
1 parent 557e761 commit c7dc6e6b11af76cb5db1f160c4466a5d75b524b2
@@ -558,7 +558,7 @@ QueryOptions buildQueryOptions(QueryOptions requestOptions) {
}

ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
Statement statement, QueryMode queryMode, Options options) {
Statement statement, QueryMode queryMode, Options options, boolean withTransactionSelector) {
ExecuteSqlRequest.Builder builder =
ExecuteSqlRequest.newBuilder()
.setSql(statement.getSql())
@@ -572,9 +572,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder(
builder.putParamTypes(param.getKey(), param.getValue().getType().toProto());
}
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
if (withTransactionSelector) {
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
}
builder.setSeqno(getSeqNo());
builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions()));
@@ -619,18 +621,26 @@ ResultSet executeQueryInternalWithOptions(
beforeReadOrQuery();
final int prefetchChunks =
options.hasPrefetchChunks() ? options.prefetchChunks() : defaultPrefetchChunks;
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(
statement, queryMode, options, /* withTransactionSelector = */ false);
ResumableStreamIterator stream =
new ResumableStreamIterator(MAX_BUFFERED_CHUNKS, SpannerImpl.QUERY, span) {
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(statement, prefetchChunks);
final ExecuteSqlRequest.Builder request =
getExecuteSqlRequestBuilder(statement, queryMode, options);
if (partitionToken != null) {
request.setPartitionToken(partitionToken);
}
TransactionSelector selector = null;
if (resumeToken != null) {
request.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!request.hasTransaction()) {
selector = getTransactionSelector();
}
if (selector != null) {
request.setTransaction(selector);
}
SpannerRpc.StreamingCall call =
rpc.executeQuery(request.build(), stream.consumer(), session.getOptions());
@@ -738,10 +748,13 @@ ResultSet readInternalWithOptions(
@Override
CloseableIterator<PartialResultSet> startStream(@Nullable ByteString resumeToken) {
GrpcStreamIterator stream = new GrpcStreamIterator(prefetchChunks);
TransactionSelector selector = null;
if (resumeToken != null) {
builder.setResumeToken(resumeToken);
selector = getTransactionSelector();
} else if (!builder.hasTransaction()) {
selector = getTransactionSelector();
}
TransactionSelector selector = getTransactionSelector();
if (selector != null) {
builder.setTransaction(selector);
}
@@ -1080,6 +1080,7 @@ protected PartialResultSet computeNext() {
backoffSleep(context, backOff);
}
}

continue;
}
span.addAnnotation("Stream broken. Not safe to retry");
@@ -583,7 +583,10 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
try {
com.google.spanner.v1.ResultSet resultSet =
rpc.executeQuery(builder.build(), session.getOptions());
@@ -608,7 +611,10 @@ public ApiFuture<Long> executeUpdateAsync(Statement statement, UpdateOption... o
beforeReadOrQuery();
final ExecuteSqlRequest.Builder builder =
getExecuteSqlRequestBuilder(
statement, QueryMode.NORMAL, Options.fromUpdateOptions(options));
statement,
QueryMode.NORMAL,
Options.fromUpdateOptions(options),
/* withTransactionSelector = */ true);
final ApiFuture<com.google.spanner.v1.ResultSet> resultSet;
try {
// Register the update as an async operation that must finish before the transaction may
@@ -90,7 +90,10 @@ public void executeSqlRequestBuilderWithoutQueryOptions() {
ExecuteSqlRequest request =
context
.getExecuteSqlRequestBuilder(
Statement.of("SELECT FOO FROM BAR"), QueryMode.NORMAL, Options.fromQueryOptions())
Statement.of("SELECT FOO FROM BAR"),
QueryMode.NORMAL,
Options.fromQueryOptions(),
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions()).isEqualTo(defaultQueryOptions);
@@ -105,7 +108,8 @@ public void executeSqlRequestBuilderWithQueryOptions() {
.withQueryOptions(QueryOptions.newBuilder().setOptimizerVersion("2.0").build())
.build(),
QueryMode.NORMAL,
Options.fromQueryOptions())
Options.fromQueryOptions(),
true)
.build();
assertThat(request.getSql()).isEqualTo("SELECT FOO FROM BAR");
assertThat(request.getQueryOptions().getOptimizerVersion()).isEqualTo("2.0");
@@ -257,6 +257,130 @@ public Long run(TransactionContext transaction) throws Exception {
assertThat(countTransactionsStarted()).isEqualTo(2);
}

@Test
public void testInlinedBeginFirstUpdateAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
UPDATE_STATEMENT,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.update(UPDATE_STATEMENT, UPDATE_COUNT));
}
return transaction.executeUpdate(UPDATE_STATEMENT);
}
});
assertThat(updateCount).isEqualTo(UPDATE_COUNT);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryAborts() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
boolean firstAttempt = true;

@Override
public Long run(TransactionContext transaction) throws Exception {
if (firstAttempt) {
firstAttempt = false;
mockSpanner.putStatementResult(
StatementResult.exception(
SELECT1,
mockSpanner.createAbortedException(
ByteString.copyFromUtf8("some-tx"))));
} else {
mockSpanner.putStatementResult(
StatementResult.query(SELECT1, SELECT1_RESULTSET));
}
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(updateCount).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(1);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstQueryReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setExecuteStreamingSqlExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs = transaction.executeQuery(SELECT1)) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ExecuteSqlRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginFirstReadReturnsUnavailable() {
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
mockSpanner.setStreamingReadExecutionTime(
SimulatedExecutionTime.ofStreamException(Status.UNAVAILABLE.asRuntimeException(), 0));
long value =
client
.readWriteTransaction()
.run(
new TransactionCallable<Long>() {
@Override
public Long run(TransactionContext transaction) throws Exception {
// The first attempt will return UNAVAILABLE and retry internally.
try (ResultSet rs =
transaction.read("FOO", KeySet.all(), Arrays.asList("ID"))) {
while (rs.next()) {
return rs.getLong(0);
}
}
return 0L;
}
});
assertThat(value).isEqualTo(1L);
assertThat(countRequests(BeginTransactionRequest.class)).isEqualTo(0);
assertThat(countRequests(ReadRequest.class)).isEqualTo(2);
assertThat(countRequests(CommitRequest.class)).isEqualTo(1);
}

@Test
public void testInlinedBeginTxWithQuery() {
DatabaseClient client =
@@ -285,8 +409,7 @@ public Long run(TransactionContext transaction) throws Exception {

@Test
public void testInlinedBeginTxWithRead() {
DatabaseClient client =
spanner.getDatabaseClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE]"));
DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
long updateCount =
client
.readWriteTransaction()

0 comments on commit c7dc6e6

Please sign in to comment.