Skip to content
Permalink
Browse files
JIRA-1211
closes #93
  • Loading branch information
Maja Kabiljo committed Nov 9, 2018
1 parent f69f77d commit d41221966cece5ed8c029a7e941b5d621486ddb1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 4 deletions.
@@ -94,6 +94,7 @@
import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_EXECUTION_THREADS;
import static org.apache.giraph.conf.GiraphConstants.NETTY_CLIENT_USE_EXECUTION_HANDLER;
import static org.apache.giraph.conf.GiraphConstants.NETTY_MAX_CONNECTION_FAILURES;
import static org.apache.giraph.conf.GiraphConstants.RESEND_TIMED_OUT_REQUESTS;
import static org.apache.giraph.conf.GiraphConstants.WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS;
import static org.apache.giraph.conf.GiraphConstants.WAITING_REQUEST_MSECS;

@@ -188,6 +189,11 @@ public class NettyClient {
private final long waitTimeBetweenConnectionRetriesMs;
/** Maximum number of milliseconds for a request */
private final int maxRequestMilliseconds;
/**
* Whether to resend request which timed out or fail the job if timeout
* happens
*/
private final boolean resendTimedOutRequests;
/** Waiting interval for checking outstanding requests msecs */
private final int waitingRequestMsecs;
/** Timed logger for printing request debugging */
@@ -278,6 +284,7 @@ public NettyClient(Mapper<?, ?, ?, ?>.Context context,
NETWORK_REQUESTS_RESENT_FOR_CONNECTION_FAILURE_NAME));

maxRequestMilliseconds = MAX_REQUEST_MILLISECONDS.get(conf);
resendTimedOutRequests = RESEND_TIMED_OUT_REQUESTS.get(conf);
maxConnectionFailures = NETTY_MAX_CONNECTION_FAILURES.get(conf);
waitTimeBetweenConnectionRetriesMs =
WAIT_TIME_BETWEEN_CONNECTION_RETRIES_MS.get(conf);
@@ -997,7 +1004,7 @@ public boolean apply(RequestInfo requestInfo) {
// If the request is taking too long, re-establish and resend
return requestInfo.getElapsedMsecs() > maxRequestMilliseconds;
}
}, networkRequestsResentForTimeout);
}, networkRequestsResentForTimeout, resendTimedOutRequests);
resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override
public boolean apply(RequestInfo requestInfo) {
@@ -1006,18 +1013,21 @@ public boolean apply(RequestInfo requestInfo) {
return writeFuture != null && (!writeFuture.channel().isActive() ||
(writeFuture.isDone() && !writeFuture.isSuccess()));
}
}, networkRequestsResentForConnectionFailure);
}, networkRequestsResentForConnectionFailure, true);
}

/**
* Resend requests which satisfy predicate
* @param shouldResendRequestPredicate Predicate to use to check whether
* request should be resent
* @param counter Counter to increment for every resent network request
* @param resendProblematicRequest Whether to resend problematic request or
* fail the job if such request is found
*/
private void resendRequestsWhenNeeded(
Predicate<RequestInfo> shouldResendRequestPredicate,
GiraphHadoopCounter counter) {
GiraphHadoopCounter counter,
boolean resendProblematicRequest) {
// Check if there are open requests which have been sent a long time ago,
// and if so, resend them.
List<ClientRequestId> addedRequestIds = Lists.newArrayList();
@@ -1028,6 +1038,11 @@ private void resendRequestsWhenNeeded(
RequestInfo requestInfo = entry.getValue();
// If request should be resent
if (shouldResendRequestPredicate.apply(requestInfo)) {
if (!resendProblematicRequest) {
throw new IllegalStateException("Problem with request id " +
entry.getKey() + " for " + requestInfo.getDestinationAddress() +
", failing the job");
}
ChannelFuture writeFuture = requestInfo.getWriteFuture();
String logMessage;
if (writeFuture == null) {
@@ -1135,7 +1150,7 @@ public boolean apply(RequestInfo requestInfo) {
return requestInfo.getDestinationAddress().equals(
channel.remoteAddress());
}
}, networkRequestsResentForChannelFailure);
}, networkRequestsResentForChannelFailure, true);
}

/**
@@ -690,6 +690,15 @@ public interface GiraphConstants {
new IntConfOption("giraph.maxRequestMilliseconds", MINUTES.toMillis(10),
"Milliseconds for a request to complete (or else resend)");

/**
* Whether to resend request which timed out or fail the job if timeout
* happens
*/
BooleanConfOption RESEND_TIMED_OUT_REQUESTS =
new BooleanConfOption("giraph.resendTimedOutRequests", true,
"Whether to resend request which timed out or fail the job if " +
"timeout happens");

/** Netty max connection failures */
IntConfOption NETTY_MAX_CONNECTION_FAILURES =
new IntConfOption("giraph.nettyMaxConnectionFailures", 1000,

0 comments on commit d412219

Please sign in to comment.