From d4cfb729ed72c121978476a8966e6413e31b38f1 Mon Sep 17 00:00:00 2001 From: Jungtaek Lim Date: Wed, 27 Dec 2017 11:52:19 +0900 Subject: [PATCH] STORM-2870 Properly shutdown ExecutorService in FileBasedEventLogger * extract local variable 'scheduler' to one of fields * gracefully shutdown the scheduler * address review comments * wait once, and call shutdownNow, don't wait afterwards * set daemon to true --- .../storm/metric/FileBasedEventLogger.java | 34 +++++++++++++++++-- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java b/storm-core/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java index 7697bf4d3bd..eee278be6a0 100644 --- a/storm-core/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java +++ b/storm-core/src/jvm/org/apache/storm/metric/FileBasedEventLogger.java @@ -35,8 +35,11 @@ import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + public class FileBasedEventLogger implements IEventLogger { private static final Logger LOG = LoggerFactory.getLogger(FileBasedEventLogger.class); @@ -44,6 +47,7 @@ public class FileBasedEventLogger implements IEventLogger { private Path eventLogPath; private BufferedWriter eventLogWriter; + private ScheduledExecutorService flushScheduler; private volatile boolean dirty = false; private void initLogWriter(Path logFilePath) { @@ -60,8 +64,13 @@ private void initLogWriter(Path logFilePath) { private void setUpFlushTask() { - ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); - Runnable task = new Runnable() { + ThreadFactory threadFactory = new ThreadFactoryBuilder() + .setNameFormat("event-logger-flush-%d") + .setDaemon(true) + .build(); + + flushScheduler = Executors.newSingleThreadScheduledExecutor(threadFactory); + Runnable runnable = new Runnable() { @Override public void run() { try { @@ -76,7 +85,7 @@ public void run() { } }; - scheduler.scheduleAtFixedRate(task, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); + flushScheduler.scheduleAtFixedRate(runnable, FLUSH_INTERVAL_MILLIS, FLUSH_INTERVAL_MILLIS, TimeUnit.MILLISECONDS); } private String getFirstNonNull(String... strings) { @@ -147,8 +156,27 @@ protected String buildLogMessage(EventInfo event) { public void close() { try { eventLogWriter.close(); + } catch (IOException ex) { LOG.error("Error closing event log.", ex); } + + closeFlushScheduler(); + } + + private void closeFlushScheduler() { + if (flushScheduler != null) { + flushScheduler.shutdown(); + try { + if (!flushScheduler.awaitTermination(2, TimeUnit.SECONDS)) { + flushScheduler.shutdownNow(); + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + flushScheduler.shutdownNow(); + // Preserve interrupt status + Thread.currentThread().interrupt(); + } + } } }