Skip to content
Permalink
Browse files
GIRAPH-1163
closes #52
  • Loading branch information
Maja Kabiljo committed Oct 12, 2017
1 parent 83d06d9 commit 2e7ce47dfc59e772a9fcc8577bbc6b14f9311bf3
Show file tree
Hide file tree
Showing 7 changed files with 45 additions and 10 deletions.
@@ -19,6 +19,7 @@
package org.apache.giraph.graph;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
@@ -93,11 +94,12 @@ public void run(Context context) throws IOException, InterruptedException {
// CHECKSTYLE: stop IllegalCatch
} catch (RuntimeException e) {
// CHECKSTYLE: resume IllegalCatch
byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
LOG.error("Caught an unrecoverable exception " + e.getMessage(), e);
graphTaskManager.getJobProgressTracker().logError(
"Exception occurred on mapper " +
graphTaskManager.getConf().getTaskPartition() + ": " +
ExceptionUtils.getStackTrace(e));
ExceptionUtils.getStackTrace(e), exByteArray);
graphTaskManager.zooKeeperCleanup();
graphTaskManager.workerFailureCleanup();
throw new IllegalStateException(
@@ -65,6 +65,7 @@
import org.apache.giraph.worker.WorkerContext;
import org.apache.giraph.worker.WorkerObserver;
import org.apache.giraph.worker.WorkerProgress;
import org.apache.giraph.writable.kryo.KryoWritableWrapper;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -1115,8 +1116,9 @@ public void uncaughtException(final Thread t, final Throwable e) {
LOG.fatal(
"uncaughtException: OverrideExceptionHandler on thread " +
t.getName() + ", msg = " + e.getMessage() + ", exiting...", e);
jobProgressTracker.logError(ExceptionUtils.getStackTrace(e));

byte [] exByteArray = KryoWritableWrapper.convertToByteArray(e);
jobProgressTracker.logError(ExceptionUtils.getStackTrace(e),
exByteArray);
zooKeeperCleanup();
workerFailureCleanup();
} finally {
@@ -39,7 +39,7 @@ public void logInfo(String logLine) {
}

@Override
public void logError(String logLine) {
public void logError(String logLine, byte [] exByteArray) {
}

@Override
@@ -124,11 +124,12 @@ public void run() {
}

@Override
public synchronized void logError(final String logLine) {
public synchronized void logError(final String logLine,
final byte [] exByteArray) {
executeWithRetry(new Runnable() {
@Override
public void run() {
jobProgressTracker.logError(logLine);
jobProgressTracker.logError(logLine, exByteArray);
}
});
}
@@ -224,7 +224,8 @@ public void logInfo(String logLine) {
}

@Override
public void logError(String logLine) {
public void
logError(String logLine, byte [] exByteArray) {
LOG.error(logLine);
}

@@ -43,13 +43,17 @@ public interface JobProgressTracker {
void logInfo(String logLine);

/**
* Call this when you want to log an error line from any mapper to command
* line
* Call this when you want to log an error line and exception
* object from any mapper to command line
*
* KryoWritableWrapper.convertFromByteArray can be used to
* get exception object back
*
* @param logLine Line to log
* @param exByteArray Exception byte array
*/
@ThriftMethod
void logError(String logLine);
void logError(String logLine, byte [] exByteArray);

/**
* Notify that job is failing
@@ -120,4 +120,29 @@ public static <T> T unwrapIfNeeded(Writable value) {
public static <T> T wrapAndCopy(T object) {
return WritableUtils.createCopy(new KryoWritableWrapper<>(object)).get();
}

/**
* Converting the object to byte array.
* @param object Object
* @param <T> Type
* @return byte array
*/
public static <T> byte [] convertToByteArray(T object) {
KryoWritableWrapper<T> wrapper =
new KryoWritableWrapper<>(object);
return WritableUtils.toByteArray(wrapper);
}

/**
* Converting from byte array
* @param arr byte array
* @param <T> type
* @return original object
*/
public static <T> T convertFromByteArray(byte [] arr) {
KryoWritableWrapper<T> wrapper =
new KryoWritableWrapper<>();
WritableUtils.fromByteArray(arr, wrapper);
return wrapper.get();
}
}

0 comments on commit 2e7ce47

Please sign in to comment.