From c94b6d422655f13e4bd8b1dfe1494e307a16976b Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 13:14:56 +0530 Subject: [PATCH 1/7] fix(spanner): fix grpc-gcp affinity cleanup and multiplexed channel usage leaks --- .../cloud/spanner/AbstractReadContext.java | 2 +- .../MultiplexedSessionDatabaseClient.java | 42 ++++- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 83 +++++++--- .../cloud/spanner/spi/v1/KeyAwareChannel.java | 30 ++++ .../cloud/spanner/spi/v1/SpannerRpc.java | 9 ++ .../MultiplexedSessionDatabaseClientTest.java | 82 ++++++++++ .../google/cloud/spanner/SessionImplTest.java | 28 ++++ .../spanner/spi/v1/GapicSpannerRpcTest.java | 150 ++++++++++++++++++ 8 files changed, 400 insertions(+), 26 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java index 856f876a14de..84af890b69b5 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/AbstractReadContext.java @@ -425,7 +425,7 @@ ByteString getTransactionId() { public void close() { ByteString id = getTransactionId(); if (id != null && !id.isEmpty()) { - rpc.clearTransactionAffinity(id); + rpc.clearTransactionAndChannelAffinity(id, Option.CHANNEL_HINT.getLong(channelHint)); } super.close(); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java index 5bd94da32cc6..0f1388c36f9c 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java @@ -161,7 +161,17 @@ public void close() { * Keeps track of which channels have been 'given' to single-use transactions for a given Spanner * instance. */ - private static final Map CHANNEL_USAGE = new HashMap<>(); + private static final class SharedChannelUsage { + private final BitSet channelUsage; + private int referenceCount; + + private SharedChannelUsage(int numChannels) { + this.channelUsage = new BitSet(numChannels); + this.referenceCount = 0; + } + } + + private static final Map CHANNEL_USAGE = new HashMap<>(); private static final EnumSet RETRYABLE_ERROR_CODES = EnumSet.of(ErrorCode.DEADLINE_EXCEEDED, ErrorCode.RESOURCE_EXHAUSTED, ErrorCode.UNAVAILABLE); @@ -182,6 +192,8 @@ public void close() { private final SessionClient sessionClient; + private final SpannerImpl spanner; + private final TraceWrapper tracer; /** The current multiplexed session that is used by this client. */ @@ -213,15 +225,20 @@ public void close() { @VisibleForTesting MultiplexedSessionDatabaseClient(SessionClient sessionClient, Clock clock) { - this.numChannels = sessionClient.getSpanner().getOptions().getNumChannels(); + this.spanner = sessionClient.getSpanner(); + this.numChannels = spanner.getOptions().getNumChannels(); synchronized (CHANNEL_USAGE) { - CHANNEL_USAGE.putIfAbsent(sessionClient.getSpanner(), new BitSet(numChannels)); - this.channelUsage = CHANNEL_USAGE.get(sessionClient.getSpanner()); + SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner); + if (sharedChannelUsage == null) { + sharedChannelUsage = new SharedChannelUsage(numChannels); + CHANNEL_USAGE.put(this.spanner, sharedChannelUsage); + } + sharedChannelUsage.referenceCount++; + this.channelUsage = sharedChannelUsage.channelUsage; } this.sessionExpirationDuration = Duration.ofMillis( - sessionClient - .getSpanner() + spanner .getOptions() .getSessionPoolOptions() .getMultiplexedSessionMaintenanceDuration() @@ -354,10 +371,23 @@ AtomicLong getNumSessionsReleased() { } void close() { + boolean releaseChannelUsage = false; synchronized (this) { if (!this.isClosed) { this.isClosed = true; this.maintainer.stop(); + releaseChannelUsage = true; + } + } + if (releaseChannelUsage) { + synchronized (CHANNEL_USAGE) { + SharedChannelUsage sharedChannelUsage = CHANNEL_USAGE.get(this.spanner); + if (sharedChannelUsage != null) { + sharedChannelUsage.referenceCount--; + if (sharedChannelUsage.referenceCount == 0) { + CHANNEL_USAGE.remove(this.spanner); + } + } } } } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 8e4fc2458420..7a2f716bfe12 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -64,6 +64,7 @@ import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelBuilder; import com.google.cloud.grpc.GcpManagedChannelOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.grpc.fallback.GcpFallbackChannel; @@ -301,6 +302,7 @@ public class GapicSpannerRpc implements SpannerRpc { private final boolean isGrpcGcpExtensionEnabled; private final boolean isDynamicChannelPoolEnabled; @Nullable private final KeyAwareChannel keyAwareChannel; + @Nullable private final GcpManagedChannel grpcGcpChannel; private final GrpcCallContext baseGrpcCallContext; @@ -420,6 +422,7 @@ public GapicSpannerRpc(final SpannerOptions options) { .build(); ClientContext clientContext = ClientContext.create(spannerStubSettings); this.keyAwareChannel = extractKeyAwareChannel(clientContext.getTransportChannel()); + this.grpcGcpChannel = extractGrpcGcpChannel(clientContext.getTransportChannel()); this.spannerStub = GrpcSpannerStubWithStubSettingsAndClientContext.create( spannerStubSettings, clientContext); @@ -540,6 +543,7 @@ public UnaryCallable createUnaryCalla } } else { this.keyAwareChannel = null; + this.grpcGcpChannel = null; this.databaseAdminStub = null; this.instanceAdminStub = null; this.spannerStub = null; @@ -589,6 +593,18 @@ private static KeyAwareChannel extractKeyAwareChannel(TransportChannel transport return null; } + @Nullable + private static GcpManagedChannel extractGrpcGcpChannel(TransportChannel transportChannel) { + if (!(transportChannel instanceof GrpcTransportChannel)) { + return null; + } + Channel channel = ((GrpcTransportChannel) transportChannel).getChannel(); + if (channel instanceof GcpManagedChannel) { + return (GcpManagedChannel) channel; + } + return null; + } + @Override public void clearTransactionAffinity(ByteString transactionId) { if (keyAwareChannel != null) { @@ -596,6 +612,32 @@ public void clearTransactionAffinity(ByteString transactionId) { } } + @Override + public void clearTransactionAndChannelAffinity( + ByteString transactionId, @Nullable Long channelHint) { + if (keyAwareChannel != null) { + keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint); + return; + } + clearTransactionAffinity(transactionId); + clearChannelHintAffinity(grpcGcpChannel, channelHint); + } + + @VisibleForTesting + static void clearChannelHintAffinity( + @Nullable ManagedChannel channel, @Nullable Long channelHint) { + if (!(channel instanceof GcpManagedChannel) || channelHint == null) { + return; + } + ClientCall call = + channel.newCall( + SpannerGrpc.getExecuteSqlMethod(), + CallOptions.DEFAULT + .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) + .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); + call.cancel("Cloud Spanner transaction closed", null); + } + private static String parseGrpcGcpApiConfig() { try { return Resources.toString( @@ -772,16 +814,30 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerO } optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build()); - // Configure dynamic channel pool options if enabled. - // Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults - // or user-provided configuration. - if (options.isDynamicChannelPoolEnabled()) { - optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions()); + // Always pass channel-pool options when grpc-gcp is enabled so affinity cleanup settings are + // applied regardless of whether dynamic channel pool is enabled. In the non-DCP path, only + // propagate the affinity cleanup configuration to avoid implicitly turning on dynamic scaling. + if (options.isGrpcGcpExtensionEnabled()) { + optionsBuilder.withChannelPoolOptions(getGrpcGcpChannelPoolOptions(options)); } return optionsBuilder.build(); } + @VisibleForTesting + static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options) { + GcpChannelPoolOptions channelPoolOptions = options.getGcpChannelPoolOptions(); + if (options.isDynamicChannelPoolEnabled()) { + return channelPoolOptions; + } + + return GcpChannelPoolOptions.newBuilder() + .disableDynamicScaling() + .setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime()) + .setCleanupInterval(channelPoolOptions.getCleanupInterval()) + .build(); + } + @SuppressWarnings("rawtypes") private static void maybeEnableGrpcGcpExtension( InstantiatingGrpcChannelProvider.Builder defaultChannelProviderBuilder, @@ -2275,20 +2331,9 @@ GrpcCallContext newCallContext( Long affinity = options == null ? null : Option.CHANNEL_HINT.getLong(options); if (affinity != null) { if (this.isGrpcGcpExtensionEnabled) { - // Set channel affinity in gRPC-GCP. - String affinityKey; - if (this.isDynamicChannelPoolEnabled) { - // When dynamic channel pooling is enabled, we use the raw affinity value as the key. - // This allows grpc-gcp to use round-robin for new keys, enabling new channels - // (created during scale-up) to receive requests. The affinity key lifetime setting - // ensures the affinity map doesn't grow unbounded. - affinityKey = String.valueOf(affinity); - } else { - // When DCP is disabled, compute bounded channel hint to prevent - // gRPC-GCP affinity map from getting unbounded. - int boundedChannelHint = affinity.intValue() % this.numChannels; - affinityKey = String.valueOf(boundedChannelHint); - } + // Set channel affinity in gRPC-GCP. Always use the raw affinity value as the key. + // Cleanup is handled explicitly by unbind on terminal/single-use operations. + String affinityKey = String.valueOf(affinity); context = context.withCallOptions( context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey)); diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index b6c22ad4e3f8..4ada7cd5c446 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -20,6 +20,7 @@ import com.google.api.core.InternalApi; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -32,6 +33,7 @@ import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.RollbackRequest; +import com.google.spanner.v1.SpannerGrpc; import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TransactionSelector; import io.grpc.CallOptions; @@ -294,6 +296,34 @@ void clearTransactionAffinity(ByteString transactionId) { clearAffinity(transactionId); } + void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) { + if (channelHint != null) { + ManagedChannel channel = defaultChannel; + String address = transactionAffinities.get(transactionId); + if (address != null) { + ChannelEndpoint endpoint = endpointCache.getIfPresent(address); + if (endpoint != null) { + channel = endpoint.getChannel(); + } + } + clearChannelHintAffinity(channel, channelHint); + } + clearAffinity(transactionId); + } + + private static void clearChannelHintAffinity(ManagedChannel channel, long channelHint) { + if (!(channel instanceof GcpManagedChannel)) { + return; + } + ClientCall call = + channel.newCall( + SpannerGrpc.getExecuteSqlMethod(), + CallOptions.DEFAULT + .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) + .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); + call.cancel("Cloud Spanner transaction closed", null); + } + private void maybeExcludeEndpointOnNextCall( @Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) { if (endpoint == null || logicalRequestKey == null) { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java index d75bdfa89b0e..efb305301d85 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/SpannerRpc.java @@ -199,6 +199,15 @@ default RequestIdCreator getRequestIdCreator() { /** Clears any client-side affinity associated with the given transaction id. */ default void clearTransactionAffinity(ByteString transactionId) {} + /** + * Clears any client-side transaction affinity and transport-level channel affinity associated + * with the given transaction. + */ + default void clearTransactionAndChannelAffinity( + ByteString transactionId, @Nullable Long channelHint) { + clearTransactionAffinity(transactionId); + } + // Instance admin APIs. Paginated listInstanceConfigs(int pageSize, @Nullable String pageToken) throws SpannerException; diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java index af178a4f1790..8c0eb2db976f 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java @@ -39,6 +39,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import org.junit.After; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -46,6 +47,10 @@ @RunWith(JUnit4.class) public class MultiplexedSessionDatabaseClientTest { + @After + public void tearDown() throws Exception { + clearChannelUsage(); + } @Test public void testMaintainer() { @@ -302,6 +307,83 @@ public void testGrpcGcpSingleUseDoesNotReserveBitsetChannelHint() throws Excepti assertEquals(0, counter.get()); } + @Test + public void testCloseRemovesChannelUsageEntryWhenLastClientCloses() throws Exception { + SessionClient sessionClient = createSessionClient(); + + MultiplexedSessionDatabaseClient client = + new MultiplexedSessionDatabaseClient(sessionClient, Clock.systemUTC()); + + assertEquals(1, getChannelUsage().size()); + + client.close(); + + assertEquals(0, getChannelUsage().size()); + } + + @Test + public void testCloseKeepsChannelUsageEntryWhileAnotherClientIsUsingSameSpanner() + throws Exception { + SpannerImpl spanner = mock(SpannerImpl.class); + SessionClient firstSessionClient = createSessionClient(spanner); + SessionClient secondSessionClient = createSessionClient(spanner); + + MultiplexedSessionDatabaseClient firstClient = + new MultiplexedSessionDatabaseClient(firstSessionClient, Clock.systemUTC()); + MultiplexedSessionDatabaseClient secondClient = + new MultiplexedSessionDatabaseClient(secondSessionClient, Clock.systemUTC()); + + assertEquals(1, getChannelUsage().size()); + + firstClient.close(); + assertEquals(1, getChannelUsage().size()); + + secondClient.close(); + assertEquals(0, getChannelUsage().size()); + } + + private SessionClient createSessionClient() { + return createSessionClient(mock(SpannerImpl.class)); + } + + private SessionClient createSessionClient(SpannerImpl spanner) { + SessionClient sessionClient = mock(SessionClient.class); + SpannerOptions spannerOptions = mock(SpannerOptions.class); + SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class); + + when(sessionClient.getSpanner()).thenReturn(spanner); + when(spanner.getOptions()).thenReturn(spannerOptions); + when(spannerOptions.getNumChannels()).thenReturn(4); + when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); + when(sessionPoolOptions.getMultiplexedSessionMaintenanceDuration()) + .thenReturn(Duration.ofDays(7)); + when(sessionPoolOptions.getWaitForMinSessions()).thenReturn(Duration.ZERO); + doAnswer( + (Answer) + invocationOnMock -> { + SessionConsumer consumer = invocationOnMock.getArgument(0); + consumer.onSessionCreateFailure( + SpannerExceptionFactory.newSpannerException( + ErrorCode.UNAUTHENTICATED, "test"), + 1); + return null; + }) + .when(sessionClient) + .asyncCreateMultiplexedSession(any(SessionConsumer.class)); + return sessionClient; + } + + @SuppressWarnings("unchecked") + private Map getChannelUsage() throws Exception { + Field field = MultiplexedSessionDatabaseClient.class.getDeclaredField("CHANNEL_USAGE"); + field.setAccessible(true); + return (Map) field.get(null); + } + + private void clearChannelUsage() throws Exception { + getChannelUsage().clear(); + } + private boolean isJava8() { return JavaVersionUtil.getJavaMajorVersion() == 8; } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java index 2b22a8c1d6b1..95e81cf7e6e5 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/SessionImplTest.java @@ -379,6 +379,34 @@ public void multiUseReadOnlyTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() assertThat(beginOptions).containsKey(SpannerRpc.Option.CHANNEL_HINT); } + @SuppressWarnings("unchecked") + @Test + public void multiUseReadOnlyTransactionCloseClearsGrpcGcpAffinityWhenEnabled() + throws ParseException { + when(spannerOptions.isGrpcGcpExtensionEnabled()).thenReturn(true); + ArgumentCaptor> beginOptionsCaptor = + ArgumentCaptor.forClass((Class) Map.class); + Transaction txnMetadata = + Transaction.newBuilder() + .setId(ByteString.copyFromUtf8("x")) + .setReadTimestamp(Timestamps.parse("2015-10-01T10:54:20.021Z")) + .build(); + Mockito.when(rpc.beginTransaction(Mockito.any(), beginOptionsCaptor.capture(), eq(false))) + .thenReturn(txnMetadata); + mockRead( + PartialResultSet.newBuilder() + .setMetadata(newMetadata(Type.struct(Type.StructField.of("C", Type.string())))) + .build()); + + ReadOnlyTransaction txn = session.readOnlyTransaction(TimestampBound.strong()); + txn.readRow("Dummy", Key.of(), Collections.singletonList("C")); + txn.close(); + + Long channelHint = SpannerRpc.Option.CHANNEL_HINT.getLong(beginOptionsCaptor.getValue()); + Mockito.verify(rpc) + .clearTransactionAndChannelAffinity(ByteString.copyFromUtf8("x"), channelHint); + } + @SuppressWarnings("unchecked") @Test public void readWriteTransactionUsesRandomChannelHintWhenGrpcGcpEnabled() { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 7a6a9f78d4b5..6566b6067dd1 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -40,7 +40,9 @@ import com.google.auth.oauth2.OAuth2Credentials; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; +import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.grpc.GcpManagedChannelOptions; +import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions; import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions; import com.google.cloud.grpc.fallback.GcpFallbackChannelOptions; import com.google.cloud.grpc.fallback.GcpFallbackOpenTelemetry; @@ -73,6 +75,8 @@ import com.google.spanner.v1.StructType; import com.google.spanner.v1.StructType.Field; import com.google.spanner.v1.TypeCode; +import io.grpc.CallOptions; +import io.grpc.ClientCall; import io.grpc.Context; import io.grpc.Contexts; import io.grpc.ManagedChannelBuilder; @@ -115,6 +119,8 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; @RunWith(Parameterized.class) public class GapicSpannerRpcTest { @@ -449,6 +455,78 @@ public void testNewCallContextWithNullRequestAndNullMethod() { rpc.shutdown(); } + @Test + public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithoutDcp() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .enableGrpcGcpExtension() + .disableDynamicChannelPool() + .setNumChannels(4) + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + Map grpcGcpOptions = new HashMap<>(); + grpcGcpOptions.put(Option.CHANNEL_HINT, 7L); + grpcGcpOptions.put(Option.UNBIND_CHANNEL_HINT, Boolean.TRUE); + + GrpcCallContext callContext = + rpc.newCallContext( + grpcGcpOptions, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod()); + + assertEquals("7", callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertEquals( + Boolean.TRUE, + callContext.getCallOptions().getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY)); + rpc.shutdown(); + } + + @Test + public void testNewCallContextWithGrpcGcpUsesRawAffinityKeyWithDcp() { + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("some-project") + .enableGrpcGcpExtension() + .enableDynamicChannelPool() + .build(); + GapicSpannerRpc rpc = new GapicSpannerRpc(options, false); + Map grpcGcpOptions = new HashMap<>(); + grpcGcpOptions.put(Option.CHANNEL_HINT, 7L); + + GrpcCallContext callContext = + rpc.newCallContext( + grpcGcpOptions, + "/some/resource", + ExecuteSqlRequest.getDefaultInstance(), + SpannerGrpc.getExecuteSqlMethod()); + + assertEquals("7", callContext.getCallOptions().getOption(GcpManagedChannel.AFFINITY_KEY)); + rpc.shutdown(); + } + + @SuppressWarnings("unchecked") + @Test + public void testClearChannelHintAffinityCancelsSyntheticGrpcGcpCall() { + GcpManagedChannel channel = Mockito.mock(GcpManagedChannel.class); + ClientCall call = + Mockito.mock(ClientCall.class); + ArgumentCaptor callOptionsCaptor = ArgumentCaptor.forClass(CallOptions.class); + Mockito.when( + channel.newCall( + Mockito.eq(SpannerGrpc.getExecuteSqlMethod()), callOptionsCaptor.capture())) + .thenReturn(call); + + GapicSpannerRpc.clearChannelHintAffinity(channel, 7L); + + assertEquals("7", callOptionsCaptor.getValue().getOption(GcpManagedChannel.AFFINITY_KEY)); + assertEquals( + Boolean.TRUE, + callOptionsCaptor.getValue().getOption(GcpManagedChannel.UNBIND_AFFINITY_KEY)); + Mockito.verify(call).cancel("Cloud Spanner transaction closed", null); + } + @Test public void testNewCallContextWithRouteToLeaderHeader() { SpannerOptions options = @@ -1030,6 +1108,78 @@ public void testGrpcGcpOtelMetricsDisabledSkipsMeterInjection() throws Exception assertNull(metricsOptions.getOpenTelemetryMeter()); } + @Test + public void testGrpcGcpOptionsIncludeChannelPoolCleanupSettingsWithoutDcp() throws Exception { + Duration affinityKeyLifetime = Duration.ofMinutes(10); + Duration cleanupInterval = Duration.ofMinutes(5); + GcpChannelPoolOptions channelPoolOptions = + GcpChannelPoolOptions.newBuilder() + .setAffinityKeyLifetime(affinityKeyLifetime) + .setCleanupInterval(cleanupInterval) + .build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .enableGrpcGcpExtension() + .disableDynamicChannelPool() + .setGcpChannelPoolOptions(channelPoolOptions) + .build(); + + java.lang.reflect.Method method = + GapicSpannerRpc.class.getDeclaredMethod( + "grpcGcpOptionsWithMetricsAndDcp", SpannerOptions.class); + method.setAccessible(true); + GcpManagedChannelOptions grpcGcpOptions = + (GcpManagedChannelOptions) method.invoke(null, options); + + assertEquals( + affinityKeyLifetime, grpcGcpOptions.getChannelPoolOptions().getAffinityKeyLifetime()); + assertEquals(cleanupInterval, grpcGcpOptions.getChannelPoolOptions().getCleanupInterval()); + assertEquals(0, grpcGcpOptions.getChannelPoolOptions().getMinRpcPerChannel()); + assertEquals(0, grpcGcpOptions.getChannelPoolOptions().getMaxRpcPerChannel()); + assertEquals(Duration.ZERO, grpcGcpOptions.getChannelPoolOptions().getScaleDownInterval()); + } + + @Test + public void testGrpcGcpOptionsRetainDynamicChannelPoolSettingsWithDcp() throws Exception { + Duration affinityKeyLifetime = Duration.ofMinutes(10); + Duration cleanupInterval = Duration.ofMinutes(5); + Duration scaleDownInterval = Duration.ofMinutes(3); + GcpChannelPoolOptions channelPoolOptions = + GcpChannelPoolOptions.newBuilder() + .setInitSize(6) + .setMaxSize(15) + .setMinSize(3) + .setDynamicScaling(10, 50, scaleDownInterval) + .setAffinityKeyLifetime(affinityKeyLifetime) + .setCleanupInterval(cleanupInterval) + .build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("[PROJECT]") + .enableGrpcGcpExtension() + .enableDynamicChannelPool() + .setGcpChannelPoolOptions(channelPoolOptions) + .build(); + + java.lang.reflect.Method method = + GapicSpannerRpc.class.getDeclaredMethod( + "grpcGcpOptionsWithMetricsAndDcp", SpannerOptions.class); + method.setAccessible(true); + GcpManagedChannelOptions grpcGcpOptions = + (GcpManagedChannelOptions) method.invoke(null, options); + + assertEquals(6, grpcGcpOptions.getChannelPoolOptions().getInitSize()); + assertEquals(15, grpcGcpOptions.getChannelPoolOptions().getMaxSize()); + assertEquals(3, grpcGcpOptions.getChannelPoolOptions().getMinSize()); + assertEquals(10, grpcGcpOptions.getChannelPoolOptions().getMinRpcPerChannel()); + assertEquals(50, grpcGcpOptions.getChannelPoolOptions().getMaxRpcPerChannel()); + assertEquals(scaleDownInterval, grpcGcpOptions.getChannelPoolOptions().getScaleDownInterval()); + assertEquals( + affinityKeyLifetime, grpcGcpOptions.getChannelPoolOptions().getAffinityKeyLifetime()); + assertEquals(cleanupInterval, grpcGcpOptions.getChannelPoolOptions().getCleanupInterval()); + } + private static final class RecordingTransportChannelProvider implements TransportChannelProvider { private final String host; private final int port; From 1b90d0d6267bd4a5d6f1d31c70dae448d122ff74 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 15:00:12 +0530 Subject: [PATCH 2/7] fix test --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 15 ++-- .../cloud/spanner/ChannelUsageTest.java | 81 ++++++++++------- ...OpenTelemetryBuiltInMetricsTracerTest.java | 88 ++++++++++--------- .../spanner/spi/v1/GapicSpannerRpcTest.java | 7 +- 4 files changed, 113 insertions(+), 78 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index 7a2f716bfe12..faaab30327ae 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -831,7 +831,12 @@ static GcpChannelPoolOptions getGrpcGcpChannelPoolOptions(SpannerOptions options return channelPoolOptions; } + // When DCP is disabled, Spanner's numChannels should still produce a fixed grpc-gcp channel + // pool instead of only capping the pool's maximum size. return GcpChannelPoolOptions.newBuilder() + .setMaxSize(options.getNumChannels()) + .setMinSize(options.getNumChannels()) + .setInitSize(options.getNumChannels()) .disableDynamicScaling() .setAffinityKeyLifetime(channelPoolOptions.getAffinityKeyLifetime()) .setCleanupInterval(channelPoolOptions.getCleanupInterval()) @@ -849,10 +854,6 @@ private static void maybeEnableGrpcGcpExtension( final String jsonApiConfig = parseGrpcGcpApiConfig(); final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options); - // When dynamic channel pool is enabled, use the DCP initial size as the pool size. - // When disabled, use the explicitly configured numChannels. - final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels(); - ApiFunction baseConfigurator = defaultChannelProviderBuilder.getChannelConfigurator(); ApiFunction apiFunction = @@ -860,10 +861,12 @@ private static void maybeEnableGrpcGcpExtension( if (baseConfigurator != null) { channelBuilder = baseConfigurator.apply(channelBuilder); } + // The grpc-gcp pool is configured entirely through GcpChannelPoolOptions above. Avoid + // the deprecated setPoolSize path, which only adjusts maxSize and does not eagerly create + // a fixed-size pool. return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder) .withApiConfigJsonString(jsonApiConfig) - .withOptions(grpcGcpOptions) - .setPoolSize(poolSize); + .withOptions(grpcGcpOptions); }; // Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1. diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java index 182c9cc35b5e..a60cdc669d94 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java @@ -16,11 +16,11 @@ package com.google.cloud.spanner; -import static java.util.stream.Collectors.toSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import com.google.cloud.NoCredentials; +import com.google.cloud.spanner.MockSpannerServiceImpl.SimulatedExecutionTime; import com.google.cloud.spanner.MockSpannerServiceImpl.StatementResult; import com.google.protobuf.ListValue; import com.google.spanner.v1.ResultSetMetadata; @@ -39,10 +39,17 @@ import java.net.InetSocketAddress; import java.util.Arrays; import java.util.Collection; +import java.util.ArrayList; import java.util.Deque; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.junit.After; @@ -96,8 +103,7 @@ public static Collection data() { private static MockSpannerServiceImpl mockSpanner; private static Server server; private static InetSocketAddress address; - // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method - private static final Set batchCreateSessionChannelHints = ConcurrentHashMap.newKeySet(); + // Track channel hints (from X-Goog-Spanner-Request-Id header) for ExecuteStreamingSql. private static final Set executeSqlChannelHints = ConcurrentHashMap.newKeySet(); private static final Deque allExecuteSqlChannelHints = new ConcurrentLinkedDeque<>(); @@ -128,7 +134,7 @@ public ServerCall.Listener interceptCall( headers.get( Metadata.Key.of( "x-response-encoding", Metadata.ASCII_STRING_MARSHALLER))); - // Extract channel hint from X-Goog-Spanner-Request-Id header + // Extract channel hint from X-Goog-Spanner-Request-Id header. String requestId = headers.get(XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY); if (requestId != null) { // Format: @@ -137,10 +143,6 @@ public ServerCall.Listener interceptCall( if (parts.length >= 4) { try { long channelHint = Long.parseLong(parts[3]); - if (call.getMethodDescriptor() - .equals(SpannerGrpc.getBatchCreateSessionsMethod())) { - batchCreateSessionChannelHints.add(channelHint); - } if (call.getMethodDescriptor() .equals(SpannerGrpc.getExecuteStreamingSqlMethod())) { executeSqlChannelHints.add(channelHint); @@ -180,7 +182,6 @@ public static void resetLogging() { @After public void reset() { mockSpanner.reset(); - batchCreateSessionChannelHints.clear(); executeSqlChannelHints.clear(); allExecuteSqlChannelHints.clear(); } @@ -209,38 +210,58 @@ private SpannerOptions createSpannerOptions() { } @Test - public void testUsesAllChannels() throws InterruptedException { - final int multiplier = 10; + public void testUsesAllChannels() throws Exception { + final int multiplier = 25; + final int concurrentTransactions = numChannels * multiplier; + // grpc-gcp assigns new manual affinity keys to the least-busy channel. Sequential + // read-only transactions tend to keep picking the same idle channel, so keep reads + // overlapping to verify distribution across the fixed-size pool. + mockSpanner.setExecuteStreamingSqlExecutionTime( + SimulatedExecutionTime.ofMinimumAndRandomTime(500, 0)); + try (Spanner spanner = createSpannerOptions().getService()) { DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("p", "i", "d")); - for (int run = 0; run < numChannels * multiplier; run++) { - try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { - for (int i = 0; i < 2; i++) { - try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { - while (resultSet.next()) {} - } - } + ExecutorService executor = Executors.newFixedThreadPool(concurrentTransactions); + CountDownLatch ready = new CountDownLatch(concurrentTransactions); + CountDownLatch start = new CountDownLatch(1); + List> futures = new ArrayList<>(concurrentTransactions); + try { + for (int run = 0; run < concurrentTransactions; run++) { + futures.add( + executor.submit( + () -> { + ready.countDown(); + start.await(); + try (ReadOnlyTransaction transaction = client.readOnlyTransaction()) { + try (ResultSet resultSet = transaction.executeQuery(SELECT1)) { + while (resultSet.next()) {} + } + } + return null; + })); + } + assertTrue( + "Timed out while preparing concurrent transactions", + ready.await(10, TimeUnit.SECONDS)); + start.countDown(); + for (Future future : futures) { + future.get(20, TimeUnit.SECONDS); } + } finally { + executor.shutdownNow(); + executor.awaitTermination(10, TimeUnit.SECONDS); } } - // Bound the channel hints to numChannels (matching gRPC-GCP behavior) and verify - // that channels are being distributed. The raw channel hints may be unbounded (based on - // session index), but gRPC-GCP bounds them to the actual number of channels. - assertEquals(2 * numChannels * multiplier, allExecuteSqlChannelHints.size()); - Set boundedChannelHints = - executeSqlChannelHints.stream().map(hint -> hint % numChannels).collect(toSet()); - // Verify that channel distribution is working: - // - For numChannels=1, exactly 1 channel should be used - // - For numChannels>1, multiple channels should be used (at least half) + assertEquals(concurrentTransactions, allExecuteSqlChannelHints.size()); if (numChannels == 1) { - assertEquals(1, boundedChannelHints.size()); + assertEquals(1, executeSqlChannelHints.size()); } else { assertTrue( "Expected at least " + (numChannels / 2) + " channels to be used, but got " - + boundedChannelHints.size(), - boundedChannelHints.size() >= numChannels / 2); + + executeSqlChannelHints.size(), + executeSqlChannelHints.size() >= numChannels / 2); } } } diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java index a3273c2a6aa0..8c110c125c99 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java @@ -284,7 +284,7 @@ public void testNoNetworkConnection() { return null; }); - Spanner spanner = + try (Spanner spanner = builder .setProjectId("test-project") .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) @@ -303,46 +303,52 @@ public void testNoNetworkConnection() { .setBuiltInMetricsEnabled(false) .setApiTracerFactory(createMetricsTracerFactory()) .build() - .getService(); - String instance = "i"; - DatabaseClient client = spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d")); - - // Using this client will return UNAVAILABLE, as the server is not reachable and we have - // disabled retries. - SpannerException exception = - assertThrows( - SpannerException.class, () -> client.singleUse().executeQuery(SELECT_RANDOM).next()); - assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode()); - - Attributes expectedAttributesCreateSessionOK = - expectedCommonBaseAttributes.toBuilder() - .put(BuiltInMetricsConstant.STATUS_KEY, "OK") - .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") - // Include the additional attributes that are added by the HeaderInterceptor in the - // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never - // leaves the client. - .build(); - - Attributes expectedAttributesCreateSessionFailed = - expectedCommonBaseAttributes.toBuilder() - .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE") - .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") - // Include the additional attributes that are added by the HeaderInterceptor in the - // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request never - // leaves the client. - .build(); - - MetricData attemptCountMetricData = - getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); - assertNotNull(attemptCountMetricData); - - // Attempt count should have a failed metric point for CreateSession. - assertEquals( - 1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0); - assertTrue( - checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); - assertTrue( - checkIfMetricExists(metricReader, BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME)); + .getService()) { + String instance = "i"; + DatabaseClient client = + spanner.getDatabaseClient(DatabaseId.of("test-project", instance, "d")); + + // Using this client will return UNAVAILABLE, as the server is not reachable and we have + // disabled retries. + SpannerException exception = + assertThrows( + SpannerException.class, () -> client.singleUse().executeQuery(SELECT_RANDOM).next()); + assertEquals(ErrorCode.UNAVAILABLE, exception.getErrorCode()); + + Attributes expectedAttributesCreateSessionOK = + expectedCommonBaseAttributes.toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "OK") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") + // Include the additional attributes that are added by the HeaderInterceptor in the + // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request + // never leaves the client. + .build(); + + Attributes expectedAttributesCreateSessionFailed = + expectedCommonBaseAttributes.toBuilder() + .put(BuiltInMetricsConstant.STATUS_KEY, "UNAVAILABLE") + .put(BuiltInMetricsConstant.METHOD_KEY, "Spanner.CreateSession") + // Include the additional attributes that are added by the HeaderInterceptor in the + // filter. Note that the DIRECT_PATH_USED attribute is not added, as the request + // never leaves the client. + .build(); + + MetricData attemptCountMetricData = + getMetricData(metricReader, BuiltInMetricsConstant.ATTEMPT_COUNT_NAME); + assertNotNull(attemptCountMetricData); + + // Multiplexed session creation may continue retrying in the background, so the exact number + // of failed CreateSession attempts is timing-dependent. The request should never succeed, + // and we should observe at least one failed CreateSession attempt. + assertEquals( + 0, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionOK), 0); + assertTrue( + getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed) >= 1.0); + assertTrue( + checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME)); + assertTrue( + checkIfMetricExists(metricReader, BuiltInMetricsConstant.AFE_CONNECTIVITY_ERROR_NAME)); + } } @Test diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 9167e3f28457..7bee8514b00e 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -1109,7 +1109,7 @@ public void testGrpcGcpOtelMetricsDisabledSkipsMeterInjection() throws Exception } @Test - public void testGrpcGcpOptionsIncludeChannelPoolCleanupSettingsWithoutDcp() throws Exception { + public void testGrpcGcpOptionsIncludeStaticChannelPoolSettingsWithoutDcp() throws Exception { Duration affinityKeyLifetime = Duration.ofMinutes(10); Duration cleanupInterval = Duration.ofMinutes(5); GcpChannelPoolOptions channelPoolOptions = @@ -1117,11 +1117,13 @@ public void testGrpcGcpOptionsIncludeChannelPoolCleanupSettingsWithoutDcp() thro .setAffinityKeyLifetime(affinityKeyLifetime) .setCleanupInterval(cleanupInterval) .build(); + int numChannels = 7; SpannerOptions options = SpannerOptions.newBuilder() .setProjectId("[PROJECT]") .enableGrpcGcpExtension() .disableDynamicChannelPool() + .setNumChannels(numChannels) .setGcpChannelPoolOptions(channelPoolOptions) .build(); @@ -1132,6 +1134,9 @@ public void testGrpcGcpOptionsIncludeChannelPoolCleanupSettingsWithoutDcp() thro GcpManagedChannelOptions grpcGcpOptions = (GcpManagedChannelOptions) method.invoke(null, options); + assertEquals(numChannels, grpcGcpOptions.getChannelPoolOptions().getMaxSize()); + assertEquals(numChannels, grpcGcpOptions.getChannelPoolOptions().getMinSize()); + assertEquals(numChannels, grpcGcpOptions.getChannelPoolOptions().getInitSize()); assertEquals( affinityKeyLifetime, grpcGcpOptions.getChannelPoolOptions().getAffinityKeyLifetime()); assertEquals(cleanupInterval, grpcGcpOptions.getChannelPoolOptions().getCleanupInterval()); From 27117743e0f247d8005cc4c2247d43bf8da7eb37 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 15:09:31 +0530 Subject: [PATCH 3/7] fix lint --- .../test/java/com/google/cloud/spanner/ChannelUsageTest.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java index a60cdc669d94..b3d71c4925bb 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/ChannelUsageTest.java @@ -37,9 +37,9 @@ import io.grpc.ServerInterceptor; import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.ArrayList; import java.util.Deque; import java.util.List; import java.util.Set; @@ -241,8 +241,7 @@ public void testUsesAllChannels() throws Exception { })); } assertTrue( - "Timed out while preparing concurrent transactions", - ready.await(10, TimeUnit.SECONDS)); + "Timed out while preparing concurrent transactions", ready.await(10, TimeUnit.SECONDS)); start.countDown(); for (Future future : futures) { future.get(20, TimeUnit.SECONDS); From bb03696a8eab47ae067b3cae7f9c19ebf7c36c88 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 15:25:22 +0530 Subject: [PATCH 4/7] refactored clearChannelHintAffinity --- .../cloud/spanner/spi/v1/GapicSpannerRpc.java | 17 +------ .../spanner/spi/v1/GrpcGcpAffinityUtil.java | 44 +++++++++++++++++++ .../cloud/spanner/spi/v1/KeyAwareChannel.java | 17 +------ .../spanner/spi/v1/GapicSpannerRpcTest.java | 2 +- 4 files changed, 47 insertions(+), 33 deletions(-) create mode 100644 java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index faaab30327ae..c5ab73fbaf39 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -620,22 +620,7 @@ public void clearTransactionAndChannelAffinity( return; } clearTransactionAffinity(transactionId); - clearChannelHintAffinity(grpcGcpChannel, channelHint); - } - - @VisibleForTesting - static void clearChannelHintAffinity( - @Nullable ManagedChannel channel, @Nullable Long channelHint) { - if (!(channel instanceof GcpManagedChannel) || channelHint == null) { - return; - } - ClientCall call = - channel.newCall( - SpannerGrpc.getExecuteSqlMethod(), - CallOptions.DEFAULT - .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) - .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); - call.cancel("Cloud Spanner transaction closed", null); + GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint); } private static String parseGrpcGcpApiConfig() { diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java new file mode 100644 index 000000000000..93d502cf3d8e --- /dev/null +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java @@ -0,0 +1,44 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.cloud.grpc.GcpManagedChannel; +import com.google.spanner.v1.ExecuteSqlRequest; +import com.google.spanner.v1.ResultSet; +import com.google.spanner.v1.SpannerGrpc; +import io.grpc.CallOptions; +import io.grpc.ClientCall; +import io.grpc.ManagedChannel; +import javax.annotation.Nullable; + +final class GrpcGcpAffinityUtil { + private GrpcGcpAffinityUtil() {} + + static void clearChannelHintAffinity( + @Nullable ManagedChannel channel, @Nullable Long channelHint) { + if (!(channel instanceof GcpManagedChannel) || channelHint == null) { + return; + } + ClientCall call = + channel.newCall( + SpannerGrpc.getExecuteSqlMethod(), + CallOptions.DEFAULT + .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) + .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); + call.cancel("Cloud Spanner transaction closed", null); + } +} diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index b5fe09dd8cc7..108f585bb74a 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -20,7 +20,6 @@ import com.google.api.core.InternalApi; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; -import com.google.cloud.grpc.GcpManagedChannel; import com.google.cloud.spanner.XGoogSpannerRequestId; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; @@ -33,7 +32,6 @@ import com.google.spanner.v1.ReadRequest; import com.google.spanner.v1.ResultSet; import com.google.spanner.v1.RollbackRequest; -import com.google.spanner.v1.SpannerGrpc; import com.google.spanner.v1.Transaction; import com.google.spanner.v1.TransactionSelector; import io.grpc.CallOptions; @@ -316,24 +314,11 @@ void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channel = endpoint.getChannel(); } } - clearChannelHintAffinity(channel, channelHint); + GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, channelHint); } clearAffinity(transactionId); } - private static void clearChannelHintAffinity(ManagedChannel channel, long channelHint) { - if (!(channel instanceof GcpManagedChannel)) { - return; - } - ClientCall call = - channel.newCall( - SpannerGrpc.getExecuteSqlMethod(), - CallOptions.DEFAULT - .withOption(GcpManagedChannel.AFFINITY_KEY, String.valueOf(channelHint)) - .withOption(GcpManagedChannel.UNBIND_AFFINITY_KEY, true)); - call.cancel("Cloud Spanner transaction closed", null); - } - private void maybeExcludeEndpointOnNextCall( @Nullable ChannelEndpoint endpoint, @Nullable String logicalRequestKey) { if (endpoint == null || logicalRequestKey == null) { diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java index 7bee8514b00e..165557608ac3 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpcTest.java @@ -518,7 +518,7 @@ public void testClearChannelHintAffinityCancelsSyntheticGrpcGcpCall() { Mockito.eq(SpannerGrpc.getExecuteSqlMethod()), callOptionsCaptor.capture())) .thenReturn(call); - GapicSpannerRpc.clearChannelHintAffinity(channel, 7L); + GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, 7L); assertEquals("7", callOptionsCaptor.getValue().getOption(GcpManagedChannel.AFFINITY_KEY)); assertEquals( From 0dc3d69ce819d3ed57c935cd6bd8fb2eaf8ed11f Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 15:35:04 +0530 Subject: [PATCH 5/7] fix graalVM test --- .../MultiplexedSessionDatabaseClientTest.java | 122 +++++++++++------- 1 file changed, 75 insertions(+), 47 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java index 8c0eb2db976f..d398a78643aa 100644 --- a/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java +++ b/java-spanner/google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientTest.java @@ -29,6 +29,8 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFutures; +import com.google.cloud.NoCredentials; +import com.google.cloud.grpc.GrpcTransportOptions.ExecutorFactory; import com.google.cloud.spanner.SessionClient.SessionConsumer; import java.io.PrintWriter; import java.io.StringWriter; @@ -37,6 +39,9 @@ import java.time.Duration; import java.time.Instant; import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.After; @@ -309,68 +314,58 @@ public void testGrpcGcpSingleUseDoesNotReserveBitsetChannelHint() throws Excepti @Test public void testCloseRemovesChannelUsageEntryWhenLastClientCloses() throws Exception { - SessionClient sessionClient = createSessionClient(); + try (SpannerImpl spanner = createTestSpanner(); + SessionClient sessionClient = createSessionClient(spanner)) { + MultiplexedSessionDatabaseClient client = + new MultiplexedSessionDatabaseClient(sessionClient, Clock.systemUTC()); - MultiplexedSessionDatabaseClient client = - new MultiplexedSessionDatabaseClient(sessionClient, Clock.systemUTC()); - - assertEquals(1, getChannelUsage().size()); + assertEquals(1, getChannelUsage().size()); - client.close(); + client.close(); - assertEquals(0, getChannelUsage().size()); + assertEquals(0, getChannelUsage().size()); + } } @Test public void testCloseKeepsChannelUsageEntryWhileAnotherClientIsUsingSameSpanner() throws Exception { - SpannerImpl spanner = mock(SpannerImpl.class); - SessionClient firstSessionClient = createSessionClient(spanner); - SessionClient secondSessionClient = createSessionClient(spanner); + try (SpannerImpl spanner = createTestSpanner(); + SessionClient firstSessionClient = createSessionClient(spanner); + SessionClient secondSessionClient = createSessionClient(spanner)) { + MultiplexedSessionDatabaseClient firstClient = + new MultiplexedSessionDatabaseClient(firstSessionClient, Clock.systemUTC()); + MultiplexedSessionDatabaseClient secondClient = + new MultiplexedSessionDatabaseClient(secondSessionClient, Clock.systemUTC()); - MultiplexedSessionDatabaseClient firstClient = - new MultiplexedSessionDatabaseClient(firstSessionClient, Clock.systemUTC()); - MultiplexedSessionDatabaseClient secondClient = - new MultiplexedSessionDatabaseClient(secondSessionClient, Clock.systemUTC()); + assertEquals(1, getChannelUsage().size()); - assertEquals(1, getChannelUsage().size()); + firstClient.close(); + assertEquals(1, getChannelUsage().size()); - firstClient.close(); - assertEquals(1, getChannelUsage().size()); - - secondClient.close(); - assertEquals(0, getChannelUsage().size()); - } - - private SessionClient createSessionClient() { - return createSessionClient(mock(SpannerImpl.class)); + secondClient.close(); + assertEquals(0, getChannelUsage().size()); + } } private SessionClient createSessionClient(SpannerImpl spanner) { - SessionClient sessionClient = mock(SessionClient.class); - SpannerOptions spannerOptions = mock(SpannerOptions.class); - SessionPoolOptions sessionPoolOptions = mock(SessionPoolOptions.class); + return new FailingMultiplexedSessionClient(spanner); + } - when(sessionClient.getSpanner()).thenReturn(spanner); - when(spanner.getOptions()).thenReturn(spannerOptions); - when(spannerOptions.getNumChannels()).thenReturn(4); - when(spannerOptions.getSessionPoolOptions()).thenReturn(sessionPoolOptions); - when(sessionPoolOptions.getMultiplexedSessionMaintenanceDuration()) - .thenReturn(Duration.ofDays(7)); - when(sessionPoolOptions.getWaitForMinSessions()).thenReturn(Duration.ZERO); - doAnswer( - (Answer) - invocationOnMock -> { - SessionConsumer consumer = invocationOnMock.getArgument(0); - consumer.onSessionCreateFailure( - SpannerExceptionFactory.newSpannerException( - ErrorCode.UNAUTHENTICATED, "test"), - 1); - return null; - }) - .when(sessionClient) - .asyncCreateMultiplexedSession(any(SessionConsumer.class)); - return sessionClient; + private SpannerImpl createTestSpanner() { + SessionPoolOptions sessionPoolOptions = + SessionPoolOptions.newBuilder() + .setMultiplexedSessionMaintenanceDuration(Duration.ofDays(7)) + .setWaitForMinSessionsDuration(Duration.ZERO) + .build(); + SpannerOptions options = + SpannerOptions.newBuilder() + .setProjectId("test-project") + .setCredentials(NoCredentials.getInstance()) + .setNumChannels(4) + .setSessionPoolOption(sessionPoolOptions) + .build(); + return new SpannerImpl(options); } @SuppressWarnings("unchecked") @@ -391,4 +386,37 @@ private boolean isJava8() { private boolean isWindows() { return System.getProperty("os.name").toLowerCase().contains("windows"); } + + private static final class TestExecutorFactory + implements ExecutorFactory { + @Override + public ScheduledExecutorService get() { + return Executors.newSingleThreadScheduledExecutor(); + } + + @Override + public void release(ScheduledExecutorService executor) { + executor.shutdown(); + try { + executor.awaitTermination(10L, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + + private static final class FailingMultiplexedSessionClient extends SessionClient { + private static final DatabaseId TEST_DATABASE_ID = + DatabaseId.of("test-project", "test-instance", "test-database"); + + private FailingMultiplexedSessionClient(SpannerImpl spanner) { + super(spanner, TEST_DATABASE_ID, new TestExecutorFactory()); + } + + @Override + void asyncCreateMultiplexedSession(SessionConsumer consumer) { + consumer.onSessionCreateFailure( + SpannerExceptionFactory.newSpannerException(ErrorCode.UNAUTHENTICATED, "test"), 1); + } + } } From 3288019a4070d88e51763561171df8602b51d222 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 16:36:14 +0530 Subject: [PATCH 6/7] address comments --- .../java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java | 1 - .../com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java | 2 ++ .../java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java | 4 ++-- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java index c5ab73fbaf39..7c1b6be1c1bd 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GapicSpannerRpc.java @@ -619,7 +619,6 @@ public void clearTransactionAndChannelAffinity( keyAwareChannel.clearTransactionAndChannelAffinity(transactionId, channelHint); return; } - clearTransactionAffinity(transactionId); GrpcGcpAffinityUtil.clearChannelHintAffinity(grpcGcpChannel, channelHint); } diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java index 93d502cf3d8e..d4bb41755595 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcGcpAffinityUtil.java @@ -33,6 +33,8 @@ static void clearChannelHintAffinity( if (!(channel instanceof GcpManagedChannel) || channelHint == null) { return; } + // TODO: Replace this synthetic call once grpc-gcp exposes a direct API for unbinding + // affinity keys without creating and immediately cancelling a ClientCall. ClientCall call = channel.newCall( SpannerGrpc.getExecuteSqlMethod(), diff --git a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java index 108f585bb74a..d7b32f72bcd6 100644 --- a/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java +++ b/java-spanner/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/KeyAwareChannel.java @@ -305,9 +305,10 @@ void clearTransactionAffinity(ByteString transactionId) { } void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long channelHint) { + String address = transactionAffinities.remove(transactionId); + readOnlyTxPreferLeader.invalidate(transactionId); if (channelHint != null) { ManagedChannel channel = defaultChannel; - String address = transactionAffinities.get(transactionId); if (address != null) { ChannelEndpoint endpoint = endpointCache.getIfPresent(address); if (endpoint != null) { @@ -316,7 +317,6 @@ void clearTransactionAndChannelAffinity(ByteString transactionId, @Nullable Long } GrpcGcpAffinityUtil.clearChannelHintAffinity(channel, channelHint); } - clearAffinity(transactionId); } private void maybeExcludeEndpointOnNextCall( From 5c4891d351d5a2b05167ef98b296187dd789d689 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 9 Apr 2026 16:41:54 +0530 Subject: [PATCH 7/7] fix github check related to .gitignore --- java-spanner/.gitignore | 1 - 1 file changed, 1 deletion(-) delete mode 100644 java-spanner/.gitignore diff --git a/java-spanner/.gitignore b/java-spanner/.gitignore deleted file mode 100644 index 722d5e71d93c..000000000000 --- a/java-spanner/.gitignore +++ /dev/null @@ -1 +0,0 @@ -.vscode