From a2aa2607fdbcd6b4ddfb5ce10dbd121b60a0f6de Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 11 Oct 2024 09:31:42 +0000 Subject: [PATCH 1/7] chore(spanner): tracking precommit token from response --- .../cloud/spanner/AbstractReadContext.java | 8 ++++ .../cloud/spanner/AbstractResultSet.java | 7 +++ .../google/cloud/spanner/GrpcResultSet.java | 2 +- .../cloud/spanner/GrpcValueIterator.java | 9 +++- .../cloud/spanner/TransactionRunnerImpl.java | 45 +++++++++++++++++++ 5 files changed, 69 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index caf0e06379e..a89090e34d9 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -48,6 +48,7 @@ import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.RequestOptions; @@ -893,6 +894,13 @@ public void onDone(boolean withBeginTransaction) { this.session.onReadDone(); } + /** + * For transactions other than read-write, the MultiplexedSessionPrecommitToken will not be + * present in the RPC response. In such cases, this method will be a no-op. + */ + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} + private ResultSet readInternal( String table, @Nullable String index, diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java index 2cf93fb92ec..fdc0398d5fe 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractResultSet.java @@ -27,6 +27,7 @@ import com.google.protobuf.ListValue; import com.google.protobuf.ProtocolMessageEnum; import com.google.protobuf.Value.KindCase; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.Transaction; import java.io.IOException; import java.io.Serializable; @@ -57,6 +58,12 @@ void onTransactionMetadata(Transaction transaction, boolean shouldIncludeId) /** Called when the read finishes normally. */ void onDone(boolean withBeginTransaction); + + /** + * Called when the RPC response contains a MultiplexedSessionPrecommitToken. A precommit token + * will be included if the read-write transaction is executed on a multiplexed session. + */ + void onPrecommitToken(MultiplexedSessionPrecommitToken token); } static final class LazyByteArray implements Serializable { diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java index be75c1e5c4e..23c9dd7c2d3 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcResultSet.java @@ -47,7 +47,7 @@ class GrpcResultSet extends AbstractResultSet> implements ProtobufR GrpcResultSet( CloseableIterator iterator, Listener listener, DecodeMode decodeMode) { - this.iterator = new GrpcValueIterator(iterator); + this.iterator = new GrpcValueIterator(iterator, listener); this.listener = listener; this.decodeMode = decodeMode; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java index 0a2e17bd2b5..1a3df8b9123 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/GrpcValueIterator.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; +import com.google.cloud.spanner.AbstractResultSet.Listener; import com.google.common.collect.AbstractIterator; import com.google.protobuf.ListValue; import com.google.protobuf.Value.KindCase; @@ -44,9 +45,11 @@ private enum StreamValue { private PartialResultSet current; private int pos; private ResultSetStats statistics; + private final Listener listener; - GrpcValueIterator(CloseableIterator stream) { + GrpcValueIterator(CloseableIterator stream, Listener listener) { this.stream = stream; + this.listener = listener; } @SuppressWarnings("unchecked") @@ -154,6 +157,10 @@ private boolean ensureReady(StreamValue requiredValue) throws SpannerException { ErrorCode.INTERNAL, "Invalid type metadata: " + e.getMessage(), e); } } + // collect the precommit token from each PartialResultSet + if (current.hasPrecommitToken()) { + listener.onPrecommitToken(current.getPrecommitToken()); + } if (current.hasStats()) { statistics = current.getStats(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index c8bf6dc833b..fa3db708465 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -46,6 +46,7 @@ import com.google.spanner.v1.ExecuteBatchDmlResponse; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.RequestOptions; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.ResultSetStats; @@ -171,6 +172,11 @@ public void removeListener(Runnable listener) { @GuardedBy("committingLock") private volatile boolean committing; + private final Object precommitTokenLock = new Object(); + + @GuardedBy("precommitTokenLock") + private MultiplexedSessionPrecommitToken latestPrecommitToken; + @GuardedBy("lock") private volatile SettableApiFuture finishedAsyncOperations = SettableApiFuture.create(); @@ -625,6 +631,24 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude } } + /** + * In read-write transactions, the precommit token with the highest sequence number from this + * transaction attempt will be tracked and included in the + * [Commit][google.spanner.v1.Spanner.Commit] request for the transaction. + */ + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) { + if (token == null) return; + synchronized (precommitTokenLock) { + if (this.latestPrecommitToken == null + || token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) { + this.latestPrecommitToken = token; + System.out.println("Updating precommit token to " + this.latestPrecommitToken); + txnLogger.log(Level.ALL, "Updating precommit token to " + this.latestPrecommitToken); + } + } + } + @Nullable String getTransactionTag() { if (this.options.hasTag()) { @@ -633,6 +657,13 @@ String getTransactionTag() { return null; } + @Nullable + MultiplexedSessionPrecommitToken getLatestPrecommitToken() { + synchronized (precommitTokenLock) { + return this.latestPrecommitToken; + } + } + @Override public SpannerException onError(SpannerException e, boolean withBeginTransaction) { e = super.onError(e, withBeginTransaction); @@ -811,6 +842,9 @@ private ResultSet internalExecuteUpdate( throw new IllegalArgumentException( "DML response missing stats possibly due to non-DML statement as input"); } + if (resultSet.hasPrecommitToken()) { + onPrecommitToken(resultSet.getPrecommitToken()); + } return resultSet; } catch (Throwable t) { throw onError( @@ -885,6 +919,9 @@ public ApiFuture executeUpdateAsync(Statement statement, UpdateOption... u resultSet.get().getMetadata().getTransaction(), builder.getTransaction().hasBegin()); } + if (resultSet.get().hasPrecommitToken()) { + onPrecommitToken(resultSet.get().getPrecommitToken()); + } } catch (Throwable e) { // Ignore this error here as it is handled by the future that is returned by the // executeUpdateAsync method. @@ -940,6 +977,11 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update } } + // TODO(sriharshach): check if we need to get precommit_token from response.getResultSets + if (response.hasPrecommitToken()) { + onPrecommitToken(response.getPrecommitToken()); + } + // If one of the DML statements was aborted, we should throw an aborted exception. // In all other cases, we should throw a BatchUpdateException. if (response.getStatus().getCode() == Code.ABORTED_VALUE) { @@ -1004,6 +1046,9 @@ public ApiFuture batchUpdateAsync( builder.getTransaction().hasBegin()); } } + if (batchDmlResponse.hasPrecommitToken()) { + onPrecommitToken(batchDmlResponse.getPrecommitToken()); + } // If one of the DML statements was aborted, we should throw an aborted exception. // In all other cases, we should throw a BatchUpdateException. if (batchDmlResponse.getStatus().getCode() == Code.ABORTED_VALUE) { From 3f9e5ec597c8e218107299d09658511580e7cccb Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 14 Oct 2024 11:37:47 +0000 Subject: [PATCH 2/7] chore(spanner): set precommit token in CommitRequest --- .../java/com/google/cloud/spanner/TransactionRunnerImpl.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index fa3db708465..aa230a75937 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -429,6 +429,10 @@ public void run() { } requestBuilder.setRequestOptions(requestOptionsBuilder.build()); } + if (session.getIsMultiplexed() && getLatestPrecommitToken() != null) { + // Set the precommit token in the CommitRequest for multiplexed sessions. + requestBuilder.setPrecommitToken(getLatestPrecommitToken()); + } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); final ApiFuture commitFuture; From 9981932f6f576f81e712ee8cadfa635c95b41127 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Tue, 15 Oct 2024 06:27:16 +0000 Subject: [PATCH 3/7] chore(spanner): add precommit token support in mock spanner impl and add mock spanner tests --- .../cloud/spanner/GrpcResultSetTest.java | 4 + .../cloud/spanner/MockSpannerServiceImpl.java | 65 +++++-- ...edSessionDatabaseClientMockServerTest.java | 163 ++++++++++++++++++ .../cloud/spanner/ReadFormatTestRunner.java | 4 + .../cloud/spanner/ResultSetsHelper.java | 4 + 5 files changed, 229 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java index 62336163eaf..59a18a3ab79 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/GrpcResultSetTest.java @@ -36,6 +36,7 @@ import com.google.common.collect.ImmutableMap; import com.google.protobuf.ByteString; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.QueryPlan; import com.google.spanner.v1.ResultSetMetadata; @@ -77,6 +78,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} } @Before diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 9f0a2822d87..1856df1b046 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -54,6 +54,7 @@ import com.google.spanner.v1.GetSessionRequest; import com.google.spanner.v1.ListSessionsRequest; import com.google.spanner.v1.ListSessionsResponse; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Partition; import com.google.spanner.v1.PartitionOptions; @@ -197,10 +198,12 @@ private static class PartialResultSetsIterator implements Iterator resp throw result.getException(); case RESULT_SET: returnResultSet( - result.getResultSet(), transactionId, request.getTransaction(), responseObserver); + result.getResultSet(), + transactionId, + request.getTransaction(), + responseObserver, + session); break; case UPDATE_COUNT: if (isPartitionedDmlTransaction(transactionId)) { @@ -1033,7 +1043,7 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp .build()) .build()); } else { - responseObserver.onNext( + ResultSet.Builder resultSetBuilder = ResultSet.newBuilder() .setStats( ResultSetStats.newBuilder() @@ -1045,8 +1055,11 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp ignoreNextInlineBeginRequest.getAndSet(false) ? Transaction.getDefaultInstance() : Transaction.newBuilder().setId(transactionId).build()) - .build()) - .build()); + .build()); + if (session.getMultiplexed()) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken()); + } + responseObserver.onNext(resultSetBuilder.build()); } break; default: @@ -1064,7 +1077,8 @@ private void returnResultSet( ResultSet resultSet, ByteString transactionId, TransactionSelector transactionSelector, - StreamObserver responseObserver) { + StreamObserver responseObserver, + Session session) { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId != null) { metadata = @@ -1080,6 +1094,9 @@ private void returnResultSet( metadata = metadata.toBuilder().setTransaction(transaction).build(); } resultSet = resultSet.toBuilder().setMetadata(metadata).build(); + if (session.getMultiplexed()) { + resultSet = resultSet.toBuilder().setPrecommitToken(getResultSetPrecommitToken()).build(); + } responseObserver.onNext(resultSet); } @@ -1174,6 +1191,9 @@ public void executeBatchDml( .build()); } builder.setStatus(status); + if (session.getMultiplexed()) { + builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken()); + } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { @@ -1242,7 +1262,8 @@ public void executeStreamingSql( transactionId, request.getTransaction(), responseObserver, - getExecuteStreamingSqlExecutionTime()); + getExecuteStreamingSqlExecutionTime(), + session.getMultiplexed()); break; case UPDATE_COUNT: if (isPartitioned) { @@ -1612,7 +1633,7 @@ public void read(final ReadRequest request, StreamObserver responseOb cols); StatementResult res = getResult(statement); returnResultSet( - res.getResultSet(), transactionId, request.getTransaction(), responseObserver); + res.getResultSet(), transactionId, request.getTransaction(), responseObserver, session); responseObserver.onCompleted(); } catch (StatusRuntimeException e) { responseObserver.onError(e); @@ -1670,7 +1691,8 @@ public void streamingRead( transactionId, request.getTransaction(), responseObserver, - getStreamingReadExecutionTime()); + getStreamingReadExecutionTime(), + session.getMultiplexed()); } catch (StatusRuntimeException e) { responseObserver.onError(e); } catch (Throwable t) { @@ -1683,7 +1705,8 @@ private void returnPartialResultSet( ByteString transactionId, TransactionSelector transactionSelector, StreamObserver responseObserver, - SimulatedExecutionTime executionTime) + SimulatedExecutionTime executionTime, + boolean isMultiplexedSession) throws Exception { ResultSetMetadata metadata = resultSet.getMetadata(); if (transactionId == null) { @@ -1700,7 +1723,8 @@ private void returnPartialResultSet( .build(); } resultSet = resultSet.toBuilder().setMetadata(metadata).build(); - PartialResultSetsIterator iterator = new PartialResultSetsIterator(resultSet); + PartialResultSetsIterator iterator = + new PartialResultSetsIterator(resultSet, isMultiplexedSession); long index = 0L; while (iterator.hasNext()) { SimulatedExecutionTime.checkStreamException( @@ -2447,4 +2471,23 @@ Session getSession(String name) { } return null; } + + static MultiplexedSessionPrecommitToken getResultSetPrecommitToken() { + return getPrecommitToken("ResultSetPrecommitToken", 1); + } + + static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken() { + return getPrecommitToken("PartialResultSetPrecommitToken", 3); + } + + static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken() { + return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", 2); + } + + static MultiplexedSessionPrecommitToken getPrecommitToken(String value, int seqNo) { + return MultiplexedSessionPrecommitToken.newBuilder() + .setPrecommitToken(ByteString.copyFromUtf8(value)) + .setSeqNum(seqNo) + .build(); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 2e412537882..e2b842ed935 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -36,9 +36,11 @@ import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.cloud.spanner.Options.RpcPriority; +import com.google.cloud.spanner.TransactionRunnerImpl.TransactionContextImpl; import com.google.cloud.spanner.connection.RandomResultSetGenerator; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.spanner.v1.CommitRequest; @@ -745,6 +747,167 @@ public void testAsyncRunnerIsNonBlockingWithMultiplexedSession() throws Exceptio assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testPrecommitTokenForResultSet() { + // This test verifies that the precommit token received from the ResultSet is properly tracked + // and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + Long count = + client + .readWriteTransaction() + .run( + transaction -> { + long res = transaction.executeUpdate(UPDATE_STATEMENT); + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return res; + }); + + assertNotNull(count); + assertEquals(1, count.longValue()); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testPrecommitTokenForExecuteBatchDmlResponse() { + // This test verifies that the precommit token received from the ExecuteBatchDmlResponse is + // properly tracked and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + long[] count = + client + .readWriteTransaction() + .run( + transaction -> { + long[] res = transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return res; + }); + + assertNotNull(count); + assertEquals(1, count.length); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testPrecommitTokenForPartialResultSet() { + // This test verifies that the precommit token received from the PartialResultSet is properly + // tracked and set in the CommitRequest. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + + // Verify that the latest precommit token is tracked in the transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return null; + }); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + + @Test + public void testTxnTracksPrecommitTokenWithLatestSeqNo() { + // This test ensures that the read-write transaction tracks the precommit token with the + // highest sequence number (in this case, PartialResultSetPrecommitToken) and sets it in the + // CommitRequest. + // ResultSetPrecommitToken -> Seq no 1 + // ExecuteBatchDmlResponsePrecommitToken -> Seq no 2 + // PartialResultSetPrecommitToken -> Seq no 3 + + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken) with + // Seq no 1 + transaction.executeUpdate(UPDATE_STATEMENT); + + // Returns a PartialResultSet containing the precommit token + // (PartialResultSetPrecommitToken) with Seq no 3 + ResultSet resultSet = transaction.executeQuery(STATEMENT); + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + + // Returns a ExecuteBatchDmlResponse containing the precommit token + // (ExecuteBatchDmlResponsePrecommitToken) with Seq no 2 + transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); + + // Verify that the latest precommit token with highest sequence number is tracked in + // the + // transaction context. + TransactionContextImpl impl = (TransactionContextImpl) transaction; + assertNotNull(impl.getLatestPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + impl.getLatestPrecommitToken().getPrecommitToken()); + return null; + }); + + // Verify that the latest precommit token is set in the CommitRequest + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(1, commitRequests.size()); + assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java index c973b7e471e..2a399e6f486 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ReadFormatTestRunner.java @@ -24,6 +24,7 @@ import com.google.cloud.spanner.spi.v1.SpannerRpc; import com.google.common.io.Resources; import com.google.protobuf.util.JsonFormat; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Transaction; import java.math.BigDecimal; @@ -56,6 +57,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} } public ReadFormatTestRunner(Class clazz) throws InitializationError { diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java index fc494c6f3ff..404973336ba 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ResultSetsHelper.java @@ -19,6 +19,7 @@ import com.google.cloud.spanner.AbstractResultSet.CloseableIterator; import com.google.cloud.spanner.AbstractResultSet.Listener; import com.google.protobuf.ListValue; +import com.google.spanner.v1.MultiplexedSessionPrecommitToken; import com.google.spanner.v1.PartialResultSet; import com.google.spanner.v1.Transaction; import java.util.Iterator; @@ -82,6 +83,9 @@ public SpannerException onError(SpannerException e, boolean withBeginTransaction @Override public void onDone(boolean withBeginTransaction) {} + + @Override + public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} }); } } From 741f016c769ca6f8bf0d6c7b53be33f7a878d745 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 16 Oct 2024 05:21:23 +0000 Subject: [PATCH 4/7] chore(spanner): review comments --- .../com/google/cloud/spanner/TransactionRunnerImpl.java | 8 ++++---- .../com/google/cloud/spanner/MockSpannerServiceImpl.java | 6 ++++-- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index a12ceed3195..c7280258977 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -642,13 +642,14 @@ public void onTransactionMetadata(Transaction transaction, boolean shouldInclude */ @Override public void onPrecommitToken(MultiplexedSessionPrecommitToken token) { - if (token == null) return; + if (token == null) { + return; + } synchronized (precommitTokenLock) { if (this.latestPrecommitToken == null || token.getSeqNum() > this.latestPrecommitToken.getSeqNum()) { this.latestPrecommitToken = token; - System.out.println("Updating precommit token to " + this.latestPrecommitToken); - txnLogger.log(Level.ALL, "Updating precommit token to " + this.latestPrecommitToken); + txnLogger.log(Level.FINE, "Updating precommit token to " + this.latestPrecommitToken); } } } @@ -981,7 +982,6 @@ public long[] batchUpdate(Iterable statements, UpdateOption... update } } - // TODO(sriharshach): check if we need to get precommit_token from response.getResultSets if (response.hasPrecommitToken()) { onPrecommitToken(response.getPrecommitToken()); } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 1856df1b046..44ccf8182d9 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -1093,10 +1093,12 @@ private void returnResultSet( Transaction transaction = getTemporaryTransactionOrNull(transactionSelector); metadata = metadata.toBuilder().setTransaction(transaction).build(); } - resultSet = resultSet.toBuilder().setMetadata(metadata).build(); + ResultSet.Builder resultSetBuilder = resultSet.toBuilder(); + resultSetBuilder.setMetadata(metadata); if (session.getMultiplexed()) { - resultSet = resultSet.toBuilder().setPrecommitToken(getResultSetPrecommitToken()).build(); + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken()); } + resultSet = resultSetBuilder.build(); responseObserver.onNext(resultSet); } From 12b7c044014f8ded2e195e0b223817d42830c40d Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 16 Oct 2024 07:27:06 +0000 Subject: [PATCH 5/7] chore(spanner): update precommit token logic in mock spanner --- .../cloud/spanner/MockSpannerServiceImpl.java | 63 +++++++++++++------ ...edSessionDatabaseClientMockServerTest.java | 25 +++----- 2 files changed, 53 insertions(+), 35 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 44ccf8182d9..5986f928808 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -198,12 +198,15 @@ private static class PartialResultSetsIterator implements Iterator transactionCounters = new ConcurrentHashMap<>(); private ConcurrentMap> partitionTokens = new ConcurrentHashMap<>(); private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); + + // Stores latest sequence number required for precommit token. + private static ConcurrentMap transactionSequenceNo = + new ConcurrentHashMap<>(); private int maxNumSessionsInOneBatch = 100; private int maxTotalSessions = Integer.MAX_VALUE; private Iterable batchWriteResult = new ArrayList<>(); @@ -1056,8 +1063,8 @@ public void executeSql(ExecuteSqlRequest request, StreamObserver resp ? Transaction.getDefaultInstance() : Transaction.newBuilder().setId(transactionId).build()) .build()); - if (session.getMultiplexed()) { - resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken()); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); } responseObserver.onNext(resultSetBuilder.build()); } @@ -1095,8 +1102,8 @@ private void returnResultSet( } ResultSet.Builder resultSetBuilder = resultSet.toBuilder(); resultSetBuilder.setMetadata(metadata); - if (session.getMultiplexed()) { - resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken()); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + resultSetBuilder.setPrecommitToken(getResultSetPrecommitToken(transactionId)); } resultSet = resultSetBuilder.build(); responseObserver.onNext(resultSet); @@ -1193,8 +1200,8 @@ public void executeBatchDml( .build()); } builder.setStatus(status); - if (session.getMultiplexed()) { - builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken()); + if (session.getMultiplexed() && isReadWriteTransaction(transactionId)) { + builder.setPrecommitToken(getExecuteBatchDmlResponsePrecommitToken(transactionId)); } responseObserver.onNext(builder.build()); responseObserver.onCompleted(); @@ -1726,7 +1733,10 @@ private void returnPartialResultSet( } resultSet = resultSet.toBuilder().setMetadata(metadata).build(); PartialResultSetsIterator iterator = - new PartialResultSetsIterator(resultSet, isMultiplexedSession); + new PartialResultSetsIterator( + resultSet, + isMultiplexedSession && isReadWriteTransaction(transactionId), + transactionId); long index = 0L; while (iterator.hasNext()) { SimulatedExecutionTime.checkStreamException( @@ -2060,6 +2070,7 @@ private void commitTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } @Override @@ -2091,6 +2102,7 @@ void rollbackTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } void markAbortedTransaction(ByteString transactionId) { @@ -2098,6 +2110,7 @@ void markAbortedTransaction(ByteString transactionId) { transactions.remove(transactionId); isPartitionedDmlTransaction.remove(transactionId); transactionLastUsed.remove(transactionId); + transactionSequenceNo.remove(transactionId); } @Override @@ -2302,6 +2315,7 @@ public void reset() { transactionCounters = new ConcurrentHashMap<>(); partitionTokens = new ConcurrentHashMap<>(); transactionLastUsed = new ConcurrentHashMap<>(); + transactionSequenceNo = new ConcurrentHashMap<>(); numSessionsCreated.set(0); stickyGlobalExceptions = false; @@ -2474,22 +2488,31 @@ Session getSession(String name) { return null; } - static MultiplexedSessionPrecommitToken getResultSetPrecommitToken() { - return getPrecommitToken("ResultSetPrecommitToken", 1); + static MultiplexedSessionPrecommitToken getResultSetPrecommitToken(ByteString transactionId) { + return getPrecommitToken("ResultSetPrecommitToken", transactionId); } - static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken() { - return getPrecommitToken("PartialResultSetPrecommitToken", 3); + static MultiplexedSessionPrecommitToken getPartialResultSetPrecommitToken( + ByteString transactionId) { + return getPrecommitToken("PartialResultSetPrecommitToken", transactionId); } - static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken() { - return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", 2); + static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken( + ByteString transactionId) { + return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId); } - static MultiplexedSessionPrecommitToken getPrecommitToken(String value, int seqNo) { + static MultiplexedSessionPrecommitToken getPrecommitToken( + String value, ByteString transactionId) { + if (!transactionSequenceNo.containsKey(transactionId)) { + transactionSequenceNo.put(transactionId, new AtomicInteger(0)); + } + + // Generates an incrementing sequence number + int seqNum = transactionSequenceNo.get(transactionId).addAndGet(1); return MultiplexedSessionPrecommitToken.newBuilder() .setPrecommitToken(ByteString.copyFromUtf8(value)) - .setSeqNum(seqNo) + .setSeqNum(seqNum) .build(); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index e2b842ed935..be45f3cb53f 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -858,12 +858,7 @@ public void testPrecommitTokenForPartialResultSet() { @Test public void testTxnTracksPrecommitTokenWithLatestSeqNo() { // This test ensures that the read-write transaction tracks the precommit token with the - // highest sequence number (in this case, PartialResultSetPrecommitToken) and sets it in the - // CommitRequest. - // ResultSetPrecommitToken -> Seq no 1 - // ExecuteBatchDmlResponsePrecommitToken -> Seq no 2 - // PartialResultSetPrecommitToken -> Seq no 3 - + // highest sequence number and sets it in the CommitRequest. DatabaseClientImpl client = (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); @@ -871,29 +866,29 @@ public void testTxnTracksPrecommitTokenWithLatestSeqNo() { .readWriteTransaction() .run( transaction -> { - // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken) with - // Seq no 1 + // Returns a ResultSet containing the precommit token (ResultSetPrecommitToken) transaction.executeUpdate(UPDATE_STATEMENT); // Returns a PartialResultSet containing the precommit token - // (PartialResultSetPrecommitToken) with Seq no 3 + // (PartialResultSetPrecommitToken) ResultSet resultSet = transaction.executeQuery(STATEMENT); //noinspection StatementWithEmptyBody while (resultSet.next()) { // ignore } - // Returns a ExecuteBatchDmlResponse containing the precommit token - // (ExecuteBatchDmlResponsePrecommitToken) with Seq no 2 + // Returns an ExecuteBatchDmlResponse containing the precommit token + // (ExecuteBatchDmlResponsePrecommitToken). + // Since this is the last request received by the mock Spanner, it should be the most + // recent precommit token tracked by the transaction context. transaction.batchUpdate(Lists.newArrayList(UPDATE_STATEMENT)); // Verify that the latest precommit token with highest sequence number is tracked in - // the - // transaction context. + // the transaction context. TransactionContextImpl impl = (TransactionContextImpl) transaction; assertNotNull(impl.getLatestPrecommitToken()); assertEquals( - ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), impl.getLatestPrecommitToken().getPrecommitToken()); return null; }); @@ -904,7 +899,7 @@ public void testTxnTracksPrecommitTokenWithLatestSeqNo() { assertTrue(mockSpanner.getSession(commitRequests.get(0).getSession()).getMultiplexed()); assertNotNull(commitRequests.get(0).getPrecommitToken()); assertEquals( - ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + ByteString.copyFromUtf8("ExecuteBatchDmlResponsePrecommitToken"), commitRequests.get(0).getPrecommitToken().getPrecommitToken()); } From d80fda0f7d42855a9e2110ce06ffb796d06082c4 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Wed, 16 Oct 2024 07:35:16 +0000 Subject: [PATCH 6/7] chore(spanner): lint fix --- .../java/com/google/cloud/spanner/MockSpannerServiceImpl.java | 4 +++- .../MultiplexedSessionDatabaseClientMockServerTest.java | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 5986f928808..290583be1ab 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -610,7 +610,9 @@ private static void checkStreamException( private ConcurrentMap> partitionTokens = new ConcurrentHashMap<>(); private ConcurrentMap transactionLastUsed = new ConcurrentHashMap<>(); - // Stores latest sequence number required for precommit token. + // Stores the latest sequence number needed for the precommit token. + // The transaction entry is created only if the transaction is read-write and executed on a + // multiplexed session. private static ConcurrentMap transactionSequenceNo = new ConcurrentHashMap<>(); private int maxNumSessionsInOneBatch = 100; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 7cb383a2d44..c7d7b697d64 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -926,7 +926,7 @@ public void testAbortedReadWriteTxnUsesPreviousTxnIdOnRetryWithExplicitBegin() { .getReadWrite() .getMultiplexedSessionPreviousTransactionId()); } - + @Test public void testPrecommitTokenForResultSet() { // This test verifies that the precommit token received from the ResultSet is properly tracked From 3942a7a75b2db02af18ab9eda35da1ea5da7116a Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Fri, 18 Oct 2024 07:03:01 +0000 Subject: [PATCH 7/7] chore(spanner): review comments --- .../com/google/cloud/spanner/MockSpannerServiceImpl.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index 290583be1ab..014ab7e94e3 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -2506,12 +2506,10 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken static MultiplexedSessionPrecommitToken getPrecommitToken( String value, ByteString transactionId) { - if (!transactionSequenceNo.containsKey(transactionId)) { - transactionSequenceNo.put(transactionId, new AtomicInteger(0)); - } + transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0)); // Generates an incrementing sequence number - int seqNum = transactionSequenceNo.get(transactionId).addAndGet(1); + int seqNum = transactionSequenceNo.get(transactionId).incrementAndGet(); return MultiplexedSessionPrecommitToken.newBuilder() .setPrecommitToken(ByteString.copyFromUtf8(value)) .setSeqNum(seqNum)