Skip to content

Commit

Permalink
HBASE-13097 Use same EventLoopGroup for different AsyncRpcClients if …
Browse files Browse the repository at this point in the history
…possible

Signed-off-by: stack <stack@apache.org>
  • Loading branch information
Apache9 authored and saintstack committed Feb 27, 2015
1 parent f670649 commit d1619bc
Show file tree
Hide file tree
Showing 5 changed files with 752 additions and 555 deletions.
Expand Up @@ -44,6 +44,7 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
Expand All @@ -63,11 +64,12 @@
/** /**
* Netty client for the requests and responses * Netty client for the requests and responses
*/ */
@InterfaceAudience.Private @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
public class AsyncRpcClient extends AbstractRpcClient { public class AsyncRpcClient extends AbstractRpcClient {


public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max"; public static final String CLIENT_MAX_THREADS = "hbase.rpc.client.threads.max";
public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.useNativeTransport"; public static final String USE_NATIVE_TRANSPORT = "hbase.rpc.client.nativetransport";
public static final String USE_GLOBAL_EVENT_LOOP_GROUP = "hbase.rpc.client.globaleventloopgroup";


public static final HashedWheelTimer WHEEL_TIMER = public static final HashedWheelTimer WHEEL_TIMER =
new HashedWheelTimer(100, TimeUnit.MILLISECONDS); new HashedWheelTimer(100, TimeUnit.MILLISECONDS);
Expand All @@ -82,12 +84,54 @@ protected void initChannel(SocketChannel ch) throws Exception {


protected final AtomicInteger callIdCnt = new AtomicInteger(); protected final AtomicInteger callIdCnt = new AtomicInteger();


private final EventLoopGroup eventLoopGroup;
private final PoolMap<Integer, AsyncRpcChannel> connections; private final PoolMap<Integer, AsyncRpcChannel> connections;


final FailedServers failedServers; final FailedServers failedServers;


private final Bootstrap bootstrap; @VisibleForTesting
final Bootstrap bootstrap;

private final boolean useGlobalEventLoopGroup;

@VisibleForTesting
static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;

private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
getGlobalEventLoopGroup(Configuration conf) {
if (GLOBAL_EVENT_LOOP_GROUP == null) {
GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
if (LOG.isDebugEnabled()) {
LOG.debug("Create global event loop group "
+ GLOBAL_EVENT_LOOP_GROUP.getFirst().getClass().getSimpleName());
}
}
return GLOBAL_EVENT_LOOP_GROUP;
}

private static Pair<EventLoopGroup, Class<? extends Channel>> createEventLoopGroup(
Configuration conf) {
// Max amount of threads to use. 0 lets Netty decide based on amount of cores
int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0);

// Config to enable native transport. Does not seem to be stable at time of implementation
// although it is not extensively tested.
boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);

// Use the faster native epoll transport mechanism on linux if enabled
if (epollEnabled && JVM.isLinux()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Create EpollEventLoopGroup with maxThreads = " + maxThreads);
}
return new Pair<EventLoopGroup, Class<? extends Channel>>(new EpollEventLoopGroup(maxThreads,
Threads.newDaemonThreadFactory("AsyncRpcChannel")), EpollSocketChannel.class);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Create NioEventLoopGroup with maxThreads = " + maxThreads);
}
return new Pair<EventLoopGroup, Class<? extends Channel>>(new NioEventLoopGroup(maxThreads,
Threads.newDaemonThreadFactory("AsyncRpcChannel")), NioSocketChannel.class);
}
}


/** /**
* Constructor for tests * Constructor for tests
Expand All @@ -106,23 +150,16 @@ protected void initChannel(SocketChannel ch) throws Exception {
LOG.debug("Starting async Hbase RPC client"); LOG.debug("Starting async Hbase RPC client");
} }


// Max amount of threads to use. 0 lets Netty decide based on amount of cores Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass;
int maxThreads = conf.getInt(CLIENT_MAX_THREADS, 0); this.useGlobalEventLoopGroup = conf.getBoolean(USE_GLOBAL_EVENT_LOOP_GROUP, true);

if (useGlobalEventLoopGroup) {
// Config to enable native transport. Does not seem to be stable at time of implementation eventLoopGroupAndChannelClass = getGlobalEventLoopGroup(configuration);
// although it is not extensively tested.
boolean epollEnabled = conf.getBoolean(USE_NATIVE_TRANSPORT, false);

// Use the faster native epoll transport mechanism on linux if enabled
Class<? extends Channel> socketChannelClass;
if (epollEnabled && JVM.isLinux()) {
socketChannelClass = EpollSocketChannel.class;
this.eventLoopGroup =
new EpollEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel"));
} else { } else {
socketChannelClass = NioSocketChannel.class; eventLoopGroupAndChannelClass = createEventLoopGroup(configuration);
this.eventLoopGroup = }
new NioEventLoopGroup(maxThreads, Threads.newDaemonThreadFactory("AsyncRpcChannel")); if (LOG.isDebugEnabled()) {
LOG.debug("Use " + (useGlobalEventLoopGroup ? "global" : "individual") + " event loop group "
+ eventLoopGroupAndChannelClass.getFirst().getClass().getSimpleName());
} }


this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration)); this.connections = new PoolMap<>(getPoolType(configuration), getPoolSize(configuration));
Expand All @@ -133,7 +170,8 @@ protected void initChannel(SocketChannel ch) throws Exception {


// Configure the default bootstrap. // Configure the default bootstrap.
this.bootstrap = new Bootstrap(); this.bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup).channel(socketChannelClass) bootstrap.group(eventLoopGroupAndChannelClass.getFirst())
.channel(eventLoopGroupAndChannelClass.getSecond())
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.option(ChannelOption.TCP_NODELAY, tcpNoDelay) .option(ChannelOption.TCP_NODELAY, tcpNoDelay)
.option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive) .option(ChannelOption.SO_KEEPALIVE, tcpKeepAlive)
Expand Down Expand Up @@ -176,6 +214,9 @@ public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddre
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket, Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
InetSocketAddress addr) throws IOException, InterruptedException { InetSocketAddress addr) throws IOException, InterruptedException {
if (pcrc == null) {
pcrc = new PayloadCarryingRpcController();
}
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket); final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);


Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType); Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
Expand Down Expand Up @@ -236,6 +277,8 @@ public void operationComplete(Future<Message> future) throws Exception {
} }
} }


private boolean closed = false;

/** /**
* Close netty * Close netty
*/ */
Expand All @@ -245,12 +288,18 @@ public void close() {
} }


synchronized (connections) { synchronized (connections) {
if (closed) {
return;
}
closed = true;
for (AsyncRpcChannel conn : connections.values()) { for (AsyncRpcChannel conn : connections.values()) {
conn.close(null); conn.close(null);
} }
} }

// do not close global EventLoopGroup.
eventLoopGroup.shutdownGracefully(); if (!useGlobalEventLoopGroup) {
bootstrap.group().shutdownGracefully();
}
} }


/** /**
Expand Down Expand Up @@ -287,10 +336,6 @@ public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
*/ */
private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location, private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress location,
User ticket) throws StoppedRpcClientException, FailedServerException { User ticket) throws StoppedRpcClientException, FailedServerException {
if (this.eventLoopGroup.isShuttingDown() || this.eventLoopGroup.isShutdown()) {
throw new StoppedRpcClientException();
}

// Check if server is failed // Check if server is failed
if (this.failedServers.isFailedServer(location)) { if (this.failedServers.isFailedServer(location)) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
Expand All @@ -305,6 +350,9 @@ private AsyncRpcChannel createRpcChannel(String serviceName, InetSocketAddress l


AsyncRpcChannel rpcChannel; AsyncRpcChannel rpcChannel;
synchronized (connections) { synchronized (connections) {
if (closed) {
throw new StoppedRpcClientException();
}
rpcChannel = connections.get(hashCode); rpcChannel = connections.get(hashCode);
if (rpcChannel == null) { if (rpcChannel == null) {
rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location); rpcChannel = new AsyncRpcChannel(this.bootstrap, this, ticket, serviceName, location);
Expand Down

0 comments on commit d1619bc

Please sign in to comment.