diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index caf0e06379e..a89090e34d9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -48,6 +48,7 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RequestOptions; @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) { this.session.onReadDone(); } + /** + * For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be + * present in the RPC response. In such cases, this method will be a no-op. + */ + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} + private ResultSet readInternal( String table, @Nullable String index, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 2cf93fb92ec..fdc0398d5fe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -27,6 +27,7 @@ import com.google.protobuf.ListValue; import com.google.protobuf.ProtocolMessageEnum; import com.google.protobuf.Value.KindCase; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.Transaction; import java.io.IOException; import java.io.Serializable; @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) /** Called when the read finishes normally. */ void onDone(boolean withBeginTransaction); + + /** + * Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token + * will be included if the read-write transaction is executed on a multiplexed session. + */ + void onPrecommitToken(MultiplexedSessionPrecommitToken token); } static final class LazyByteArray implements Serializable { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index be75c1e5c4e..23c9dd7c2d3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet> implements ProtobufR GrpcResultSet( CloseableIterator iterator, Listener listener, DecodeMode decodeMode) { - this.iterator = new GrpcValueIterator(iterator); + this.iterator = new GrpcValueIterator(iterator, listener); this.listener = listener; this.decodeMode = decodeMode; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java index 0a2e17bd2b5..1a3df8b9123 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; +import com.google.cloud.spanner.AbstractResultSet.Listener; import com.google.common.collect.AbstractIterator; import com.google.protobuf.ListValue; import com.google.protobuf.Value.KindCase; @@ -44,9 +45,11 @@ private enum StreamValue { private PartialResultSet current; private int pos; private ResultSetStats statistics; + private final Listener listener; - GrpcValueIterator(CloseableIterator stream) { + GrpcValueIterator(CloseableIterator stream, Listener listener) { this.stream = stream; + this.listener = listener; } @SuppressWarnings("unchecked") @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException { ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e); } } + // collect the precommit token from each PartialResultSet + if (current.hasPrecommitToken()) { + listener.onPrecommitToken(current.getPrecommitToken()); + } if (current.hasStats()) { statistics = current.getStats(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 48affde3558..92d9c50aa9e 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -46,6 +46,7 @@ import com.google.spanner.v1.ExecuteBatchDmlResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.ResultSetStats; @@ -179,6 +180,11 @@ public void removeListener(Runnable listener) { @GuardedBy("committingLock") private volatile boolean committing; + private final Object precommitTokenLock = new Object(); + + @GuardedBy("precommitTokenLock") + private MultiplexedSessionPrecommitToken latestPrecommitToken; + @GuardedBy("lock") private volatile SettableApiFuture finishedAsyncOperations = SettableApiFuture.create(); @@ -439,6 +445,10 @@ public void run() { } requestBuilder.setRequestOptions(requestOptionsBuilder.build()); } + if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) { + // Set the precommit token in the CommitRequest for multiplexed sessions. + requestBuilder.setPrecommitToken(getLatestPrecommitToken()); + } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); final ApiFuture commitFuture; @@ -643,6 +653,25 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } } + /** + * In read-write transactions, the precommit token with the highest sequence number from this + * transaction attempt will be tracked and included in the + * [Commit][google.spanner.v1.Spanner.Commit] request for the transaction. + */ + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) { + if (token == null) { + return; + } + synchronized (precommitTokenLock) { + if (this.latestPrecommitToken == null + || token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) { + this.latestPrecommitToken = token; + txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken); + } + } + } + @Nullable String getTransactionTag() { if (this.options.hasTag()) { @@ -651,6 +680,13 @@ String getTransactionTag() { return null; } + @Nullable + MultiplexedSessionPrecommitToken getLatestPrecommitToken() { + synchronized (precommitTokenLock) { + return this.latestPrecommitToken; + } + } + @Override public SpannerException onError(SpannerException e, boolean withBeginTransaction) { e = super.onError(e, withBeginTransaction); @@ -829,6 +865,9 @@ private ResultSet internalExecuteUpdate( throw new IllegalArgumentException( "DML response missing stats possibly due to non-DML statement as input"); } + if (resultSet.hasPrecommitToken()) { + onPrecommitToken(resultSet.getPrecommitToken()); + } return resultSet; } catch (Throwable t) { throw onError( @@ -903,6 +942,9 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... u resultSet.get().getMetadata().getTransaction(), builder.getTransaction().hasBegin()); } + if (resultSet.get().hasPrecommitToken()) { + onPrecommitToken(resultSet.get().getPrecommitToken()); + } } catch (Throwable e) { // Ignore this error here as it is handled by the future that is returned by the // executeUpdateAsync method. @@ -958,6 +1000,10 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update } } + if (response.hasPrecommitToken()) { + onPrecommitToken(response.getPrecommitToken()); + } + // If one of the DML statements was aborted, we should throw an aborted exception. // In all other cases, we should throw a BatchUpdateException. if (response.getStatus().getCode() == Code.ABORTED_VALUE) { @@ -1022,6 +1068,9 @@ public ApiFuture batchUpdateAsync( builder.getTransaction().hasBegin()); } } + if (batchDmlResponse.hasPrecommitToken()) { + onPrecommitToken(batchDmlResponse.getPrecommitToken()); + } // If one of the DML statements was aborted, we should throw an aborted exception. // In all other cases, we should throw a BatchUpdateException. if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 62336163eaf..59a18a3ab79 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.QueryPlan; import com.google.spanner.v1.ResultSetMetadata; @@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 9f0a2822d87..014ab7e94e3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -54,6 +54,7 @@ import com.google.spanner.v1.GetSessionRequest; import com.google.spanner.v1.ListSessionsRequest; import com.google.spanner.v1.ListSessionsResponse; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Partition; import com.google.spanner.v1.PartitionOptions; @@ -197,10 +198,15 @@ private static class PartialResultSetsIterator implements Iterator transactionCounters = new ConcurrentHashMap<>(); private ConcurrentMap> partitionTokens = new ConcurrentHashMap<>(); private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); + + // Stores the latest sequence number needed for the precommit token. + // The transaction entry is created only if the transaction is read-write and executed on a + // multiplexed session. + private static ConcurrentMap transactionSequenceNo = + new ConcurrentHashMap<>(); private int maxNumSessionsInOneBatch = 100; private int maxTotalSessions = Integer.MAX_VALUE; private Iterable batchWriteResult = new ArrayList<>(); @@ -1020,7 +1035,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp throw result.getException(); case RESULT_SET: returnResultSet( - result.getResultSet(), transactionId, request.getTransaction(), responseObserver); + result.getResultSet(), + transactionId, + request.getTransaction(), + responseObserver, + session); break; case UPDATE_COUNT: if (isPartitionedDmlTransaction(transactionId)) { @@ -1033,7 +1052,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp .build()) .build()); } else { - responseObserver.onNext( + ResultSet.Builder resultSetBuilder = ResultSet.newBuilder() .setStats( ResultSetStats.newBuilder() @@ -1045,8 +1064,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp ignoreNextInlineBeginRequest.getAndSet(false) ? Transaction.getDefaultInstance() : Transaction.newBuilder().setId(transactionId).build()) - .build()) - .build()); + .build()); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); + } + responseObserver.onNext(resultSetBuilder.build()); } break; default: @@ -1064,7 +1086,8 @@ private void returnResultSet( ResultSet resultSet, ByteString transactionId, TransactionSelector transactionSelector, - StreamObserver responseObserver) { + StreamObserver responseObserver, + Session session) { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId != null) { metadata = @@ -1079,7 +1102,12 @@ private void returnResultSet( Transaction transaction = getTemporaryTransactionOrNull(transactionSelector); metadata = metadata.toBuilder().setTransaction(transaction).build(); } - resultSet = resultSet.toBuilder().setMetadata(metadata).build(); + ResultSet.Builder resultSetBuilder = resultSet.toBuilder(); + resultSetBuilder.setMetadata(metadata); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); + } + resultSet = resultSetBuilder.build(); responseObserver.onNext(resultSet); } @@ -1174,6 +1202,9 @@ public void executeBatchDml( .build()); } builder.setStatus(status); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken(transactionId)); + } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { @@ -1242,7 +1273,8 @@ public void executeStreamingSql( transactionId, request.getTransaction(), responseObserver, - getExecuteStreamingSqlExecutionTime()); + getExecuteStreamingSqlExecutionTime(), + session.getMultiplexed()); break; case UPDATE_COUNT: if (isPartitioned) { @@ -1612,7 +1644,7 @@ public void read(final ReadRequest request, StreamObserver responseOb cols); StatementResult res = getResult(statement); returnResultSet( - res.getResultSet(), transactionId, request.getTransaction(), responseObserver); + res.getResultSet(), transactionId, request.getTransaction(), responseObserver, session); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { responseObserver.onError(e); @@ -1670,7 +1702,8 @@ public void streamingRead( transactionId, request.getTransaction(), responseObserver, - getStreamingReadExecutionTime()); + getStreamingReadExecutionTime(), + session.getMultiplexed()); } catch (StatusRuntimeException e) { responseObserver.onError(e); } catch (Throwable t) { @@ -1683,7 +1716,8 @@ private void returnPartialResultSet( ByteString transactionId, TransactionSelector transactionSelector, StreamObserver responseObserver, - SimulatedExecutionTime executionTime) + SimulatedExecutionTime executionTime, + boolean isMultiplexedSession) throws Exception { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId == null) { @@ -1700,7 +1734,11 @@ private void returnPartialResultSet( .build(); } resultSet = resultSet.toBuilder().setMetadata(metadata).build(); - PartialResultSetsIterator iterator = new PartialResultSetsIterator(resultSet); + PartialResultSetsIterator iterator = + new PartialResultSetsIterator( + resultSet, + isMultiplexedSession && isReadWriteTransaction(transactionId), + transactionId); long index = 0L; while (iterator.hasNext()) { SimulatedExecutionTime.checkStreamException( @@ -2034,6 +2072,7 @@ private void commitTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } @Override @@ -2065,6 +2104,7 @@ void rollbackTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } void markAbortedTransaction(ByteString transactionId) { @@ -2072,6 +2112,7 @@ void markAbortedTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } @Override @@ -2276,6 +2317,7 @@ public void reset() { transactionCounters = new ConcurrentHashMap<>(); partitionTokens = new ConcurrentHashMap<>(); transactionLastUsed = new ConcurrentHashMap<>(); + transactionSequenceNo = new ConcurrentHashMap<>(); numSessionsCreated.set(0); stickyGlobalExceptions = false; @@ -2447,4 +2489,30 @@ Session getSession(String name) { } return null; } + + static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) { + return getPrecommitToken("ResultSetPrecommitToken", transactionId); + } + + static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken( + ByteString transactionId) { + return getPrecommitToken("PartialResultSetPrecommitToken", transactionId); + } + + static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken( + ByteString transactionId) { + return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId); + } + + static MultiplexedSessionPrecommitToken getPrecommitToken( + String value, ByteString transactionId) { + transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0)); + + // Generates an incrementing sequence number + int seqNum = transactionSequenceNo.get(transactionId).incrementAndGet(); + return MultiplexedSessionPrecommitToken.newBuilder() + .setPrecommitToken(ByteString.copyFromUtf8(value)) + .setSeqNum(seqNum) + .build(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index adf7ed2a403..c7d7b697d64 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -41,6 +41,7 @@ import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; @@ -926,6 +927,162 @@ public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin() { .getMultiplexedSessionPreviousTransactionId()); } + @Test + public void testPrecommitTokenForResultSet() { + // This test verifies that the precommit token received from the ResultSet is properly tracked + // and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + Long count = + client + .readWriteTransaction() + .run( + transaction -> { + long res = transaction.executeUpdate(UPDATE_STATEMENT); + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return res; + }); + + assertNotNull(count); + assertEquals(1, count.longValue()); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testPrecommitTokenForExecuteBatchDmlResponse() { + // This test verifies that the precommit token received from the ExecuteBatchDmlResponse is + // properly tracked and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + long[] count = + client + .readWriteTransaction() + .run( + transaction -> { + long[] res = transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return res; + }); + + assertNotNull(count); + assertEquals(1, count.length); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testPrecommitTokenForPartialResultSet() { + // This test verifies that the precommit token received from the PartialResultSet is properly + // tracked and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return null; + }); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testTxnTracksPrecommitTokenWithLatestSeqNo() { + // This test ensures that the read-write transaction tracks the precommit token with the + // highest sequence number and sets it in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken) + transaction.executeUpdate(UPDATE_STATEMENT); + + // Returns a PartialResultSet containing the precommit token + // (PartialResultSetPrecommitToken) + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + + // Returns an ExecuteBatchDmlResponse containing the precommit token + // (ExecuteBatchDmlResponsePrecommitToken). + // Since this is the last request received by the mock Spanner, it should be the most + // recent precommit token tracked by the transaction context. + transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + // Verify that the latest precommit token with highest sequence number is tracked in + // the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return null; + }); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index c973b7e471e..2a399e6f486 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -24,6 +24,7 @@ import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.io.Resources; import com.google.protobuf.util.JsonFormat; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Transaction; import java.math.BigDecimal; @@ -56,6 +57,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} } public ReadFormatTestRunner(Class clazz) throws InitializationError { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java index fc494c6f3ff..404973336ba 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java @@ -19,6 +19,7 @@ import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; import com.google.cloud.spanner.AbstractResultSet.Listener; import com.google.protobuf.ListValue; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Transaction; import java.util.Iterator; @@ -82,6 +83,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} }); } }