Skip to content
Permalink
Browse files
GIRAPH-1213
closes #96
  • Loading branch information
Maja Kabiljo committed Dec 11, 2018
1 parent bb8dab5 commit aa740b54bd914a0246e2a9280459e33c233de9b3
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 7 deletions.
@@ -82,6 +82,7 @@
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.util.AttributeKey;
/*end[HADOOP_NON_SECURE]*/
import io.netty.util.concurrent.BlockingOperationException;
import io.netty.util.concurrent.DefaultEventExecutorGroup;
import io.netty.util.concurrent.EventExecutorGroup;

@@ -755,7 +756,11 @@ private Channel getNextChannel(InetSocketAddress remoteServer) {
int reconnectFailures = 0;
while (reconnectFailures < maxConnectionFailures) {
ChannelFuture connectionFuture = bootstrap.connect(remoteServer);
ProgressableUtils.awaitChannelFuture(connectionFuture, context);
try {
ProgressableUtils.awaitChannelFuture(connectionFuture, context);
} catch (BlockingOperationException e) {
LOG.warn("getNextChannel: Failed connecting to " + remoteServer, e);
}
if (connectionFuture.isSuccess()) {
if (LOG.isInfoEnabled()) {
LOG.info("getNextChannel: Connected to " + remoteServer + "!");
@@ -1052,7 +1057,8 @@ private void resendRequestsWhenNeeded(
writeFuture.channel().isActive() +
", future done = " + writeFuture.isDone() + ", " +
"success = " + writeFuture.isSuccess() + ", " +
"cause = " + writeFuture.cause();
"cause = " + writeFuture.cause() + ", " +
"channelId = " + writeFuture.channel().hashCode();
}
LOG.warn("checkRequestsForProblems: Problem with request id " +
entry.getKey() + ", " + logMessage + ", " +
@@ -1080,6 +1086,11 @@ private void resendRequestsWhenNeeded(
LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
}
writeRequestToChannel(requestInfo);
if (LOG.isInfoEnabled()) {
LOG.info("checkRequestsForProblems: Request " + requestId +
" was resent through channelId=" +
requestInfo.getWriteFuture().channel().hashCode());
}
}
addedRequestIds.clear();
addedRequestInfos.clear();
@@ -1147,8 +1158,11 @@ private void checkRequestsAfterChannelFailure(final Channel channel) {
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
return requestInfo.getDestinationAddress().equals(
channel.remoteAddress());
if (requestInfo.getWriteFuture() == null ||
requestInfo.getWriteFuture().channel() == null) {
return false;
}
return requestInfo.getWriteFuture().channel().equals(channel);
}
}, networkRequestsResentForChannelFailure, true);
}
@@ -1163,7 +1177,8 @@ public boolean apply(RequestInfo requestInfo) {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isDone() && !future.isSuccess()) {
LOG.error("Request failed", future.cause());
LOG.error("Channel failed channelId=" + future.channel().hashCode(),
future.cause());
checkRequestsAfterChannelFailure(future.channel());
}
}
@@ -106,8 +106,9 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
LOG.warn("exceptionCaught: Channel failed with " +
"remote address " + ctx.channel().remoteAddress(), cause);
LOG.warn("exceptionCaught: Channel channelId=" +
ctx.channel().hashCode() + " failed with remote address " +
ctx.channel().remoteAddress(), cause);
}
}

0 comments on commit aa740b5

Please sign in to comment.