Skip to content
Permalink
Browse files
JIRA-1200
closes #83
  • Loading branch information
Maja Kabiljo committed Sep 18, 2018
1 parent 239ea8f commit 6128d66eba5b1dfc0a5e047a057c33d00abac6e7
Showing 1 changed file with 32 additions and 5 deletions.
@@ -38,6 +38,7 @@
import org.apache.giraph.conf.BooleanConfOption;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.counters.GiraphHadoopCounter;
import org.apache.giraph.function.Predicate;
import org.apache.giraph.graph.TaskInfo;
import org.apache.giraph.master.MasterInfo;
@@ -131,6 +132,16 @@ public class NettyClient {
public static final AttributeKey<SaslNettyClient> SASL =
AttributeKey.valueOf("saslNettyClient");
/*end[HADOOP_NON_SECURE]*/

/** Group name for netty counters */
public static final String NETTY_COUNTERS_GROUP = "Netty counters";
/** How many network requests were resent because they took too long */
public static final String NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME =
"Network requests resent for timeout";
/** How many network requests were resent because channel failed */
public static final String NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME =
"Network requests resent for channel failure";

/** Class logger */
private static final Logger LOG = Logger.getLogger(NettyClient.class);
/** Context used to report progress */
@@ -206,6 +217,11 @@ public class NettyClient {
/** Flow control policy used */
private final FlowControl flowControl;

/** How many network requests were resent because they took too long */
private final GiraphHadoopCounter networkRequestsResentForTimeout;
/** How many network requests were resent because channel failed */
private final GiraphHadoopCounter networkRequestsResentForChannelFailure;

/**
* Only constructor
*
@@ -242,6 +258,15 @@ public NettyClient(Mapper<?, ?, ?, ?>.Context context,
flowControl = new NoOpFlowControl(this);
}

networkRequestsResentForTimeout =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_TIMEOUT_NAME));
networkRequestsResentForChannelFailure =
new GiraphHadoopCounter(context.getCounter(
NETTY_COUNTERS_GROUP,
NETWORK_REQUESTS_RESENT_FOR_CHANNEL_FAILURE_NAME));

maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
waitTimeBetweenConnectionRetriesMs =
@@ -966,17 +991,18 @@ public boolean apply(RequestInfo requestInfo) {
(writeFuture.isDone() && !writeFuture.isSuccess()))) ||
(requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
}
});
}, networkRequestsResentForTimeout);
}

/**
* Resend requests which satisfy predicate
*
* @param shouldResendRequestPredicate Predicate to use to check whether
* @param shouldResendRequestPredicate Predicate to use to check whether
* request should be resent
* @param counter Counter to increment for every resent network request
*/
private void resendRequestsWhenNeeded(
Predicate<RequestInfo> shouldResendRequestPredicate) {
Predicate<RequestInfo> shouldResendRequestPredicate,
GiraphHadoopCounter counter) {
// Check if there are open requests which have been sent a long time ago,
// and if so, resend them.
List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -1006,6 +1032,7 @@ private void resendRequestsWhenNeeded(
addedRequestIds.add(entry.getKey());
addedRequestInfos.add(new RequestInfo(
requestInfo.getDestinationAddress(), requestInfo.getRequest()));
counter.increment();
}
}

@@ -1093,7 +1120,7 @@ public boolean apply(RequestInfo requestInfo) {
return requestInfo.getDestinationAddress().equals(
channel.remoteAddress());
}
});
}, networkRequestsResentForChannelFailure);
}

/**

0 comments on commit 6128d66

Please sign in to comment.