From 26c6c634b685bce66ce7caf05057a98e9cc6f5dc Mon Sep 17 00:00:00 2001 From: Arpan Mishra Date: Wed, 3 Jan 2024 23:58:20 +0530 Subject: [PATCH] feat: add support for Directed Read options (#2766) * fix: prevent illegal negative timeout values into thread sleep() method while retrying exceptions in unit tests. * For details on issue see - https://github.com/googleapis/java-spanner/issues/2206 * Fixing lint issues. * feat: add support for Directed Read options. * chore: fix lint issues. * test: add unit tests for options class. * test: add tests using mock spanner. * test: add unit test for partitioned read. * test: add unit test for partitioned read. * chore: adding option in spanner options. * chore: fix NPE. * chore: disabling test on emulator. * chore: adding test for query in RW transaction. * chore: adding IT for transaction manager interface. * chore: disable IT for emulator. * chore: PR comments. * chore: address PR comments. --- .../cloud/spanner/AbstractReadContext.java | 19 ++ .../google/cloud/spanner/BatchClientImpl.java | 8 +- .../com/google/cloud/spanner/Options.java | 46 ++- .../com/google/cloud/spanner/SessionImpl.java | 3 + .../google/cloud/spanner/SpannerOptions.java | 36 +++ .../spanner/AbstractReadContextTest.java | 22 ++ .../cloud/spanner/DatabaseClientImplTest.java | 303 +++++++++++++++++- .../cloud/spanner/MockSpannerTestUtil.java | 1 + .../com/google/cloud/spanner/OptionsTest.java | 73 ++++- .../cloud/spanner/SpannerOptionsTest.java | 24 ++ .../cloud/spanner/it/ITDirectedReadsTest.java | 185 +++++++++++ .../google/cloud/spanner/it/ITReadTest.java | 34 ++ 12 files changed, 740 insertions(+), 14 deletions(-) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index a0b25cb64c..0f4310f9b4 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -42,6 +42,7 @@ import com.google.common.util.concurrent.MoreExecutors; import com.google.protobuf.ByteString; import com.google.spanner.v1.BeginTransactionRequest; +import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; @@ -72,6 +73,7 @@ abstract static class Builder, T extends AbstractReadCon private Span span = Tracing.getTracer().getCurrentSpan(); private int defaultPrefetchChunks = SpannerOptions.Builder.DEFAULT_PREFETCH_CHUNKS; private QueryOptions defaultQueryOptions = SpannerOptions.Builder.DEFAULT_QUERY_OPTIONS; + private DirectedReadOptions defaultDirectedReadOption; private ExecutorProvider executorProvider; private Clock clock = new Clock(); @@ -117,6 +119,11 @@ B setClock(Clock clock) { return self(); } + B setDefaultDirectedReadOptions(DirectedReadOptions directedReadOptions) { + this.defaultDirectedReadOption = directedReadOptions; + return self(); + } + abstract T build(); } @@ -399,6 +406,7 @@ void initTransaction() { private final int defaultPrefetchChunks; private final QueryOptions defaultQueryOptions; + private final DirectedReadOptions defaultDirectedReadOptions; private final Clock clock; @GuardedBy("lock") @@ -423,6 +431,7 @@ void initTransaction() { this.rpc = builder.rpc; this.defaultPrefetchChunks = builder.defaultPrefetchChunks; this.defaultQueryOptions = builder.defaultQueryOptions; + this.defaultDirectedReadOptions = builder.defaultDirectedReadOption; this.span = builder.span; this.executorProvider = builder.executorProvider; this.clock = builder.clock; @@ -623,6 +632,11 @@ ExecuteSqlRequest.Builder getExecuteSqlRequestBuilder( if (options.hasDataBoostEnabled()) { builder.setDataBoostEnabled(options.dataBoostEnabled()); } + if (options.hasDirectedReadOptions()) { + builder.setDirectedReadOptions(options.directedReadOptions()); + } else if (defaultDirectedReadOptions != null) { + builder.setDirectedReadOptions(defaultDirectedReadOptions); + } builder.setSeqno(getSeqNo()); builder.setQueryOptions(buildQueryOptions(statement.getQueryOptions())); builder.setRequestOptions(buildRequestOptions(options)); @@ -811,6 +825,11 @@ ResultSet readInternalWithOptions( if (readOptions.hasDataBoostEnabled()) { builder.setDataBoostEnabled(readOptions.dataBoostEnabled()); } + if (readOptions.hasDirectedReadOptions()) { + builder.setDirectedReadOptions(readOptions.directedReadOptions()); + } else if (defaultDirectedReadOptions != null) { + builder.setDirectedReadOptions(defaultDirectedReadOptions); + } final int prefetchChunks = readOptions.hasPrefetchChunks() ? readOptions.prefetchChunks() : defaultPrefetchChunks; ResumableStreamIterator stream = diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java index 0191a11be1..eab90a266c 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/BatchClientImpl.java @@ -60,7 +60,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) { .setDefaultQueryOptions( sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId())) .setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider()) - .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()), + .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()) + .setDefaultDirectedReadOptions( + sessionClient.getSpanner().getOptions().getDirectedReadOptions()), checkNotNull(bound)); } @@ -77,7 +79,9 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc .setDefaultQueryOptions( sessionClient.getSpanner().getDefaultQueryOptions(sessionClient.getDatabaseId())) .setExecutorProvider(sessionClient.getSpanner().getAsyncExecutorProvider()) - .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()), + .setDefaultPrefetchChunks(sessionClient.getSpanner().getDefaultPrefetchChunks()) + .setDefaultDirectedReadOptions( + sessionClient.getSpanner().getOptions().getDirectedReadOptions()), batchTransactionId); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java index 2bd35ec785..dda12b60d6 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/Options.java @@ -17,6 +17,7 @@ package com.google.cloud.spanner; import com.google.common.base.Preconditions; +import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.RequestOptions.Priority; import java.io.Serializable; import java.util.Objects; @@ -224,6 +225,18 @@ public static CreateUpdateDeleteAdminApiOption validateOnly(Boolean validateOnly return new ValidateOnlyOption(validateOnly); } + /** + * Option to request DirectedRead for ReadOnlyTransaction and SingleUseTransaction. + * + *

The DirectedReadOptions can be used to indicate which replicas or regions should be used for + * non-transactional reads or queries. Not all requests can be sent to non-leader replicas. In + * particular, some requests such as reads within read-write transactions must be sent to a + * designated leader replica. These requests ignore DirectedReadOptions. + */ + public static ReadAndQueryOption directedRead(DirectedReadOptions directedReadOptions) { + return new DirectedReadOption(directedReadOptions); + } + /** Option to request {@link CommitStats} for read/write transactions. */ static final class CommitStatsOption extends InternalOption implements TransactionOption { @Override @@ -325,6 +338,21 @@ void appendToOptions(Options options) { } } + static final class DirectedReadOption extends InternalOption implements ReadAndQueryOption { + private final DirectedReadOptions directedReadOptions; + + DirectedReadOption(DirectedReadOptions directedReadOptions) { + this.directedReadOptions = + Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null"); + ; + } + + @Override + void appendToOptions(Options options) { + options.directedReadOptions = directedReadOptions; + } + } + private boolean withCommitStats; private Long limit; private Integer prefetchChunks; @@ -338,6 +366,7 @@ void appendToOptions(Options options) { private Boolean validateOnly; private Boolean withOptimisticLock; private Boolean dataBoostEnabled; + private DirectedReadOptions directedReadOptions; // Construction is via factory methods below. private Options() {} @@ -438,6 +467,14 @@ Boolean dataBoostEnabled() { return dataBoostEnabled; } + boolean hasDirectedReadOptions() { + return directedReadOptions != null; + } + + DirectedReadOptions directedReadOptions() { + return directedReadOptions; + } + @Override public String toString() { StringBuilder b = new StringBuilder(); @@ -477,6 +514,9 @@ public String toString() { if (dataBoostEnabled != null) { b.append("dataBoostEnabled: ").append(dataBoostEnabled).append(' '); } + if (directedReadOptions != null) { + b.append("directedReadOptions: ").append(directedReadOptions).append(' '); + } return b.toString(); } @@ -512,7 +552,8 @@ public boolean equals(Object o) { && Objects.equals(etag(), that.etag()) && Objects.equals(validateOnly(), that.validateOnly()) && Objects.equals(withOptimisticLock(), that.withOptimisticLock()) - && Objects.equals(dataBoostEnabled(), that.dataBoostEnabled()); + && Objects.equals(dataBoostEnabled(), that.dataBoostEnabled()) + && Objects.equals(directedReadOptions(), that.directedReadOptions()); } @Override @@ -557,6 +598,9 @@ public int hashCode() { if (dataBoostEnabled != null) { result = 31 * result + dataBoostEnabled.hashCode(); } + if (directedReadOptions != null) { + result = 31 * result + directedReadOptions.hashCode(); + } return result; } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java index 0e763dbc93..53bf37feb0 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SessionImpl.java @@ -255,6 +255,7 @@ public ReadContext singleUse(TimestampBound bound) { .setRpc(spanner.getRpc()) .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) + .setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions()) .setSpan(currentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .build()); @@ -274,6 +275,7 @@ public ReadOnlyTransaction singleUseReadOnlyTransaction(TimestampBound bound) { .setRpc(spanner.getRpc()) .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) + .setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions()) .setSpan(currentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .buildSingleUseReadOnlyTransaction()); @@ -293,6 +295,7 @@ public ReadOnlyTransaction readOnlyTransaction(TimestampBound bound) { .setRpc(spanner.getRpc()) .setDefaultQueryOptions(spanner.getDefaultQueryOptions(databaseId)) .setDefaultPrefetchChunks(spanner.getDefaultPrefetchChunks()) + .setDefaultDirectedReadOptions(spanner.getOptions().getDirectedReadOptions()) .setSpan(currentSpan) .setExecutorProvider(spanner.getAsyncExecutorProvider()) .build()); diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java index ba22ec5448..877ea72e46 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/SpannerOptions.java @@ -33,6 +33,7 @@ import com.google.cloud.TransportOptions; import com.google.cloud.grpc.GcpManagedChannelOptions; import com.google.cloud.grpc.GrpcTransportOptions; +import com.google.cloud.spanner.Options.DirectedReadOption; import com.google.cloud.spanner.Options.QueryOption; import com.google.cloud.spanner.Options.UpdateOption; import com.google.cloud.spanner.admin.database.v1.DatabaseAdminSettings; @@ -50,6 +51,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.spanner.v1.DirectedReadOptions; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; import com.google.spanner.v1.SpannerGrpc; @@ -137,6 +139,7 @@ public class SpannerOptions extends ServiceOptions { private final String compressorName; private final boolean leaderAwareRoutingEnabled; private final boolean attemptDirectPath; + private final DirectedReadOptions directedReadOptions; /** Interface that can be used to provide {@link CallCredentials} to {@link SpannerOptions}. */ public interface CallCredentialsProvider { @@ -627,6 +630,7 @@ private SpannerOptions(Builder builder) { compressorName = builder.compressorName; leaderAwareRoutingEnabled = builder.leaderAwareRoutingEnabled; attemptDirectPath = builder.attemptDirectPath; + directedReadOptions = builder.directedReadOptions; } /** @@ -729,6 +733,7 @@ public static class Builder private String emulatorHost = System.getenv("SPANNER_EMULATOR_HOST"); private boolean leaderAwareRoutingEnabled = true; private boolean attemptDirectPath = true; + private DirectedReadOptions directedReadOptions; private static String createCustomClientLibToken(String token) { return token + " " + ServiceOptions.getGoogApiClientLibName(); @@ -789,6 +794,7 @@ private Builder() { this.channelConfigurator = options.channelConfigurator; this.interceptorProvider = options.interceptorProvider; this.attemptDirectPath = options.attemptDirectPath; + this.directedReadOptions = options.directedReadOptions; } @Override @@ -1153,6 +1159,32 @@ public Builder setAsyncExecutorProvider(CloseableExecutorProvider provider) { return this; } + /** + * Sets the {@link DirectedReadOption} that specify which replicas or regions should be used for + * non-transactional reads or queries. + * + *

DirectedReadOptions set at the request level will take precedence over the options set + * using this method. + * + *

An example below of how {@link DirectedReadOptions} can be constructed by including a + * replica. + * + *


+     * DirectedReadOptions.newBuilder()
+     *           .setIncludeReplicas(
+     *               IncludeReplicas.newBuilder()
+     *                   .addReplicaSelections(
+     *                       ReplicaSelection.newBuilder().setLocation("us-east1").build()))
+     *           .build();
+     *           }
+     * 
+ */ + public Builder setDirectedReadOptions(DirectedReadOptions directedReadOptions) { + this.directedReadOptions = + Preconditions.checkNotNull(directedReadOptions, "DirectedReadOptions cannot be null"); + return this; + } + /** * Specifying this will allow the client to prefetch up to {@code prefetchChunks} {@code * PartialResultSet} chunks for each read and query. The data size of each chunk depends on the @@ -1371,6 +1403,10 @@ public boolean isLeaderAwareRoutingEnabled() { return leaderAwareRoutingEnabled; } + public DirectedReadOptions getDirectedReadOptions() { + return directedReadOptions; + } + @BetaApi public boolean isAttemptDirectPath() { return attemptDirectPath; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java index 31b73581f6..16e4aa9600 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/AbstractReadContextTest.java @@ -25,6 +25,9 @@ import com.google.api.gax.core.ExecutorProvider; import com.google.cloud.spanner.Options.RpcPriority; import com.google.cloud.spanner.spi.v1.SpannerRpc; +import com.google.spanner.v1.DirectedReadOptions; +import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; +import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; @@ -45,6 +48,14 @@ @RunWith(Parameterized.class) public class AbstractReadContextTest { + private static final DirectedReadOptions DIRECTED_READ_OPTIONS = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder().setLocation("us-west1").build())) + .build(); + @Parameter(0) public QueryOptions defaultQueryOptions; @@ -250,4 +261,15 @@ public void executeSqlRequestBuilderWithRequestOptionsWithTxnTag() { .isEqualTo("app=spanner,env=test,action=query"); assertThat(request.getRequestOptions().getTransactionTag()).isEqualTo("app=spanner,env=test"); } + + @Test + public void testGetExecuteSqlRequestBuilderWithDirectedReadOptions() { + ExecuteSqlRequest.Builder request = + context.getExecuteSqlRequestBuilder( + Statement.of("SELECT * FROM FOO"), + QueryMode.NORMAL, + Options.fromQueryOptions(Options.directedRead(DIRECTED_READ_OPTIONS)), + false); + assertEquals(DIRECTED_READ_OPTIONS, request.getDirectedReadOptions()); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java index aea8a4dcb6..e6263568fe 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java @@ -21,6 +21,7 @@ import static com.google.cloud.spanner.MockSpannerTestUtil.READ_ONE_KEY_VALUE_STATEMENT; import static com.google.cloud.spanner.MockSpannerTestUtil.READ_TABLE_NAME; import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; +import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1_FROM_TABLE; import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1_RESULTSET; import static com.google.cloud.spanner.SpannerApiFutures.get; import static com.google.common.truth.Truth.assertThat; @@ -75,6 +76,9 @@ import com.google.spanner.v1.BatchWriteResponse; import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.DeleteSessionRequest; +import com.google.spanner.v1.DirectedReadOptions; +import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; +import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryMode; @@ -169,6 +173,20 @@ public class DatabaseClientImplTest { .setStatus(STATUS_OK) .addAllIndexes(ImmutableList.of(2, 3)) .build()); + private static final DirectedReadOptions DIRECTED_READ_OPTIONS1 = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder().setLocation("us-west1").build())) + .build(); + private static final DirectedReadOptions DIRECTED_READ_OPTIONS2 = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder().setLocation("us-east1").build())) + .build(); private Spanner spanner; private Spanner spannerWithEmptySessionPool; private static ExecutorService executor; @@ -186,6 +204,8 @@ public static void startStaticServer() throws IOException { StatementResult.exception( INVALID_UPDATE_STATEMENT, Status.INVALID_ARGUMENT.withDescription("invalid statement").asRuntimeException())); + mockSpanner.putStatementResult( + StatementResult.query(SELECT1_FROM_TABLE, MockSpannerTestUtil.SELECT1_RESULTSET)); mockSpanner.setBatchWriteResult(BATCH_WRITE_RESPONSES); executor = Executors.newSingleThreadExecutor(); @@ -1518,6 +1538,69 @@ public void testExecuteQueryWithTag() { assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); } + @Test + public void testExecuteQuery_withDirectedReadOptionsViaRequest() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = + client.singleUse().executeQuery(SELECT1, Options.directedRead(DIRECTED_READ_OPTIONS1))) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(1, requests.size()); + ExecuteSqlRequest request = requests.get(0); + assertTrue(request.hasDirectedReadOptions()); + assertEquals(DIRECTED_READ_OPTIONS1, request.getDirectedReadOptions()); + } + + @Test + public void testExecuteQuery_withDirectedReadOptionsViaSpannerOptions() { + Spanner spannerWithDirectedReadOptions = + spanner + .getOptions() + .toBuilder() + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService(); + DatabaseClient client = + spannerWithDirectedReadOptions.getDatabaseClient( + DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = client.singleUse().executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(requests.size(), 1); + ExecuteSqlRequest request = requests.get(0); + assertTrue(request.hasDirectedReadOptions()); + assertEquals(DIRECTED_READ_OPTIONS2, request.getDirectedReadOptions()); + } + + @Test + public void testExecuteQuery_whenMultipleDirectedReadsOptions_preferRequestOption() { + Spanner spannerWithDirectedReadOptions = + spanner + .getOptions() + .toBuilder() + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService(); + DatabaseClient client = + spannerWithDirectedReadOptions.getDatabaseClient( + DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = + client.singleUse().executeQuery(SELECT1, Options.directedRead(DIRECTED_READ_OPTIONS1))) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(requests.size(), 1); + ExecuteSqlRequest request = requests.get(0); + assertTrue(request.hasDirectedReadOptions()); + assertEquals(DIRECTED_READ_OPTIONS1, request.getDirectedReadOptions()); + } + @Test public void testExecuteReadWithTag() { DatabaseClient client = @@ -1542,6 +1625,79 @@ public void testExecuteReadWithTag() { assertThat(request.getRequestOptions().getTransactionTag()).isEmpty(); } + @Test + public void testExecuteReadWithDirectedReadOptions() { + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = + client + .singleUse() + .read( + READ_TABLE_NAME, + KeySet.singleKey(Key.of(1L)), + READ_COLUMN_NAMES, + Options.directedRead(DIRECTED_READ_OPTIONS1))) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ReadRequest.class); + assertEquals(1, requests.size()); + ReadRequest request = requests.get(0); + assertTrue(request.hasDirectedReadOptions()); + assertEquals(DIRECTED_READ_OPTIONS1, request.getDirectedReadOptions()); + } + + @Test + public void testExecuteReadWithDirectedReadOptionsViaSpannerOptions() { + Spanner spannerWithDirectedReadOptions = + spanner + .getOptions() + .toBuilder() + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService(); + DatabaseClient client = + spannerWithDirectedReadOptions.getDatabaseClient( + DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + try (ResultSet resultSet = + client.singleUse().read(READ_TABLE_NAME, KeySet.singleKey(Key.of(1L)), READ_COLUMN_NAMES)) { + while (resultSet.next()) {} + } + + List requests = mockSpanner.getRequestsOfType(ReadRequest.class); + assertEquals(requests.size(), 1); + ReadRequest request = requests.get(0); + assertTrue(request.hasDirectedReadOptions()); + assertEquals(DIRECTED_READ_OPTIONS2, request.getDirectedReadOptions()); + } + + @Test + public void testReadWriteExecuteQueryWithDirectedReadOptionsViaSpannerOptions() { + Spanner spannerWithDirectedReadOptions = + spanner + .getOptions() + .toBuilder() + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService(); + DatabaseClient client = + spannerWithDirectedReadOptions.getDatabaseClient( + DatabaseId.of(TEST_PROJECT, TEST_INSTANCE, TEST_DATABASE)); + TransactionRunner runner = client.readWriteTransaction(); + runner.run( + transaction -> { + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + return null; + }); + + List requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class); + assertEquals(requests.size(), 1); + ExecuteSqlRequest request = requests.get(0); + assertFalse(request.hasDirectedReadOptions()); + } + @Test public void testReadWriteExecuteQueryWithTag() { DatabaseClient client = @@ -2728,7 +2884,7 @@ public void testNestedTransactionsUsingTwoDatabases() throws InterruptedExceptio @Test public void testBackendQueryOptions() { - // Use a Spanner instance with MinSession=0 and WriteFraction=0.0 to prevent background requests + // Use a Spanner instance with MinSession=0 to prevent background requests // from the session pool interfering with the test case. try (Spanner spanner = SpannerOptions.newBuilder() @@ -2769,7 +2925,7 @@ public void testBackendQueryOptions() { @Test public void testBackendQueryOptionsWithAnalyzeQuery() { - // Use a Spanner instance with MinSession=0 and WriteFraction=0.0 to prevent background requests + // Use a Spanner instance with MinSession=0 to prevent background requests // from the session pool interfering with the test case. try (Spanner spanner = SpannerOptions.newBuilder() @@ -2812,7 +2968,7 @@ public void testBackendQueryOptionsWithAnalyzeQuery() { @Test public void testBackendPartitionQueryOptions() { - // Use a Spanner instance with MinSession=0 and WriteFraction=0.0 to prevent background requests + // Use a Spanner instance with MinSession=0 to prevent background requests // from the session pool interfering with the test case. try (Spanner spanner = SpannerOptions.newBuilder() @@ -2820,6 +2976,58 @@ public void testBackendPartitionQueryOptions() { .setChannelProvider(channelProvider) .setCredentials(NoCredentials.getInstance()) .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService()) { + BatchClient client = + spanner.getBatchClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE")); + BatchReadOnlyTransaction transaction = + client.batchReadOnlyTransaction(TimestampBound.strong()); + List partitions = + transaction.partitionQuery( + PartitionOptions.newBuilder().setMaxPartitions(10L).build(), + Statement.newBuilder(SELECT1.getSql()) + .withQueryOptions( + QueryOptions.newBuilder() + .setOptimizerVersion("1") + .setOptimizerStatisticsPackage("custom-package") + .build()) + .build(), + Options.directedRead(DIRECTED_READ_OPTIONS1)); + try (ResultSet rs = transaction.execute(partitions.get(0))) { + // Just iterate over the results to execute the query. + while (rs.next()) {} + } finally { + transaction.cleanup(); + } + // Check if the last query executed is a DeleteSessionRequest and the second last query + // executed is a ExecuteSqlRequest and was executed using a custom optimizer version, + // statistics package and directed read options. + List requests = mockSpanner.getRequests(); + assert requests.size() >= 2 : "required to have at least 2 requests"; + assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class); + assertThat(requests.get(requests.size() - 2)).isInstanceOf(ExecuteSqlRequest.class); + ExecuteSqlRequest executeSqlRequest = (ExecuteSqlRequest) requests.get(requests.size() - 2); + assertThat(executeSqlRequest.getQueryOptions()).isNotNull(); + assertThat(executeSqlRequest.getQueryOptions().getOptimizerVersion()).isEqualTo("1"); + assertThat(executeSqlRequest.getQueryOptions().getOptimizerStatisticsPackage()) + .isEqualTo("custom-package"); + assertThat(executeSqlRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS1); + } + } + + @Test + public void + testBackendPartitionQueryOptions_whenDirectedReadOptionsViaSpannerOptions_assertOptions() { + // Use a Spanner instance with MinSession=0 to prevent background requests + // from the session pool interfering with the test case. + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) .build() .getService()) { BatchClient client = @@ -2843,8 +3051,8 @@ public void testBackendPartitionQueryOptions() { transaction.cleanup(); } // Check if the last query executed is a DeleteSessionRequest and the second last query - // executed is a ExecuteSqlRequest and was executed using a custom optimizer version and - // statistics package. + // executed is a ExecuteSqlRequest and was executed using a custom optimizer version, + // statistics package and directed read options. List requests = mockSpanner.getRequests(); assert requests.size() >= 2 : "required to have at least 2 requests"; assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class); @@ -2854,6 +3062,91 @@ public void testBackendPartitionQueryOptions() { assertThat(executeSqlRequest.getQueryOptions().getOptimizerVersion()).isEqualTo("1"); assertThat(executeSqlRequest.getQueryOptions().getOptimizerStatisticsPackage()) .isEqualTo("custom-package"); + assertThat(executeSqlRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS2); + } + } + + @Test + public void testBackendPartitionReadOptions() { + // Use a Spanner instance with MinSession=0 to prevent background requests + // from the session pool interfering with the test case. + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService()) { + BatchClient client = + spanner.getBatchClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE")); + BatchReadOnlyTransaction transaction = + client.batchReadOnlyTransaction(TimestampBound.strong()); + List partitions = + transaction.partitionRead( + PartitionOptions.newBuilder().setMaxPartitions(10L).build(), + "FOO", + KeySet.all(), + Lists.newArrayList("1"), + Options.directedRead(DIRECTED_READ_OPTIONS1)); + try (ResultSet rs = transaction.execute(partitions.get(0))) { + // Just iterate over the results to execute the query. + while (rs.next()) {} + } finally { + transaction.cleanup(); + } + // Check if the last query executed is a DeleteSessionRequest and the second last query + // executed is a ExecuteSqlRequest and was executed using a custom optimizer version, + // statistics package and directed read options. + List requests = mockSpanner.getRequests(); + assert requests.size() >= 2 : "required to have at least 2 requests"; + assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class); + assertThat(requests.get(requests.size() - 2)).isInstanceOf(ReadRequest.class); + ReadRequest readRequest = (ReadRequest) requests.get(requests.size() - 2); + assertThat(readRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS1); + } + } + + @Test + public void + testBackendPartitionReadOptions_whenDirectedReadOptionsViaSpannerOptions_assertOptions() { + // Use a Spanner instance with MinSession=0 to prevent background requests + // from the session pool interfering with the test case. + try (Spanner spanner = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setChannelProvider(channelProvider) + .setCredentials(NoCredentials.getInstance()) + .setSessionPoolOption(SessionPoolOptions.newBuilder().setMinSessions(0).build()) + .setDirectedReadOptions(DIRECTED_READ_OPTIONS2) + .build() + .getService()) { + BatchClient client = + spanner.getBatchClient(DatabaseId.of("[PROJECT]", "[INSTANCE]", "[DATABASE")); + BatchReadOnlyTransaction transaction = + client.batchReadOnlyTransaction(TimestampBound.strong()); + List partitions = + transaction.partitionRead( + PartitionOptions.newBuilder().setMaxPartitions(10L).build(), + "FOO", + KeySet.all(), + Lists.newArrayList("1")); + try (ResultSet rs = transaction.execute(partitions.get(0))) { + // Just iterate over the results to execute the query. + while (rs.next()) {} + } finally { + transaction.cleanup(); + } + // Check if the last query executed is a DeleteSessionRequest and the second last query + // executed is a ExecuteSqlRequest and was executed using a custom optimizer version, + // statistics package and directed read options. + List requests = mockSpanner.getRequests(); + assert requests.size() >= 2 : "required to have at least 2 requests"; + assertThat(requests.get(requests.size() - 1)).isInstanceOf(DeleteSessionRequest.class); + assertThat(requests.get(requests.size() - 2)).isInstanceOf(ReadRequest.class); + ReadRequest readRequest = (ReadRequest) requests.get(requests.size() - 2); + assertThat(readRequest.getDirectedReadOptions()).isEqualTo(DIRECTED_READ_OPTIONS2); } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java index af336d3f58..83bb1728ac 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MockSpannerTestUtil.java @@ -49,6 +49,7 @@ public class MockSpannerTestUtil { .build()) .setMetadata(SELECT1_METADATA) .build(); + public static final Statement SELECT1_FROM_TABLE = Statement.of("SELECT 1 FROM FOO WHERE 1=1"); static final String TEST_PROJECT = "my-project"; static final String TEST_INSTANCE = "my-instance"; diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java index d40f9b39ea..e0bbf81f29 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OptionsTest.java @@ -24,6 +24,9 @@ import static org.junit.Assert.assertTrue; import com.google.cloud.spanner.Options.RpcPriority; +import com.google.spanner.v1.DirectedReadOptions; +import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; +import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; import com.google.spanner.v1.RequestOptions.Priority; import org.junit.Test; import org.junit.runner.RunWith; @@ -32,6 +35,13 @@ /** Unit tests for {@link Options}. */ @RunWith(JUnit4.class) public class OptionsTest { + private static final DirectedReadOptions DIRECTED_READ_OPTIONS = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder().setLocation("us-west1").build())) + .build(); @Test public void negativeLimitsNotAllowed() { @@ -65,13 +75,18 @@ public void zeroPrefetchChunksNotAllowed() { public void allOptionsPresent() { Options options = Options.fromReadOptions( - Options.limit(10), Options.prefetchChunks(1), Options.dataBoostEnabled(true)); + Options.limit(10), + Options.prefetchChunks(1), + Options.dataBoostEnabled(true), + Options.directedRead(DIRECTED_READ_OPTIONS)); assertThat(options.hasLimit()).isTrue(); assertThat(options.limit()).isEqualTo(10); assertThat(options.hasPrefetchChunks()).isTrue(); assertThat(options.prefetchChunks()).isEqualTo(1); assertThat(options.hasDataBoostEnabled()).isTrue(); assertTrue(options.dataBoostEnabled()); + assertTrue(options.hasDirectedReadOptions()); + assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions()); } @Test @@ -84,6 +99,7 @@ public void allOptionsAbsent() { assertThat(options.hasPriority()).isFalse(); assertThat(options.hasTag()).isFalse(); assertThat(options.hasDataBoostEnabled()).isFalse(); + assertThat(options.hasDirectedReadOptions()).isFalse(); assertThat(options.toString()).isEqualTo(""); assertThat(options.equals(options)).isTrue(); assertThat(options.equals(null)).isFalse(); @@ -161,14 +177,28 @@ public void readOptionsTest() { boolean dataBoost = true; Options options = Options.fromReadOptions( - Options.limit(limit), Options.tag(tag), Options.dataBoostEnabled(true)); + Options.limit(limit), + Options.tag(tag), + Options.dataBoostEnabled(true), + Options.directedRead(DIRECTED_READ_OPTIONS)); assertThat(options.toString()) .isEqualTo( - "limit: " + limit + " " + "tag: " + tag + " " + "dataBoostEnabled: " + dataBoost + " "); + "limit: " + + limit + + " " + + "tag: " + + tag + + " " + + "dataBoostEnabled: " + + dataBoost + + " " + + "directedReadOptions: " + + DIRECTED_READ_OPTIONS + + " "); assertThat(options.tag()).isEqualTo(tag); assertEquals(dataBoost, options.dataBoostEnabled()); - assertThat(options.hashCode()).isEqualTo(-96091607); + assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions()); } @Test @@ -199,7 +229,10 @@ public void queryOptionsTest() { boolean dataBoost = true; Options options = Options.fromQueryOptions( - Options.prefetchChunks(chunks), Options.tag(tag), Options.dataBoostEnabled(true)); + Options.prefetchChunks(chunks), + Options.tag(tag), + Options.dataBoostEnabled(true), + Options.directedRead(DIRECTED_READ_OPTIONS)); assertThat(options.toString()) .isEqualTo( "prefetchChunks: " @@ -210,11 +243,14 @@ public void queryOptionsTest() { + " " + "dataBoostEnabled: " + dataBoost + + " " + + "directedReadOptions: " + + DIRECTED_READ_OPTIONS + " "); assertThat(options.prefetchChunks()).isEqualTo(chunks); assertThat(options.tag()).isEqualTo(tag); assertEquals(dataBoost, options.dataBoostEnabled()); - assertThat(options.hashCode()).isEqualTo(1274581983); + assertEquals(DIRECTED_READ_OPTIONS, options.directedReadOptions()); } @Test @@ -630,4 +666,29 @@ public void optimisticLockHashCode() { assertEquals(option1.hashCode(), option2.hashCode()); assertNotEquals(option1.hashCode(), option3.hashCode()); } + + @Test + public void directedReadEquality() { + Options option1 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS)); + Options option2 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS)); + Options option3 = Options.fromTransactionOptions(); + + assertEquals(option1, option2); + assertNotEquals(option1, option3); + } + + @Test + public void directedReadHashCode() { + Options option1 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS)); + Options option2 = Options.fromReadOptions(Options.directedRead(DIRECTED_READ_OPTIONS)); + Options option3 = Options.fromTransactionOptions(); + + assertEquals(option1.hashCode(), option2.hashCode()); + assertNotEquals(option1.hashCode(), option3.hashCode()); + } + + @Test + public void directedReadsNullNotAllowed() { + assertThrows(NullPointerException.class, () -> Options.directedRead(null)); + } } diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java index 635838512c..42e53a6b8e 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SpannerOptionsTest.java @@ -48,6 +48,9 @@ import com.google.spanner.v1.CommitRequest; import com.google.spanner.v1.CreateSessionRequest; import com.google.spanner.v1.DeleteSessionRequest; +import com.google.spanner.v1.DirectedReadOptions; +import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; +import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; import com.google.spanner.v1.ExecuteBatchDmlRequest; import com.google.spanner.v1.ExecuteSqlRequest; import com.google.spanner.v1.ExecuteSqlRequest.QueryOptions; @@ -698,6 +701,27 @@ public void testLeaderAwareRoutingEnablement() { .isLeaderAwareRoutingEnabled()); } + @Test + public void testSetDirectedReadOptions() { + final DirectedReadOptions directedReadOptions = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder().setLocation("us-west1").build()) + .build()) + .build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .setDirectedReadOptions(directedReadOptions) + .build(); + assertEquals(options.getDirectedReadOptions(), directedReadOptions); + assertThrows( + NullPointerException.class, + () -> SpannerOptions.newBuilder().setDirectedReadOptions(null).build()); + } + @Test public void testSpannerCallContextTimeoutConfigurator_NullValues() { SpannerCallContextTimeoutConfigurator configurator = diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java new file mode 100644 index 0000000000..217da5f4bc --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITDirectedReadsTest.java @@ -0,0 +1,185 @@ +/* + * Copyright 2023 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.it; + +import static com.google.cloud.spanner.MockSpannerTestUtil.SELECT1; +import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeFalse; + +import com.google.cloud.spanner.AbortedException; +import com.google.cloud.spanner.Database; +import com.google.cloud.spanner.DatabaseClient; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.IntegrationTestEnv; +import com.google.cloud.spanner.Key; +import com.google.cloud.spanner.KeySet; +import com.google.cloud.spanner.Options; +import com.google.cloud.spanner.ParallelIntegrationTest; +import com.google.cloud.spanner.ResultSet; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerException; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.spanner.TransactionContext; +import com.google.cloud.spanner.TransactionManager; +import com.google.cloud.spanner.TransactionRunner; +import com.google.common.collect.Lists; +import com.google.spanner.v1.DirectedReadOptions; +import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; +import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@Category(ParallelIntegrationTest.class) +@RunWith(JUnit4.class) +public class ITDirectedReadsTest { + + private static final DirectedReadOptions DIRECTED_READ_OPTIONS = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder().setLocation("us-west1").build())) + .build(); + + @ClassRule public static IntegrationTestEnv env = new IntegrationTestEnv(); + private static Database db; + + @BeforeClass + public static void setUp() { + db = + env.getTestHelper() + .createTestDatabase("CREATE TABLE TEST (ID INT64, NAME STRING(100)) PRIMARY KEY (ID)"); + } + + @AfterClass + public static void tearDown() { + db.drop(); + } + + @Test + public void testReadWriteTransactionRunner_queryWithDirectedReadOptionsViaRequest_throwsError() { + // Directed Read Options set at an RPC level is not acceptable for RW transaction + + assumeFalse("Emulator does not support directed reads", isUsingEmulator()); + SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build(); + try (Spanner spanner = options.getService()) { + DatabaseClient client = spanner.getDatabaseClient(db.getId()); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> { + try (ResultSet resultSet = + transaction.executeQuery( + SELECT1, Options.directedRead(DIRECTED_READ_OPTIONS))) { + while (resultSet.next()) {} + } + return null; + })); + + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + assertTrue( + e.getMessage() + .contains("Directed reads can only be performed in a read-only transaction.")); + } + } + + @Test + public void testReadWriteTransactionRunner_readWithDirectedReadOptionsViaRequest_throwsError() { + // Directed Read Options set at an RPC level is not acceptable for RW transaction + + assumeFalse("Emulator does not support directed reads", isUsingEmulator()); + SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build(); + try (Spanner spanner = options.getService()) { + DatabaseClient client = spanner.getDatabaseClient(db.getId()); + TransactionRunner runner = client.readWriteTransaction(); + SpannerException e = + assertThrows( + SpannerException.class, + () -> + runner.run( + transaction -> { + try (ResultSet resultSet = + transaction.read( + "TEST", + KeySet.singleKey(Key.of(1L)), + Lists.newArrayList("NAME"), + Options.directedRead(DIRECTED_READ_OPTIONS))) { + while (resultSet.next()) {} + } + return null; + })); + + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + assertTrue( + e.getMessage() + .contains("Directed reads can only be performed in a read-only transaction.")); + } + } + + @Test + public void testReadWriteTransactionManager_readWithDirectedReadOptionsViaRequest_throwsError() { + // Directed Read Options set at an RPC level is not acceptable for RW transaction + + assumeFalse("Emulator does not support directed reads", isUsingEmulator()); + SpannerOptions options = env.getTestHelper().getOptions().toBuilder().build(); + try (Spanner spanner = options.getService()) { + DatabaseClient client = spanner.getDatabaseClient(db.getId()); + try (TransactionManager manager = client.transactionManager()) { + SpannerException e = + assertThrows( + SpannerException.class, + () -> { + TransactionContext transaction = manager.begin(); + try { + while (true) { + + ResultSet resultSet = + transaction.read( + "TEST", + KeySet.singleKey(Key.of(1L)), + Lists.newArrayList("NAME"), + Options.directedRead(DIRECTED_READ_OPTIONS)); + while (resultSet.next()) {} + + manager.commit(); + assertNotNull(manager.getCommitTimestamp()); + break; + } + } catch (AbortedException ex) { + transaction = manager.resetForRetry(); + } + }); + assertEquals(ErrorCode.INVALID_ARGUMENT, e.getErrorCode()); + assertTrue( + e.getMessage() + .contains("Directed reads can only be performed in a read-only transaction.")); + } + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java index c28b48c529..2d88899646 100644 --- a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/it/ITReadTest.java @@ -21,6 +21,9 @@ import static com.google.cloud.spanner.testing.EmulatorSpannerHelper.isUsingEmulator; import static com.google.common.truth.Truth.assertThat; import static com.google.common.truth.Truth.assertWithMessage; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import com.google.cloud.spanner.Database; @@ -42,6 +45,9 @@ import com.google.cloud.spanner.Type; import com.google.cloud.spanner.connection.ConnectionOptions; import com.google.cloud.spanner.testing.RemoteSpannerHelper; +import com.google.spanner.v1.DirectedReadOptions; +import com.google.spanner.v1.DirectedReadOptions.IncludeReplicas; +import com.google.spanner.v1.DirectedReadOptions.ReplicaSelection; import io.grpc.Context; import java.util.ArrayList; import java.util.Arrays; @@ -77,6 +83,17 @@ public class ITReadTest { private static final Type TABLE_TYPE = Type.struct( StructField.of("key", Type.string()), StructField.of("stringvalue", Type.string())); + private static DirectedReadOptions DIRECTED_READ_OPTIONS = + DirectedReadOptions.newBuilder() + .setIncludeReplicas( + IncludeReplicas.newBuilder() + .addReplicaSelections( + ReplicaSelection.newBuilder() + .setLocation("us-west1") + .setType(ReplicaSelection.Type.READ_ONLY) + .build()) + .setAutoFailoverDisabled(true)) + .build(); private static DatabaseClient googleStandardSQLClient; private static DatabaseClient postgreSQLClient; @@ -336,6 +353,23 @@ public void rowsAreSnapshots() { assertThat(rows.get(2).getString(1)).isEqualTo("v4"); } + @Test + public void pointReadWithDirectedReadOptions() { + try (ResultSet rs = + getClient(dialect.dialect) + .singleUse() + .read( + TABLE_NAME, + KeySet.singleKey(Key.of("k1")), + ALL_COLUMNS, + Options.directedRead(DIRECTED_READ_OPTIONS))) { + assertTrue(rs.next()); + assertEquals("k1", rs.getString(0)); + assertEquals("v1", rs.getString(1)); + assertFalse(rs.next()); + } + } + @Test public void invalidDatabase() { RemoteSpannerHelper helper = env.getTestHelper();