From 36046191aba31e6cff22eabbac71164877ffeb17 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 5 Jan 2017 14:08:02 -0600 Subject: [PATCH 1/2] STORM-2278: Allow max number of disruptor queue flusher threads to be configurable --- .../jvm/org/apache/storm/utils/DisruptorQueue.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index e80d746c21b..6801df3931d 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -63,6 +63,18 @@ public class DisruptorQueue implements IStatefulObject { private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); + private static int getNumFlusherPoolThreads() { + int numThreads = 100; + try { + String threads = System.getProperty("num_flusher_pool_threads", "100"); + numThreads = Integer.parseInt(threads); + } catch (Exception e) { + LOG.warn("Error while parsing number of flusher pool threads", e); + } + LOG.debug("Reading num_flusher_pool_threads Flusher pool threads: {}", numThreads); + return numThreads; + } + private static class FlusherPool { private static final String THREAD_PREFIX = "disruptor-flush"; private Timer _timer = new Timer(THREAD_PREFIX + "-trigger", true); @@ -71,7 +83,7 @@ private static class FlusherPool { private HashMap _tt = new HashMap<>(); public FlusherPool() { - _exec = new ThreadPoolExecutor(1, 100, 10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadPoolExecutor.DiscardPolicy()); + _exec = new ThreadPoolExecutor(1, getNumFlusherPoolThreads(), 10, TimeUnit.SECONDS, new ArrayBlockingQueue(1024), new ThreadPoolExecutor.DiscardPolicy()); ThreadFactory threadFactory = new ThreadFactoryBuilder() .setDaemon(true) .setNameFormat(THREAD_PREFIX + "-task-pool") From 107795c01e5af6880d71bd044adfb5b0be04dc2e Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 9 Jan 2017 15:53:18 -0600 Subject: [PATCH 2/2] STORM-2278: Addressed review comments --- storm-core/src/jvm/org/apache/storm/Config.java | 12 +++++++++++- .../jvm/org/apache/storm/utils/DisruptorQueue.java | 11 +++++++++-- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index eef3e79b929..92f81dc89c1 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -2265,7 +2265,17 @@ public class Config extends HashMap { @isString public static final String CLIENT_JAR_TRANSFORMER = "client.jartransformer.class"; - + /** + * This is a config that is not likely to be used. Internally the disruptor queue will batch entries written + * into the queue. A background thread pool will flush those batches if they get too old. By default that + * pool can grow rather large, and sacrifice some CPU time to keep the latency low. In some cases you may + * want the queue to be smaller so there is less CPU used, but the latency will increase in some situations. + * This configs is on a per cluster bases, if you want to control this on a per topology bases you need to set + * the java System property for the worker "num_flusher_pool_threads" to the value you want. + */ + @isInteger + public static final String STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE = "storm.worker.disruptor.flusher.max.pool.size"; + /** * The plugin to be used for resource isolation */ diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index 6801df3931d..4b3aa065f62 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -31,6 +31,7 @@ import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.dsl.ProducerType; +import org.apache.storm.Config; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.internal.RateTracker; import org.slf4j.Logger; @@ -62,11 +63,17 @@ public class DisruptorQueue implements IStatefulObject { private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); - + private static int getNumFlusherPoolThreads() { int numThreads = 100; try { - String threads = System.getProperty("num_flusher_pool_threads", "100"); + Map conf = Utils.readStormConfig(); + numThreads = Utils.getInt(conf.get(Config.STORM_WORKER_DISRUPTOR_FLUSHER_MAX_POOL_SIZE), numThreads); + } catch (Exception e) { + LOG.warn("Error while trying to read system config", e); + } + try { + String threads = System.getProperty("num_flusher_pool_threads", String.valueOf(numThreads)); numThreads = Integer.parseInt(threads); } catch (Exception e) { LOG.warn("Error while parsing number of flusher pool threads", e);