Skip to content

Commit

Permalink
[FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal …
Browse files Browse the repository at this point in the history
…errors during process async message in akka rpc actor

This closes apache#3360
  • Loading branch information
淘江 authored and StephanEwen committed Mar 9, 2017
1 parent 3cc3e3e commit 27fe460
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 2 deletions.
13 changes: 13 additions & 0 deletions flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
Expand Up @@ -112,6 +112,19 @@ public static void rethrowIfFatalError(Throwable t) {
}
}

/**
* Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM
* or an out-of-memory error. See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a
* definition of fatal errors.
*
* @param t The Throwable to check and rethrow.
*/
public static void rethrowIfFatalErrorOrOOM(Throwable t) {
if (isJvmFatalError(t) || t instanceof OutOfMemoryError) {
throw (Error) t;
}
}

/**
* Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception}
* to a prior exception, or returns the new exception, if no prior exception exists.
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.flink.runtime.rpc.akka.messages.RunAsync;

import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -271,8 +272,9 @@ else if (runAsync.getDelay() == 0) {
// run immediately
try {
runAsync.getRunnable().run();
} catch (final Throwable e) {
LOG.error("Caught exception while executing runnable in main thread.", e);
} catch (Throwable t) {
LOG.error("Caught exception while executing runnable in main thread.", t);
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
}
else {
Expand Down

0 comments on commit 27fe460

Please sign in to comment.