Skip to content

Commit

Permalink
Move GrpcManagedChannelPool timeout parameters to GrpcChannel and Grp…
Browse files Browse the repository at this point in the history
…cChannelBuilder (#8339)

* Move channel timeout parameters into ManagedChannelPool.ChannelKey

By moving the parameters into the channel key we can acquire and shutdown channels based on per-client configurations rather than using a load-once-per-jvm strategy that was used previously.

The advantage of doing this is that users who choose to modify timeouts within the same JVM across different client objects will have those timeouts respected by the channel pool when acquiring and releasing channels.

* Set timeouts in ChannelKey constructor

By utilizing the constructor it will force us to use the defaults from configuration if the values are not set by the programmer.

* Fix Checkstyle issues

* remove timeouts from equality and hash code

It's not required to put the health check timeout and shutdown timeout inside of the equality and hashCode functions for the ChannelKey.

The shutdown of a channel will always immediately return if there are other references present. The last key to shut down the channel will respect the timeout however which is what we want. Similarly for health check which is only used when acquiring the channel - it will come from the key and is not stored with the channel so it is not required in those functions.

* minor style fixes

* move argument locations of timeouts

ChannelKey may not have been the most intuitive place to place the parameters. Moving out the the GrpcChannel which wraps the ManagedChannel Reference and also using the configuration object from within the GrpcChannelBuilder in order to reference the timeouts.
  • Loading branch information
ZacBlanco authored and calvinjia committed Feb 7, 2019
1 parent fc08cfe commit 9af970e
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 91 deletions.
7 changes: 5 additions & 2 deletions core/common/src/main/java/alluxio/grpc/GrpcChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,19 @@ public final class GrpcChannel extends Channel {
private final Channel mChannel;
private boolean mChannelReleased;
private boolean mChannelHealthy = true;
private final long mShutdownTimeoutMs;

/**
* Create a new instance of {@link GrpcChannel}.
*
* @param channel the grpc channel to wrap
*/
public GrpcChannel(GrpcManagedChannelPool.ChannelKey channelKey, Channel channel) {
public GrpcChannel(GrpcManagedChannelPool.ChannelKey channelKey, Channel channel,
long shutdownTimeoutMs) {
mChannelKey = channelKey;
mChannel = ClientInterceptors.intercept(channel, new ChannelResponseTracker((this)));
mChannelReleased = false;
mShutdownTimeoutMs = shutdownTimeoutMs;
}

@Override
Expand All @@ -59,7 +62,7 @@ public String authority() {
*/
public void shutdown() {
if(!mChannelReleased) {
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(mChannelKey);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(mChannelKey, mShutdownTimeoutMs);
}
mChannelReleased = true;
}
Expand Down
12 changes: 8 additions & 4 deletions core/common/src/main/java/alluxio/grpc/GrpcChannelBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public final class GrpcChannelBuilder {
protected AlluxioConfiguration mConfiguration;

private GrpcChannelBuilder(SocketAddress address, AlluxioConfiguration conf) {
mChannelKey = GrpcManagedChannelPool.ChannelKey.create();
mChannelKey = GrpcManagedChannelPool.ChannelKey.create(conf);
mChannelKey.setAddress(address).usePlaintext();
mUseSubject = true;
mAuthenticateChannel = true;
Expand Down Expand Up @@ -192,7 +192,9 @@ public GrpcChannelBuilder setPoolingStrategy(GrpcManagedChannelPool.PoolingStrat
*/
public GrpcChannel build() throws UnauthenticatedException, UnavailableException {
ManagedChannel underlyingChannel =
GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(mChannelKey);
GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(mChannelKey,
mConfiguration.getMs(PropertyKey.NETWORK_CONNECTION_HEALTH_CHECK_TIMEOUT_MS),
mConfiguration.getMs(PropertyKey.MASTER_GRPC_CHANNEL_SHUTDOWN_TIMEOUT));
try {
Channel clientChannel = underlyingChannel;

Expand All @@ -211,10 +213,12 @@ public GrpcChannel build() throws UnauthenticatedException, UnavailableException
clientChannel = channelAuthenticator.authenticate(underlyingChannel, mConfiguration);
}
// Create the channel after authentication with the target.
return new GrpcChannel(mChannelKey, clientChannel);
return new GrpcChannel(mChannelKey, clientChannel,
mConfiguration.getMs(PropertyKey.MASTER_GRPC_CHANNEL_SHUTDOWN_TIMEOUT));
} catch (Exception exc) {
// Release the managed channel to the pool before throwing.
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(mChannelKey);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(mChannelKey,
mConfiguration.getMs(PropertyKey.MASTER_GRPC_CHANNEL_SHUTDOWN_TIMEOUT));
throw exc;
}
}
Expand Down
56 changes: 17 additions & 39 deletions core/common/src/main/java/alluxio/grpc/GrpcManagedChannelPool.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
package alluxio.grpc;

import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.collections.Pair;
import alluxio.conf.AlluxioConfiguration;
import alluxio.resource.LockResource;
import alluxio.util.ConfigurationUtils;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.CommonUtils;
import alluxio.util.WaitForOptions;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
import com.google.common.base.Verify;
import io.grpc.ConnectivityState;
Expand Down Expand Up @@ -47,22 +43,7 @@ public class GrpcManagedChannelPool {

static {
// TODO(zac): Find a better way to handle handle this instance
sInstance = new GrpcManagedChannelPool(
new InstancedConfiguration(ConfigurationUtils.defaults())
.getMs(PropertyKey.NETWORK_CONNECTION_HEALTH_CHECK_TIMEOUT_MS),
new InstancedConfiguration(ConfigurationUtils.defaults())
.getMs(PropertyKey.MASTER_GRPC_CHANNEL_SHUTDOWN_TIMEOUT));
}

/**
* Creates a new channel pool with the given health check timeout and channel shutdown timeout
*
* @param networkHealthCheckTimeoutMs timeout in ms for channel health check
* @param channelShutdownTimeoutMs timeout in ms for channel shutdown.
*/
@VisibleForTesting
public static void renewChannelPool(long networkHealthCheckTimeoutMs, long channelShutdownTimeoutMs) {
sInstance = new GrpcManagedChannelPool(networkHealthCheckTimeoutMs, channelShutdownTimeoutMs);
sInstance = new GrpcManagedChannelPool();
}

/**
Expand All @@ -83,27 +64,21 @@ public static GrpcManagedChannelPool INSTANCE() {
/** Used to control access to mChannel */
private ReentrantReadWriteLock mLock;

private final long mChannelShutdownTimeoutMs;

/** Scheduler for destruction of idle channels. */
protected ScheduledExecutorService mScheduler;
/** Timeout for health check on managed channels. */
private final long mHealthCheckTimeoutMs;

/**
* Creates a new {@link GrpcManagedChannelPool}.
*/
public GrpcManagedChannelPool(long healthCheckTimeoutMs, long channelShutdownTimeoutMs) {
public GrpcManagedChannelPool() {
mChannels = new HashMap<>();
mLock = new ReentrantReadWriteLock(true);
mChannelShutdownTimeoutMs = channelShutdownTimeoutMs;
mHealthCheckTimeoutMs = healthCheckTimeoutMs;
}

private void shutdownManagedChannel(ManagedChannel managedChannel) {
private void shutdownManagedChannel(ManagedChannel managedChannel, long shutdownTimeoutMs) {
managedChannel.shutdown();
try {
managedChannel.awaitTermination(mChannelShutdownTimeoutMs, TimeUnit.MILLISECONDS);
managedChannel.awaitTermination(shutdownTimeoutMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Allow thread to exit.
Expand All @@ -113,7 +88,7 @@ private void shutdownManagedChannel(ManagedChannel managedChannel) {
Verify.verify(managedChannel.isShutdown());
}

private boolean waitForChannelReady(ManagedChannel managedChannel) {
private boolean waitForChannelReady(ManagedChannel managedChannel, long healthCheckTimeoutMs) {
try {
Boolean res = CommonUtils.waitForResult("channel to be ready", () -> {
ConnectivityState currentState = managedChannel.getState(true);
Expand All @@ -129,7 +104,7 @@ private boolean waitForChannelReady(ManagedChannel managedChannel) {
default:
return null;
}
}, WaitForOptions.defaults().setTimeoutMs((int) mHealthCheckTimeoutMs));
}, WaitForOptions.defaults().setTimeoutMs((int) healthCheckTimeoutMs));
return res;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -145,12 +120,14 @@ private boolean waitForChannelReady(ManagedChannel managedChannel) {
* @param channelKey channel key
* @return a {@link ManagedChannel}
*/
public ManagedChannel acquireManagedChannel(ChannelKey channelKey) {
public ManagedChannel acquireManagedChannel(ChannelKey channelKey, long healthCheckTimeoutMs,
long shutdownTimeoutMs) {
boolean shutdownExistingChannel = false;
try (LockResource lockShared = new LockResource(mLock.readLock())) {
if (mChannels.containsKey(channelKey)) {
ManagedChannelReference managedChannelRef = mChannels.get(channelKey);
if (waitForChannelReady(managedChannelRef.get())) {
if (waitForChannelReady(mChannels.get(channelKey).get(),
healthCheckTimeoutMs)) {
return managedChannelRef.reference();
} else {
// Postpone channel shutdown under exclusive lock below.
Expand All @@ -162,7 +139,7 @@ public ManagedChannel acquireManagedChannel(ChannelKey channelKey) {
// Dispose existing channel if required.
int existingRefCount = 0;
if (shutdownExistingChannel && mChannels.containsKey(channelKey)) {
shutdownManagedChannel(mChannels.get(channelKey).get());
shutdownManagedChannel(mChannels.get(channelKey).get(), shutdownTimeoutMs);
existingRefCount = mChannels.get(channelKey).getRefCount();
mChannels.remove(channelKey);
}
Expand All @@ -181,7 +158,7 @@ public ManagedChannel acquireManagedChannel(ChannelKey channelKey) {
*
* @param channelKey host address
*/
public void releaseManagedChannel(ChannelKey channelKey) {
public void releaseManagedChannel(ChannelKey channelKey, long shutdownTimeoutMs) {
boolean shutdownManagedChannel;
try (LockResource lockShared = new LockResource(mLock.readLock())) {
Verify.verify(mChannels.containsKey(channelKey));
Expand All @@ -193,7 +170,7 @@ public void releaseManagedChannel(ChannelKey channelKey) {
if (mChannels.containsKey(channelKey)) {
ManagedChannelReference channelRef = mChannels.get(channelKey);
if (channelRef.getRefCount() <= 0) {
shutdownManagedChannel(mChannels.remove(channelKey).get());
shutdownManagedChannel(mChannels.remove(channelKey).get(), shutdownTimeoutMs);
}
}
}
Expand Down Expand Up @@ -294,12 +271,13 @@ public static class ChannelKey {
private Optional<Class<? extends io.netty.channel.Channel>> mChannelType = Optional.empty();
private Optional<EventLoopGroup> mEventLoopGroup = Optional.empty();
private long mPoolKey = 0;
private ChannelKey() {}

public static ChannelKey create() {
public static ChannelKey create(AlluxioConfiguration conf) {
return new ChannelKey();
}

private ChannelKey() {}

/**
* @param address destination address of the channel
* @return the modified {@link ChannelKey}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,9 @@
import alluxio.ConfigurationTestUtils;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.util.SleepUtils;

import io.grpc.ManagedChannel;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

Expand All @@ -34,6 +32,10 @@
public final class GrpcManagedChannelPoolTest {

private static InstancedConfiguration sConf = ConfigurationTestUtils.defaults();
private static final long SHUTDOWN_TIMEOUT =
sConf.getMs(PropertyKey.MASTER_GRPC_CHANNEL_SHUTDOWN_TIMEOUT);
private static final long HEALTH_CHECK_TIMEOUT =
sConf.getMs(PropertyKey.NETWORK_CONNECTION_HEALTH_CHECK_TIMEOUT_MS);

@BeforeClass
public static void classSetup() {
Expand All @@ -48,8 +50,8 @@ public void after() throws Exception {
@Test
public void testEqualKeys() throws Exception {

GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create(sConf);
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create(sConf);

GrpcServer server1 =
GrpcServerBuilder.forAddress(new InetSocketAddress("0.0.0.0", 0), sConf).build().start();
Expand All @@ -58,21 +60,23 @@ public void testEqualKeys() throws Exception {
key1.setAddress(address);
key2.setAddress(address);

ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2);
ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);

assertTrue(channel1 == channel2);

GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1, SHUTDOWN_TIMEOUT);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2, SHUTDOWN_TIMEOUT);
server1.shutdown();
}

@Test
public void testUnhealthyChannelRecreation() throws Exception {

GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create(sConf);
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create(sConf);

// Not creating the coresponding server will ensure, the channels will never
// be ready.
Expand All @@ -81,19 +85,21 @@ public void testUnhealthyChannelRecreation() throws Exception {
key1.setAddress(address);
key2.setAddress(address);

ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2);
ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);

assertTrue(channel1 != channel2);

GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1, SHUTDOWN_TIMEOUT);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2, SHUTDOWN_TIMEOUT);
}

@Test
public void testEqualKeysComplex() throws Exception {
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create(sConf);
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create(sConf);

GrpcServer server1 =
GrpcServerBuilder.forAddress(new InetSocketAddress("0.0.0.0", 0), sConf).build().start();
Expand All @@ -115,20 +121,22 @@ public void testEqualKeysComplex() throws Exception {
key1.setKeepAliveTimeout(100, TimeUnit.MINUTES);
key2.setKeepAliveTimeout(100, TimeUnit.MINUTES);

ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2);
ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);

assertTrue(channel1 == channel2);

GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1, SHUTDOWN_TIMEOUT);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2, SHUTDOWN_TIMEOUT);
server1.shutdown();
}

@Test
public void testNotEqualKeys() throws Exception {
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create();
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create(sConf);
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create(sConf);

GrpcServer server1 =
GrpcServerBuilder.forAddress(new InetSocketAddress("0.0.0.0", 0), sConf).build().start();
Expand All @@ -141,22 +149,24 @@ public void testNotEqualKeys() throws Exception {
key1.setAddress(address1);
key2.setAddress(address2);

ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2);
ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);

assertTrue(channel1 != channel2);

GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1, SHUTDOWN_TIMEOUT);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2, SHUTDOWN_TIMEOUT);
server2.shutdown();
server2.shutdown();
}

@Test
public void testEqualKeysNoPooling() throws Exception {
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create()
GrpcManagedChannelPool.ChannelKey key1 = GrpcManagedChannelPool.ChannelKey.create(sConf)
.setPoolingStrategy(GrpcManagedChannelPool.PoolingStrategy.DISABLED);
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create()
GrpcManagedChannelPool.ChannelKey key2 = GrpcManagedChannelPool.ChannelKey.create(sConf)
.setPoolingStrategy(GrpcManagedChannelPool.PoolingStrategy.DISABLED);

GrpcServer server1 =
Expand All @@ -167,13 +177,16 @@ public void testEqualKeysNoPooling() throws Exception {
key1.setAddress(address);
key2.setAddress(address);

ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2);
ManagedChannel channel1 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key1,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);
ManagedChannel channel2 = GrpcManagedChannelPool.INSTANCE().acquireManagedChannel(key2,
HEALTH_CHECK_TIMEOUT, SHUTDOWN_TIMEOUT);

assertTrue(channel1 != channel2);

GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key1, SHUTDOWN_TIMEOUT);
GrpcManagedChannelPool.INSTANCE().releaseManagedChannel(key2, SHUTDOWN_TIMEOUT);

server1.shutdown();
}
}
Loading

0 comments on commit 9af970e

Please sign in to comment.