Skip to content

Commit

Permalink
DRILL-3081: Populate connection name as late as possible so RPC error…
Browse files Browse the repository at this point in the history
… messages are reported correctly.
  • Loading branch information
parthchandra authored and jacques-n committed May 15, 2015
1 parent f0b3671 commit 4b0b3a6
Show file tree
Hide file tree
Showing 11 changed files with 65 additions and 27 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener; import io.netty.util.concurrent.GenericFutureListener;


import java.net.SocketAddress;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;


Expand All @@ -46,7 +47,7 @@


public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite> public abstract class BasicClient<T extends EnumLite, R extends RemoteConnection, HANDSHAKE_SEND extends MessageLite, HANDSHAKE_RESPONSE extends MessageLite>
extends RpcBus<T, R> { extends RpcBus<T, R> {
final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(getClass()); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BasicClient.class);


// The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For // The percentage of time that should pass before sending a ping message to ensure server doesn't time us out. For
// example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15 // example, if timeout is set to 30 seconds and we set percentage to 0.5, then if no write has happened within 15
Expand Down Expand Up @@ -101,7 +102,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
} }


pipe.addLast("message-handler", new InboundHandler(connection)); pipe.addLast("message-handler", new InboundHandler(connection));
pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName())); pipe.addLast("exception-handler", new RpcExceptionHandler(connection));
} }
}); // }); //


Expand All @@ -110,6 +111,12 @@ protected void initChannel(SocketChannel ch) throws Exception {
// } // }
} }


public R initRemoteConnection(SocketChannel channel){
local=channel.localAddress();
remote=channel.remoteAddress();
return null;
};

private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK); private static final OutboundRpcMessage PING_MESSAGE = new OutboundRpcMessage(RpcMode.PING, 0, 0, Acks.OK);


/** /**
Expand Down Expand Up @@ -200,12 +207,14 @@ public void operationComplete(ChannelFuture future) throws Exception {
// So there is no point propagating the interruption as failure immediately. // So there is no point propagating the interruption as failure immediately.
long remainingWaitTimeMills = 120000; long remainingWaitTimeMills = 120000;
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();

// logger.debug("Connection operation finished. Success: {}", future.isSuccess()); // logger.debug("Connection operation finished. Success: {}", future.isSuccess());
while(true) { while(true) {
try { try {
future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS); future.get(remainingWaitTimeMills, TimeUnit.MILLISECONDS);
if (future.isSuccess()) { if (future.isSuccess()) {
SocketAddress remote = future.channel().remoteAddress();
SocketAddress local = future.channel().localAddress();
setAddresses(remote, local);
// send a handshake on the current thread. This is the only time we will send from within the event thread. // send a handshake on the current thread. This is the only time we will send from within the event thread.
// We can do this because the connection will not be backed up. // We can do this because the connection will not be backed up.
send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true); send(handshakeSendHandler, connection, handshakeType, handshakeValue, responseClass, true);
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ protected Response handle(ServerConnection connection, int rpcType, ByteBuf pBod


@Override @Override
public ServerConnection initRemoteConnection(SocketChannel channel) { public ServerConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return new ServerConnection(connectionName, channel, alloc); return new ServerConnection(connectionName, channel, alloc);
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -85,11 +85,11 @@ protected void initChannel(SocketChannel ch) throws Exception {


if (rpcMapping.hasTimeout()) { if (rpcMapping.hasTimeout()) {
pipe.addLast(TIMEOUT_HANDLER, pipe.addLast(TIMEOUT_HANDLER,
new LogggingReadTimeoutHandler(connection.getName(), rpcMapping.getTimeout())); new LogggingReadTimeoutHandler(connection, rpcMapping.getTimeout()));
} }


pipe.addLast("message-handler", new InboundHandler(connection)); pipe.addLast("message-handler", new InboundHandler(connection));
pipe.addLast("exception-handler", new RpcExceptionHandler(connection.getName())); pipe.addLast("exception-handler", new RpcExceptionHandler(connection));


connect = true; connect = true;
// logger.debug("Server connection initialization completed."); // logger.debug("Server connection initialization completed.");
Expand All @@ -101,19 +101,19 @@ protected void initChannel(SocketChannel ch) throws Exception {
// } // }
} }


private class LogggingReadTimeoutHandler extends ReadTimeoutHandler { private class LogggingReadTimeoutHandler<C extends RemoteConnection> extends ReadTimeoutHandler {


private final String name; private final C connection;
private final int timeoutSeconds; private final int timeoutSeconds;
public LogggingReadTimeoutHandler(String name, int timeoutSeconds) { public LogggingReadTimeoutHandler(C connection, int timeoutSeconds) {
super(timeoutSeconds); super(timeoutSeconds);
this.name = name; this.connection = connection;
this.timeoutSeconds = timeoutSeconds; this.timeoutSeconds = timeoutSeconds;
} }


@Override @Override
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception { protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", name, logger.info("RPC connection {} timed out. Timeout was set to {} seconds. Closing connection.", connection.getName(),
timeoutSeconds); timeoutSeconds);
super.readTimedOut(ctx); super.readTimedOut(ctx);
} }
Expand Down Expand Up @@ -178,6 +178,8 @@ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutc


@Override @Override
public C initRemoteConnection(SocketChannel channel) { public C initRemoteConnection(SocketChannel channel) {
local = channel.localAddress();
remote = channel.remoteAddress();
return null; return null;
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public abstract class RemoteConnection implements ConnectionThrottle, AutoClosea
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RemoteConnection.class);
private final Channel channel; private final Channel channel;
private final WriteManager writeManager; private final WriteManager writeManager;
private final String name; private String name;
private final String clientName;


public boolean inEventLoop(){ public boolean inEventLoop(){
return channel.eventLoop().inEventLoop(); return channel.eventLoop().inEventLoop();
Expand All @@ -42,7 +43,7 @@ public boolean inEventLoop(){
public RemoteConnection(SocketChannel channel, String name) { public RemoteConnection(SocketChannel channel, String name) {
super(); super();
this.channel = channel; this.channel = channel;
this.name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), name); this.clientName = name;
this.writeManager = new WriteManager(); this.writeManager = new WriteManager();
channel.pipeline().addLast(new BackPressureHandler()); channel.pipeline().addLast(new BackPressureHandler());
channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() { channel.closeFuture().addListener(new GenericFutureListener<Future<? super Void>>() {
Expand All @@ -57,6 +58,9 @@ public void operationComplete(Future<? super Void> future) throws Exception {
} }


public String getName() { public String getName() {
if(name == null){
name = String.format("%s <--> %s (%s)", channel.localAddress(), channel.remoteAddress(), clientName);
}
return name; return name;
} }


Expand Down
32 changes: 24 additions & 8 deletions exec/java-exec/src/main/java/org/apache/drill/exec/rpc/RpcBus.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@


import java.io.Closeable; import java.io.Closeable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,10 +68,19 @@ protected void handle(C connection, int rpcType, ByteBuf pBody, ByteBuf dBody, R


protected final RpcConfig rpcConfig; protected final RpcConfig rpcConfig;


protected volatile SocketAddress local;
protected volatile SocketAddress remote;


public RpcBus(RpcConfig rpcConfig) { public RpcBus(RpcConfig rpcConfig) {
this.rpcConfig = rpcConfig; this.rpcConfig = rpcConfig;
} }


protected void setAddresses(SocketAddress remote, SocketAddress local){
this.remote = remote;
this.local = local;
}

<SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType, <SEND extends MessageLite, RECEIVE extends MessageLite> DrillRpcFuture<RECEIVE> send(C connection, T rpcType,
SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) { SEND protobufBody, Class<RECEIVE> clazz, ByteBuf... dataBodies) {
DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>(); DrillRpcFutureImpl<RECEIVE> rpcFuture = new DrillRpcFutureImpl<RECEIVE>();
Expand Down Expand Up @@ -133,21 +143,27 @@ public <SEND extends MessageLite, RECEIVE extends MessageLite> void send(RpcOutc


public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> { public class ChannelClosedHandler implements GenericFutureListener<ChannelFuture> {


final InetSocketAddress local;
final InetSocketAddress remote;
final C clientConnection; final C clientConnection;
private final Channel channel;


public ChannelClosedHandler(C clientConnection, InetSocketAddress local, InetSocketAddress remote) { public ChannelClosedHandler(C clientConnection, Channel channel) {
this.local = local; this.channel = channel;
this.remote = remote;
this.clientConnection = clientConnection; this.clientConnection = clientConnection;
} }


@Override @Override
public void operationComplete(ChannelFuture future) throws Exception { public void operationComplete(ChannelFuture future) throws Exception {
String msg = String.format("Channel closed %s <--> %s.", local, remote); String msg;
if(local!=null) {
msg = String.format("Channel closed %s <--> %s.", local, remote);
}else{
msg = String.format("Channel closed %s <--> %s.", future.channel().localAddress(), future.channel().remoteAddress());
}

if (RpcBus.this.isClient()) { if (RpcBus.this.isClient()) {
logger.info(String.format(msg)); if(local != null) {
logger.info(String.format(msg));
}
} else { } else {
queue.channelClosed(new ChannelClosedException(msg)); queue.channelClosed(new ChannelClosedException(msg));
} }
Expand All @@ -158,7 +174,7 @@ public void operationComplete(ChannelFuture future) throws Exception {
} }


protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) { protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel channel, C clientConnection) {
return new ChannelClosedHandler(clientConnection, channel.localAddress(), channel.remoteAddress()); return new ChannelClosedHandler(clientConnection, channel);
} }


private class ResponseSenderImpl implements ResponseSender { private class ResponseSenderImpl implements ResponseSender {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,23 +19,24 @@


import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import org.eclipse.jetty.io.Connection;


public class RpcExceptionHandler implements ChannelHandler{ public class RpcExceptionHandler<C extends RemoteConnection> implements ChannelHandler{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RpcExceptionHandler.class);


private final String name; private final C connection;


public RpcExceptionHandler(String name) { public RpcExceptionHandler(C connection){
this.name = name; this.connection = connection;
} }


@Override @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){ if(!ctx.channel().isOpen() || cause.getMessage().equals("Connection reset by peer")){
logger.warn("Exception occurred with closed channel. Connection: {}", name, cause); logger.warn("Exception occurred with closed channel. Connection: {}", connection.getName(), cause);
return; return;
}else{ }else{
logger.error("Exception in RPC communication. Connection: {}. Closing connection.", name, cause); logger.error("Exception in RPC communication. Connection: {}. Closing connection.", connection.getName(), cause);
ctx.close(); ctx.close();
} }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@


public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{ public class ControlClient extends BasicClient<RpcType, ControlConnection, BitControlHandshake, BitControlHandshake>{


static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class); private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ControlClient.class);


private final ControlMessageHandler handler; private final ControlMessageHandler handler;
private final DrillbitEndpoint remoteEndpoint; private final DrillbitEndpoint remoteEndpoint;
Expand All @@ -66,6 +66,7 @@ public void connect(RpcConnectionHandler<ControlConnection> connectionHandler) {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
@Override @Override
public ControlConnection initRemoteConnection(SocketChannel channel) { public ControlConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
this.connection = new ControlConnection("control client", channel, this.connection = new ControlConnection("control client", channel,
(RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator); (RpcBus<RpcType, ControlConnection>) (RpcBus<?, ?>) this, allocator);
return connection; return connection;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch,


@Override @Override
public ControlConnection initRemoteConnection(SocketChannel channel) { public ControlConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return new ControlConnection("control server", channel, this, allocator); return new ControlConnection("control server", channel, this, allocator);
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public DataClient(DrillbitEndpoint remoteEndpoint, BootStrapContext context, Dat


@Override @Override
public DataClientConnection initRemoteConnection(SocketChannel channel) { public DataClientConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
this.connection = new DataClientConnection(channel, this); this.connection = new DataClientConnection(channel, this);
return connection; return connection;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch,


@Override @Override
public BitServerConnection initRemoteConnection(SocketChannel channel) { public BitServerConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return new BitServerConnection(channel, context.getAllocator()); return new BitServerConnection(channel, context.getAllocator());
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public BufferAllocator getAllocator() {


@Override @Override
public UserClientConnection initRemoteConnection(SocketChannel channel) { public UserClientConnection initRemoteConnection(SocketChannel channel) {
super.initRemoteConnection(channel);
return new UserClientConnection(channel); return new UserClientConnection(channel);
} }


Expand Down

0 comments on commit 4b0b3a6

Please sign in to comment.