Skip to content
Permalink
Browse files
GIRAPH-1205
closes #88
  • Loading branch information
Maja Kabiljo committed Oct 18, 2018
1 parent 843cd6e commit 5e44c4e4be97e93f5d13cdc7bd52b7374635398c
Showing 1 changed file with 20 additions and 6 deletions.
@@ -141,6 +141,9 @@ public class NettyClient {
/** How many network requests were resent because channel failed */
public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
"Network requests resent for channel failure";
/** How many network requests were resent because connection failed */
public static final String NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME =
"Network requests resent for connection or request failure";

/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
@@ -221,6 +224,8 @@ public class NettyClient {
private final GiraphHadoopCounter networkRequestsResentForTimeout;
/** How many network requests were resent because channel failed */
private final GiraphHadoopCounter networkRequestsResentForChannelFailure;
/** How many network requests were resent because connection failed */
private final GiraphHadoopCounter networkRequestsResentForConnectionFailure;

/**
* Only constructor
@@ -266,6 +271,10 @@ public NettyClient(Mapper<?, ?, ?, ?>.Context context,
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));
networkRequestsResentForConnectionFailure =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));

maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
@@ -984,14 +993,19 @@ private void checkRequestsForProblems() {
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
ChannelFuture writeFuture = requestInfo.getWriteFuture();
// If not connected anymore, request failed, or the request is taking
// too long, re-establish and resend
return (writeFuture != null && (!writeFuture.channel().isActive() ||
(writeFuture.isDone() && !writeFuture.isSuccess()))) ||
(requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
// If the request is taking too long, re-establish and resend
return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
}
}, networkRequestsResentForTimeout);
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
ChannelFuture writeFuture = requestInfo.getWriteFuture();
// If not connected anymore or request failed re-establish and resend
return writeFuture != null && (!writeFuture.channel().isActive() ||
(writeFuture.isDone() && !writeFuture.isSuccess()));
}
}, networkRequestsResentForConnectionFailure);
}

/**

0 comments on commit 5e44c4e

Please sign in to comment.