From 89749401288bcf93841d3fbedd5a7de7724e091e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Wed, 8 Apr 2026 17:50:57 +0200 Subject: [PATCH 1/2] fix(spanner): ensure executeQueryAsync is non-blocking When `executeQueryAsync(..)` was called for a read-only transaction, the method would execute a blocking `BeginTransaction` RPC for the first query. This change ensures that the `BeginTransaction` RPC instead is executed using the background executor thread pool. --- java-spanner/.gitignore | 1 + .../cloud/spanner/AbstractReadContext.java | 9 +- .../cloud/spanner/AsyncResultSetImpl.java | 16 +- .../spanner/AsyncReadOnlyTransactionTest.java | 151 ++++++++++++++++++ 4 files changed, 171 insertions(+), 6 deletions(-) create mode 100644 java-spanner/.gitignore create mode 100644 java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java diff --git a/java-spanner/.gitignore b/java-spanner/.gitignore new file mode 100644 index 000000000000..722d5e71d93c --- /dev/null +++ b/java-spanner/.gitignore @@ -0,0 +1 @@ +.vscode diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 856f876a14de..4fc880cb66f7 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -588,7 +588,7 @@ public ListenableAsyncResultSet readAsync( ? readOptions.bufferRows() : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( - executorProvider, readInternal(table, null, keys, columns, options), bufferRows); + executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows); } @Override @@ -607,7 +607,7 @@ public ListenableAsyncResultSet readUsingIndexAsync( : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( executorProvider, - readInternal(table, checkNotNull(index), keys, columns, options), + () -> readInternal(table, checkNotNull(index), keys, columns, options), bufferRows); } @@ -659,8 +659,9 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( executorProvider, - executeQueryInternal( - statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), + () -> + executeQueryInternal( + statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), bufferRows); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java index e53e4db94b66..d2e4c6a00fb8 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AsyncResultSetImpl.java @@ -157,9 +157,21 @@ private enum State { AsyncResultSetImpl( ExecutorProvider executorProvider, Supplier delegate, int bufferSize) { - super(delegate); + this( + executorProvider, + Suppliers.memoize(Preconditions.checkNotNull(delegate)), + bufferSize, + true); + } + + private AsyncResultSetImpl( + ExecutorProvider executorProvider, + Supplier memoizedDelegate, + int bufferSize, + boolean dummy) { + super(memoizedDelegate); this.executorProvider = Preconditions.checkNotNull(executorProvider); - this.delegateResultSet = Preconditions.checkNotNull(delegate); + this.delegateResultSet = memoizedDelegate; this.service = MoreExecutors.listeningDecorator(executorProvider.getExecutor()); this.buffer = new LinkedBlockingDeque<>(bufferSize); } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java new file mode 100644 index 000000000000..9d38316ce802 --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AsyncReadOnlyTransactionTest.java @@ -0,0 +1,151 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner; + +import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertTrue; + +import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.ExecuteSqlRequest; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class AsyncReadOnlyTransactionTest extends AbstractAsyncTransactionTest { + + @Test + public void asyncReadOnlyTransactionIsNonBlocking() throws Exception { + // Warm up session pool to avoid CreateSession blocking when server is frozen. + try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) { + while (resultSet.next()) {} + } + mockSpanner.reset(); + + try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) { + mockSpanner.freeze(); + // Call executeQueryAsync. It should not block even though mock server is + // frozen! + AsyncResultSet rs = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT); + + // Verify that no requests have been sent yet. + assertTrue(mockSpanner.getRequestTypes().isEmpty()); + + // Now register a callback to start the stream. + final CountDownLatch latch = new CountDownLatch(1); + rs.setCallback( + executor, + resultSet -> { + try { + AsyncResultSet.CursorState state; + while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) { + // consume + } + if (state == AsyncResultSet.CursorState.DONE) { + latch.countDown(); + } + return AsyncResultSet.CallbackResponse.CONTINUE; + } catch (Throwable t) { + latch.countDown(); + return AsyncResultSet.CallbackResponse.DONE; + } + }); + + // Unfreeze the mock server so the background thread can proceed. + mockSpanner.unfreeze(); + + // Wait for the callback to complete. + assertTrue("Timeout waiting for callback", latch.await(10, TimeUnit.SECONDS)); + + // Verify that requests were sent on the background thread. + // It should contain one BeginTransaction and one ExecuteSql. + assertThat(mockSpanner.getRequestTypes()) + .containsExactly(BeginTransactionRequest.class, ExecuteSqlRequest.class); + } + } + + @Test + public void testMultipleQueriesOnlyCallsBeginTransactionOnce() throws Exception { + // Warm up session pool to avoid CreateSession blocking when server is frozen. + try (ResultSet resultSet = client().singleUse().executeQuery(READ_ONE_KEY_VALUE_STATEMENT)) { + while (resultSet.next()) {} + } + mockSpanner.reset(); + + try (ReadOnlyTransaction transaction = client().readOnlyTransaction()) { + mockSpanner.freeze(); + // Call executeQueryAsync twice. + AsyncResultSet rs1 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT); + AsyncResultSet rs2 = transaction.executeQueryAsync(READ_ONE_KEY_VALUE_STATEMENT); + + // Verify that no requests have been sent yet. + assertTrue(mockSpanner.getRequestTypes().isEmpty()); + + // Unfreeze the mock server. + mockSpanner.unfreeze(); + + // Now register callbacks to start the streams. + final CountDownLatch latch1 = new CountDownLatch(1); + final CountDownLatch latch2 = new CountDownLatch(1); + + rs1.setCallback( + executor, + resultSet -> { + try { + AsyncResultSet.CursorState state; + while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {} + if (state == AsyncResultSet.CursorState.DONE) { + latch1.countDown(); + } + return AsyncResultSet.CallbackResponse.CONTINUE; + } catch (Throwable t) { + latch1.countDown(); + return AsyncResultSet.CallbackResponse.DONE; + } + }); + + rs2.setCallback( + executor, + resultSet -> { + try { + AsyncResultSet.CursorState state; + while ((state = resultSet.tryNext()) == AsyncResultSet.CursorState.OK) {} + if (state == AsyncResultSet.CursorState.DONE) { + latch2.countDown(); + } + return AsyncResultSet.CallbackResponse.CONTINUE; + } catch (Throwable t) { + latch2.countDown(); + return AsyncResultSet.CallbackResponse.DONE; + } + }); + + // Wait for both callbacks to complete. + assertTrue("Timeout waiting for callback 1", latch1.await(10, TimeUnit.SECONDS)); + assertTrue("Timeout waiting for callback 2", latch2.await(10, TimeUnit.SECONDS)); + + // Verify that requests were sent. + // It should contain one BeginTransaction and two ExecuteSql. + assertThat(mockSpanner.getRequestTypes()) + .containsExactly( + BeginTransactionRequest.class, ExecuteSqlRequest.class, ExecuteSqlRequest.class); + } + } +} From 5f422002ee70e29f94ea4454b289d0eaaf956200 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Knut=20Olav=20L=C3=B8ite?= Date: Thu, 9 Apr 2026 08:30:02 +0200 Subject: [PATCH 2/2] fix(spanner): only use background executor for read-only tx The background executor should only be used to execute queries/reads for read-only transactions, as using the background executor does not guarantee the order of execution of statements. Read/write transactions require strict ordering of statements, and also require that all statements have been executed before a Commit RPC is invoked. This is not required for read-only transactions. --- .../cloud/spanner/AbstractReadContext.java | 54 ++++++++++++++++--- 1 file changed, 47 insertions(+), 7 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 4fc880cb66f7..2638bf4b94aa 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -407,6 +407,47 @@ TransactionSelector getTransactionSelector() { return selector; } + @Override + public ListenableAsyncResultSet readAsync( + String table, KeySet keys, Iterable columns, ReadOption... options) { + Options readOptions = Options.fromReadOptions(options); + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; + return new AsyncResultSetImpl( + executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows); + } + + @Override + public ListenableAsyncResultSet readUsingIndexAsync( + String table, String index, KeySet keys, Iterable columns, ReadOption... options) { + Options readOptions = Options.fromReadOptions(options); + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; + return new AsyncResultSetImpl( + executorProvider, + () -> readInternal(table, checkNotNull(index), keys, columns, options), + bufferRows); + } + + @Override + public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOption... options) { + Options readOptions = Options.fromQueryOptions(options); + final int bufferRows = + readOptions.hasBufferRows() + ? readOptions.bufferRows() + : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; + return new AsyncResultSetImpl( + executorProvider, + () -> + executeQueryInternal( + statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), + bufferRows); + } + @Override public Timestamp getReadTimestamp() { synchronized (txnLock) { @@ -588,7 +629,7 @@ public ListenableAsyncResultSet readAsync( ? readOptions.bufferRows() : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( - executorProvider, () -> readInternal(table, null, keys, columns, options), bufferRows); + executorProvider, readInternal(table, null, keys, columns, options), bufferRows); } @Override @@ -607,7 +648,7 @@ public ListenableAsyncResultSet readUsingIndexAsync( : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( executorProvider, - () -> readInternal(table, checkNotNull(index), keys, columns, options), + readInternal(table, checkNotNull(index), keys, columns, options), bufferRows); } @@ -659,9 +700,8 @@ public ListenableAsyncResultSet executeQueryAsync(Statement statement, QueryOpti : AsyncResultSetImpl.DEFAULT_BUFFER_SIZE; return new AsyncResultSetImpl( executorProvider, - () -> - executeQueryInternal( - statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), + executeQueryInternal( + statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode.NORMAL, options), bufferRows); } @@ -680,7 +720,7 @@ public final ResultSet analyzeQuery(Statement statement, QueryAnalyzeMode readCo } } - private ResultSet executeQueryInternal( + ResultSet executeQueryInternal( Statement statement, com.google.spanner.v1.ExecuteSqlRequest.QueryMode queryMode, QueryOption... options) { @@ -988,7 +1028,7 @@ public void onDone(boolean withBeginTransaction) { @Override public void onPrecommitToken(MultiplexedSessionPrecommitToken token) {} - private ResultSet readInternal( + ResultSet readInternal( String table, @Nullable String index, KeySet keys,