From bbfd7cd4050ae80d406daaed9cec882b00be1f98 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 8 Oct 2018 13:25:17 -0500 Subject: [PATCH] STORM-3249: Make sure times shut down and so does shut down thread --- .../apache/storm/hdfs/spout/HdfsSpout.java | 2 +- .../org/apache/storm/hive/bolt/HiveBolt.java | 5 ++- .../apache/storm/hive/trident/HiveState.java | 3 +- .../src/jvm/org/apache/storm/utils/Utils.java | 43 +++++++++++-------- .../storm/blobstore/LocalFsBlobStore.java | 5 ++- .../org/apache/storm/daemon/drpc/DRPC.java | 6 +-- 6 files changed, 40 insertions(+), 24 deletions(-) diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java index 12debe514de..bdf9da379a9 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/spout/HdfsSpout.java @@ -390,7 +390,7 @@ protected void emitData(List tuple, MessageId id) { public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { LOG.info("Opening HDFS Spout"); this.conf = conf; - this.commitTimer = new Timer(); + this.commitTimer = new Timer(context.getThisTaskId() + "-commit-timer", true); this.tracker = new ProgressTracker(); this.hdfsConfig = new Configuration(); this.collector = collector; diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java index cfabbd6f235..180f41b4af0 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/bolt/HiveBolt.java @@ -79,7 +79,7 @@ public void prepare(Map conf, TopologyContext topologyContext, O new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); sendHeartBeat.set(true); - heartBeatTimer = new Timer(); + heartBeatTimer = new Timer(topologyContext.getThisTaskId() + "-hb-timer", true); setupHeartBeatTimer(); } catch (Exception e) { @@ -151,6 +151,9 @@ public void cleanup() { } callTimeoutPool = null; + if (heartBeatTimer != null) { + heartBeatTimer.cancel(); + } super.cleanup(); LOG.info("Hive Bolt stopped"); } diff --git a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java index a698e24f4c5..67173296376 100644 --- a/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java +++ b/external/storm-hive/src/main/java/org/apache/storm/hive/trident/HiveState.java @@ -80,7 +80,7 @@ public void prepare(Map conf, IMetricsContext metrics, int parti String timeoutName = "hive-bolt-%d"; this.callTimeoutPool = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setNameFormat(timeoutName).build()); - heartBeatTimer = new Timer(); + heartBeatTimer = new Timer("hive-hb-timer", true); setupHeartBeatTimer(); } catch (Exception e) { LOG.warn("unable to make connection to hive ", e); @@ -289,6 +289,7 @@ public void cleanup() { LOG.warn("shutdown interrupted on " + execService, ex); } } + heartBeatTimer.cancel(); callTimeoutPool = null; } diff --git a/storm-client/src/jvm/org/apache/storm/utils/Utils.java b/storm-client/src/jvm/org/apache/storm/utils/Utils.java index a0406e04037..9e9196f3b63 100644 --- a/storm-client/src/jvm/org/apache/storm/utils/Utils.java +++ b/storm-client/src/jvm/org/apache/storm/utils/Utils.java @@ -33,6 +33,7 @@ import java.io.OutputStreamWriter; import java.io.Serializable; import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; import java.net.InetAddress; import java.net.ServerSocket; import java.net.URL; @@ -300,21 +301,25 @@ public static void addShutdownHookWithForceKillIn1Sec(Runnable func) { * runtime to avoid any zombie process in case cleanup function hangs. */ public static void addShutdownHookWithDelayedForceKill(Runnable func, int numSecs) { - Runnable sleepKill = new Runnable() { - @Override - public void run() { - try { - LOG.info("Halting after {} seconds", numSecs); - Time.sleepSecs(numSecs); - LOG.warn("Forcing Halt..."); - Runtime.getRuntime().halt(20); - } catch (Exception e) { - LOG.warn("Exception in the ShutDownHook", e); - } + final Thread sleepKill = new Thread(() -> { + try { + LOG.info("Halting after {} seconds", numSecs); + Time.sleepSecs(numSecs); + LOG.warn("Forcing Halt... {}", Utils.threadDump()); + Runtime.getRuntime().halt(20); + } catch (InterruptedException ie) { + //Ignored/expected... + } catch (Exception e) { + LOG.warn("Exception in the ShutDownHook", e); } - }; - Runtime.getRuntime().addShutdownHook(new Thread(func)); - Runtime.getRuntime().addShutdownHook(new Thread(sleepKill)); + }); + sleepKill.setDaemon(true); + Thread wrappedFunc = new Thread(() -> { + func.run(); + sleepKill.interrupt(); + }); + Runtime.getRuntime().addShutdownHook(wrappedFunc); + Runtime.getRuntime().addShutdownHook(sleepKill); } public static boolean isSystemId(String id) { @@ -1190,7 +1195,9 @@ public static String threadDump() { final StringBuilder dump = new StringBuilder(); final java.lang.management.ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); final java.lang.management.ThreadInfo[] threadInfos = threadMXBean.getThreadInfo(threadMXBean.getAllThreadIds(), 100); - for (java.lang.management.ThreadInfo threadInfo : threadInfos) { + for (Entry entry: Thread.getAllStackTraces().entrySet()) { + Thread t = entry.getKey(); + ThreadInfo threadInfo = threadMXBean.getThreadInfo(t.getId()); if (threadInfo == null) { //Thread died before we could get the info, skip continue; @@ -1198,6 +1205,9 @@ public static String threadDump() { dump.append('"'); dump.append(threadInfo.getThreadName()); dump.append("\" "); + if (t.isDaemon()) { + dump.append("(DAEMON)"); + } dump.append("\n lock: "); dump.append(threadInfo.getLockName()); dump.append(" owner: "); @@ -1205,8 +1215,7 @@ public static String threadDump() { final Thread.State state = threadInfo.getThreadState(); dump.append("\n java.lang.Thread.State: "); dump.append(state); - final StackTraceElement[] stackTraceElements = threadInfo.getStackTrace(); - for (final StackTraceElement stackTraceElement : stackTraceElements) { + for (final StackTraceElement stackTraceElement : entry.getValue()) { dump.append("\n at "); dump.append(stackTraceElement); } diff --git a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java index a72944b95a2..7724b318252 100644 --- a/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java +++ b/storm-server/src/main/java/org/apache/storm/blobstore/LocalFsBlobStore.java @@ -108,7 +108,7 @@ public void prepare(Map conf, String overrideBase, NimbusInfo ni } catch (Exception e) { e.printStackTrace(); } - timer = new Timer(); + timer = new Timer("BLOB-STORE-TIMER", true); this.leaderElector = leaderElector; } @@ -411,6 +411,9 @@ public void shutdown() { if (zkClient != null) { zkClient.close(); } + if (timer != null) { + timer.cancel();; + } } @Override diff --git a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java index e8edd7db977..2d853f39570 100644 --- a/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java +++ b/storm-server/src/main/java/org/apache/storm/daemon/drpc/DRPC.java @@ -71,7 +71,7 @@ public class DRPC implements AutoCloseable { //Waiting to be returned private final ConcurrentHashMap _requests = new ConcurrentHashMap<>(); - private final Timer _timer = new Timer(); + private final Timer timer = new Timer("DRPC-CLEANUP-TIMER", true); private final AtomicLong _ctr = new AtomicLong(0); private final IAuthorizer _auth; @@ -87,7 +87,7 @@ public DRPC(StormMetricsRegistry metricsRegistry, IAuthorizer auth, long timeout this.meterResultCalls = metricsRegistry.registerMeter("drpc:num-result-calls"); this.meterFailRequestCalls = metricsRegistry.registerMeter("drpc:num-failRequest-calls"); this.meterFetchRequestCalls = metricsRegistry.registerMeter("drpc:num-fetchRequest-calls"); - _timer.scheduleAtFixedRate(new TimerTask() { + timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { cleanupAll(timeoutMs, TIMED_OUT); @@ -241,7 +241,7 @@ public String executeBlocking(String functionName, String funcArgs) throws DRPCE @Override public void close() { - _timer.cancel(); + timer.cancel(); cleanupAll(0, SHUT_DOWN); } }