Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add switching delay to GcpMultiEndpointChannel #145

Merged
merged 2 commits into from
Sep 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion grpc-gcp/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ dependencies {
testImplementation "io.grpc:grpc-auth:${grpcVersion}"
testImplementation "io.grpc:grpc-stub:${grpcVersion}"
testImplementation "com.google.truth:truth:0.42"
testImplementation "junit:junit:4.12"
testImplementation 'junit:junit:4.13.2'
}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -882,7 +883,12 @@ private void recordUnresponsiveDetection(long nanos, long dropCount) {
@Override
public void notifyWhenStateChanged(ConnectivityState source, Runnable callback) {
if (!getState(false).equals(source)) {
stateNotificationExecutor.execute(callback);
try {
stateNotificationExecutor.execute(callback);
} catch (RejectedExecutionException e) {
// Ignore exceptions on shutdown.
logger.fine(log("State notification change task rejected: %s", e.getMessage()));
}
return;
}
stateChangeCallbacks.add(callback);
Expand Down Expand Up @@ -938,7 +944,12 @@ public void run() {
private synchronized void executeStateChangeCallbacks() {
List<Runnable> callbacksToTrigger = stateChangeCallbacks;
stateChangeCallbacks = new LinkedList<>();
callbacksToTrigger.forEach(stateNotificationExecutor::execute);
try {
callbacksToTrigger.forEach(stateNotificationExecutor::execute);
} catch (RejectedExecutionException e) {
// Ignore exceptions on shutdown.
logger.fine(log("State notification change task rejected: %s", e.getMessage()));
}
}

void processChannelStateChange(int channelId, ConnectivityState state) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package com.google.cloud.grpc;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

import com.google.cloud.grpc.GcpManagedChannelOptions.GcpChannelPoolOptions;
import com.google.cloud.grpc.GcpManagedChannelOptions.GcpMetricsOptions;
import com.google.cloud.grpc.multiendpoint.MultiEndpoint;
import com.google.cloud.grpc.proto.ApiConfig;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.ClientCall.Listener;
Expand All @@ -34,12 +37,17 @@
import io.grpc.MethodDescriptor;
import io.opencensus.metrics.LabelKey;
import io.opencensus.metrics.LabelValue;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand Down Expand Up @@ -119,6 +127,11 @@ public class GcpMultiEndpointChannel extends ManagedChannel {

private final Map<String, GcpManagedChannel> pools = new ConcurrentHashMap<>();

@GuardedBy("this")
private final Set<String> currentEndpoints = new HashSet<>();

private final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(1);

/**
* Constructor for {@link GcpMultiEndpointChannel}.
*
Expand Down Expand Up @@ -219,11 +232,10 @@ private GcpManagedChannelOptions prepareGcpManagedChannelConfig(
* no channel credentials change, nor channel configurator change).
* </ul>
*/
public void setMultiEndpoints(List<GcpMultiEndpointOptions> meOptions) {
public synchronized void setMultiEndpoints(List<GcpMultiEndpointOptions> meOptions) {
Preconditions.checkNotNull(meOptions);
Preconditions.checkArgument(!meOptions.isEmpty(), "MultiEndpoints list is empty");
Set<String> currentMultiEndpoints = new HashSet<>();
Set<String> currentEndpoints = new HashSet<>();

// Must have all multiendpoints before initializing the pools so that all multiendpoints
// can get status update of every pool.
Expand All @@ -236,10 +248,12 @@ public void setMultiEndpoints(List<GcpMultiEndpointOptions> meOptions) {
multiEndpoints.put(options.getName(),
(new MultiEndpoint.Builder(options.getEndpoints()))
.withRecoveryTimeout(options.getRecoveryTimeout())
.withSwitchingDelay(options.getSwitchingDelay())
.build());
}
});

currentEndpoints.clear();
// TODO: Support the same endpoint in different MultiEndpoint to use different channel
// credentials.
// TODO: Support different endpoints in the same MultiEndpoint to use different channel
Expand Down Expand Up @@ -289,11 +303,32 @@ public void setMultiEndpoints(List<GcpMultiEndpointOptions> meOptions) {
multiEndpoints.keySet().removeIf(name -> !currentMultiEndpoints.contains(name));

// Shutdown and remove the pools not present in options.
for (String endpoint : pools.keySet()) {
if (!currentEndpoints.contains(endpoint)) {
pools.get(endpoint).shutdown();
pools.remove(endpoint);
final Set<String> poolsToRemove = new HashSet<>(pools.keySet());
poolsToRemove.removeIf(currentEndpoints::contains);
if (!poolsToRemove.isEmpty()) {
// Get max switching delay.
Optional<Duration> maxDelay = meOptions.stream()
.map(GcpMultiEndpointOptions::getSwitchingDelay)
.max(Comparator.naturalOrder());
if (maxDelay.isPresent() && maxDelay.get().toMillis() > 0) {
executor.schedule(
() -> maybeCleanupPools(poolsToRemove),
maxDelay.get().toMillis(),
MILLISECONDS
);
} else {
maybeCleanupPools(poolsToRemove);
}
}
}

private synchronized void maybeCleanupPools(Set<String> endpoints) {
for (String endpoint : endpoints) {
if (currentEndpoints.contains(endpoint)) {
continue;
}
pools.get(endpoint).shutdown();
pools.remove(endpoint);
}
}

Expand All @@ -305,6 +340,7 @@ public void setMultiEndpoints(List<GcpMultiEndpointOptions> meOptions) {
* @since 1.0.0
*/
@Override
@CanIgnoreReturnValue
public ManagedChannel shutdown() {
pools.values().forEach(GcpManagedChannel::shutdown);
return this;
Expand Down Expand Up @@ -344,6 +380,7 @@ public boolean isTerminated() {
* @since 1.0.0
*/
@Override
@CanIgnoreReturnValue
public ManagedChannel shutdownNow() {
pools.values().forEach(GcpManagedChannel::shutdownNow);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFunction;
import com.google.cloud.grpc.multiendpoint.MultiEndpoint;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ChannelCredentials;
import io.grpc.ManagedChannelBuilder;
import java.time.Duration;
Expand All @@ -34,6 +35,7 @@ public class GcpMultiEndpointOptions {
private final ApiFunction<ManagedChannelBuilder<?>, ManagedChannelBuilder<?>> channelConfigurator;
private final ChannelCredentials channelCredentials;
private final Duration recoveryTimeout;
private final Duration switchingDelay;

public static String DEFAULT_NAME = "default";

Expand All @@ -43,6 +45,7 @@ public GcpMultiEndpointOptions(Builder builder) {
this.channelConfigurator = builder.channelConfigurator;
this.channelCredentials = builder.channelCredentials;
this.recoveryTimeout = builder.recoveryTimeout;
this.switchingDelay = builder.switchingDelay;
}

/**
Expand Down Expand Up @@ -81,13 +84,18 @@ public Duration getRecoveryTimeout() {
return recoveryTimeout;
}

public Duration getSwitchingDelay() {
return switchingDelay;
}

public static class Builder {

private String name = GcpMultiEndpointOptions.DEFAULT_NAME;
private List<String> endpoints;
private ApiFunction<ManagedChannelBuilder<?>, ManagedChannelBuilder<?>> channelConfigurator;
private ChannelCredentials channelCredentials;
private Duration recoveryTimeout = Duration.ZERO;
private Duration switchingDelay = Duration.ZERO;

public Builder(List<String> endpoints) {
setEndpoints(endpoints);
Expand All @@ -99,6 +107,7 @@ public Builder(GcpMultiEndpointOptions options) {
this.channelConfigurator = options.getChannelConfigurator();
this.channelCredentials = options.getChannelCredentials();
this.recoveryTimeout = options.getRecoveryTimeout();
this.switchingDelay = options.getSwitchingDelay();
}

public GcpMultiEndpointOptions build() {
Expand All @@ -119,6 +128,7 @@ private void setEndpoints(List<String> endpoints) {
*
* @param name MultiEndpoint name.
*/
@CanIgnoreReturnValue
public GcpMultiEndpointOptions.Builder withName(String name) {
this.name = name;
return this;
Expand All @@ -129,6 +139,7 @@ public GcpMultiEndpointOptions.Builder withName(String name) {
*
* @param endpoints List of endpoints in the form of host:port in descending priority order.
*/
@CanIgnoreReturnValue
public GcpMultiEndpointOptions.Builder withEndpoints(List<String> endpoints) {
this.setEndpoints(endpoints);
return this;
Expand All @@ -140,6 +151,7 @@ public GcpMultiEndpointOptions.Builder withEndpoints(List<String> endpoints) {
* @param channelConfigurator function to perform on the ManagedChannelBuilder in the channel
* pool.
*/
@CanIgnoreReturnValue
public GcpMultiEndpointOptions.Builder withChannelConfigurator(
ApiFunction<ManagedChannelBuilder<?>, ManagedChannelBuilder<?>> channelConfigurator) {
this.channelConfigurator = channelConfigurator;
Expand All @@ -151,6 +163,7 @@ public GcpMultiEndpointOptions.Builder withChannelConfigurator(
*
* @param channelCredentials channel credentials.
*/
@CanIgnoreReturnValue
public GcpMultiEndpointOptions.Builder withChannelCredentials(
ChannelCredentials channelCredentials) {
this.channelCredentials = channelCredentials;
Expand All @@ -162,9 +175,23 @@ public GcpMultiEndpointOptions.Builder withChannelCredentials(
*
* @param recoveryTimeout recovery timeout.
*/
@CanIgnoreReturnValue
public GcpMultiEndpointOptions.Builder withRecoveryTimeout(Duration recoveryTimeout) {
this.recoveryTimeout = recoveryTimeout;
return this;
}

/**
* Sets the switching delay for the MultiEndpoint.
* <p>When switching between endpoints the MultiEndpoint will stick to previous endpoint for the
* switching delay.
*
* @param switchingDelay switching delay.
*/
@CanIgnoreReturnValue
public GcpMultiEndpointOptions.Builder withSwitchingDelay(Duration switchingDelay) {
this.switchingDelay = switchingDelay;
return this;
}
}
}