Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate channel pool config in the GcpChannelPoolOptions. #109

Merged
merged 1 commit into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
44 changes: 34 additions & 10 deletions grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
public class GcpManagedChannel extends ManagedChannel {
private static final Logger logger = Logger.getLogger(GcpManagedChannel.class.getName());
static final AtomicInteger channelPoolIndex = new AtomicInteger();
private static final int DEFAULT_MAX_CHANNEL = 10;
private static final int DEFAULT_MAX_STREAM = 100;
static final int DEFAULT_MAX_CHANNEL = 10;
static final int DEFAULT_MAX_STREAM = 100;

private final ManagedChannelBuilder<?> delegateChannelBuilder;
private final GcpManagedChannelOptions options;
Expand Down Expand Up @@ -141,21 +141,17 @@ public class GcpManagedChannel extends ManagedChannel {
* @param options the options for GcpManagedChannel.
*/
public GcpManagedChannel(
ManagedChannelBuilder<?> delegateChannelBuilder,
ApiConfig apiConfig,
int poolSize,
GcpManagedChannelOptions options) {
ManagedChannelBuilder<?> delegateChannelBuilder,
ApiConfig apiConfig,
GcpManagedChannelOptions options) {
loadApiConfig(apiConfig);
if (poolSize != 0) {
this.maxSize = poolSize;
}
this.delegateChannelBuilder = delegateChannelBuilder;
this.options = options;
initOptions();
if (options.getResiliencyOptions() != null) {
fallbackEnabled = options.getResiliencyOptions().isNotReadyFallbackEnabled();
unresponsiveDetectionEnabled =
options.getResiliencyOptions().isUnresponsiveDetectionEnabled();
options.getResiliencyOptions().isUnresponsiveDetectionEnabled();
unresponsiveMs = options.getResiliencyOptions().getUnresponsiveDetectionMs();
unresponsiveDropCount = options.getResiliencyOptions().getUnresponsiveDetectionDroppedCount();
} else {
Expand All @@ -166,7 +162,35 @@ public GcpManagedChannel(
}
}

/**
* Constructor for GcpManagedChannel.
* Deprecated. Use the one without the poolSize and set the maximum pool size in options. However, note that if
* setting the pool size from options then concurrent streams low watermark (even the default one) will be also taken
* from the options and not apiConfig.
*
* @param delegateChannelBuilder the underlying delegate ManagedChannelBuilder.
* @param apiConfig the ApiConfig object for configuring GcpManagedChannel.
* @param poolSize maximum number of channels the pool can have.
* @param options the options for GcpManagedChannel.
*/
@Deprecated
public GcpManagedChannel(
ManagedChannelBuilder<?> delegateChannelBuilder,
ApiConfig apiConfig,
int poolSize,
GcpManagedChannelOptions options) {
this(delegateChannelBuilder, apiConfig, options);
if (poolSize != 0) {
this.maxSize = poolSize;
}
}

private void initOptions() {
GcpManagedChannelOptions.GcpChannelPoolOptions poolOptions = options.getChannelPoolOptions();
if (poolOptions != null) {
this.maxSize = poolOptions.getMaxSize();
this.maxConcurrentStreamsLowWatermark = poolOptions.getConcurrentStreamsLowWatermark();
}
initMetrics();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ public static GcpManagedChannelBuilder forDelegateBuilder(ManagedChannelBuilder<
return new GcpManagedChannelBuilder(delegate);
}

/** Sets the channel pool size. This will override the pool size configuration in ApiConfig. */
/** Sets the maximum channel pool size. This will override the pool size configuration in ApiConfig.
* Deprecated. Use maxSize in GcpManagedChannelOptions.GcpChannelPoolOptions.
*/
@Deprecated
public GcpManagedChannelBuilder setPoolSize(int poolSize) {
this.poolSize = poolSize;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,27 @@
public class GcpManagedChannelOptions {
private static final Logger logger = Logger.getLogger(GcpManagedChannelOptions.class.getName());

@Nullable private final GcpChannelPoolOptions channelPoolOptions;
@Nullable private final GcpMetricsOptions metricsOptions;
@Nullable private final GcpResiliencyOptions resiliencyOptions;

public GcpManagedChannelOptions() {
channelPoolOptions = null;
metricsOptions = null;
resiliencyOptions = null;
}

public GcpManagedChannelOptions(Builder builder) {
channelPoolOptions = builder.channelPoolOptions;
metricsOptions = builder.metricsOptions;
resiliencyOptions = builder.resiliencyOptions;
}

@Nullable
public GcpChannelPoolOptions getChannelPoolOptions() {
return channelPoolOptions;
}

@Nullable
public GcpMetricsOptions getMetricsOptions() {
return metricsOptions;
Expand All @@ -63,12 +71,14 @@ public static Builder newBuilder(GcpManagedChannelOptions options) {
}

public static class Builder {
private GcpChannelPoolOptions channelPoolOptions;
private GcpMetricsOptions metricsOptions;
private GcpResiliencyOptions resiliencyOptions;

public Builder() {}

public Builder(GcpManagedChannelOptions options) {
this.channelPoolOptions = options.getChannelPoolOptions();
this.metricsOptions = options.getMetricsOptions();
this.resiliencyOptions = options.getResiliencyOptions();
}
Expand All @@ -77,6 +87,17 @@ public GcpManagedChannelOptions build() {
return new GcpManagedChannelOptions(this);
}

/**
* Sets the channel pool configuration for the {@link GcpManagedChannel}.
*
* @param channelPoolOptions a {@link GcpChannelPoolOptions} to use as a channel pool
* configuration.
*/
public Builder withChannelPoolOptions(GcpChannelPoolOptions channelPoolOptions) {
this.channelPoolOptions = channelPoolOptions;
return this;
}

/**
* Sets the metrics configuration for the {@link GcpManagedChannel}.
*
Expand Down Expand Up @@ -127,6 +148,82 @@ public Builder withResiliencyOptions(GcpResiliencyOptions resiliencyOptions) {
}
}

/** Channel pool configuration for the GCP managed channel. */
public static class GcpChannelPoolOptions {
// The maximum number of channels in the pool.
private final int maxSize;
// If every channel in the pool has at least this amount of concurrent streams then a new channel will be created
// in the pool unless the pool reached its maximum size.
private final int concurrentStreamsLowWatermark;

public GcpChannelPoolOptions(Builder builder) {
maxSize = builder.maxSize;
concurrentStreamsLowWatermark = builder.concurrentStreamsLowWatermark;
}

public int getMaxSize() {
return maxSize;
}

public int getConcurrentStreamsLowWatermark() {
return concurrentStreamsLowWatermark;
}

/** Creates a new GcpChannelPoolOptions.Builder. */
public static GcpChannelPoolOptions.Builder newBuilder() {
return new GcpChannelPoolOptions.Builder();
}

/** Creates a new GcpChannelPoolOptions.Builder from GcpChannelPoolOptions. */
public static GcpChannelPoolOptions.Builder newBuilder(GcpChannelPoolOptions options) {
return new GcpChannelPoolOptions.Builder(options);
}

public static class Builder {
private int maxSize = GcpManagedChannel.DEFAULT_MAX_CHANNEL;
private int concurrentStreamsLowWatermark = GcpManagedChannel.DEFAULT_MAX_STREAM;

public Builder() {}

public Builder(GcpChannelPoolOptions options) {
this();
if (options == null) {
return;
}
this.maxSize = options.getMaxSize();
this.concurrentStreamsLowWatermark = options.getConcurrentStreamsLowWatermark();
}

public GcpChannelPoolOptions build() {
return new GcpChannelPoolOptions(this);
}

/**
* Sets the maximum size of the channel pool.
*
* @param maxSize maximum number of channels the pool can have.
*/
public Builder setMaxSize(int maxSize) {
Preconditions.checkArgument(maxSize > 0, "Channel pool size must be positive.");
this.maxSize = maxSize;
return this;
}

/**
* Sets the concurrent streams low watermark.
* If every channel in the pool has at least this amount of concurrent streams then a new
* channel will be created in the pool unless the pool reached its maximum size.
*
* @param concurrentStreamsLowWatermark number of streams every channel must reach before adding a new channel
* to the pool.
*/
public Builder setConcurrentStreamsLowWatermark(int concurrentStreamsLowWatermark) {
this.concurrentStreamsLowWatermark = concurrentStreamsLowWatermark;
return this;
}
}
}

/** Metrics configuration for the GCP managed channel. */
public static class GcpMetricsOptions {
private final MetricRegistry metricRegistry;
Expand Down
7 changes: 6 additions & 1 deletion grpc-gcp/src/main/proto/google/grpc/gcp/proto/grpc_gcp.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ option java_outer_classname = "GcpExtensionProto";
option java_package = "com.google.cloud.grpc.proto";

message ApiConfig {
// Deprecated. Use GcpManagedChannelOptions.GcpChannelPoolOptions class.
// The channel pool configurations.
ChannelPoolConfig channel_pool = 2;
ChannelPoolConfig channel_pool = 2 [deprecated = true];

// The method configurations.
repeated MethodConfig method = 1001;
}


// Deprecated. Use GcpManagedChannelOptions.GcpChannelPoolOptions class.
message ChannelPoolConfig {
option deprecated = true;

// The max number of channels in the pool.
uint32 max_size = 1;
// The idle timeout (seconds) of channels without bound affinity sessions.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertNotNull;

import com.google.cloud.grpc.GcpManagedChannel.ChannelRef;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpResiliencyOptions;
import com.google.cloud.grpc.MetricRegistryTestUtils.FakeMetricRegistry;
Expand Down Expand Up @@ -130,6 +131,51 @@ public void testLoadApiConfigString() throws Exception {
assertEquals(3, gcpChannel.methodToAffinity.size());
}

@Test
public void testUsesPoolOptions() {
resetGcpChannel();
GcpChannelPoolOptions poolOptions = GcpChannelPoolOptions.newBuilder()
.setMaxSize(5)
.setConcurrentStreamsLowWatermark(50)
.build();
GcpManagedChannelOptions options = GcpManagedChannelOptions.newBuilder()
.withChannelPoolOptions(poolOptions)
.build();
gcpChannel =
(GcpManagedChannel)
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withOptions(options)
.build();
assertEquals(0, gcpChannel.channelRefs.size());
assertEquals(5, gcpChannel.getMaxSize());
assertEquals(50, gcpChannel.getStreamsLowWatermark());
}

@Test
public void testPoolOptionsOverrideApiConfig() {
resetGcpChannel();
final URL resource = GcpManagedChannelTest.class.getClassLoader().getResource(API_FILE);
assertNotNull(resource);
File configFile = new File(resource.getFile());
GcpChannelPoolOptions poolOptions = GcpChannelPoolOptions.newBuilder()
.setMaxSize(5)
.setConcurrentStreamsLowWatermark(50)
.build();
GcpManagedChannelOptions options = GcpManagedChannelOptions.newBuilder()
.withChannelPoolOptions(poolOptions)
.build();
gcpChannel =
(GcpManagedChannel)
GcpManagedChannelBuilder.forDelegateBuilder(builder)
.withApiConfigJsonFile(configFile)
.withOptions(options)
.build();
assertEquals(0, gcpChannel.channelRefs.size());
assertEquals(5, gcpChannel.getMaxSize());
assertEquals(50, gcpChannel.getStreamsLowWatermark());
assertEquals(3, gcpChannel.methodToAffinity.size());
}

@Test
public void testGetChannelRefInitialization() {
// Should not have a managedchannel by default.
Expand Down