Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi-endpoint feature #135

Merged
merged 11 commits into from
Jul 12, 2022
1 change: 1 addition & 0 deletions grpc-gcp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ dependencies {
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.opencensus:opencensus-api:${opencensusVersion}"
implementation "com.google.api:api-common:2.1.5"

compileOnly "org.apache.tomcat:annotations-api:6.0.53" // necessary for Java 9+

Expand Down
93 changes: 83 additions & 10 deletions grpc-gcp/src/main/java/com/google/cloud/grpc/GcpManagedChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.grpc.proto.ApiConfig;
import com.google.cloud.grpc.proto.MethodConfig;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.base.Joiner;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.protobuf.Descriptors.FieldDescriptor;
Expand All @@ -48,11 +49,13 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.LongSummaryStatistics;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -92,6 +95,10 @@ public class GcpManagedChannel extends ManagedChannel {

@VisibleForTesting final List<ChannelRef> channelRefs = new CopyOnWriteArrayList<>();

private final ExecutorService stateNotificationExecutor = Executors.newCachedThreadPool(
new ThreadFactoryBuilder().setNameFormat("gcp-mc-state-notifications-%d").build());
private List<Runnable> stateChangeCallbacks = Collections.synchronizedList(new LinkedList<>());

// Metrics configuration.
private MetricRegistry metricRegistry;
private final List<LabelKey> labelKeys = new ArrayList<>();
Expand Down Expand Up @@ -872,6 +879,15 @@ private void recordUnresponsiveDetection(long nanos, long dropCount) {
}
}

@Override
public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
if (!getState(false).equals(source)) {
stateNotificationExecutor.execute(callback);
return;
}
stateChangeCallbacks.add(callback);
}

/**
* ChannelStateMonitor subscribes to channel's state changes and informs {@link GcpManagedChannel}
* on any new state. This monitor allows to detect when a channel is not ready and temporarily
Expand Down Expand Up @@ -919,7 +935,14 @@ public void run() {
}
}

private synchronized void executeStateChangeCallbacks() {
List<Runnable> callbacksToTrigger = stateChangeCallbacks;
stateChangeCallbacks = new LinkedList<>();
callbacksToTrigger.forEach(stateNotificationExecutor::execute);
}

void processChannelStateChange(int channelId, ConnectivityState state) {
executeStateChangeCallbacks();
if (!fallbackEnabled) {
return;
}
Expand Down Expand Up @@ -967,10 +990,12 @@ protected ChannelRef getChannelRefForBind() {
ChannelRef channelRef;
if (options.getChannelPoolOptions() != null && options.getChannelPoolOptions().isUseRoundRobinOnBind()) {
channelRef = getChannelRefRoundRobin();
logger.finest(log(
"Channel %d picked for bind operation using round-robin.", channelRef.getId()));
} else {
channelRef = getChannelRef(null);
logger.finest(log("Channel %d picked for bind operation.", channelRef.getId()));
}
logger.finest(log("Channel %d picked for bind operation.", channelRef.getId()));
return channelRef;
}

Expand Down Expand Up @@ -1061,15 +1086,45 @@ private synchronized ChannelRef createNewChannel() {
return channelRef;
}

// Returns first newly created channel or null if there are already some channels in the pool.
@Nullable
private ChannelRef createFirstChannel() {
mohanli-ml marked this conversation as resolved.
Show resolved Hide resolved
if (!channelRefs.isEmpty()) {
return null;
}
synchronized (this) {
if (channelRefs.isEmpty()) {
return createNewChannel();
}
}
return null;
}

// Creates new channel if maxSize is not reached.
// Returns new channel or null.
@Nullable
private ChannelRef tryCreateNewChannel() {
if (channelRefs.size() >= maxSize) {
return null;
}
synchronized (this) {
if (channelRefs.size() < maxSize) {
return createNewChannel();
}
}
return null;
}

/**
* Pick a {@link ChannelRef} (and create a new one if necessary). If notReadyFallbackEnabled is
* true in the {@link GcpResiliencyOptions} then instead of a channel in a non-READY state another
* channel in the READY state and having fewer than maximum allowed number of active streams will
* be provided if available.
*/
private ChannelRef pickLeastBusyChannel(boolean forFallback) {
if (channelRefs.isEmpty()) {
return createNewChannel();
ChannelRef first = createFirstChannel();
if (first != null) {
return first;
}

// Pick the least busy channel and the least busy ready and not overloaded channel (this could
Expand All @@ -1095,17 +1150,23 @@ private ChannelRef pickLeastBusyChannel(boolean forFallback) {

if (!fallbackEnabled) {
if (channelRefs.size() < maxSize && minStreams >= maxConcurrentStreamsLowWatermark) {
return createNewChannel();
ChannelRef newChannel = tryCreateNewChannel();
if (newChannel != null) {
return newChannel;
}
}
return channelCandidate;
}

if (channelRefs.size() < maxSize && readyMinStreams >= maxConcurrentStreamsLowWatermark) {
if (!forFallback && readyCandidate == null) {
logger.finest(log("Fallback to newly created channel"));
fallbacksSucceeded.incrementAndGet();
ChannelRef newChannel = tryCreateNewChannel();
if (newChannel != null) {
if (!forFallback && readyCandidate == null) {
logger.finest(log("Fallback to newly created channel %d", newChannel.getId()));
fallbacksSucceeded.incrementAndGet();
}
return newChannel;
}
return createNewChannel();
}

if (readyCandidate != null) {
Expand Down Expand Up @@ -1164,6 +1225,9 @@ public ManagedChannel shutdownNow() {
if (logMetricService != null && !logMetricService.isTerminated()) {
logMetricService.shutdownNow();
}
if (!stateNotificationExecutor.isTerminated()) {
stateNotificationExecutor.shutdownNow();
}
return this;
}

Expand All @@ -1176,6 +1240,7 @@ public ManagedChannel shutdown() {
if (logMetricService != null) {
logMetricService.shutdown();
}
stateNotificationExecutor.shutdown();
return this;
}

Expand All @@ -1197,6 +1262,11 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE
//noinspection ResultOfMethodCallIgnored
logMetricService.awaitTermination(awaitTimeNanos, NANOSECONDS);
}
awaitTimeNanos = endTimeNanos - System.nanoTime();
if (awaitTimeNanos > 0) {
//noinspection ResultOfMethodCallIgnored
stateNotificationExecutor.awaitTermination(awaitTimeNanos, NANOSECONDS);
}
return isTerminated();
}

Expand All @@ -1210,7 +1280,7 @@ public boolean isShutdown() {
if (logMetricService != null) {
return logMetricService.isShutdown();
}
return true;
return stateNotificationExecutor.isShutdown();
}

@Override
Expand All @@ -1223,12 +1293,15 @@ public boolean isTerminated() {
if (logMetricService != null) {
return logMetricService.isTerminated();
}
return true;
return stateNotificationExecutor.isTerminated();
}

/** Get the current connectivity state of the channel pool. */
@Override
public ConnectivityState getState(boolean requestConnection) {
if (requestConnection && getNumberOfChannels() == 0) {
createFirstChannel();
}
int ready = 0;
int idle = 0;
int connecting = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.opencensus.metrics.LabelKey;
import io.opencensus.metrics.LabelValue;
import io.opencensus.metrics.MetricRegistry;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
Expand Down Expand Up @@ -64,10 +65,10 @@ public GcpResiliencyOptions getResiliencyOptions() {
@Override
public String toString() {
return String.format(
"{channelPoolOptions: %s, metricsOptions: %s, resiliencyOptions: %s}",
"{channelPoolOptions: %s, resiliencyOptions: %s, metricsOptions: %s}",
getChannelPoolOptions(),
getMetricsOptions(),
getResiliencyOptions()
getResiliencyOptions(),
getMetricsOptions()
);
}

Expand Down Expand Up @@ -208,8 +209,9 @@ public boolean isUseRoundRobinOnBind() {
@Override
public String toString() {
return String.format(
"{maxSize: %d, concurrentStreamsLowWatermark: %d, useRoundRobinOnBind: %s}",
"{maxSize: %d, minSize: %d, concurrentStreamsLowWatermark: %d, useRoundRobinOnBind: %s}",
getMaxSize(),
getMinSize(),
getConcurrentStreamsLowWatermark(),
isUseRoundRobinOnBind()
);
Expand Down