Skip to content
Permalink
Browse files
fix: check for timeout in connection after last statement finished (#…
…1086)

The check whether the previous statement timed out in the Connection API was
done when a statement was submitted to the connection, and not when the statement
was executed. That could cause a race condition, as statements are executed using
a separate thread, while submitting a statement is done using the main thread.

This could cause a statement to return an error with code ABORTED instead of
FAILED_PRECONDITION. A statement on a read/write transaction would always return
an error when a/the previous statement timed out, only the error code could
be different depending on whether the race condition occurred or not. This is
now fixed so that the error code is always FAILED_PRECONDITION and the error
indicates that a read/write transaction is no longer usable after a statement
timeout.

Fixes #1077
  • Loading branch information
olavloite committed Apr 25, 2021
1 parent 4a3829f commit 51d753c507e7248132eb5d6ea2c4b735542eda49
Showing with 74 additions and 62 deletions.
  1. +74 −62 google-cloud-spanner/src/main/java/com/google/cloud/spanner/connection/ReadWriteTransaction.java
@@ -201,6 +201,10 @@ private void checkValidState() {
+ "or "
+ UnitOfWorkState.ABORTED
+ " is allowed.");
checkTimedOut();
}

private void checkTimedOut() {
ConnectionPreconditions.checkState(
!timedOutOrCancelled,
"The last statement of this transaction timed out or was cancelled. "
@@ -313,34 +317,35 @@ public ApiFuture<ResultSet> executeQueryAsync(
res =
executeStatementAsync(
statement,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
ResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
delegate, statement, analyzeMode, options);
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedQuery(e, statement, analyzeMode, options);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
statement,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
ResultSet delegate =
DirectExecuteResultSet.ofResultSet(
internalExecuteQuery(statement, analyzeMode, options));
return createAndAddRetryResultSet(
delegate, statement, analyzeMode, options);
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedQuery(e, statement, analyzeMode, options);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteStreamingSqlMethod()));
} else {
res = super.executeQueryAsync(statement, analyzeMode, options);
}

ApiFutures.addCallback(
res,
new ApiFutureCallback<ResultSet>() {
@@ -368,26 +373,28 @@ public ApiFuture<Long> executeUpdateAsync(final ParsedStatement update) {
res =
executeStatementAsync(
update,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
update,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long updateCount =
get(txContextFuture).executeUpdate(update.getStatement());
createAndAddRetriableUpdate(update, updateCount);
return updateCount;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedUpdate(e, update);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
update,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long updateCount =
get(txContextFuture).executeUpdate(update.getStatement());
createAndAddRetriableUpdate(update, updateCount);
return updateCount;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedUpdate(e, update);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteSqlMethod()));
@@ -396,6 +403,7 @@ public ApiFuture<Long> executeUpdateAsync(final ParsedStatement update) {
executeStatementAsync(
update,
() -> {
checkTimedOut();
checkAborted();
return get(txContextFuture).executeUpdate(update.getStatement());
},
@@ -449,25 +457,27 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<ParsedStatement> updat
res =
executeStatementAsync(
EXECUTE_BATCH_UPDATE_STATEMENT,
() ->
runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
EXECUTE_BATCH_UPDATE_STATEMENT,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements);
createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
return updateCounts;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedBatchUpdate(e, updateStatements);
throw e;
}
}),
() -> {
checkTimedOut();
return runWithRetry(
() -> {
try {
getStatementExecutor()
.invokeInterceptors(
EXECUTE_BATCH_UPDATE_STATEMENT,
StatementExecutionStep.EXECUTE_STATEMENT,
ReadWriteTransaction.this);
long[] updateCounts = get(txContextFuture).batchUpdate(updateStatements);
createAndAddRetriableBatchUpdate(updateStatements, updateCounts);
return updateCounts;
} catch (AbortedException e) {
throw e;
} catch (SpannerException e) {
createAndAddFailedBatchUpdate(e, updateStatements);
throw e;
}
});
},
// ignore interceptors here as they are invoked in the Callable.
InterceptorsUsage.IGNORE_INTERCEPTORS,
ImmutableList.<MethodDescriptor<?, ?>>of(SpannerGrpc.getExecuteBatchDmlMethod()));
@@ -476,12 +486,12 @@ public ApiFuture<long[]> executeBatchUpdateAsync(Iterable<ParsedStatement> updat
executeStatementAsync(
EXECUTE_BATCH_UPDATE_STATEMENT,
() -> {
checkTimedOut();
checkAborted();
return get(txContextFuture).batchUpdate(updateStatements);
},
SpannerGrpc.getExecuteBatchDmlMethod());
}

ApiFutures.addCallback(
res,
new ApiFutureCallback<long[]>() {
@@ -546,6 +556,7 @@ public ApiFuture<Void> commitAsync() {
executeStatementAsync(
COMMIT_STATEMENT,
() -> {
checkTimedOut();
try {
return runWithRetry(
() -> {
@@ -574,6 +585,7 @@ public ApiFuture<Void> commitAsync() {
executeStatementAsync(
COMMIT_STATEMENT,
() -> {
checkTimedOut();
try {
return commitCallable.call();
} catch (Throwable t) {

0 comments on commit 51d753c

Please sign in to comment.