From d5a7e0c083b9f75ca0f7997c644aa138ccfd0f29 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Thu, 7 Nov 2024 12:37:46 +0000 Subject: [PATCH 1/3] chore(spanner): handle commit retry protocol extension for mux read-write --- .../cloud/spanner/TransactionRunnerImpl.java | 42 +++++++++++++- .../cloud/spanner/MockSpannerServiceImpl.java | 40 ++++++++++--- ...edSessionDatabaseClientMockServerTest.java | 56 +++++++++++++++++++ 3 files changed, 127 insertions(+), 11 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 5eaa54cd050..54253b93c48 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -409,7 +409,9 @@ ApiFuture commitAsync() { } builder.addAllMutations(mutationsProto); finishOps.addListener( - new CommitRunnable(res, finishOps, builder), MoreExecutors.directExecutor()); + new CommitRunnable( + res, finishOps, builder, /* retryAttemptDueToCommitProtocolExtension = */ false), + MoreExecutors.directExecutor()); return res; } @@ -418,14 +420,17 @@ private final class CommitRunnable implements Runnable { private final SettableApiFuture res; private final ApiFuture prev; private final CommitRequest.Builder requestBuilder; + private final boolean retryAttemptDueToCommitProtocolExtension; CommitRunnable( SettableApiFuture res, ApiFuture prev, - CommitRequest.Builder requestBuilder) { + CommitRequest.Builder requestBuilder, + boolean retryAttemptDueToCommitProtocolExtension) { this.res = res; this.prev = prev; this.requestBuilder = requestBuilder; + this.retryAttemptDueToCommitProtocolExtension = retryAttemptDueToCommitProtocolExtension; } @Override @@ -459,6 +464,13 @@ public void run() { // Set the precommit token in the CommitRequest for multiplexed sessions. requestBuilder.setPrecommitToken(getLatestPrecommitToken()); } + if (retryAttemptDueToCommitProtocolExtension) { + // When a retry occurs due to the commit protocol extension, clear all mutations because + // they were already buffered in SpanFE during the previous attempt. + requestBuilder.clearMutations(); + span.addAnnotation( + "Retrying commit operation with a new precommit token obtained from the previous CommitResponse"); + } final CommitRequest commitRequest = requestBuilder.build(); span.addAnnotation("Starting Commit"); final ApiFuture commitFuture; @@ -479,6 +491,32 @@ public void run() { return; } com.google.spanner.v1.CommitResponse proto = commitFuture.get(); + + // If the CommitResponse includes a precommit token, the client will retry the + // commit RPC once with the new token and clear any existing mutations. + // This case is applicable only when the read-write transaction uses multiplexed + // session. + if (proto.hasPrecommitToken() && !retryAttemptDueToCommitProtocolExtension) { + // track the latest pre commit token + onPrecommitToken(proto.getPrecommitToken()); + span.addAnnotation( + "Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field"); + opSpan.addAnnotation( + "Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field"); + opSpan.end(); + + // Retry the commit RPC with the latest precommit token from CommitResponse. + MoreExecutors.directExecutor() + .execute( + new CommitRunnable( + res, + prev, + requestBuilder, + /* retryAttemptDueToCommitProtocolExtension = */ true)); + + // Exit to prevent further processing in this attempt. + return; + } if (!proto.hasCommitTimestamp()) { throw newSpannerException( ErrorCode.INTERNAL, "Missing commitTimestamp:\n" + session.getName()); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java index c0c6bbd9281..39f1ff180fa 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerServiceImpl.java @@ -603,6 +603,7 @@ private static void checkStreamException( private ConcurrentMap isPartitionedDmlTransaction = new ConcurrentHashMap<>(); private ConcurrentMap abortedTransactions = new ConcurrentHashMap<>(); + private ConcurrentMap commitRetryTransactions = new ConcurrentHashMap<>(); private final AtomicBoolean abortNextTransaction = new AtomicBoolean(); private final AtomicBoolean abortNextStatement = new AtomicBoolean(); private final AtomicBoolean ignoreNextInlineBeginRequest = new AtomicBoolean(); @@ -2045,15 +2046,23 @@ public void commit(CommitRequest request, StreamObserver respons return; } simulateAbort(session, request.getTransactionId()); - commitTransaction(transaction.getId()); - CommitResponse.Builder responseBuilder = - CommitResponse.newBuilder().setCommitTimestamp(getCurrentGoogleTimestamp()); - if (request.getReturnCommitStats()) { - responseBuilder.setCommitStats( - com.google.spanner.v1.CommitResponse.CommitStats.newBuilder() - // This is not really always equal, but at least it returns a value. - .setMutationCount(request.getMutationsCount()) - .build()); + CommitResponse.Builder responseBuilder = CommitResponse.newBuilder(); + Optional commitRetry = + Optional.fromNullable(commitRetryTransactions.get(request.getTransactionId())); + if (commitRetry.or(Boolean.FALSE) && session.getMultiplexed()) { + responseBuilder.setPrecommitToken( + getCommitResponsePrecommitToken(request.getTransactionId())); + commitRetryTransactions.remove(request.getTransactionId()); + } else { + commitTransaction(transaction.getId()); + responseBuilder.setCommitTimestamp(getCurrentGoogleTimestamp()); + if (request.getReturnCommitStats()) { + responseBuilder.setCommitStats( + com.google.spanner.v1.CommitResponse.CommitStats.newBuilder() + // This is not really always equal, but at least it returns a value. + .setMutationCount(request.getMutationsCount()) + .build()); + } } responseObserver.onNext(responseBuilder.build()); responseObserver.onCompleted(); @@ -2134,6 +2143,14 @@ void markAbortedTransaction(ByteString transactionId) { transactionSequenceNo.remove(transactionId); } + public void markCommitRetryOnTransaction(ByteString transactionId) { + Transaction transaction = transactions.get(transactionId); + if (transaction == null || !isReadWriteTransaction(transactionId)) { + return; + } + commitRetryTransactions.putIfAbsent(transactionId, Boolean.TRUE); + } + @Override public void partitionQuery( PartitionQueryRequest request, StreamObserver responseObserver) { @@ -2527,6 +2544,11 @@ static MultiplexedSessionPrecommitToken getExecuteBatchDmlResponsePrecommitToken return getPrecommitToken("ExecuteBatchDmlResponsePrecommitToken", transactionId); } + static MultiplexedSessionPrecommitToken getCommitResponsePrecommitToken( + ByteString transactionId) { + return getPrecommitToken("CommitResponsePrecommitToken", transactionId); + } + static MultiplexedSessionPrecommitToken getPrecommitToken( String value, ByteString transactionId) { transactionSequenceNo.putIfAbsent(transactionId, new AtomicInteger(0)); diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 43899e51562..97d47f17fa8 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1575,6 +1575,62 @@ public void testOtherUnimplementedError_ReadWriteTransactionStillUsesMultiplexed assertEquals(2L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); } + @Test + public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() { + // This test simulates the commit retry protocol extension which occurs when a read-write + // transaction contains read/query + mutation operations. + DatabaseClientImpl client = + (DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); + + client + .readWriteTransaction() + .run( + transaction -> { + try (ResultSet resultSet = transaction.executeQuery(STATEMENT)) { + //noinspection StatementWithEmptyBody + while (resultSet.next()) { + // ignore + } + } + + Mutation mutation = + Mutation.newInsertBuilder("FOO").set("ID").to(1L).set("NAME").to("Bar").build(); + transaction.buffer(mutation); + + TransactionContextImpl impl = (TransactionContextImpl) transaction; + // Force the Commit RPC to return a CommitResponse with MultiplexedSessionRetry field + // set. + // This scenario is only possible when a read-write transaction contains read/query + + // mutation operations. + mockSpanner.markCommitRetryOnTransaction(impl.transactionId); + return null; + }); + + List executeSqlRequests = + mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, executeSqlRequests.size()); + // Verify the request is executed using multiplexed sessions + assertTrue(mockSpanner.getSession(executeSqlRequests.get(0).getSession()).getMultiplexed()); + + List commitRequests = mockSpanner.getRequestsOfType(CommitRequest.class); + assertEquals(2, commitRequests.size()); + assertNotNull(commitRequests.get(0).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), + commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + + // Second CommitRequest should contain the latest precommit token received via the + // CommitResponse in previous attempt. + assertNotNull(commitRequests.get(1).getPrecommitToken()); + assertEquals( + ByteString.copyFromUtf8("CommitResponsePrecommitToken"), + commitRequests.get(1).getPrecommitToken().getPrecommitToken()); + + assertNotNull(client.multiplexedSessionDatabaseClient); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get()); + assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get()); + } + private void waitForSessionToBeReplaced(DatabaseClientImpl client) { assertNotNull(client.multiplexedSessionDatabaseClient); SessionReference sessionReference = From 2003ad4648f8706555ca989c1c26b192301e0589 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 11 Nov 2024 07:17:15 +0000 Subject: [PATCH 2/3] chore(spanner): directly invoke run --- .../google/cloud/spanner/TransactionRunnerImpl.java | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 54253b93c48..5219d0dae02 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -506,13 +506,12 @@ public void run() { opSpan.end(); // Retry the commit RPC with the latest precommit token from CommitResponse. - MoreExecutors.directExecutor() - .execute( - new CommitRunnable( - res, - prev, - requestBuilder, - /* retryAttemptDueToCommitProtocolExtension = */ true)); + new CommitRunnable( + res, + prev, + requestBuilder, + /* retryAttemptDueToCommitProtocolExtension = */ true) + .run(); // Exit to prevent further processing in this attempt. return; From 37e56ff779e80f5abf8f9f9d24a8a72542b92493 Mon Sep 17 00:00:00 2001 From: Sri Harsha CH Date: Mon, 11 Nov 2024 09:45:07 +0000 Subject: [PATCH 3/3] chore(spanner): review comments --- .../java/com/google/cloud/spanner/TransactionRunnerImpl.java | 2 -- .../MultiplexedSessionDatabaseClientMockServerTest.java | 4 ++++ 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java index 5219d0dae02..9e9fe62304a 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/TransactionRunnerImpl.java @@ -501,8 +501,6 @@ public void run() { onPrecommitToken(proto.getPrecommitToken()); span.addAnnotation( "Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field"); - opSpan.addAnnotation( - "Commit operation will be retried with new precommit token as the CommitResponse includes a MultiplexedSessionRetry field"); opSpan.end(); // Retry the commit RPC with the latest precommit token from CommitResponse. diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java index 97d47f17fa8..4dc1da62e7b 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java @@ -1618,6 +1618,8 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() { assertEquals( ByteString.copyFromUtf8("PartialResultSetPrecommitToken"), commitRequests.get(0).getPrecommitToken().getPrecommitToken()); + // Verify that the first request has mutations set + assertTrue(commitRequests.get(0).getMutationsCount() > 0); // Second CommitRequest should contain the latest precommit token received via the // CommitResponse in previous attempt. @@ -1625,6 +1627,8 @@ public void testReadWriteTransactionWithCommitRetryProtocolExtensionSet() { assertEquals( ByteString.copyFromUtf8("CommitResponsePrecommitToken"), commitRequests.get(1).getPrecommitToken().getPrecommitToken()); + // Verify that the commit retry request does not have any mutations set + assertEquals(0, commitRequests.get(1).getMutationsCount()); assertNotNull(client.multiplexedSessionDatabaseClient); assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());