diff --git a/java-spanner/.gitignore b/java-spanner/.gitignore deleted file mode 100644 index 722d5e71d93c..000000000000 --- a/java-spanner/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.vscode diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 2638bf4b94aa..16d30e8415fa 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -466,7 +466,7 @@ ByteString getTransactionId() { public void close() { ByteString id = getTransactionId(); if (id != null && !id.isEmpty()) { - rpc.clearTransactionAffinity(id); + rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint)); } super.close(); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 5bd94da32cc6..0f1388c36f9c 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -161,7 +161,17 @@ public void close() { * Keeps track of which channels have been 'given' to single-use transactions for a given Spanner * instance. */ - private static final Map CHANNEL_USAGE = new HashMap<>(); + private static final class SharedChannelUsage { + private final BitSet channelUsage; + private int referenceCount; + + private SharedChannelUsage(int numChannels) { + this.channelUsage = new BitSet(numChannels); + this.referenceCount = 0; + } + } + + private static final Map CHANNEL_USAGE = new HashMap<>(); private static final EnumSet RETRYABLE_ERROR_CODES = EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE); @@ -182,6 +192,8 @@ public void close() { private final SessionClient sessionClient; + private final SpannerImpl spanner; + private final TraceWrapper tracer; /** The current multiplexed session that is used by this client. */ @@ -213,15 +225,20 @@ public void close() { @VisibleForTesting MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) { - this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); + this.spanner = sessionClient.getSpanner(); + this.numChannels = spanner.getOptions().getNumChannels(); synchronized (CHANNEL_USAGE) { - CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels)); - this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner()); + SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner); + if (sharedChannelUsage == null) { + sharedChannelUsage = new SharedChannelUsage(numChannels); + CHANNEL_USAGE.put(this.spanner, sharedChannelUsage); + } + sharedChannelUsage.referenceCount++; + this.channelUsage = sharedChannelUsage.channelUsage; } this.sessionExpirationDuration = Duration.ofMillis( - sessionClient - .getSpanner() + spanner .getOptions() .getSessionPoolOptions() .getMultiplexedSessionMaintenanceDuration() @@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() { } void close() { + boolean releaseChannelUsage = false; synchronized (this) { if (!this.isClosed) { this.isClosed = true; this.maintainer.stop(); + releaseChannelUsage = true; + } + } + if (releaseChannelUsage) { + synchronized (CHANNEL_USAGE) { + SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner); + if (sharedChannelUsage != null) { + sharedChannelUsage.referenceCount--; + if (sharedChannelUsage.referenceCount == 0) { + CHANNEL_USAGE.remove(this.spanner); + } + } } } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 8e4fc2458420..7c1b6be1c1bd 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -64,6 +64,7 @@ import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelBuilder; import com.google.cloud.grpc.GcpManagedChannelOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.fallback.GcpFallbackChannel; @@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc { private final boolean isGrpcGcpExtensionEnabled; private final boolean isDynamicChannelPoolEnabled; @Nullable private final KeyAwareChannel keyAwareChannel; + @Nullable private final GcpManagedChannel grpcGcpChannel; private final GrpcCallContext baseGrpcCallContext; @@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .build(); ClientContext clientContext = ClientContext.create(spannerStubSettings); this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel()); + this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel()); this.spannerStub = GrpcSpannerStubWithStubSettingsAndClientContext.create( spannerStubSettings, clientContext); @@ -540,6 +543,7 @@ public UnaryCallable createUnaryCalla } } else { this.keyAwareChannel = null; + this.grpcGcpChannel = null; this.databaseAdminStub = null; this.instanceAdminStub = null; this.spannerStub = null; @@ -589,6 +593,18 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport return null; } + @Nullable + private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) { + if (!(transportChannel instanceof GrpcTransportChannel)) { + return null; + } + Channel channel = ((GrpcTransportChannel) transportChannel).getChannel(); + if (channel instanceof GcpManagedChannel) { + return (GcpManagedChannel) channel; + } + return null; + } + @Override public void clearTransactionAffinity(ByteString transactionId) { if (keyAwareChannel != null) { @@ -596,6 +612,16 @@ public void clearTransactionAffinity(ByteString transactionId) { } } + @Override + public void clearTransactionAndChannelAffinity( + ByteString transactionId, @Nullable Long channelHint) { + if (keyAwareChannel != null) { + keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint); + return; + } + GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint); + } + private static String parseGrpcGcpApiConfig() { try { return Resources.toString( @@ -772,16 +798,35 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO } optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build()); - // Configure dynamic channel pool options if enabled. - // Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults - // or user-provided configuration. - if (options.isDynamicChannelPoolEnabled()) { - optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions()); + // Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are + // applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only + // propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling. + if (options.isGrpcGcpExtensionEnabled()) { + optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options)); } return optionsBuilder.build(); } + @VisibleForTesting + static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) { + GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions(); + if (options.isDynamicChannelPoolEnabled()) { + return channelPoolOptions; + } + + // When DCP is disabled, Spanner's numChannels should still produce a fixed grpc-gcp channel + // pool instead of only capping the pool's maximum size. + return GcpChannelPoolOptions.newBuilder() + .setMaxSize(options.getNumChannels()) + .setMinSize(options.getNumChannels()) + .setInitSize(options.getNumChannels()) + .disableDynamicScaling() + .setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime()) + .setCleanupInterval(channelPoolOptions.getCleanupInterval()) + .build(); + } + @SuppressWarnings("rawtypes") private static void maybeEnableGrpcGcpExtension( InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder, @@ -793,10 +838,6 @@ private static void maybeEnableGrpcGcpExtension( final String jsonApiConfig = parseGrpcGcpApiConfig(); final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options); - // When dynamic channel pool is enabled, use the DCP initial size as the pool size. - // When disabled, use the explicitly configured numChannels. - final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels(); - ApiFunction baseConfigurator = defaultChannelProviderBuilder.getChannelConfigurator(); ApiFunction apiFunction = @@ -804,10 +845,12 @@ private static void maybeEnableGrpcGcpExtension( if (baseConfigurator != null) { channelBuilder = baseConfigurator.apply(channelBuilder); } + // The grpc-gcp pool is configured entirely through GcpChannelPoolOptions above. Avoid + // the deprecated setPoolSize path, which only adjusts maxSize and does not eagerly create + // a fixed-size pool. return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder) .withApiConfigJsonString(jsonApiConfig) - .withOptions(grpcGcpOptions) - .setPoolSize(poolSize); + .withOptions(grpcGcpOptions); }; // Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1. @@ -2275,20 +2318,9 @@ GrpcCallContext newCallContext( Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options); if (affinity != null) { if (this.isGrpcGcpExtensionEnabled) { - // Set channel affinity in gRPC-GCP. - String affinityKey; - if (this.isDynamicChannelPoolEnabled) { - // When dynamic channel pooling is enabled, we use the raw affinity value as the key. - // This allows grpc-gcp to use round-robin for new keys, enabling new channels - // (created during scale-up) to receive requests. The affinity key lifetime setting - // ensures the affinity map doesn't grow unbounded. - affinityKey = String.valueOf(affinity); - } else { - // When DCP is disabled, compute bounded channel hint to prevent - // gRPC-GCP affinity map from getting unbounded. - int boundedChannelHint = affinity.intValue() % this.numChannels; - affinityKey = String.valueOf(boundedChannelHint); - } + // Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key. + // Cleanup is handled explicitly by unbind on terminal/single-use operations. + String affinityKey = String.valueOf(affinity); context = context.withCallOptions( context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey)); diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java new file mode 100644 index 000000000000..d4bb41755595 --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java @@ -0,0 +1,46 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.cloud.grpc.GcpManagedChannel; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.ResultSet; +import com.google.spanner.v1.SpannerGrpc; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import javax.annotation.Nullable; + +final class GrpcGcpAffinityUtil { + private GrpcGcpAffinityUtil() {} + + static void clearChannelHintAffinity( + @Nullable ManagedChannel channel, @Nullable Long channelHint) { + if (!(channel instanceof GcpManagedChannel) || channelHint == null) { + return; + } + // TODO: Replace this synthetic call once grpc-gcp exposes a direct API for unbinding + // affinity keys without creating and immediately cancelling a ClientCall. + ClientCall call = + channel.newCall( + SpannerGrpc.getExecuteSqlMethod(), + CallOptions.DEFAULT + .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) + .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); + call.cancel("Cloud Spanner transaction closed", null); + } +} diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index e8a10d5675bb..d7b32f72bcd6 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -304,6 +304,21 @@ void clearTransactionAffinity(ByteString transactionId) { clearAffinity(transactionId); } + void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) { + String address = transactionAffinities.remove(transactionId); + readOnlyTxPreferLeader.invalidate(transactionId); + if (channelHint != null) { + ManagedChannel channel = defaultChannel; + if (address != null) { + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint != null) { + channel = endpoint.getChannel(); + } + } + GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, channelHint); + } + } + private void maybeExcludeEndpointOnNextCall( @Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) { if (endpoint == null || logicalRequestKey == null) { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index d75bdfa89b0e..efb305301d85 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -199,6 +199,15 @@ default RequestIdCreator getRequestIdCreator() { /** Clears any client-side affinity associated with the given transaction id. */ default void clearTransactionAffinity(ByteString transactionId) {} + /** + * Clears any client-side transaction affinity and transport-level channel affinity associated + * with the given transaction. + */ + default void clearTransactionAndChannelAffinity( + ByteString transactionId, @Nullable Long channelHint) { + clearTransactionAffinity(transactionId); + } + // Instance admin APIs. Paginated listInstanceConfigs(int pageSize, @Nullable String pageToken) throws SpannerException; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java index 182c9cc35b5e..b3d71c4925bb 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java @@ -16,11 +16,11 @@ package com.google.cloud.spanner; -import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; @@ -37,12 +37,19 @@ import io.grpc.ServerInterceptor; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Deque; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.junit.After; @@ -96,8 +103,7 @@ public static Collection data() { private static MockSpannerServiceImpl mockSpanner; private static Server server; private static InetSocketAddress address; - // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method - private static final Set batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet(); + // Track channel hints (from X-Goog-Spanner-Request-Id header) for ExecuteStreamingSql. private static final Set executeSqlChannelHints = ConcurrentHashMap.newKeySet(); private static final Deque allExecuteSqlChannelHints = new ConcurrentLinkedDeque<>(); @@ -128,7 +134,7 @@ public ServerCall.Listener interceptCall( headers.get( Metadata.Key.of( "x-response-encoding", Metadata.ASCII_STRING_MARSHALLER))); - // Extract channel hint from X-Goog-Spanner-Request-Id header + // Extract channel hint from X-Goog-Spanner-Request-Id header. String requestId = headers.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY); if (requestId != null) { // Format: @@ -137,10 +143,6 @@ public ServerCall.Listener interceptCall( if (parts.length >= 4) { try { long channelHint = Long.parseLong(parts[3]); - if (call.getMethodDescriptor() - .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { - batchCreateSessionChannelHints.add(channelHint); - } if (call.getMethodDescriptor() .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { executeSqlChannelHints.add(channelHint); @@ -180,7 +182,6 @@ public static void resetLogging() { @After public void reset() { mockSpanner.reset(); - batchCreateSessionChannelHints.clear(); executeSqlChannelHints.clear(); allExecuteSqlChannelHints.clear(); } @@ -209,38 +210,57 @@ private SpannerOptions createSpannerOptions() { } @Test - public void testUsesAllChannels() throws InterruptedException { - final int multiplier = 10; + public void testUsesAllChannels() throws Exception { + final int multiplier = 25; + final int concurrentTransactions = numChannels * multiplier; + // grpc-gcp assigns new manual affinity keys to the least-busy channel. Sequential + // read-only transactions tend to keep picking the same idle channel, so keep reads + // overlapping to verify distribution across the fixed-size pool. + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime(500, 0)); + try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - for (int run = 0; run < numChannels * multiplier; run++) { - try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { - for (int i = 0; i < 2; i++) { - try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { - while (resultSet.next()) {} - } - } + ExecutorService executor = Executors.newFixedThreadPool(concurrentTransactions); + CountDownLatch ready = new CountDownLatch(concurrentTransactions); + CountDownLatch start = new CountDownLatch(1); + List> futures = new ArrayList<>(concurrentTransactions); + try { + for (int run = 0; run < concurrentTransactions; run++) { + futures.add( + executor.submit( + () -> { + ready.countDown(); + start.await(); + try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + } + return null; + })); + } + assertTrue( + "Timed out while preparing concurrent transactions", ready.await(10, TimeUnit.SECONDS)); + start.countDown(); + for (Future future : futures) { + future.get(20, TimeUnit.SECONDS); } + } finally { + executor.shutdownNow(); + executor.awaitTermination(10, TimeUnit.SECONDS); } } - // Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify - // that channels are being distributed. The raw channel hints may be unbounded (based on - // session index), but gRPC-GCP bounds them to the actual number of channels. - assertEquals(2 * numChannels * multiplier, allExecuteSqlChannelHints.size()); - Set boundedChannelHints = - executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet()); - // Verify that channel distribution is working: - // - For numChannels=1, exactly 1 channel should be used - // - For numChannels>1, multiple channels should be used (at least half) + assertEquals(concurrentTransactions, allExecuteSqlChannelHints.size()); if (numChannels == 1) { - assertEquals(1, boundedChannelHints.size()); + assertEquals(1, executeSqlChannelHints.size()); } else { assertTrue( "Expected at least " + (numChannels / 2) + " channels to be used, but got " - + boundedChannelHints.size(), - boundedChannelHints.size() >= numChannels / 2); + + executeSqlChannelHints.size(), + executeSqlChannelHints.size() >= numChannels / 2); } } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java index af178a4f1790..d398a78643aa 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java @@ -29,6 +29,8 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; +import com.google.cloud.NoCredentials; +import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionClient.SessionConsumer; import java.io.PrintWriter; import java.io.StringWriter; @@ -37,8 +39,12 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -46,6 +52,10 @@ @RunWith(JUnit4.class) public class MultiplexedSessionDatabaseClientTest { + @After + public void tearDown() throws Exception { + clearChannelUsage(); + } @Test public void testMaintainer() { @@ -302,6 +312,73 @@ public void testGrpcGcpSingleUseDoesNotReserveBitsetChannelHint() throws Excepti assertEquals(0, counter.get()); } + @Test + public void testCloseRemovesChannelUsageEntryWhenLastClientCloses() throws Exception { + try (SpannerImpl spanner = createTestSpanner(); + SessionClient sessionClient = createSessionClient(spanner)) { + MultiplexedSessionDatabaseClient client = + new MultiplexedSessionDatabaseClient(sessionClient, Clock.systemUTC()); + + assertEquals(1, getChannelUsage().size()); + + client.close(); + + assertEquals(0, getChannelUsage().size()); + } + } + + @Test + public void testCloseKeepsChannelUsageEntryWhileAnotherClientIsUsingSameSpanner() + throws Exception { + try (SpannerImpl spanner = createTestSpanner(); + SessionClient firstSessionClient = createSessionClient(spanner); + SessionClient secondSessionClient = createSessionClient(spanner)) { + MultiplexedSessionDatabaseClient firstClient = + new MultiplexedSessionDatabaseClient(firstSessionClient, Clock.systemUTC()); + MultiplexedSessionDatabaseClient secondClient = + new MultiplexedSessionDatabaseClient(secondSessionClient, Clock.systemUTC()); + + assertEquals(1, getChannelUsage().size()); + + firstClient.close(); + assertEquals(1, getChannelUsage().size()); + + secondClient.close(); + assertEquals(0, getChannelUsage().size()); + } + } + + private SessionClient createSessionClient(SpannerImpl spanner) { + return new FailingMultiplexedSessionClient(spanner); + } + + private SpannerImpl createTestSpanner() { + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMultiplexedSessionMaintenanceDuration(Duration.ofDays(7)) + .setWaitForMinSessionsDuration(Duration.ZERO) + .build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .setNumChannels(4) + .setSessionPoolOption(sessionPoolOptions) + .build(); + return new SpannerImpl(options); + } + + @SuppressWarnings("unchecked") + private Map getChannelUsage() throws Exception { + Field field = MultiplexedSessionDatabaseClient.class.getDeclaredField("CHANNEL_USAGE"); + field.setAccessible(true); + return (Map) field.get(null); + } + + private void clearChannelUsage() throws Exception { + getChannelUsage().clear(); + } + private boolean isJava8() { return JavaVersionUtil.getJavaMajorVersion() == 8; } @@ -309,4 +386,37 @@ private boolean isJava8() { private boolean isWindows() { return System.getProperty("os.name").toLowerCase().contains("windows"); } + + private static final class TestExecutorFactory + implements ExecutorFactory { + @Override + public ScheduledExecutorService get() { + return Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void release(ScheduledExecutorService executor) { + executor.shutdown(); + try { + executor.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static final class FailingMultiplexedSessionClient extends SessionClient { + private static final DatabaseId TEST_DATABASE_ID = + DatabaseId.of("test-project", "test-instance", "test-database"); + + private FailingMultiplexedSessionClient(SpannerImpl spanner) { + super(spanner, TEST_DATABASE_ID, new TestExecutorFactory()); + } + + @Override + void asyncCreateMultiplexedSession(SessionConsumer consumer) { + consumer.onSessionCreateFailure( + SpannerExceptionFactory.newSpannerException(ErrorCode.UNAUTHENTICATED, "test"), 1); + } + } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index a3273c2a6aa0..8c110c125c99 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -284,7 +284,7 @@ public void testNoNetworkConnection() { return null; }); - Spanner spanner = + try (Spanner spanner = builder .setProjectId("test-project") .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) @@ -303,46 +303,52 @@ public void testNoNetworkConnection() { .setBuiltInMetricsEnabled(false) .setApiTracerFactory(createMetricsTracerFactory()) .build() - .getService(); - String instance = "i"; - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d")); - - // Using this client will return UNAVAILABLE, as the server is not reachable and we have - // disabled retries. - SpannerException exception = - assertThrows( - SpannerException.class, () -> client.singleUse().executeQuery(SELECT_RANDOM).next()); - assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode()); - - Attributes expectedAttributesCreateSessionOK = - expectedCommonBaseAttributes.toBuilder() - .put(BuiltInMetricsConstant.STATUS_KEY, "OK") - .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") - // Include the additional attributes that are added by the HeaderInterceptor in the - // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never - // leaves the client. - .build(); - - Attributes expectedAttributesCreateSessionFailed = - expectedCommonBaseAttributes.toBuilder() - .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE") - .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") - // Include the additional attributes that are added by the HeaderInterceptor in the - // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never - // leaves the client. - .build(); - - MetricData attemptCountMetricData = - getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); - assertNotNull(attemptCountMetricData); - - // Attempt count should have a failed metric point for CreateSession. - assertEquals( - 1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0); - assertTrue( - checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); - assertTrue( - checkIfMetricExists(metricReader, BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME)); + .getService()) { + String instance = "i"; + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d")); + + // Using this client will return UNAVAILABLE, as the server is not reachable and we have + // disabled retries. + SpannerException exception = + assertThrows( + SpannerException.class, () -> client.singleUse().executeQuery(SELECT_RANDOM).next()); + assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode()); + + Attributes expectedAttributesCreateSessionOK = + expectedCommonBaseAttributes.toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "OK") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") + // Include the additional attributes that are added by the HeaderInterceptor in the + // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request + // never leaves the client. + .build(); + + Attributes expectedAttributesCreateSessionFailed = + expectedCommonBaseAttributes.toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") + // Include the additional attributes that are added by the HeaderInterceptor in the + // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request + // never leaves the client. + .build(); + + MetricData attemptCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertNotNull(attemptCountMetricData); + + // Multiplexed session creation may continue retrying in the background, so the exact number + // of failed CreateSession attempts is timing-dependent. The request should never succeed, + // and we should observe at least one failed CreateSession attempt. + assertEquals( + 0, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionOK), 0); + assertTrue( + getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed) >= 1.0); + assertTrue( + checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); + assertTrue( + checkIfMetricExists(metricReader, BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME)); + } } @Test diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 2b22a8c1d6b1..95e81cf7e6e5 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -379,6 +379,34 @@ public void multiUseReadOnlyTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); } + @SuppressWarnings("unchecked") + @Test + public void multiUseReadOnlyTransactionCloseClearsGrpcGcpAffinityWhenEnabled() + throws ParseException { + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + ArgumentCaptor> beginOptionsCaptor = + ArgumentCaptor.forClass((Class) Map.class); + Transaction txnMetadata = + Transaction.newBuilder() + .setId(ByteString.copyFromUtf8("x")) + .setReadTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z")) + .build(); + Mockito.when(rpc.beginTransaction(Mockito.any(), beginOptionsCaptor.capture(), eq(false))) + .thenReturn(txnMetadata); + mockRead( + PartialResultSet.newBuilder() + .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) + .build()); + + ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); + txn.readRow("Dummy", Key.of(), Collections.singletonList("C")); + txn.close(); + + Long channelHint = SpannerRpc.Option.CHANNEL_HINT.getLong(beginOptionsCaptor.getValue()); + Mockito.verify(rpc) + .clearTransactionAndChannelAffinity(ByteString.copyFromUtf8("x"), channelHint); + } + @SuppressWarnings("unchecked") @Test public void readWriteTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index f7b3b657d704..165557608ac3 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -40,7 +40,9 @@ import com.google.auth.oauth2.OAuth2Credentials; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; +import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.fallback.GcpFallbackChannelOptions; import com.google.cloud.grpc.fallback.GcpFallbackOpenTelemetry; @@ -73,6 +75,8 @@ import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; import com.google.spanner.v1.TypeCode; +import io.grpc.CallOptions; +import io.grpc.ClientCall; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ManagedChannelBuilder; @@ -115,6 +119,8 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; @RunWith(Parameterized.class) public class GapicSpannerRpcTest { @@ -449,6 +455,78 @@ public void testNewCallContextWithNullRequestAndNullMethod() { rpc.shutdown(); } + @Test + public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithoutDcp() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .enableGrpcGcpExtension() + .disableDynamicChannelPool() + .setNumChannels(4) + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + Map grpcGcpOptions = new HashMap<>(); + grpcGcpOptions.put(Option.CHANNEL_HINT, 7L); + grpcGcpOptions.put(Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); + + GrpcCallContext callContext = + rpc.newCallContext( + grpcGcpOptions, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod()); + + assertEquals("7", callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertEquals( + Boolean.TRUE, + callContext.getCallOptions().getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY)); + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithDcp() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .enableGrpcGcpExtension() + .enableDynamicChannelPool() + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + Map grpcGcpOptions = new HashMap<>(); + grpcGcpOptions.put(Option.CHANNEL_HINT, 7L); + + GrpcCallContext callContext = + rpc.newCallContext( + grpcGcpOptions, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod()); + + assertEquals("7", callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + rpc.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void testClearChannelHintAffinityCancelsSyntheticGrpcGcpCall() { + GcpManagedChannel channel = Mockito.mock(GcpManagedChannel.class); + ClientCall call = + Mockito.mock(ClientCall.class); + ArgumentCaptor callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class); + Mockito.when( + channel.newCall( + Mockito.eq(SpannerGrpc.getExecuteSqlMethod()), callOptionsCaptor.capture())) + .thenReturn(call); + + GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, 7L); + + assertEquals("7", callOptionsCaptor.getValue().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertEquals( + Boolean.TRUE, + callOptionsCaptor.getValue().getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY)); + Mockito.verify(call).cancel("Cloud Spanner transaction closed", null); + } + @Test public void testNewCallContextWithRouteToLeaderHeader() { SpannerOptions options = @@ -1030,6 +1108,83 @@ public void testGrpcGcpOtelMetricsDisabledSkipsMeterInjection() throws Exception assertNull(metricsOptions.getOpenTelemetryMeter()); } + @Test + public void testGrpcGcpOptionsIncludeStaticChannelPoolSettingsWithoutDcp() throws Exception { + Duration affinityKeyLifetime = Duration.ofMinutes(10); + Duration cleanupInterval = Duration.ofMinutes(5); + GcpChannelPoolOptions channelPoolOptions = + GcpChannelPoolOptions.newBuilder() + .setAffinityKeyLifetime(affinityKeyLifetime) + .setCleanupInterval(cleanupInterval) + .build(); + int numChannels = 7; + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .enableGrpcGcpExtension() + .disableDynamicChannelPool() + .setNumChannels(numChannels) + .setGcpChannelPoolOptions(channelPoolOptions) + .build(); + + java.lang.reflect.Method method = + GapicSpannerRpc.class.getDeclaredMethod( + "grpcGcpOptionsWithMetricsAndDcp", SpannerOptions.class); + method.setAccessible(true); + GcpManagedChannelOptions grpcGcpOptions = + (GcpManagedChannelOptions) method.invoke(null, options); + + assertEquals(numChannels, grpcGcpOptions.getChannelPoolOptions().getMaxSize()); + assertEquals(numChannels, grpcGcpOptions.getChannelPoolOptions().getMinSize()); + assertEquals(numChannels, grpcGcpOptions.getChannelPoolOptions().getInitSize()); + assertEquals( + affinityKeyLifetime, grpcGcpOptions.getChannelPoolOptions().getAffinityKeyLifetime()); + assertEquals(cleanupInterval, grpcGcpOptions.getChannelPoolOptions().getCleanupInterval()); + assertEquals(0, grpcGcpOptions.getChannelPoolOptions().getMinRpcPerChannel()); + assertEquals(0, grpcGcpOptions.getChannelPoolOptions().getMaxRpcPerChannel()); + assertEquals(Duration.ZERO, grpcGcpOptions.getChannelPoolOptions().getScaleDownInterval()); + } + + @Test + public void testGrpcGcpOptionsRetainDynamicChannelPoolSettingsWithDcp() throws Exception { + Duration affinityKeyLifetime = Duration.ofMinutes(10); + Duration cleanupInterval = Duration.ofMinutes(5); + Duration scaleDownInterval = Duration.ofMinutes(3); + GcpChannelPoolOptions channelPoolOptions = + GcpChannelPoolOptions.newBuilder() + .setInitSize(6) + .setMaxSize(15) + .setMinSize(3) + .setDynamicScaling(10, 50, scaleDownInterval) + .setAffinityKeyLifetime(affinityKeyLifetime) + .setCleanupInterval(cleanupInterval) + .build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .enableGrpcGcpExtension() + .enableDynamicChannelPool() + .setGcpChannelPoolOptions(channelPoolOptions) + .build(); + + java.lang.reflect.Method method = + GapicSpannerRpc.class.getDeclaredMethod( + "grpcGcpOptionsWithMetricsAndDcp", SpannerOptions.class); + method.setAccessible(true); + GcpManagedChannelOptions grpcGcpOptions = + (GcpManagedChannelOptions) method.invoke(null, options); + + assertEquals(6, grpcGcpOptions.getChannelPoolOptions().getInitSize()); + assertEquals(15, grpcGcpOptions.getChannelPoolOptions().getMaxSize()); + assertEquals(3, grpcGcpOptions.getChannelPoolOptions().getMinSize()); + assertEquals(10, grpcGcpOptions.getChannelPoolOptions().getMinRpcPerChannel()); + assertEquals(50, grpcGcpOptions.getChannelPoolOptions().getMaxRpcPerChannel()); + assertEquals(scaleDownInterval, grpcGcpOptions.getChannelPoolOptions().getScaleDownInterval()); + assertEquals( + affinityKeyLifetime, grpcGcpOptions.getChannelPoolOptions().getAffinityKeyLifetime()); + assertEquals(cleanupInterval, grpcGcpOptions.getChannelPoolOptions().getCleanupInterval()); + } + @Test public void testBuiltInMetricsDisabledSkipsGrpcBuiltInMetricsConfigurator() { try {