Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.cloud.ServiceRpc;
import com.google.cloud.TransportOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.spanner.Options.DirectedReadOption;
import com.google.cloud.spanner.Options.QueryOption;
Expand Down Expand Up @@ -134,6 +135,72 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
// is enabled, to make sure there are sufficient channels available to move the sessions to a
// different channel if a network connection in a particular channel fails.
@VisibleForTesting static final int GRPC_GCP_ENABLED_DEFAULT_CHANNELS = 8;

// Dynamic Channel Pool (DCP) default values and bounds
/** Default max concurrent RPCs per channel before triggering scale up. */
public static final int DEFAULT_DYNAMIC_POOL_MAX_RPC = 25;

/** Default min concurrent RPCs per channel for scale down check. */
public static final int DEFAULT_DYNAMIC_POOL_MIN_RPC = 15;

/** Default scale down check interval. */
public static final Duration DEFAULT_DYNAMIC_POOL_SCALE_DOWN_INTERVAL = Duration.ofMinutes(3);

/** Default initial number of channels for dynamic pool. */
public static final int DEFAULT_DYNAMIC_POOL_INITIAL_SIZE = 4;

/** Default max number of channels for dynamic pool. */
public static final int DEFAULT_DYNAMIC_POOL_MAX_CHANNELS = 10;

/** Default min number of channels for dynamic pool. */
public static final int DEFAULT_DYNAMIC_POOL_MIN_CHANNELS = 2;

/**
* Default affinity key lifetime for dynamic channel pool. This is how long to keep an affinity
* key after its last use. Zero means keeping keys forever. Default is 10 minutes, which is
* sufficient to ensure that requests within a single transaction use the same channel.
*/
public static final Duration DEFAULT_DYNAMIC_POOL_AFFINITY_KEY_LIFETIME = Duration.ofMinutes(10);

/**
* Default cleanup interval for dynamic channel pool affinity keys. This is how frequently the
* affinity key cleanup process runs. Default is 1 minute (1/10 of default affinity key lifetime).
*/
public static final Duration DEFAULT_DYNAMIC_POOL_CLEANUP_INTERVAL = Duration.ofMinutes(1);

/**
* Creates a {@link GcpChannelPoolOptions} instance with Spanner-specific defaults for dynamic
* channel pooling. These defaults are optimized for typical Spanner workloads.
*
* <p>Default values:
*
* <ul>
* <li>Max size: {@value #DEFAULT_DYNAMIC_POOL_MAX_CHANNELS}
* <li>Min size: {@value #DEFAULT_DYNAMIC_POOL_MIN_CHANNELS}
* <li>Initial size: {@value #DEFAULT_DYNAMIC_POOL_INITIAL_SIZE}
* <li>Max RPC per channel: {@value #DEFAULT_DYNAMIC_POOL_MAX_RPC}
* <li>Min RPC per channel: {@value #DEFAULT_DYNAMIC_POOL_MIN_RPC}
* <li>Scale down interval: 3 minutes
* <li>Affinity key lifetime: 10 minutes
* <li>Cleanup interval: 1 minute
* </ul>
*
* @return a new {@link GcpChannelPoolOptions} instance with Spanner defaults
*/
public static GcpChannelPoolOptions createDefaultDynamicChannelPoolOptions() {
return GcpChannelPoolOptions.newBuilder()
.setMaxSize(DEFAULT_DYNAMIC_POOL_MAX_CHANNELS)
.setMinSize(DEFAULT_DYNAMIC_POOL_MIN_CHANNELS)
.setInitSize(DEFAULT_DYNAMIC_POOL_INITIAL_SIZE)
.setDynamicScaling(
DEFAULT_DYNAMIC_POOL_MIN_RPC,
DEFAULT_DYNAMIC_POOL_MAX_RPC,
DEFAULT_DYNAMIC_POOL_SCALE_DOWN_INTERVAL)
.setAffinityKeyLifetime(DEFAULT_DYNAMIC_POOL_AFFINITY_KEY_LIFETIME)
.setCleanupInterval(DEFAULT_DYNAMIC_POOL_CLEANUP_INTERVAL)
.build();
}

private final TransportChannelProvider channelProvider;

@SuppressWarnings("rawtypes")
Expand All @@ -153,6 +220,8 @@ public class SpannerOptions extends ServiceOptions<Spanner, SpannerOptions> {
private final Duration partitionedDmlTimeout;
private final boolean grpcGcpExtensionEnabled;
private final GcpManagedChannelOptions grpcGcpOptions;
private final boolean dynamicChannelPoolEnabled;
private final GcpChannelPoolOptions gcpChannelPoolOptions;
private final boolean autoThrottleAdministrativeRequests;
private final RetrySettings retryAdministrativeRequestsSettings;
private final boolean trackTransactionStarter;
Expand Down Expand Up @@ -800,6 +869,26 @@ protected SpannerOptions(Builder builder) {
partitionedDmlTimeout = builder.partitionedDmlTimeout;
grpcGcpExtensionEnabled = builder.grpcGcpExtensionEnabled;
grpcGcpOptions = builder.grpcGcpOptions;

// Dynamic channel pooling is disabled by default.
// It is only enabled when:
// 1. enableDynamicChannelPool() was explicitly called, AND
// 2. grpc-gcp extension is enabled, AND
// 3. numChannels was not explicitly set
if (builder.dynamicChannelPoolEnabled != null && builder.dynamicChannelPoolEnabled) {
// DCP was explicitly enabled, but respect numChannels if set
dynamicChannelPoolEnabled = grpcGcpExtensionEnabled && !builder.numChannelsExplicitlySet;
} else {
// DCP is disabled by default, or was explicitly disabled
dynamicChannelPoolEnabled = false;
}

// Use user-provided GcpChannelPoolOptions or create Spanner-specific defaults
gcpChannelPoolOptions =
builder.gcpChannelPoolOptions != null
? builder.gcpChannelPoolOptions
: createDefaultDynamicChannelPoolOptions();

autoThrottleAdministrativeRequests = builder.autoThrottleAdministrativeRequests;
retryAdministrativeRequestsSettings = builder.retryAdministrativeRequestsSettings;
trackTransactionStarter = builder.trackTransactionStarter;
Expand Down Expand Up @@ -1010,6 +1099,7 @@ public static class Builder
private GrpcInterceptorProvider interceptorProvider;

private Integer numChannels;
private boolean numChannelsExplicitlySet = false;

private String transportChannelExecutorThreadNameFormat = "Cloud-Spanner-TransportChannel-%d";

Expand All @@ -1027,6 +1117,8 @@ public static class Builder
private Duration partitionedDmlTimeout = Duration.ofHours(2L);
private boolean grpcGcpExtensionEnabled = true;
private GcpManagedChannelOptions grpcGcpOptions;
private Boolean dynamicChannelPoolEnabled;
private GcpChannelPoolOptions gcpChannelPoolOptions;
private RetrySettings retryAdministrativeRequestsSettings =
DEFAULT_ADMIN_REQUESTS_LIMIT_EXCEEDED_RETRY_SETTINGS;
private boolean autoThrottleAdministrativeRequests = false;
Expand Down Expand Up @@ -1099,6 +1191,8 @@ protected Builder() {
this.partitionedDmlTimeout = options.partitionedDmlTimeout;
this.grpcGcpExtensionEnabled = options.grpcGcpExtensionEnabled;
this.grpcGcpOptions = options.grpcGcpOptions;
this.dynamicChannelPoolEnabled = options.dynamicChannelPoolEnabled;
this.gcpChannelPoolOptions = options.gcpChannelPoolOptions;
this.autoThrottleAdministrativeRequests = options.autoThrottleAdministrativeRequests;
this.retryAdministrativeRequestsSettings = options.retryAdministrativeRequestsSettings;
this.trackTransactionStarter = options.trackTransactionStarter;
Expand Down Expand Up @@ -1189,6 +1283,7 @@ public Builder setInterceptorProvider(GrpcInterceptorProvider interceptorProvide
*/
public Builder setNumChannels(int numChannels) {
this.numChannels = numChannels;
this.numChannelsExplicitlySet = true;
return this;
}

Expand Down Expand Up @@ -1578,6 +1673,62 @@ public Builder disableGrpcGcpExtension() {
return this;
}

/**
* Enables dynamic channel pooling. When enabled, the client will automatically scale the number
* of channels based on load. This requires the gRPC-GCP extension to be enabled.
*
* <p>Dynamic channel pooling is disabled by default. Use this method to explicitly enable it.
* Note that calling {@link #setNumChannels(int)} will disable dynamic channel pooling even if
* this method was called.
*/
public Builder enableDynamicChannelPool() {
this.dynamicChannelPoolEnabled = true;
return this;
}

/**
* Disables dynamic channel pooling. When disabled, the client will use a static number of
* channels as configured by {@link #setNumChannels(int)}.
*
* <p>Dynamic channel pooling is disabled by default, so this method is typically not needed
* unless you want to explicitly disable it after enabling it.
*/
public Builder disableDynamicChannelPool() {
this.dynamicChannelPoolEnabled = false;
return this;
}

/**
* Sets the channel pool options for dynamic channel pooling. Use this to configure the dynamic
* channel pool behavior when {@link #enableDynamicChannelPool()} is enabled.
*
* <p>If not set, Spanner-specific defaults will be used (see {@link
* #createDefaultDynamicChannelPoolOptions()}).
*
* <p>Example usage:
*
* <pre>{@code
* SpannerOptions options = SpannerOptions.newBuilder()
* .setProjectId("my-project")
* .enableDynamicChannelPool()
* .setGcpChannelPoolOptions(
* GcpChannelPoolOptions.newBuilder()
* .setMaxSize(15)
* .setMinSize(3)
* .setInitSize(5)
* .setDynamicScaling(10, 30, Duration.ofMinutes(5))
* .build())
* .build();
* }</pre>
*
* @param gcpChannelPoolOptions the channel pool options to use
* @return this builder for chaining
*/
public Builder setGcpChannelPoolOptions(GcpChannelPoolOptions gcpChannelPoolOptions) {
this.gcpChannelPoolOptions = Preconditions.checkNotNull(gcpChannelPoolOptions);
return this;
}

/**
* Sets the host of an emulator to use. By default the value is read from an environment
* variable. If the environment variable is not set, this will be <code>null</code>.
Expand Down Expand Up @@ -1990,6 +2141,26 @@ public GcpManagedChannelOptions getGrpcGcpOptions() {
return grpcGcpOptions;
}

/**
* Returns whether dynamic channel pooling is enabled. Dynamic channel pooling is disabled by
* default. Use {@link Builder#enableDynamicChannelPool()} to explicitly enable it. Note that
* calling {@link Builder#setNumChannels(int)} will disable dynamic channel pooling even if it was
* explicitly enabled.
*/
public boolean isDynamicChannelPoolEnabled() {
return dynamicChannelPoolEnabled;
}

/**
* Returns the channel pool options for dynamic channel pooling. If no options were explicitly
* set, returns the Spanner-specific defaults.
*
* @see #createDefaultDynamicChannelPoolOptions()
*/
public GcpChannelPoolOptions getGcpChannelPoolOptions() {
return gcpChannelPoolOptions;
}

public boolean isAutoThrottleAdministrativeRequests() {
return autoThrottleAdministrativeRequests;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public class GapicSpannerRpc implements SpannerRpc {
private final boolean endToEndTracingEnabled;
private final int numChannels;
private final boolean isGrpcGcpExtensionEnabled;
private final boolean isDynamicChannelPoolEnabled;

private final GrpcCallContext baseGrpcCallContext;

Expand Down Expand Up @@ -337,6 +338,7 @@ public GapicSpannerRpc(final SpannerOptions options) {
this.endToEndTracingEnabled = options.isEndToEndTracingEnabled();
this.numChannels = options.getNumChannels();
this.isGrpcGcpExtensionEnabled = options.isGrpcGcpExtensionEnabled();
this.isDynamicChannelPoolEnabled = options.isDynamicChannelPoolEnabled();
this.baseGrpcCallContext = createBaseCallContext();

if (initializeStubs) {
Expand Down Expand Up @@ -475,7 +477,8 @@ public GapicSpannerRpc(final SpannerOptions options) {
.withCheckInterval(pdmlSettings.getStreamWatchdogCheckInterval()));
}
this.partitionedDmlStub =
GrpcSpannerStubWithStubSettingsAndClientContext.create(pdmlSettings.build());
GrpcSpannerStubWithStubSettingsAndClientContext.create(
pdmlSettings.build(), clientContext);
this.instanceAdminStubSettings =
options.getInstanceAdminStubSettings().toBuilder()
.setTransportChannelProvider(channelProvider)
Expand Down Expand Up @@ -569,10 +572,14 @@ private static String parseGrpcGcpApiConfig() {
}
}

// Enhance metric options for gRPC-GCP extension.
private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions options) {
// Enhance gRPC-GCP options with metrics and dynamic channel pool configuration.
private static GcpManagedChannelOptions grpcGcpOptionsWithMetricsAndDcp(SpannerOptions options) {
GcpManagedChannelOptions grpcGcpOptions =
MoreObjects.firstNonNull(options.getGrpcGcpOptions(), new GcpManagedChannelOptions());
GcpManagedChannelOptions.Builder optionsBuilder =
GcpManagedChannelOptions.newBuilder(grpcGcpOptions);

// Configure metrics options with OpenTelemetry meter
GcpMetricsOptions metricsOptions =
MoreObjects.firstNonNull(
grpcGcpOptions.getMetricsOptions(), GcpMetricsOptions.newBuilder().build());
Expand All @@ -581,9 +588,21 @@ private static GcpManagedChannelOptions grpcGcpOptionsWithMetrics(SpannerOptions
if (metricsOptions.getNamePrefix().equals("")) {
metricsOptionsBuilder.withNamePrefix("cloud.google.com/java/spanner/gcp-channel-pool/");
}
return GcpManagedChannelOptions.newBuilder(grpcGcpOptions)
.withMetricsOptions(metricsOptionsBuilder.build())
.build();
// Pass OpenTelemetry meter to grpc-gcp for channel pool metrics
if (metricsOptions.getOpenTelemetryMeter() == null) {
metricsOptionsBuilder.withOpenTelemetryMeter(
options.getOpenTelemetry().getMeter("com.google.cloud.spanner"));
}
optionsBuilder.withMetricsOptions(metricsOptionsBuilder.build());

// Configure dynamic channel pool options if enabled.
// Uses the GcpChannelPoolOptions from SpannerOptions, which contains Spanner-specific defaults
// or user-provided configuration.
if (options.isDynamicChannelPoolEnabled()) {
optionsBuilder.withChannelPoolOptions(options.getGcpChannelPoolOptions());
}

return optionsBuilder.build();
}

@SuppressWarnings("rawtypes")
Expand All @@ -595,7 +614,11 @@ private static void maybeEnableGrpcGcpExtension(
}

final String jsonApiConfig = parseGrpcGcpApiConfig();
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetrics(options);
final GcpManagedChannelOptions grpcGcpOptions = grpcGcpOptionsWithMetricsAndDcp(options);

// When dynamic channel pool is enabled, use the DCP initial size as the pool size.
// When disabled, use the explicitly configured numChannels.
final int poolSize = options.isDynamicChannelPoolEnabled() ? 0 : options.getNumChannels();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this change the behavior for customers who are currently using grpc-gcp? (I would think not, considering that it seems like the channel pool size was set to options.getNumChannels() before this change as well)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because when poolSize = 0, grpc-gcp uses initSize from GcpChannelPoolOptions. Previously, numChannels was used as the pool size. The change is intentional to let DCP control initialization.


ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> apiFunction =
channelBuilder -> {
Expand All @@ -605,7 +628,7 @@ private static void maybeEnableGrpcGcpExtension(
return GcpManagedChannelBuilder.forDelegateBuilder(channelBuilder)
.withApiConfigJsonString(jsonApiConfig)
.withOptions(grpcGcpOptions)
.setPoolSize(options.getNumChannels());
.setPoolSize(poolSize);
};

// Disable the GAX channel pooling functionality by setting the GAX channel pool size to 1.
Expand Down Expand Up @@ -2060,20 +2083,34 @@ <ReqT, RespT> GrpcCallContext newCallContext(
if (affinity != null) {
if (this.isGrpcGcpExtensionEnabled) {
// Set channel affinity in gRPC-GCP.
// Compute bounded channel hint to prevent gRPC-GCP affinity map from getting unbounded.
int boundedChannelHint = affinity.intValue() % this.numChannels;
String affinityKey;
if (this.isDynamicChannelPoolEnabled) {
// When dynamic channel pooling is enabled, we use the raw affinity value as the key.
// This allows grpc-gcp to use round-robin for new keys, enabling new channels
// (created during scale-up) to receive requests. The affinity key lifetime setting
// ensures the affinity map doesn't grow unbounded.
affinityKey = String.valueOf(affinity);
} else {
// When DCP is disabled, compute bounded channel hint to prevent
// gRPC-GCP affinity map from getting unbounded.
int boundedChannelHint = affinity.intValue() % this.numChannels;
affinityKey = String.valueOf(boundedChannelHint);
}
context =
context.withCallOptions(
context
.getCallOptions()
.withOption(
GcpManagedChannel.AFFINITY_KEY, String.valueOf(boundedChannelHint)));
context.getCallOptions().withOption(GcpManagedChannel.AFFINITY_KEY, affinityKey));
} else {
// Set channel affinity in GAX.
context = context.withChannelAffinity(affinity.intValue());
}
}
int requestIdChannel = convertToRequestIdChannelNumber(affinity);
// When grpc-gcp extension with dynamic channel pooling is enabled, the actual channel ID
// will be set by RequestIdInterceptor after grpc-gcp selects the channel.
// Set to 0 (unknown) here as a placeholder.
int requestIdChannel =
(this.isGrpcGcpExtensionEnabled && this.isDynamicChannelPoolEnabled)
? 0
: convertToRequestIdChannelNumber(affinity);
if (requestId == null) {
requestId = requestIdCreator.nextRequestId(requestIdChannel);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_CALL_OPTIONS_KEY;
import static com.google.cloud.spanner.XGoogSpannerRequestId.REQUEST_ID_HEADER_KEY;

import com.google.cloud.grpc.GcpManagedChannel;
import com.google.cloud.spanner.XGoogSpannerRequestId;
import io.grpc.CallOptions;
import io.grpc.Channel;
Expand Down Expand Up @@ -47,6 +48,15 @@ public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
public void start(Listener<RespT> responseListener, Metadata headers) {
XGoogSpannerRequestId requestId = callOptions.getOption(REQUEST_ID_CALL_OPTIONS_KEY);
if (requestId != null) {
// If grpc-gcp has set the actual channel ID, use it to update the request ID.
// This provides the real channel ID used after channel selection, especially
// important when dynamic channel pooling is enabled.
Integer gcpChannelId = callOptions.getOption(GcpManagedChannel.CHANNEL_ID_KEY);
if (gcpChannelId != null) {
// Channel IDs from grpc-gcp are 0-based, add 1 to match request ID convention
// where 0 means unknown and >0 means a known channel.
requestId.setChannelId(gcpChannelId + 1);
}
requestId.incrementAttempt();
headers.put(REQUEST_ID_HEADER_KEY, requestId.getHeaderValue());
}
Expand Down
Loading