From c8147a0f7a963b262c5291c033e1e0ff80d695c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=B7=98=E6=B1=9F?= Date: Mon, 20 Feb 2017 17:54:54 +0800 Subject: [PATCH] [FLINK-5830] [distributed coordination] Handle OutOfMemory and fatal errors during process async message in akka rpc actor This closes #3360 --- .../java/org/apache/flink/util/ExceptionUtils.java | 13 +++++++++++++ .../apache/flink/runtime/rpc/akka/AkkaRpcActor.java | 6 ++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 7167a0b9e3de32..ca81465c3609ae 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -112,6 +112,19 @@ public static void rethrowIfFatalError(Throwable t) { } } + /** + * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM + * or an out-of-memory error. See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a + * definition of fatal errors. + * + * @param t The Throwable to check and rethrow. + */ + public static void rethrowIfFatalErrorOrOOM(Throwable t) { + if (isJvmFatalError(t) || t instanceof OutOfMemoryError) { + throw (Error) t; + } + } + /** * Adds a new exception as a {@link Throwable#addSuppressed(Throwable) suppressed exception} * to a prior exception, or returns the new exception, if no prior exception exists. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 264ba96ab21be7..99f8211a8c1d11 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.rpc.akka.messages.RunAsync; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -271,8 +272,9 @@ else if (runAsync.getDelay() == 0) { // run immediately try { runAsync.getRunnable().run(); - } catch (final Throwable e) { - LOG.error("Caught exception while executing runnable in main thread.", e); + } catch (Throwable t) { + LOG.error("Caught exception while executing runnable in main thread.", t); + ExceptionUtils.rethrowIfFatalErrorOrOOM(t); } } else {