Skip to content
Permalink
Browse files
GIRAPH-1218
closes #101
  • Loading branch information
dlogothetis committed May 8, 2019
1 parent fff0d34 commit e827a5668a94d13800201191ebf7d34642728771
Showing 1 changed file with 62 additions and 24 deletions.
@@ -19,9 +19,11 @@
package org.apache.giraph.graph;

import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.IntConfOption;
import org.apache.giraph.job.ClientThriftServer;
import org.apache.giraph.job.JobProgressTracker;
import org.apache.giraph.master.MasterProgress;
import org.apache.giraph.utils.ThreadUtils;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.log4j.Logger;

@@ -42,12 +44,21 @@
import java.util.concurrent.RejectedExecutionException;

/**
* Wrapper around JobProgressTracker which retires to connect and swallows
* Wrapper around JobProgressTracker which retries to connect and swallows
* exceptions so app wouldn't crash if something goes wrong with progress
* reports.
*/
public class RetryableJobProgressTrackerClient
implements JobProgressTrackerClient {
/** Conf option for number of retries */
public static final IntConfOption RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES =
new IntConfOption("giraph.job.progress.client.num.retries", 1,
"Number of times to retry a failed operation");
/** Conf option for wait time between retries */
public static final IntConfOption
RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS =
new IntConfOption("giraph.job.progress.client.retries.wait", 1000,
"Time (msec) to wait between retries");
/** Class logger */
private static final Logger LOG =
Logger.getLogger(RetryableJobProgressTrackerClient.class);
@@ -57,6 +68,10 @@
private ThriftClientManager clientManager;
/** Job progress tracker */
private JobProgressTracker jobProgressTracker;
/** Cached value for number of retries */
private int numRetries;
/** Cached value for wait time between retries */
private int retryWaitMsec;

/**
* Constructor
@@ -66,6 +81,8 @@
public RetryableJobProgressTrackerClient(GiraphConfiguration conf) throws
ExecutionException, InterruptedException {
this.conf = conf;
numRetries = RETRYABLE_JOB_PROGRESS_CLIENT_NUM_RETRIES.get(conf);
retryWaitMsec = RETRYABLE_JOB_PROGRESS_CLIENT_RETRY_WAIT_MS.get(conf);
resetConnection();
}

@@ -110,7 +127,7 @@ public synchronized void mapperStarted() {
public void run() {
jobProgressTracker.mapperStarted();
}
});
}, numRetries);
}

@Override
@@ -120,7 +137,7 @@ public synchronized void logInfo(final String logLine) {
public void run() {
jobProgressTracker.logInfo(logLine);
}
});
}, numRetries);
}

@Override
@@ -131,7 +148,7 @@ public synchronized void logError(final String logLine,
public void run() {
jobProgressTracker.logError(logLine, exByteArray);
}
});
}, numRetries);
}

@Override
@@ -141,7 +158,7 @@ public synchronized void logFailure(final String reason) {
public void run() {
jobProgressTracker.logFailure(reason);
}
});
}, numRetries);
}

@Override
@@ -151,7 +168,7 @@ public synchronized void updateProgress(final WorkerProgress workerProgress) {
public void run() {
jobProgressTracker.updateProgress(workerProgress);
}
});
}, numRetries);
}

@Override
@@ -161,50 +178,71 @@ public void updateMasterProgress(final MasterProgress masterProgress) {
public void run() {
jobProgressTracker.updateMasterProgress(masterProgress);
}
});
}, numRetries);
}

/**
* Execute Runnable, if disconnected try to connect again and retry
*
* @param runnable Runnable to execute
* @param numRetries Number of retries
*/
private void executeWithRetry(Runnable runnable) {
private void executeWithRetry(Runnable runnable, int numRetries) {
try {
runnable.run();
} catch (RuntimeTTransportException | RejectedExecutionException te) {
if (LOG.isDebugEnabled()) {
LOG.debug(te.getClass() + " occurred while talking to " +
"JobProgressTracker server, trying to reconnect", te);
"JobProgressTracker server, trying to reconnect", te);
}
try {
for (int i = 0; i < numRetries; i++) {
try {
clientManager.close();
ThreadUtils.trySleep(retryWaitMsec);
retry(runnable);
break; // If the retry succeeded, we simply break from the loop

// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
if (LOG.isDebugEnabled()) {
LOG.debug(
"Exception occurred while trying to close client manager", e);
if (LOG.isInfoEnabled()) {
LOG.info("Exception occurred while talking to " +
"JobProgressTracker server after retry " + i +
" of " + numRetries, e);
}
}
resetConnection();
runnable.run();
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
if (LOG.isInfoEnabled()) {
LOG.info("Exception occurred while talking to " +
"JobProgressTracker server, giving up", e);
}
}
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
if (LOG.isInfoEnabled()) {
LOG.info("Exception occurred while talking to " +
"JobProgressTracker server, giving up", e);
"JobProgressTracker server, giving up", e);
}
}
}

/**
* Executes a single retry by closing the existing {@link #clientManager}
* connection, re-initializing it, and then executing the passed instance
* of {@link Runnable}.
*
* @param runnable Instance of {@link Runnable} to execute.
* @throws ExecutionException
* @throws InterruptedException
*/
private void retry(Runnable runnable) throws ExecutionException,
InterruptedException {
try {
clientManager.close();
// CHECKSTYLE: stop IllegalCatch
} catch (Exception e) {
// CHECKSTYLE: resume IllegalCatch
if (LOG.isDebugEnabled()) {
LOG.debug(
"Exception occurred while trying to close client manager", e);
}
}
resetConnection();
runnable.run();
}
}

0 comments on commit e827a56

Please sign in to comment.