Skip to content
This repository has been archived by the owner on Oct 30, 2023. It is now read-only.

Commit

Permalink
GIRAPH-1103: Another try to fix jobs getting stuck after channel failure
Browse files Browse the repository at this point in the history
Summary:
With GIRAPH-1087 we see jobs stuck after channel failure less often, but it still happens. There are several additional issues I found: requests failing to send at the first place so they never get retried, callbacks for channel failures not being triggered always.
Added a thread which will periodically check on open requests even when we are not waiting on all open requests (since in many places we don't), remove the check that request wass ent when retrying it, added some thread utils while at it.

Test Plan: Before the change, failure rate of a particular job was about 1 in 50. Had over 200 successful runs with this change.

Differential Revision: https://reviews.facebook.net/D61719
  • Loading branch information
Maja Kabiljo committed Aug 25, 2016
1 parent 693a71c commit a15675c
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 106 deletions.
Expand Up @@ -385,6 +385,18 @@ public void channelUnregistered(ChannelHandlerContext ctx) throws
checkRequestsAfterChannelFailure(ctx.channel()); checkRequestsAfterChannelFailure(ctx.channel());
} }
}); });

// Start a thread which will observe if there are any problems with open
// requests
ThreadUtils.startThread(new Runnable() {
@Override
public void run() {
while (true) {
ThreadUtils.trySleep(waitingRequestMsecs);
checkRequestsForProblems();
}
}
}, "open-requests-observer");
} }


/** /**
Expand Down Expand Up @@ -715,11 +727,7 @@ private Channel getNextChannel(InetSocketAddress remoteServer) {
" on attempt " + reconnectFailures + " out of " + " on attempt " + reconnectFailures + " out of " +
maxConnectionFailures + " max attempts, sleeping for 5 secs", maxConnectionFailures + " max attempts, sleeping for 5 secs",
connectionFuture.cause()); connectionFuture.cause());
try { ThreadUtils.trySleep(5000);
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.warn("getNextChannel: Unexpected interrupted exception", e);
}
} }
throw new IllegalStateException("getNextChannel: Failed to connect " + throw new IllegalStateException("getNextChannel: Failed to connect " +
"to " + remoteServer + " in " + reconnectFailures + "to " + remoteServer + " in " + reconnectFailures +
Expand Down Expand Up @@ -758,7 +766,6 @@ public Long doSend(int destTaskId, WritableRequest request) {
} }
/*end[HADOOP_NON_SECURE]*/ /*end[HADOOP_NON_SECURE]*/


Channel channel = getNextChannel(remoteServer);
RequestInfo newRequestInfo = new RequestInfo(remoteServer, request); RequestInfo newRequestInfo = new RequestInfo(remoteServer, request);
if (registerRequest) { if (registerRequest) {
request.setClientId(myTaskInfo.getTaskId()); request.setClientId(myTaskInfo.getTaskId());
Expand All @@ -780,12 +787,22 @@ public Long doSend(int destTaskId, WritableRequest request) {
", size " + request.getSerializedSize() + ", size " + request.getSerializedSize() +
" bytes. Check netty buffer size."); " bytes. Check netty buffer size.");
} }
ChannelFuture writeFuture = channel.write(request); writeRequestToChannel(newRequestInfo);
newRequestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
return requestId; return requestId;
} }


/**
* Write request to a channel for its destination
*
* @param requestInfo Request info
*/
private void writeRequestToChannel(RequestInfo requestInfo) {
Channel channel = getNextChannel(requestInfo.getDestinationAddress());
ChannelFuture writeFuture = channel.write(requestInfo.getRequest());
requestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
}

/** /**
* Handle receipt of a message. Called by response handler. * Handle receipt of a message. Called by response handler.
* *
Expand Down Expand Up @@ -866,7 +883,6 @@ public void logAndSanityCheck() {
logInfoAboutOpenRequests(); logInfoAboutOpenRequests();
// Make sure that waiting doesn't kill the job // Make sure that waiting doesn't kill the job
context.progress(); context.progress();
checkRequestsForProblems();
} }


/** /**
Expand Down Expand Up @@ -946,9 +962,9 @@ public boolean apply(RequestInfo requestInfo) {
ChannelFuture writeFuture = requestInfo.getWriteFuture(); ChannelFuture writeFuture = requestInfo.getWriteFuture();
// If not connected anymore, request failed, or the request is taking // If not connected anymore, request failed, or the request is taking
// too long, re-establish and resend // too long, re-establish and resend
return !writeFuture.channel().isActive() || return (writeFuture != null && (!writeFuture.channel().isActive() ||
(writeFuture.isDone() && !writeFuture.isSuccess()) || (writeFuture.isDone() && !writeFuture.isSuccess()))) ||
(requestInfo.getElapsedMsecs() > maxRequestMilliseconds); (requestInfo.getElapsedMsecs() > maxRequestMilliseconds);
} }
}); });
} }
Expand All @@ -969,21 +985,23 @@ private void resendRequestsWhenNeeded(
for (Map.Entry<ClientRequestId, RequestInfo> entry : for (Map.Entry<ClientRequestId, RequestInfo> entry :
clientRequestIdRequestInfoMap.entrySet()) { clientRequestIdRequestInfoMap.entrySet()) {
RequestInfo requestInfo = entry.getValue(); RequestInfo requestInfo = entry.getValue();
ChannelFuture writeFuture = requestInfo.getWriteFuture();
// Request wasn't sent yet
if (writeFuture == null) {
continue;
}
// If request should be resent // If request should be resent
if (shouldResendRequestPredicate.apply(requestInfo)) { if (shouldResendRequestPredicate.apply(requestInfo)) {
ChannelFuture writeFuture = requestInfo.getWriteFuture();
String logMessage;
if (writeFuture == null) {
logMessage = "wasn't sent successfully";
} else {
logMessage = "connected = " +
writeFuture.channel().isActive() +
", future done = " + writeFuture.isDone() + ", " +
"success = " + writeFuture.isSuccess() + ", " +
"cause = " + writeFuture.cause();
}
LOG.warn("checkRequestsForProblems: Problem with request id " + LOG.warn("checkRequestsForProblems: Problem with request id " +
entry.getKey() + " connected = " + entry.getKey() + ", " + logMessage + ", " +
writeFuture.channel().isActive() +
", future done = " + writeFuture.isDone() + ", " +
"success = " + writeFuture.isSuccess() + ", " +
"cause = " + writeFuture.cause() + ", " +
"elapsed time = " + requestInfo.getElapsedMsecs() + ", " + "elapsed time = " + requestInfo.getElapsedMsecs() + ", " +
"destination = " + writeFuture.channel().remoteAddress() + "destination = " + requestInfo.getDestinationAddress() +
" " + requestInfo); " " + requestInfo);
addedRequestIds.add(entry.getKey()); addedRequestIds.add(entry.getKey());
addedRequestInfos.add(new RequestInfo( addedRequestInfos.add(new RequestInfo(
Expand All @@ -1001,14 +1019,10 @@ private void resendRequestsWhenNeeded(
" completed prior to sending the next request"); " completed prior to sending the next request");
clientRequestIdRequestInfoMap.remove(requestId); clientRequestIdRequestInfoMap.remove(requestId);
} }
InetSocketAddress remoteServer = requestInfo.getDestinationAddress();
Channel channel = getNextChannel(remoteServer);
if (LOG.isInfoEnabled()) { if (LOG.isInfoEnabled()) {
LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo); LOG.info("checkRequestsForProblems: Re-issuing request " + requestInfo);
} }
ChannelFuture writeFuture = channel.write(requestInfo.getRequest()); writeRequestToChannel(requestInfo);
requestInfo.setWriteFuture(writeFuture);
writeFuture.addListener(logErrorListener);
} }
addedRequestIds.clear(); addedRequestIds.clear();
addedRequestInfos.clear(); addedRequestInfos.clear();
Expand All @@ -1035,11 +1049,7 @@ private static InetSocketAddress resolveAddress(
LOG.warn("resolveAddress: Failed to resolve " + address + LOG.warn("resolveAddress: Failed to resolve " + address +
" on attempt " + resolveAttempts + " of " + " on attempt " + resolveAttempts + " of " +
maxResolveAddressAttempts + " attempts, sleeping for 5 seconds"); maxResolveAddressAttempts + " attempts, sleeping for 5 seconds");
try { ThreadUtils.trySleep(5000);
Thread.sleep(5000);
} catch (InterruptedException e) {
LOG.warn("resolveAddress: Interrupted.", e);
}
address = new InetSocketAddress(hostOrIp, address = new InetSocketAddress(hostOrIp,
address.getPort()); address.getPort());
} }
Expand Down Expand Up @@ -1080,7 +1090,7 @@ private void checkRequestsAfterChannelFailure(final Channel channel) {
resendRequestsWhenNeeded(new Predicate<RequestInfo>() { resendRequestsWhenNeeded(new Predicate<RequestInfo>() {
@Override @Override
public boolean apply(RequestInfo requestInfo) { public boolean apply(RequestInfo requestInfo) {
return requestInfo.getWriteFuture().channel().remoteAddress().equals( return requestInfo.getDestinationAddress().equals(
channel.remoteAddress()); channel.remoteAddress());
} }
}); });
Expand Down
Expand Up @@ -20,6 +20,7 @@


import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants; import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.worker.WorkerProgress; import org.apache.giraph.worker.WorkerProgress;
import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Job;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
Expand Down Expand Up @@ -72,7 +73,7 @@ public void init(GiraphConfiguration conf, GiraphJobObserver jobObserver) {
* Start the thread which writes progress periodically * Start the thread which writes progress periodically
*/ */
private void startWriterThread() { private void startWriterThread() {
writerThread = new Thread(new Runnable() { writerThread = ThreadUtils.startThread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (!finished) { while (!finished) {
Expand All @@ -89,19 +90,12 @@ public void run() {
break; break;
} }
} }
try { if (!ThreadUtils.trySleep(UPDATE_MILLISECONDS)) {
Thread.sleep(UPDATE_MILLISECONDS);
} catch (InterruptedException e) {
if (LOG.isInfoEnabled()) {
LOG.info("Progress thread interrupted");
}
break; break;
} }
} }
} }
}); }, "progress-writer");
writerThread.setDaemon(true);
writerThread.start();
} }


@Override @Override
Expand All @@ -119,28 +113,20 @@ private void jobGotAllMappers() {
GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf); GiraphConstants.MAX_ALLOWED_JOB_TIME_MS.get(conf);
if (maxAllowedJobTimeMs > 0) { if (maxAllowedJobTimeMs > 0) {
// Start a thread which will kill the job if running for too long // Start a thread which will kill the job if running for too long
Thread killThread = new Thread(new Runnable() { ThreadUtils.startThread(new Runnable() {
@Override @Override
public void run() { public void run() {
try { if (ThreadUtils.trySleep(maxAllowedJobTimeMs)) {
Thread.sleep(maxAllowedJobTimeMs);
try { try {
LOG.warn("Killing job because it took longer than " + LOG.warn("Killing job because it took longer than " +
maxAllowedJobTimeMs + " milliseconds"); maxAllowedJobTimeMs + " milliseconds");
job.killJob(); job.killJob();
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to kill job", e); LOG.warn("Failed to kill job", e);
} }
} catch (InterruptedException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Thread checking for jobs max allowed time " +
"interrupted");
}
} }
} }
}); }, "job-runtime-observer");
killThread.setDaemon(true);
killThread.start();
} }
} }


Expand Down
Expand Up @@ -82,21 +82,15 @@ private void joinJMapThread() {
*/ */
public void startJMapThread() { public void startJMapThread() {
stop = false; stop = false;
thread = new Thread(new Runnable() { thread = ThreadUtils.startThread(new Runnable() {
@Override @Override
public void run() { public void run() {
while (!stop) { while (!stop) {
JMap.heapHistogramDump(linesToPrint, liveObjectsOnly); JMap.heapHistogramDump(linesToPrint, liveObjectsOnly);
try { ThreadUtils.trySleep(sleepMillis);
Thread.sleep(sleepMillis);
} catch (InterruptedException e) {
LOG.info("JMap histogram sleep interrupted", e);
}
} }
} }
}); }, "jmap-dumper");
thread.setDaemon(true);
thread.start();
} }


@Override @Override
Expand Down
Expand Up @@ -89,26 +89,19 @@ private void joinSupervisorThread() {
public void startSupervisorThread() { public void startSupervisorThread() {
stop = false; stop = false;
final Runtime runtime = Runtime.getRuntime(); final Runtime runtime = Runtime.getRuntime();
thread = new Thread(new Runnable() { thread = ThreadUtils.startThread(new Runnable() {
@Override @Override
public void run() { public void run() {
try { while (!stop) {
while (!stop) { long potentialMemory = (runtime.maxMemory() -
long potentialMemory = (runtime.maxMemory() - runtime.totalMemory()) + runtime.freeMemory();
runtime.totalMemory()) + runtime.freeMemory(); if (potentialMemory / MB < minFreeMemory) {
if (potentialMemory / MB < minFreeMemory) { JMap.heapHistogramDump(linesToPrint);
JMap.heapHistogramDump(linesToPrint);
}
Thread.sleep(sleepMillis);
} }
} catch (InterruptedException e) { ThreadUtils.trySleep(sleepMillis);
LOG.warn("JMap histogram sleep interrupted", e);
} }
} }
}); }, "ReactiveJMapHistoDumperSupervisorThread");
thread.setName("ReactiveJMapHistoDumperSupervisorThread");
thread.setDaemon(true);
thread.start();
} }


@Override @Override
Expand Down
37 changes: 37 additions & 0 deletions giraph-core/src/main/java/org/apache/giraph/utils/ThreadUtils.java
Expand Up @@ -17,6 +17,8 @@
*/ */
package org.apache.giraph.utils; package org.apache.giraph.utils;


import org.apache.log4j.Logger;

import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;


import java.util.concurrent.Callable; import java.util.concurrent.Callable;
Expand All @@ -28,6 +30,8 @@
* Utility class for thread related functions. * Utility class for thread related functions.
*/ */
public class ThreadUtils { public class ThreadUtils {
/** Logger */
private static final Logger LOG = Logger.getLogger(ThreadUtils.class);


/** /**
* Utility class. Do not inherit or create objects. * Utility class. Do not inherit or create objects.
Expand Down Expand Up @@ -86,4 +90,37 @@ public static <T> Future<T> submitToExecutor(
return executorService.submit( return executorService.submit(
new LogStacktraceCallable<>(callable, uncaughtExceptionHandler)); new LogStacktraceCallable<>(callable, uncaughtExceptionHandler));
} }

/**
* Start thread with specified name and runnable, and make it daemon
*
* @param threadName Name of the thread
* @param runnable Runnable to execute
* @return Thread
*/
public static Thread startThread(Runnable runnable, String threadName) {
Thread thread = new Thread(runnable, threadName);
thread.setDaemon(true);
thread.start();
return thread;
}

/**
* Sleep for specified milliseconds, logging and ignoring interrupted
* exceptions
*
* @param millis How long to sleep for
* @return Whether the sleep was successful or the thread was interrupted
*/
public static boolean trySleep(long millis) {
try {
Thread.sleep(millis);
return true;
} catch (InterruptedException e) {
if (LOG.isInfoEnabled()) {
LOG.info("Thread interrupted");
}
return false;
}
}
} }
Expand Up @@ -23,6 +23,7 @@
import org.apache.giraph.conf.GiraphConfiguration; import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.IntConfOption; import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.utils.MemoryUtils; import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.zk.ZooKeeperExt; import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.CreateMode;
Expand Down Expand Up @@ -91,7 +92,7 @@ public MemoryObserver(final ZooKeeperExt zk,


final float freeMemoryFractionForGc = final float freeMemoryFractionForGc =
FREE_MEMORY_FRACTION_FOR_GC.get(conf); FREE_MEMORY_FRACTION_FOR_GC.get(conf);
Thread thread = new Thread(new Runnable() { ThreadUtils.startThread(new Runnable() {
@Override @Override
public void run() { public void run() {


Expand All @@ -115,18 +116,12 @@ public void run() {
LOG.warn("Exception occurred", e); LOG.warn("Exception occurred", e);
} }
} }
try { if (!ThreadUtils.trySleep(MEMORY_OBSERVER_SLEEP_MS)) {
Thread.sleep(MEMORY_OBSERVER_SLEEP_MS);
} catch (InterruptedException e) {
LOG.warn("Exception occurred", e);
return; return;
} }
} }
} }
}); }, "memory-observer");
thread.setName("memory-observer");
thread.setDaemon(true);
thread.start();
} }


/** Set watcher on memory observer folder */ /** Set watcher on memory observer folder */
Expand Down

0 comments on commit a15675c

Please sign in to comment.