@@ -463,13 +463,9 @@ TransactionSelector getTransactionSelector() {
// Aborted error if the call that included the BeginTransaction option fails. The
// Aborted error will cause the entire transaction to be retried, and the retry will use
// a separate BeginTransaction RPC.
if (trackTransactionStarter ) {
TransactionSelector .newBuilder ()
.setId (tx .get (waitForTransactionTimeoutMillis , TimeUnit .MILLISECONDS ))
.build ();
} else {
TransactionSelector .newBuilder ().setId (tx .get ()).build ();
}
TransactionSelector .newBuilder ()
.setId (tx .get (waitForTransactionTimeoutMillis , TimeUnit .MILLISECONDS ))
.build ();
}
} catch (ExecutionException e ) {
if (e .getCause () instanceof AbortedException ) {
@@ -479,11 +475,15 @@ TransactionSelector getTransactionSelector() {
}
throw SpannerExceptionFactory .newSpannerException (e .getCause ());
} catch (TimeoutException e ) {
// Throw an ABORTED exception to force a retry of the transaction if no transaction
// has been returned by the first statement.
SpannerException se =
SpannerExceptionFactory .newSpannerException (
ErrorCode .DEADLINE_EXCEEDED ,
"Timeout while waiting for a transaction to be returned by another statement. "
+ "See the suppressed exception for the stacktrace of the caller that should return a transaction" ,
ErrorCode .ABORTED ,
"Timeout while waiting for a transaction to be returned by another statement."
+ (trackTransactionStarter
? " See the suppressed exception for the stacktrace of the caller that should return a transaction"
: "" ),
e );
if (transactionStarter != null ) {
se .addSuppressed (transactionStarter );
@@ -498,12 +498,20 @@ TransactionSelector getTransactionSelector() {
}
@ Override
public void onTransactionMetadata (Transaction transaction ) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if (this .transactionId == null && transaction != null && transaction .getId () != null ) {
this .transactionId = transaction .getId ();
this .transactionIdFuture .set (transaction .getId ());
public void onTransactionMetadata (Transaction transaction , boolean shouldIncludeId ) {
Preconditions .checkNotNull (transaction );
if (transaction .getId () != ByteString .EMPTY ) {
// A transaction has been returned by a statement that was executed. Set the id of the
// transaction on this instance and release the lock to allow other statements to proceed.
if ((transactionIdFuture == null || !this .transactionIdFuture .isDone ())
&& this .transactionId == null ) {
this .transactionId = transaction .getId ();
this .transactionIdFuture .set (transaction .getId ());
}
} else if (shouldIncludeId ) {
// The statement should have returned a transaction.
throw SpannerExceptionFactory .newSpannerException (
ErrorCode .FAILED_PRECONDITION , AbstractReadContext .NO_TRANSACTION_RETURNED_MSG );
}
}
@@ -580,17 +588,18 @@ public long executeUpdate(Statement statement, UpdateOption... options) {
com .google .spanner .v1 .ResultSet resultSet =
rpc .executeQuery (builder .build (), session .getOptions ());
if (resultSet .getMetadata ().hasTransaction ()) {
onTransactionMetadata (resultSet .getMetadata ().getTransaction ());
onTransactionMetadata (
resultSet .getMetadata ().getTransaction (), builder .getTransaction ().hasBegin ());
}
if (!resultSet .hasStats ()) {
throw new IllegalArgumentException (
"DML response missing stats possibly due to non-DML statement as input" );
}
// For standard DML, using the exact row count.
return resultSet .getStats ().getRowCountExact ();
} catch (SpannerException e ) {
onError (e , builder . hasTransaction () && builder .getTransaction ().hasBegin ());
throw e ;
} catch (Throwable t ) {
onError (SpannerExceptionFactory . asSpannerException ( t ), builder .getTransaction ().hasBegin ());
throw t ;
}
}
@@ -621,6 +630,12 @@ public Long apply(ResultSet input) {
ErrorCode .INVALID_ARGUMENT ,
"DML response missing stats possibly due to non-DML statement as input" );
}
if (builder .getTransaction ().hasBegin ()
&& !(input .getMetadata ().hasTransaction ()
&& input .getMetadata ().getTransaction ().getId () != ByteString .EMPTY )) {
throw SpannerExceptionFactory .newSpannerException (
ErrorCode .FAILED_PRECONDITION , NO_TRANSACTION_RETURNED_MSG );
}
// For standard DML, using the exact row count.
return input .getStats ().getRowCountExact ();
}
@@ -633,8 +648,8 @@ public Long apply(ResultSet input) {
new ApiFunction <Throwable , Long >() {
@ Override
public Long apply (Throwable input ) {
SpannerException e = SpannerExceptionFactory .newSpannerException (input );
onError (e , builder .hasTransaction () && builder . getTransaction ().hasBegin ());
SpannerException e = SpannerExceptionFactory .asSpannerException (input );
onError (e , builder .getTransaction ().hasBegin ());
throw e ;
}
},
@@ -645,9 +660,11 @@ public Long apply(Throwable input) {
public void run () {
try {
if (resultSet .get ().getMetadata ().hasTransaction ()) {
onTransactionMetadata (resultSet .get ().getMetadata ().getTransaction ());
onTransactionMetadata (
resultSet .get ().getMetadata ().getTransaction (),
builder .getTransaction ().hasBegin ());
}
} catch (ExecutionException | InterruptedException e ) {
} catch (Throwable e ) {
// Ignore this error here as it is handled by the future that is returned by the
// executeUpdateAsync method.
}
@@ -670,7 +687,9 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
for (int i = 0 ; i < response .getResultSetsCount (); ++i ) {
results [i ] = response .getResultSets (i ).getStats ().getRowCountExact ();
if (response .getResultSets (i ).getMetadata ().hasTransaction ()) {
onTransactionMetadata (response .getResultSets (i ).getMetadata ().getTransaction ());
onTransactionMetadata (
response .getResultSets (i ).getMetadata ().getTransaction (),
builder .getTransaction ().hasBegin ());
}
}
@@ -686,8 +705,8 @@ public long[] batchUpdate(Iterable<Statement> statements, UpdateOption... option
results );
}
return results ;
} catch (SpannerException e ) {
onError (e , builder . hasTransaction () && builder .getTransaction ().hasBegin ());
} catch (Throwable e ) {
onError (SpannerExceptionFactory . asSpannerException ( e ), builder .getTransaction ().hasBegin ());
throw e ;
}
}
@@ -718,7 +737,9 @@ public long[] apply(ExecuteBatchDmlResponse input) {
for (int i = 0 ; i < input .getResultSetsCount (); ++i ) {
results [i ] = input .getResultSets (i ).getStats ().getRowCountExact ();
if (input .getResultSets (i ).getMetadata ().hasTransaction ()) {
onTransactionMetadata (input .getResultSets (i ).getMetadata ().getTransaction ());
onTransactionMetadata (
input .getResultSets (i ).getMetadata ().getTransaction (),
builder .getTransaction ().hasBegin ());
}
}
// If one of the DML statements was aborted, we should throw an aborted exception.
@@ -743,10 +764,8 @@ public long[] apply(ExecuteBatchDmlResponse input) {
new ApiFunction <Throwable , long []>() {
@ Override
public long [] apply (Throwable input ) {
SpannerException e = SpannerExceptionFactory .newSpannerException (input );
onError (
SpannerExceptionFactory .newSpannerException (e .getCause ()),
builder .hasTransaction () && builder .getTransaction ().hasBegin ());
SpannerException e = SpannerExceptionFactory .asSpannerException (input );
onError (e , builder .getTransaction ().hasBegin ());
throw e ;
}
},