From 48c55c84cc7a39dc1edf6cc8d6b196dd94b9c8b0 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Fri, 18 Sep 2015 16:33:39 -0500 Subject: [PATCH 1/6] STORM-350: Upgrade to newer version of disruptor --- conf/defaults.yaml | 1 - .../storm/starter/FastWordCountTopology.java | 185 ++++++++++++++++++ pom.xml | 4 +- storm-core/pom.xml | 2 +- .../clj/backtype/storm/daemon/executor.clj | 3 +- .../src/clj/backtype/storm/daemon/worker.clj | 6 +- .../src/clj/backtype/storm/disruptor.clj | 32 +-- storm-core/src/jvm/backtype/storm/Config.java | 11 +- .../backtype/storm/utils/DisruptorQueue.java | 62 +++--- .../utils/DisruptorQueueBackpressureTest.java | 8 +- .../storm/utils/DisruptorQueueTest.java | 65 +++++- 11 files changed, 295 insertions(+), 84 deletions(-) create mode 100644 examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 7166bd44142..f4c189c13a8 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -210,7 +210,6 @@ topology.executor.send.buffer.size: 1024 #individual messages topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 -topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java new file mode 100644 index 00000000000..c50c994cfd8 --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.*; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +/** + * WordCount but teh spout does not stop, and the bolts are implemented in + * java. This can show how fast the word count can run. + */ +public class FastWordCountTopology { + public static class FastRandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + Random _rand; + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = ThreadLocalRandom.current(); + } + + @Override + public void nextTuple() { + String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", + "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; + String sentence = sentences[_rand.nextInt(sentences.length)]; + _collector.emit(new Values(sentence), sentence); + } + + @Override + public void ack(Object id) { + //Ignored + } + + @Override + public void fail(Object id) { + _collector.emit(new Values(id), id); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } + } + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map counts = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void printMetrics(Nimbus.Client client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts: summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named "+name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec: info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map ackedMap = stats.get_acked().get(":all-time"); + Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key: ackedMap.keySet()) { + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal/acked; + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime)); + } + + public static void kill(Nimbus.Client client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new FastRandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + + String name = "wc-test"; + if (args != null && args.length > 0) { + name = args[0]; + } + + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + + Map clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + //Sleep for 5 mins + for (int i = 0; i < 10; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); + } + kill(client, name); + } +} diff --git a/pom.xml b/pom.xml index 01a12fab9c1..008a98842bf 100644 --- a/pom.xml +++ b/pom.xml @@ -191,7 +191,7 @@ 1.11 4.3.3 0.2.4 - 2.10.4 + 3.3.2 0.9.0 16.0.1 3.9.0.Final @@ -549,7 +549,7 @@ ${clojure.tools.cli.version} - com.googlecode.disruptor + com.lmax disruptor ${disruptor.version} diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 940ab453142..87a5804864b 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -205,7 +205,7 @@ compile - com.googlecode.disruptor + com.lmax disruptor diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj index fd8b886084a..512f6fd3816 100644 --- a/storm-core/src/clj/backtype/storm/daemon/executor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj @@ -225,8 +225,7 @@ (str "executor" executor-id "-send-queue") (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :claim-strategy :single-threaded - :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) + :producer-type :single-threaded) ] (recursive-map :worker worker diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 2b74f6974d8..355c2f616f0 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -190,8 +190,7 @@ ;; TODO: this depends on the type of executor (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))])) + (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS))])) (into {}) )) @@ -232,8 +231,7 @@ (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) + (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) receive-queue-map (->> executor-receive-queue-map diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj index 25e0050cd49..f8db4afdcbe 100644 --- a/storm-core/src/clj/backtype/storm/disruptor.clj +++ b/storm-core/src/clj/backtype/storm/disruptor.clj @@ -16,39 +16,21 @@ (ns backtype.storm.disruptor (:import [backtype.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) - (:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy - BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy - BusySpinWaitStrategy]) + (:import [com.lmax.disruptor.dsl ProducerType]) (:require [clojure [string :as str]]) (:require [clojure [set :as set]]) (:use [clojure walk]) (:use [backtype.storm util log])) -(def CLAIM-STRATEGY - {:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size))) - :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))}) +(def PRODUCER-TYPE + {:multi-threaded ProducerType/MULTI + :single-threaded ProducerType/SINGLE}) -(def WAIT-STRATEGY - {:block (fn [] (BlockingWaitStrategy.)) - :yield (fn [] (YieldingWaitStrategy.)) - :sleep (fn [] (SleepingWaitStrategy.)) - :spin (fn [] (BusySpinWaitStrategy.))}) - -(defn- mk-wait-strategy - [spec] - (if (keyword? spec) - ((WAIT-STRATEGY spec)) - (-> (str spec) new-instance))) - -;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue. -;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message -;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue, -;; unblocking the consumer (defnk disruptor-queue - [^String queue-name buffer-size timeout :claim-strategy :multi-threaded :wait-strategy :block] + [^String queue-name buffer-size timeout :producer-type :multi-threaded] (DisruptorQueue. queue-name - ((CLAIM-STRATEGY claim-strategy) buffer-size) - (mk-wait-strategy wait-strategy) timeout)) + (PRODUCER-TYPE producer-type) buffer-size + timeout)) (defn clojure-handler [afn] diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index cabdc73493c..565ef23bd3f 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1479,10 +1479,12 @@ public class Config extends HashMap { @isInteger public static final String TOPOLOGY_TICK_TUPLE_FREQ_SECS="topology.tick.tuple.freq.secs"; - /** - * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency - * vs. throughput - */ + /** + * @deprecated this is no longer supported + * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency + * vs. throughput + */ + @Deprecated @isString public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy"; @@ -1658,7 +1660,6 @@ public class Config extends HashMap { * vs. CPU usage */ @isInteger - @isPositiveNumber @NotNull public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis"; diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java index fa073e7142b..097ccefc9cc 100644 --- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java @@ -18,53 +18,52 @@ package backtype.storm.utils; import com.lmax.disruptor.AlertException; -import com.lmax.disruptor.ClaimStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.InsufficientCapacityException; +import com.lmax.disruptor.LiteBlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.SingleThreadedClaimStrategy; +import com.lmax.disruptor.TimeoutBlockingWaitStrategy; +import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.ProducerType; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.HashMap; -import java.util.Map; import backtype.storm.metric.api.IStatefulObject; import backtype.storm.metric.internal.RateTracker; - /** * A single consumer queue that uses the LMAX Disruptor. They key to the performance is * the ability to catch up to the producer by processing tuples in batches. */ public class DisruptorQueue implements IStatefulObject { - static final Object FLUSH_CACHE = new Object(); - static final Object INTERRUPT = new Object(); + private static final Object FLUSH_CACHE = new Object(); + private static final Object INTERRUPT = new Object(); + private static final String PREFIX = "disruptor-"; - RingBuffer _buffer; - Sequence _consumer; - SequenceBarrier _barrier; + private final RingBuffer _buffer; + private final Sequence _consumer; + private final SequenceBarrier _barrier; - // TODO: consider having a threadlocal cache of this variable to speed up reads? - volatile boolean consumerStartedFlag = false; - ConcurrentLinkedQueue _cache = new ConcurrentLinkedQueue(); + private volatile boolean consumerStartedFlag = false; + private final ConcurrentLinkedQueue _cache = new ConcurrentLinkedQueue(); private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); private final Lock readLock = cacheLock.readLock(); private final Lock writeLock = cacheLock.writeLock(); - private static String PREFIX = "disruptor-"; private String _queueName = ""; - private long _waitTimeout; - private final QueueMetrics _metrics; private DisruptorBackpressureCallback _cb = null; private int _highWaterMark = 0; @@ -72,15 +71,22 @@ public class DisruptorQueue implements IStatefulObject { private boolean _enableBackpressure = false; private volatile boolean _throttleOn = false; - public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, long timeout) { + public DisruptorQueue(String queueName, ProducerType type, int size, long timeout) { this._queueName = PREFIX + queueName; - _buffer = new RingBuffer(new ObjectEventFactory(), claim, wait); + WaitStrategy wait = null; + if (timeout <= 0) { + wait = new LiteBlockingWaitStrategy(); + } else { + wait = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); + } + + _buffer = RingBuffer.create(type, new ObjectEventFactory(), size, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); - _buffer.setGatingSequences(_consumer); + _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); - if (claim instanceof SingleThreadedClaimStrategy) { + if (type == ProducerType.SINGLE) { consumerStartedFlag = true; } else { // make sure we flush the pending messages in cache first @@ -90,8 +96,6 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, throw new RuntimeException("This code should be unreachable!", e); } } - - _waitTimeout = timeout; } public String getName() { @@ -109,9 +113,12 @@ public void haltWithInterrupt() { public void consumeBatchWhenAvailable(EventHandler handler) { try { final long nextSequence = _consumer.get() + 1; - final long availableSequence = - _waitTimeout == 0L ? _barrier.waitFor(nextSequence) : _barrier.waitFor(nextSequence, _waitTimeout, - TimeUnit.MILLISECONDS); + long availableSequence = 0; + try { + availableSequence = _barrier.waitFor(nextSequence); + } catch (TimeoutException te) { + availableSequence = _barrier.getCursor(); + } if (availableSequence >= nextSequence) { consumeBatchToCursor(availableSequence, handler); @@ -123,7 +130,6 @@ public void consumeBatchWhenAvailable(EventHandler handler) { } } - private void consumeBatchToCursor(long cursor, EventHandler handler) { for (long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { @@ -180,7 +186,6 @@ public void tryPublish(Object obj) throws InsufficientCapacityException { } public void publish(Object obj, boolean block) throws InsufficientCapacityException { - boolean publishNow = consumerStartedFlag; if (!publishNow) { @@ -218,13 +223,12 @@ private void publishDirect(Object obj, boolean block) throws InsufficientCapacit _throttleOn = true; } } catch (Exception e) { - throw new RuntimeException("Exception during calling highWaterMark callback!"); + throw new RuntimeException("Exception during calling highWaterMark callback!", e); } } } public void consumerStarted() { - consumerStartedFlag = true; // Use writeLock to make sure all pending cache add opearation completed diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java index 197744f8ec4..9a0f5a7d40f 100644 --- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java +++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java @@ -20,9 +20,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.MultiThreadedClaimStrategy; +import com.lmax.disruptor.dsl.ProducerType; import org.junit.Assert; import org.junit.Test; import junit.framework.TestCase; @@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory; public class DisruptorQueueBackpressureTest extends TestCase { - private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueBackpressureTest.class); private final static int MESSAGES = 100; @@ -38,7 +36,6 @@ public class DisruptorQueueBackpressureTest extends TestCase { private final static double HIGH_WATERMARK = 0.6; private final static double LOW_WATERMARK = 0.2; - @Test public void testBackPressureCallback() throws Exception { @@ -109,7 +106,6 @@ public void lowWaterMark() throws Exception { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, new MultiThreadedClaimStrategy( - queueSize), new BlockingWaitStrategy(), 10L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L); } } diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java index ddc0982b655..d9154a5fc6f 100644 --- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java +++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java @@ -20,10 +20,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.InsufficientCapacityException; -import com.lmax.disruptor.MultiThreadedClaimStrategy; import org.junit.Assert; import org.junit.Test; import junit.framework.TestCase; @@ -83,25 +82,53 @@ public void onEvent(Object obj, long sequence, boolean endOfBatch) messageConsumed.get()); } + @Test + public void testInOrder() throws InterruptedException { + final AtomicBoolean allInOrder = new AtomicBoolean(true); + + DisruptorQueue queue = createQueue("consumerHang", 1024); + Runnable producer = new IncProducer(queue, 1024*1024); + Runnable consumer = new Consumer(queue, new EventHandler() { + long _expected = 0; + @Override + public void onEvent(Object obj, long sequence, boolean endOfBatch) + throws Exception { + if (_expected != ((Number)obj).longValue()) { + allInOrder.set(false); + System.out.println("Expected "+_expected+" but got "+obj); + } + _expected++; + } + }); + + run(producer, consumer, 1000, 1); + Assert.assertTrue("Messages delivered out of order", + allInOrder.get()); + } private void run(Runnable producer, Runnable consumer) throws InterruptedException { + run(producer, consumer, 10, PRODUCER_NUM); + } - Thread[] producerThreads = new Thread[PRODUCER_NUM]; - for (int i = 0; i < PRODUCER_NUM; i++) { + private void run(Runnable producer, Runnable consumer, int sleepMs, int producerNum) + throws InterruptedException { + + Thread[] producerThreads = new Thread[producerNum]; + for (int i = 0; i < producerNum; i++) { producerThreads[i] = new Thread(producer); producerThreads[i].start(); } Thread consumerThread = new Thread(consumer); consumerThread.start(); - Thread.sleep(10); - for (int i = 0; i < PRODUCER_NUM; i++) { + Thread.sleep(sleepMs); + for (int i = 0; i < producerNum; i++) { producerThreads[i].interrupt(); } consumerThread.interrupt(); - for (int i = 0; i < PRODUCER_NUM; i++) { + for (int i = 0; i < producerNum; i++) { producerThreads[i].join(TIMEOUT); assertFalse("producer "+i+" is still alive", producerThreads[i].isAlive()); } @@ -109,6 +136,27 @@ private void run(Runnable producer, Runnable consumer) assertFalse("consumer is still alive", consumerThread.isAlive()); } + private static class IncProducer implements Runnable { + private DisruptorQueue queue; + private long _max; + + IncProducer(DisruptorQueue queue, long max) { + this.queue = queue; + this._max = max; + } + + @Override + public void run() { + try { + for (long i = 0; i < _max; i++) { + queue.publish(i, false); + } + } catch (InsufficientCapacityException e) { + return; + } + } + }; + private static class Producer implements Runnable { private String msg; private DisruptorQueue queue; @@ -153,7 +201,6 @@ public void run() { }; private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, new MultiThreadedClaimStrategy( - queueSize), new BlockingWaitStrategy(), 10L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L); } } From 41b35ea9f2a2d452a59f5376c416b2beb05de748 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 23 Sep 2015 08:43:41 -0500 Subject: [PATCH 2/6] Fixed null reads from disruptor. --- storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 6 ++++-- storm-core/src/jvm/backtype/storm/utils/MutableObject.java | 6 +++--- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java index 097ccefc9cc..cd32625fbae 100644 --- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java @@ -103,7 +103,9 @@ public String getName() { } public void consumeBatch(EventHandler handler) { - consumeBatchToCursor(_barrier.getCursor(), handler); + if (_metrics.population() > 0) { + consumeBatchWhenAvailable(handler); + } } public void haltWithInterrupt() { @@ -134,7 +136,7 @@ private void consumeBatchToCursor(long cursor, EventHandler handler) { for (long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { MutableObject mo = _buffer.get(curr); - Object o = mo.o; + Object o = mo.getObject(); mo.setObject(null); if (o == FLUSH_CACHE) { Object c = null; diff --git a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java index d5cb7dbf7d7..2bd9bb1b3a7 100644 --- a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java +++ b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java @@ -18,7 +18,7 @@ package backtype.storm.utils; public class MutableObject { - Object o = null; + private Object o = null; public MutableObject() { @@ -28,11 +28,11 @@ public MutableObject(Object o) { this.o = o; } - public void setObject(Object o) { + public synchronized void setObject(Object o) { this.o = o; } - public Object getObject() { + public synchronized Object getObject() { return o; } } From 7e0b08ef2201db91cb97fec979cd98faf10dbd31 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Thu, 24 Sep 2015 13:45:36 -0500 Subject: [PATCH 3/6] Added in an in-order test case. --- .../storm/starter/FastWordCountTopology.java | 12 +- .../storm/starter/InOrderDeliveryTest.java | 175 ++++++++++++++++++ 2 files changed, 185 insertions(+), 2 deletions(-) create mode 100644 examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java index c50c994cfd8..dab94055d71 100644 --- a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java @@ -127,13 +127,21 @@ public static void printMetrics(Nimbus.Client client, String name) throws Except TopologyInfo info = client.getTopologyInfo(id); int uptime = info.get_uptime_secs(); long acked = 0; + long failed = 0; double weightedAvgTotal = 0.0; for (ExecutorSummary exec: info.get_executors()) { if ("spout".equals(exec.get_component_id())) { SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map failedMap = stats.get_failed().get(":all-time"); Map ackedMap = stats.get_acked().get(":all-time"); Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } long ackVal = ackedMap.get(key); double latVal = avgLatMap.get(key) * ackVal; acked += ackVal; @@ -142,7 +150,7 @@ public static void printMetrics(Nimbus.Client client, String name) throws Except } } double avgLatency = weightedAvgTotal/acked; - System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime)); + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); } public static void kill(Nimbus.Client client, String name) throws Exception { @@ -169,7 +177,7 @@ public static void main(String[] args) throws Exception { } conf.setNumWorkers(1); - StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); Map clusterConf = Utils.readStormConfig(); clusterConf.putAll(Utils.readCommandLineOpts()); diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java new file mode 100644 index 00000000000..5df0688030e --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.*; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.FailedException; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class InOrderDeliveryTest { + public static class InOrderSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + int _base = 0; + int _i = 0; + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _base = context.getThisTaskIndex(); + } + + @Override + public void nextTuple() { + Values v = new Values(_base, _i); + _collector.emit(v, "ACK"); + _i++; + } + + @Override + public void ack(Object id) { + //Ignored + } + + @Override + public void fail(Object id) { + //Ignored + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("c1", "c2")); + } + } + + public static class Check extends BaseBasicBolt { + Map expected = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + Integer c1 = tuple.getInteger(0); + Integer c2 = tuple.getInteger(1); + Integer exp = expected.get(c1); + if (exp == null) exp = 0; + if (c2.intValue() != exp.intValue()) { + System.out.println(c1+" "+c2+" != "+exp); + throw new FailedException(c1+" "+c2+" != "+exp); + } + exp = c2 + 1; + expected.put(c1, exp); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + //Empty + } + } + + public static void printMetrics(Nimbus.Client client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts: summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named "+name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec: info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map failedMap = stats.get_failed().get(":all-time"); + Map ackedMap = stats.get_acked().get(":all-time"); + Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal/acked; + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); + } + + public static void kill(Nimbus.Client client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new InOrderSpout(), 8); + builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1")); + + Config conf = new Config(); + conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + + String name = "in-order-test"; + if (args != null && args.length > 0) { + name = args[0]; + } + + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); + + Map clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + //Sleep for 50 mins + for (int i = 0; i < 50; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); + } + kill(client, name); + } +} From acaa3b9c4b5dd28dc63229b8717749f3293b0adb Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 5 Oct 2015 13:50:51 -0500 Subject: [PATCH 4/6] Addressed some review comments --- storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java index cd32625fbae..9d4261a24e7 100644 --- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From 945db1a6e92c0c5d6d3c95be103ab3241a82a74f Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Wed, 14 Oct 2015 10:47:23 -0500 Subject: [PATCH 5/6] Fixed issue with disruptor queue timeout. Also updated it to use AtomicReference so debugging checks can be simpler. --- .../backtype/storm/utils/DisruptorQueue.java | 40 +++++++++++-------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java index 9d4261a24e7..33cb5bfacc1 100644 --- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java @@ -38,6 +38,10 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.concurrent.atomic.AtomicReference; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import backtype.storm.metric.api.IStatefulObject; import backtype.storm.metric.internal.RateTracker; @@ -47,11 +51,12 @@ * the ability to catch up to the producer by processing tuples in batches. */ public class DisruptorQueue implements IStatefulObject { + private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueue.class); private static final Object FLUSH_CACHE = new Object(); private static final Object INTERRUPT = new Object(); private static final String PREFIX = "disruptor-"; - private final RingBuffer _buffer; + private final RingBuffer> _buffer; private final Sequence _consumer; private final SequenceBarrier _barrier; @@ -115,16 +120,13 @@ public void haltWithInterrupt() { public void consumeBatchWhenAvailable(EventHandler handler) { try { final long nextSequence = _consumer.get() + 1; - long availableSequence = 0; - try { - availableSequence = _barrier.waitFor(nextSequence); - } catch (TimeoutException te) { - availableSequence = _barrier.getCursor(); - } + long availableSequence = _barrier.waitFor(nextSequence); if (availableSequence >= nextSequence) { consumeBatchToCursor(availableSequence, handler); } + } catch (TimeoutException te) { + //Ignored } catch (AlertException e) { throw new RuntimeException(e); } catch (InterruptedException e) { @@ -135,10 +137,12 @@ public void consumeBatchWhenAvailable(EventHandler handler) { private void consumeBatchToCursor(long cursor, EventHandler handler) { for (long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { - MutableObject mo = _buffer.get(curr); - Object o = mo.getObject(); - mo.setObject(null); - if (o == FLUSH_CACHE) { + AtomicReference mo = _buffer.get(curr); + Object o = mo.getAndSet(null); + + if (o == null) { + LOG.error("NULL found in {}:{}", this.getName(), cursor); + } else if (o == FLUSH_CACHE) { Object c = null; while (true) { c = _cache.poll(); @@ -164,7 +168,6 @@ private void consumeBatchToCursor(long cursor, EventHandler handler) { throw new RuntimeException(e); } } - //TODO: only set this if the consumer cursor has changed? _consumer.set(cursor); } @@ -214,8 +217,8 @@ private void publishDirect(Object obj, boolean block) throws InsufficientCapacit } else { id = _buffer.tryNext(1); } - final MutableObject m = _buffer.get(id); - m.setObject(obj); + final AtomicReference m = _buffer.get(id); + Object old = m.getAndSet(obj); _buffer.publish(id); _metrics.notifyArrivals(1); if (_enableBackpressure && _cb != null && _metrics.population() >= _highWaterMark) { @@ -228,6 +231,9 @@ private void publishDirect(Object obj, boolean block) throws InsufficientCapacit throw new RuntimeException("Exception during calling highWaterMark callback!", e); } } + if (old != null) { + LOG.warn("Tuple was overwritten in {}:{}", getName(), id); + } } public void consumerStarted() { @@ -266,10 +272,10 @@ public DisruptorQueue setEnableBackpressure(boolean enableBackpressure) { return this; } - public static class ObjectEventFactory implements EventFactory { + public static class ObjectEventFactory implements EventFactory> { @Override - public MutableObject newInstance() { - return new MutableObject(); + public AtomicReference newInstance() { + return new AtomicReference(); } } From 1be78e79a44319ec32f3d45852a284ca6e9cb895 Mon Sep 17 00:00:00 2001 From: "Robert (Bobby) Evans" Date: Mon, 26 Oct 2015 13:17:42 -0500 Subject: [PATCH 6/6] Fixed disable of event logger --- storm-core/src/jvm/backtype/storm/Config.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index 565ef23bd3f..ad745972b33 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1256,7 +1256,7 @@ public class Config extends HashMap { * event logging will be disabled.

*/ @isInteger - @isPositiveNumber + @isPositiveNumber(includeZero = true) public static final String TOPOLOGY_EVENTLOGGER_EXECUTORS = "topology.eventlogger.executors"; /**