Skip to content

Commit

Permalink
HBASE-15798 Add Async RpcChannels to all RpcClients
Browse files Browse the repository at this point in the history
Signed-off-by: stack <stack@apache.org>
  • Loading branch information
jurmous authored and saintstack committed May 10, 2016
1 parent 3b74b6f commit a11091c
Show file tree
Hide file tree
Showing 11 changed files with 483 additions and 227 deletions.
Expand Up @@ -270,6 +270,27 @@ public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final Us
return new BlockingRpcChannelImplementation(this, sn, ticket, defaultOperationTimeout);
}

/**
* Configure a payload carrying controller
* @param controller to configure
* @param channelOperationTimeout timeout for operation
* @return configured payload controller
*/
static PayloadCarryingRpcController configurePayloadCarryingRpcController(
RpcController controller, int channelOperationTimeout) {
PayloadCarryingRpcController pcrc;
if (controller != null && controller instanceof PayloadCarryingRpcController) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(channelOperationTimeout);
}
return pcrc;
}

/**
* Takes an Exception and the address we were trying to connect to and return an IOException with
* the input exception as the cause. The new exception provides the stack trace of the place where
Expand Down Expand Up @@ -321,16 +342,9 @@ protected BlockingRpcChannelImplementation(final AbstractRpcClient rpcClient,
@Override
public Message callBlockingMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType) throws ServiceException {
PayloadCarryingRpcController pcrc;
if (controller != null && controller instanceof PayloadCarryingRpcController) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(channelOperationTimeout);
}
PayloadCarryingRpcController pcrc = configurePayloadCarryingRpcController(
controller,
channelOperationTimeout);

return this.rpcClient.callBlockingMethod(md, pcrc, param, returnType, this.ticket, this.isa);
}
Expand Down
Expand Up @@ -51,14 +51,15 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
final Message responseDefaultType;

private final MessageConverter<M,T> messageConverter;
final long startTime;
final long rpcTimeout;
private final IOExceptionConverter exceptionConverter;

final long rpcTimeout;

// For only the request
private final CellScanner cellScanner;
private final int priority;

final MetricsConnection clientMetrics;
final MetricsConnection.CallStats callStats;

/**
Expand All @@ -71,13 +72,15 @@ public class AsyncCall<M extends Message, T> extends Promise<T> {
* @param cellScanner cellScanner containing cells to send as request
* @param responseDefaultType the default response type
* @param messageConverter converts the messages to what is the expected output
* @param exceptionConverter converts exceptions to expected format. Can be null
* @param rpcTimeout timeout for this call in ms
* @param priority for this request
* @param metrics MetricsConnection to which the metrics are stored for this request
*/
public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodDescriptor
md, Message param, CellScanner cellScanner, M responseDefaultType, MessageConverter<M, T>
messageConverter, IOExceptionConverter exceptionConverter, long rpcTimeout, int priority,
MetricsConnection.CallStats callStats) {
MetricsConnection metrics) {
super(channel.getEventExecutor());
this.channel = channel;

Expand All @@ -90,13 +93,15 @@ public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodD
this.messageConverter = messageConverter;
this.exceptionConverter = exceptionConverter;

this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = rpcTimeout;

this.priority = priority;
this.cellScanner = cellScanner;

this.callStats = callStats;
this.callStats = MetricsConnection.newCallStats();
callStats.setStartTime(EnvironmentEdgeManager.currentTime());

this.clientMetrics = metrics;
}

/**
Expand All @@ -105,7 +110,7 @@ public AsyncCall(AsyncRpcChannelImpl channel, int connectId, Descriptors.MethodD
* @return start time for the call
*/
public long getStartTime() {
return this.startTime;
return this.callStats.getStartTime();
}

@Override
Expand All @@ -122,9 +127,14 @@ public String toString() {
* @param cellBlockScanner to set
*/
public void setSuccess(M value, CellScanner cellBlockScanner) {
callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - callStats.getStartTime());

if (LOG.isTraceEnabled()) {
long callTime = EnvironmentEdgeManager.currentTime() - startTime;
LOG.trace("Call: " + method.getName() + ", callTime: " + callTime + "ms");
LOG.trace("Call: " + method.getName() + ", callTime: " + callStats.getCallTimeMs() + "ms");
}

if (clientMetrics != null) {
clientMetrics.updateRpc(method, param, callStats);
}

try {
Expand Down
Expand Up @@ -27,7 +27,6 @@
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;

/**
* Interface for Async Rpc Channels
Expand All @@ -45,15 +44,13 @@ public interface AsyncRpcChannel {
* @param exceptionConverter for converting exceptions
* @param rpcTimeout timeout for request
* @param priority for request
* @param callStats collects stats of the call
* @return Promise for the response Message
*/

<R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats);
final Message request, final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter,
IOExceptionConverter exceptionConverter, long rpcTimeout, int priority);


/**
Expand Down
Expand Up @@ -30,7 +30,6 @@
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.GenericFutureListener;

import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
Expand All @@ -52,7 +51,6 @@
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Future;
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
Expand Down Expand Up @@ -297,14 +295,16 @@ public void run(Timeout timeout) throws Exception {
* @param priority for request
* @return Promise for the response Message
*/
@Override
public <R extends Message, O> Future<O> callMethod(
final Descriptors.MethodDescriptor method,
final Message request,final CellScanner cellScanner,
R responsePrototype, MessageConverter<R, O> messageConverter, IOExceptionConverter
exceptionConverter, long rpcTimeout, int priority, MetricsConnection.CallStats callStats) {
exceptionConverter, long rpcTimeout, int priority) {
final AsyncCall<R, O> call = new AsyncCall<>(this, client.callIdCnt.getAndIncrement(),
method, request, cellScanner, responsePrototype, messageConverter, exceptionConverter,
rpcTimeout, priority, callStats);
rpcTimeout, priority, client.metrics);

synchronized (pendingCalls) {
if (closed) {
call.setFailure(new ConnectException());
Expand Down
Expand Up @@ -27,6 +27,7 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.EpollEventLoopGroup;
import io.netty.channel.epoll.EpollSocketChannel;
Expand Down Expand Up @@ -59,7 +60,6 @@
import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.ResponseFutureListener;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
Expand Down Expand Up @@ -103,7 +103,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
@VisibleForTesting
static Pair<EventLoopGroup, Class<? extends Channel>> GLOBAL_EVENT_LOOP_GROUP;

private synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
synchronized static Pair<EventLoopGroup, Class<? extends Channel>>
getGlobalEventLoopGroup(Configuration conf) {
if (GLOBAL_EVENT_LOOP_GROUP == null) {
GLOBAL_EVENT_LOOP_GROUP = createEventLoopGroup(conf);
Expand Down Expand Up @@ -241,8 +241,8 @@ protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);

final Future<Message> promise = connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(), pcrc.getPriority(),
callStats);
getMessageConverterWithRpcController(pcrc), null, pcrc.getCallTimeout(),
pcrc.getPriority());

pcrc.notifyOnCancel(new RpcCallback<Object>() {
@Override
Expand Down Expand Up @@ -289,19 +289,11 @@ private void callMethod(final Descriptors.MethodDescriptor md,
final AsyncRpcChannel connection;
try {
connection = createRpcChannel(md.getService().getName(), addr, ticket);
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();

ResponseFutureListener<Message> listener =
new ResponseFutureListener<Message>() {
@Override
public void operationComplete(Future<Message> future) throws Exception {
cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
if (metrics != null) {
metrics.updateRpc(md, param, cs);
}
if (LOG.isTraceEnabled()) {
LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
if (!future.isSuccess()) {
Throwable cause = future.cause();
if (cause instanceof IOException) {
Expand All @@ -325,10 +317,9 @@ public void operationComplete(Future<Message> future) throws Exception {
}
}
};
cs.setStartTime(EnvironmentEdgeManager.currentTime());
connection.callMethod(md, param, pcrc.cellScanner(), returnType,
getMessageConverterWithRpcController(pcrc), null,
pcrc.getCallTimeout(), pcrc.getPriority(), cs)
pcrc.getCallTimeout(), pcrc.getPriority())
.addListener(listener);
} catch (StoppedRpcClientException|FailedServerException e) {
pcrc.setFailed(e);
Expand Down Expand Up @@ -360,6 +351,11 @@ public void close() {
}
}

@Override
public EventLoop getEventExecutor() {
return this.bootstrap.group().next();
}

/**
* Create a cell scanner
*
Expand All @@ -382,10 +378,17 @@ public ByteBuffer buildCellBlock(CellScanner cells) throws IOException {
return ipcUtil.buildCellBlock(this.codec, this.compressor, cells);
}

@Override
public AsyncRpcChannel createRpcChannel(String serviceName, ServerName sn, User user)
throws StoppedRpcClientException, FailedServerException {
return this.createRpcChannel(serviceName,
new InetSocketAddress(sn.getHostname(), sn.getPort()), user);
}

/**
* Creates an RPC client
*
* @param serviceName name of servicce
* @param serviceName name of service
* @param location to connect to
* @param ticket for current user
* @return new RpcChannel
Expand Down Expand Up @@ -452,6 +455,7 @@ public void cancelConnections(ServerName sn) {

/**
* Remove connection from pool
* @param connection to remove
*/
public void removeConnection(AsyncRpcChannel connection) {
int connectionHashCode = connection.hashCode();
Expand All @@ -469,17 +473,8 @@ public void removeConnection(AsyncRpcChannel connection) {
}
}

/**
* Creates a "channel" that can be used by a protobuf service. Useful setting up
* protobuf stubs.
*
* @param sn server name describing location of server
* @param user which is to use the connection
* @param rpcTimeout default rpc operation timeout
*
* @return A rpc channel that goes via this rpc client instance.
*/
public RpcChannel createRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
@Override
public RpcChannel createProtobufRpcChannel(final ServerName sn, final User user, int rpcTimeout) {
return new RpcChannelImplementation(this, sn, user, rpcTimeout);
}

Expand Down Expand Up @@ -507,21 +502,20 @@ protected RpcChannelImplementation(final AsyncRpcClient rpcClient,
@Override
public void callMethod(Descriptors.MethodDescriptor md, RpcController controller,
Message param, Message returnType, RpcCallback<Message> done) {
PayloadCarryingRpcController pcrc;
if (controller != null) {
pcrc = (PayloadCarryingRpcController) controller;
if (!pcrc.hasCallTimeout()) {
pcrc.setCallTimeout(channelOperationTimeout);
}
} else {
pcrc = new PayloadCarryingRpcController();
pcrc.setCallTimeout(channelOperationTimeout);
}
PayloadCarryingRpcController pcrc =
configurePayloadCarryingRpcController(controller, channelOperationTimeout);

this.rpcClient.callMethod(md, pcrc, param, returnType, this.ticket, this.isa, done);
}
}

/**
* Get a new timeout on this RPC client
* @param task to run at timeout
* @param delay for the timeout
* @param unit time unit for the timeout
* @return Timeout
*/
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
return WHEEL_TIMER.newTimeout(task, delay, unit);
}
Expand Down
Expand Up @@ -41,7 +41,7 @@ public interface MessageConverter<M,O> {
MessageConverter<Message,Message> NO_CONVERTER = new MessageConverter<Message, Message>() {
@Override
public Message convert(Message msg, CellScanner cellScanner) throws IOException {
return null;
return msg;
}
};
}

0 comments on commit a11091c

Please sign in to comment.