From 4f9d5c013da12284353a2411ef386c07ef210214 Mon Sep 17 00:00:00 2001 From: Govind Menon Date: Fri, 28 Apr 2017 16:11:19 -0500 Subject: [PATCH] STORM-1642: Rethrow exception on serialization error and kill worker --- .../netty/NettyUncaughtExceptionHandler.java | 13 +++++++--- .../messaging/netty/StormServerHandler.java | 21 +++++++++++++--- .../src/jvm/org/apache/storm/utils/Utils.java | 25 +++++++++++-------- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java index fd48bdcb574..87fae1e8372 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/NettyUncaughtExceptionHandler.java @@ -25,11 +25,18 @@ public class NettyUncaughtExceptionHandler implements Thread.UncaughtExceptionHa private static final Logger LOG = LoggerFactory.getLogger(NettyUncaughtExceptionHandler.class); @Override public void uncaughtException(Thread t, Throwable e) { + try { + LOG.error("Uncaught exception in netty " + e.getCause()); + } catch (Throwable err) { + // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it + } + try { Utils.handleUncaughtException(e); - } catch (Error error) { - LOG.info("Received error in netty thread.. terminating server..."); - Runtime.getRuntime().exit(1); + } catch (Throwable throwable) { + LOG.error("Exception thrown while handling uncaught exception " + throwable.getCause()); } + LOG.info("Received error in netty thread.. terminating server..."); + Runtime.getRuntime().exit(1); } } diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java index fbec9659fe3..1e060ada224 100644 --- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java +++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java @@ -26,10 +26,16 @@ import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; public class StormServerHandler extends SimpleChannelUpstreamHandler { private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class); + private static final Set allowedExceptions = new HashSet<>(Arrays.asList(new Class[] {IOException.class})); IServer server; private AtomicInteger failure_count; private Channel channel; @@ -67,8 +73,17 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { - LOG.error("server errors in handling the request", e.getCause()); - Utils.handleUncaughtException(e.getCause()); - server.closeChannel(e.getChannel()); + try { + LOG.error("server errors in handling the request", e.getCause()); + } catch (Throwable err) { + // Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it + } + try { + Utils.handleUncaughtException(e.getCause(), allowedExceptions); + } catch (Error error) { + LOG.info("Received error in netty thread.. terminating server..."); + Runtime.getRuntime().exit(1); + } + } } diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index 26043d1586d..d9b6684eec9 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -99,7 +99,7 @@ public class Utils { public static final Logger LOG = LoggerFactory.getLogger(Utils.class); public static final String DEFAULT_STREAM_ID = "default"; - + private static final Set defaultAllowedExceptions = new HashSet<>(); public static final String FILE_PATH_SEPARATOR = System.getProperty("file.separator"); private static ThreadLocal threadSer = new ThreadLocal(); @@ -528,7 +528,11 @@ public static boolean isZkAuthenticationConfiguredTopology(Map c } public static void handleUncaughtException(Throwable t) { - if (t != null && t instanceof Error) { + handleUncaughtException(t, defaultAllowedExceptions); + } + + public static void handleUncaughtException(Throwable t, Set allowedExceptions) { + if (t != null) { if (t instanceof OutOfMemoryError) { try { System.err.println("Halting due to Out Of Memory Error..." + Thread.currentThread().getName()); @@ -536,17 +540,16 @@ public static void handleUncaughtException(Throwable t) { //Again we don't want to exit because of logging issues. } Runtime.getRuntime().halt(-1); - } else { - //Running in daemon mode, we would pass Error to calling thread. - throw (Error) t; } - } else if (t instanceof Exception) { - System.err.println("Uncaught Exception detected. Leave error log and ignore... Exception: " + t); - System.err.println("Stack trace:"); - StringWriter sw = new StringWriter(); - t.printStackTrace(new PrintWriter(sw)); - System.err.println(sw.toString()); } + + if(allowedExceptions.contains(t.getClass())) { + LOG.info("Swallowing {} {}", t.getClass(), t); + return; + } + + //Running in daemon mode, we would pass Error to calling thread. + throw new Error(t); } public static byte[] thriftSerialize(TBase t) {