diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 43ef8f413ef..eaf20746f74 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -198,7 +198,6 @@ topology.executor.send.buffer.size: 1024 #individual messages topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 -topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy" topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 diff --git a/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java new file mode 100644 index 00000000000..dab94055d71 --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/FastWordCountTopology.java @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.*; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.ThreadLocalRandom; + +/** + * WordCount but teh spout does not stop, and the bolts are implemented in + * java. This can show how fast the word count can run. + */ +public class FastWordCountTopology { + public static class FastRandomSentenceSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + Random _rand; + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = ThreadLocalRandom.current(); + } + + @Override + public void nextTuple() { + String[] sentences = new String[]{ "the cow jumped over the moon", "an apple a day keeps the doctor away", + "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" }; + String sentence = sentences[_rand.nextInt(sentences.length)]; + _collector.emit(new Values(sentence), sentence); + } + + @Override + public void ack(Object id) { + //Ignored + } + + @Override + public void fail(Object id) { + _collector.emit(new Values(id), id); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("sentence")); + } + } + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map counts = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void printMetrics(Nimbus.Client client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts: summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named "+name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec: info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map failedMap = stats.get_failed().get(":all-time"); + Map ackedMap = stats.get_acked().get(":all-time"); + Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal/acked; + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); + } + + public static void kill(Nimbus.Client client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new FastRandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + + String name = "wc-test"; + if (args != null && args.length > 0) { + name = args[0]; + } + + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); + + Map clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + //Sleep for 5 mins + for (int i = 0; i < 10; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); + } + kill(client, name); + } +} diff --git a/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java new file mode 100644 index 00000000000..5df0688030e --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/InOrderDeliveryTest.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.StormSubmitter; +import backtype.storm.generated.*; +import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.FailedException; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.topology.base.BaseRichSpout; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; +import backtype.storm.utils.NimbusClient; +import backtype.storm.utils.Utils; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +public class InOrderDeliveryTest { + public static class InOrderSpout extends BaseRichSpout { + SpoutOutputCollector _collector; + int _base = 0; + int _i = 0; + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _base = context.getThisTaskIndex(); + } + + @Override + public void nextTuple() { + Values v = new Values(_base, _i); + _collector.emit(v, "ACK"); + _i++; + } + + @Override + public void ack(Object id) { + //Ignored + } + + @Override + public void fail(Object id) { + //Ignored + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("c1", "c2")); + } + } + + public static class Check extends BaseBasicBolt { + Map expected = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + Integer c1 = tuple.getInteger(0); + Integer c2 = tuple.getInteger(1); + Integer exp = expected.get(c1); + if (exp == null) exp = 0; + if (c2.intValue() != exp.intValue()) { + System.out.println(c1+" "+c2+" != "+exp); + throw new FailedException(c1+" "+c2+" != "+exp); + } + exp = c2 + 1; + expected.put(c1, exp); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + //Empty + } + } + + public static void printMetrics(Nimbus.Client client, String name) throws Exception { + ClusterSummary summary = client.getClusterInfo(); + String id = null; + for (TopologySummary ts: summary.get_topologies()) { + if (name.equals(ts.get_name())) { + id = ts.get_id(); + } + } + if (id == null) { + throw new Exception("Could not find a topology named "+name); + } + TopologyInfo info = client.getTopologyInfo(id); + int uptime = info.get_uptime_secs(); + long acked = 0; + long failed = 0; + double weightedAvgTotal = 0.0; + for (ExecutorSummary exec: info.get_executors()) { + if ("spout".equals(exec.get_component_id())) { + SpoutStats stats = exec.get_stats().get_specific().get_spout(); + Map failedMap = stats.get_failed().get(":all-time"); + Map ackedMap = stats.get_acked().get(":all-time"); + Map avgLatMap = stats.get_complete_ms_avg().get(":all-time"); + for (String key: ackedMap.keySet()) { + if (failedMap != null) { + Long tmp = failedMap.get(key); + if (tmp != null) { + failed += tmp; + } + } + long ackVal = ackedMap.get(key); + double latVal = avgLatMap.get(key) * ackVal; + acked += ackVal; + weightedAvgTotal += latVal; + } + } + } + double avgLatency = weightedAvgTotal/acked; + System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec: "+(((double)acked)/uptime+" failed: "+failed)); + } + + public static void kill(Nimbus.Client client, String name) throws Exception { + KillOptions opts = new KillOptions(); + opts.set_wait_secs(0); + client.killTopologyWithOpts(name, opts); + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new InOrderSpout(), 8); + builder.setBolt("count", new Check(), 8).fieldsGrouping("spout", new Fields("c1")); + + Config conf = new Config(); + conf.registerMetricsConsumer(backtype.storm.metric.LoggingMetricsConsumer.class); + + String name = "in-order-test"; + if (args != null && args.length > 0) { + name = args[0]; + } + + conf.setNumWorkers(1); + StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology()); + + Map clusterConf = Utils.readStormConfig(); + clusterConf.putAll(Utils.readCommandLineOpts()); + Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient(); + + //Sleep for 50 mins + for (int i = 0; i < 50; i++) { + Thread.sleep(30 * 1000); + printMetrics(client, name); + } + kill(client, name); + } +} diff --git a/pom.xml b/pom.xml index 7f525bb763f..f1973c75926 100644 --- a/pom.xml +++ b/pom.xml @@ -191,7 +191,7 @@ 1.11 4.3.3 0.2.4 - 2.10.4 + 3.3.2 0.9.0 16.0.1 3.9.0.Final @@ -542,7 +542,7 @@ ${clojure.tools.cli.version} - com.googlecode.disruptor + com.lmax disruptor ${disruptor.version} diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 7dc5c030ea7..f72452eb466 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -205,7 +205,7 @@ compile - com.googlecode.disruptor + com.lmax disruptor diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj index 746fde38733..e4780df07da 100644 --- a/storm-core/src/clj/backtype/storm/daemon/executor.clj +++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj @@ -225,8 +225,7 @@ (str "executor" executor-id "-send-queue") (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :claim-strategy :single-threaded - :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) + :producer-type :single-threaded) ] (recursive-map :worker worker diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj index 781a9599c01..fc265081557 100644 --- a/storm-core/src/clj/backtype/storm/daemon/worker.clj +++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj @@ -186,8 +186,7 @@ ;; TODO: this depends on the type of executor (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))])) + (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS))])) (into {}) )) @@ -228,8 +227,7 @@ (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) - (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY)) + (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) receive-queue-map (->> executor-receive-queue-map diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj index 25e0050cd49..f8db4afdcbe 100644 --- a/storm-core/src/clj/backtype/storm/disruptor.clj +++ b/storm-core/src/clj/backtype/storm/disruptor.clj @@ -16,39 +16,21 @@ (ns backtype.storm.disruptor (:import [backtype.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) - (:import [com.lmax.disruptor MultiThreadedClaimStrategy SingleThreadedClaimStrategy - BlockingWaitStrategy SleepingWaitStrategy YieldingWaitStrategy - BusySpinWaitStrategy]) + (:import [com.lmax.disruptor.dsl ProducerType]) (:require [clojure [string :as str]]) (:require [clojure [set :as set]]) (:use [clojure walk]) (:use [backtype.storm util log])) -(def CLAIM-STRATEGY - {:multi-threaded (fn [size] (MultiThreadedClaimStrategy. (int size))) - :single-threaded (fn [size] (SingleThreadedClaimStrategy. (int size)))}) +(def PRODUCER-TYPE + {:multi-threaded ProducerType/MULTI + :single-threaded ProducerType/SINGLE}) -(def WAIT-STRATEGY - {:block (fn [] (BlockingWaitStrategy.)) - :yield (fn [] (YieldingWaitStrategy.)) - :sleep (fn [] (SleepingWaitStrategy.)) - :spin (fn [] (BusySpinWaitStrategy.))}) - -(defn- mk-wait-strategy - [spec] - (if (keyword? spec) - ((WAIT-STRATEGY spec)) - (-> (str spec) new-instance))) - -;; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue), as sometimes the consumer stays blocked even when there's an item on the queue. -;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message -;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue, -;; unblocking the consumer (defnk disruptor-queue - [^String queue-name buffer-size timeout :claim-strategy :multi-threaded :wait-strategy :block] + [^String queue-name buffer-size timeout :producer-type :multi-threaded] (DisruptorQueue. queue-name - ((CLAIM-STRATEGY claim-strategy) buffer-size) - (mk-wait-strategy wait-strategy) timeout)) + (PRODUCER-TYPE producer-type) buffer-size + timeout)) (defn clojure-handler [afn] diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index c886e401f05..d7399fb7d1e 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -1361,9 +1361,11 @@ public class Config extends HashMap { /** + * @deprecated this is no longer supported * Configure the wait strategy used for internal queuing. Can be used to tradeoff latency * vs. throughput */ + @Deprecated public static final String TOPOLOGY_DISRUPTOR_WAIT_STRATEGY="topology.disruptor.wait.strategy"; public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class; @@ -1532,7 +1534,7 @@ public class Config extends HashMap { * vs. CPU usage */ public static final String TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS="topology.disruptor.wait.timeout.millis"; - public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullPosIntegerValidator; + public static final Object TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT_MILLIS_SCHEMA = ConfigValidation.NotNullIntegerValidator; /** * Which implementation of {@link backtype.storm.codedistributor.ICodeDistributor} should be used by storm for code diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java index fd9dae7bf03..96245f1e657 100644 --- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java +++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java @@ -185,6 +185,24 @@ public void validateField(String pd, String name, Object field) */ public static Object MapsValidator = listFv(Map.class, true); + /** + * Validates a non null Integer + */ + public static Object NotNullIntegerValidator = new FieldValidator() { + @Override + public void validateField(String name, Object o) throws IllegalArgumentException { + final long i; + if (o instanceof Number && + (i = ((Number)o).longValue()) == ((Number)o).doubleValue()) { + if (i <= Integer.MAX_VALUE && i >= Integer.MIN_VALUE) { + return; + } + } + + throw new IllegalArgumentException("Field " + name + " must be an Integer"); + } + }; + /** * Validates a non null Integer > 0 */ diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java index 72590e5a464..1d6466af723 100644 --- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - *

+ * * http://www.apache.org/licenses/LICENSE-2.0 - *

+ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -18,67 +18,73 @@ package backtype.storm.utils; import com.lmax.disruptor.AlertException; -import com.lmax.disruptor.ClaimStrategy; import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.InsufficientCapacityException; +import com.lmax.disruptor.LiteBlockingWaitStrategy; import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.Sequence; import com.lmax.disruptor.SequenceBarrier; -import com.lmax.disruptor.SingleThreadedClaimStrategy; +import com.lmax.disruptor.TimeoutBlockingWaitStrategy; +import com.lmax.disruptor.TimeoutException; import com.lmax.disruptor.WaitStrategy; +import com.lmax.disruptor.dsl.ProducerType; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.HashMap; -import java.util.Map; import backtype.storm.metric.api.IStatefulObject; - /** * A single consumer queue that uses the LMAX Disruptor. They key to the performance is * the ability to catch up to the producer by processing tuples in batches. */ public class DisruptorQueue implements IStatefulObject { - static final Object FLUSH_CACHE = new Object(); - static final Object INTERRUPT = new Object(); + private static final Object FLUSH_CACHE = new Object(); + private static final Object INTERRUPT = new Object(); + private static final String PREFIX = "disruptor-"; - RingBuffer _buffer; - Sequence _consumer; - SequenceBarrier _barrier; + private final RingBuffer _buffer; + private final Sequence _consumer; + private final SequenceBarrier _barrier; - // TODO: consider having a threadlocal cache of this variable to speed up reads? - volatile boolean consumerStartedFlag = false; - ConcurrentLinkedQueue _cache = new ConcurrentLinkedQueue(); + private volatile boolean consumerStartedFlag = false; + private final ConcurrentLinkedQueue _cache = new ConcurrentLinkedQueue(); private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock(); private final Lock readLock = cacheLock.readLock(); private final Lock writeLock = cacheLock.writeLock(); - private static String PREFIX = "disruptor-"; private String _queueName = ""; - private long _waitTimeout; - private final QueueMetrics _metrics; private DisruptorBackpressureCallback _cb = null; private int _highWaterMark = 0; private int _lowWaterMark = 0; private boolean _enableBackpressure = false; - public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, long timeout) { + public DisruptorQueue(String queueName, ProducerType type, int size, long timeout) { this._queueName = PREFIX + queueName; - _buffer = new RingBuffer(new ObjectEventFactory(), claim, wait); + WaitStrategy wait = null; + if (timeout <= 0) { + wait = new LiteBlockingWaitStrategy(); + } else { + wait = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); + } + + _buffer = RingBuffer.create(type, new ObjectEventFactory(), size, wait); _consumer = new Sequence(); _barrier = _buffer.newBarrier(); - _buffer.setGatingSequences(_consumer); + _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); - if (claim instanceof SingleThreadedClaimStrategy) { + if (type == ProducerType.SINGLE) { consumerStartedFlag = true; } else { // make sure we flush the pending messages in cache first @@ -88,8 +94,6 @@ public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait, throw new RuntimeException("This code should be unreachable!", e); } } - - _waitTimeout = timeout; } public String getName() { @@ -97,7 +101,9 @@ public String getName() { } public void consumeBatch(EventHandler handler) { - consumeBatchToCursor(_barrier.getCursor(), handler); + if (_metrics.population() > 0) { + consumeBatchWhenAvailable(handler); + } } public void haltWithInterrupt() { @@ -107,9 +113,12 @@ public void haltWithInterrupt() { public void consumeBatchWhenAvailable(EventHandler handler) { try { final long nextSequence = _consumer.get() + 1; - final long availableSequence = - _waitTimeout == 0L ? _barrier.waitFor(nextSequence) : _barrier.waitFor(nextSequence, _waitTimeout, - TimeUnit.MILLISECONDS); + long availableSequence = 0; + try { + availableSequence = _barrier.waitFor(nextSequence); + } catch (TimeoutException te) { + availableSequence = _barrier.getCursor(); + } if (availableSequence >= nextSequence) { consumeBatchToCursor(availableSequence, handler); @@ -121,12 +130,11 @@ public void consumeBatchWhenAvailable(EventHandler handler) { } } - private void consumeBatchToCursor(long cursor, EventHandler handler) { for (long curr = _consumer.get() + 1; curr <= cursor; curr++) { try { MutableObject mo = _buffer.get(curr); - Object o = mo.o; + Object o = mo.getObject(); mo.setObject(null); if (o == FLUSH_CACHE) { Object c = null; @@ -175,7 +183,6 @@ public void tryPublish(Object obj) throws InsufficientCapacityException { } public void publish(Object obj, boolean block) throws InsufficientCapacityException { - boolean publishNow = consumerStartedFlag; if (!publishNow) { @@ -210,13 +217,12 @@ private void publishDirect(Object obj, boolean block) throws InsufficientCapacit try { _cb.highWaterMark(); } catch (Exception e) { - throw new RuntimeException("Exception during calling highWaterMark callback!"); + throw new RuntimeException("Exception during calling highWaterMark callback!", e); } } } public void consumerStarted() { - consumerStartedFlag = true; // Use writeLock to make sure all pending cache add opearation completed diff --git a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java index d5cb7dbf7d7..2bd9bb1b3a7 100644 --- a/storm-core/src/jvm/backtype/storm/utils/MutableObject.java +++ b/storm-core/src/jvm/backtype/storm/utils/MutableObject.java @@ -18,7 +18,7 @@ package backtype.storm.utils; public class MutableObject { - Object o = null; + private Object o = null; public MutableObject() { @@ -28,11 +28,11 @@ public MutableObject(Object o) { this.o = o; } - public void setObject(Object o) { + public synchronized void setObject(Object o) { this.o = o; } - public Object getObject() { + public synchronized Object getObject() { return o; } } diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java index 197744f8ec4..9a0f5a7d40f 100644 --- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java +++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueBackpressureTest.java @@ -20,9 +20,8 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; -import com.lmax.disruptor.MultiThreadedClaimStrategy; +import com.lmax.disruptor.dsl.ProducerType; import org.junit.Assert; import org.junit.Test; import junit.framework.TestCase; @@ -30,7 +29,6 @@ import org.slf4j.LoggerFactory; public class DisruptorQueueBackpressureTest extends TestCase { - private static final Logger LOG = LoggerFactory.getLogger(DisruptorQueueBackpressureTest.class); private final static int MESSAGES = 100; @@ -38,7 +36,6 @@ public class DisruptorQueueBackpressureTest extends TestCase { private final static double HIGH_WATERMARK = 0.6; private final static double LOW_WATERMARK = 0.2; - @Test public void testBackPressureCallback() throws Exception { @@ -109,7 +106,6 @@ public void lowWaterMark() throws Exception { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, new MultiThreadedClaimStrategy( - queueSize), new BlockingWaitStrategy(), 10L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L); } } diff --git a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java index ddc0982b655..d9154a5fc6f 100644 --- a/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java +++ b/storm-core/test/jvm/backtype/storm/utils/DisruptorQueueTest.java @@ -20,10 +20,9 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; -import com.lmax.disruptor.BlockingWaitStrategy; import com.lmax.disruptor.EventHandler; +import com.lmax.disruptor.dsl.ProducerType; import com.lmax.disruptor.InsufficientCapacityException; -import com.lmax.disruptor.MultiThreadedClaimStrategy; import org.junit.Assert; import org.junit.Test; import junit.framework.TestCase; @@ -83,25 +82,53 @@ public void onEvent(Object obj, long sequence, boolean endOfBatch) messageConsumed.get()); } + @Test + public void testInOrder() throws InterruptedException { + final AtomicBoolean allInOrder = new AtomicBoolean(true); + + DisruptorQueue queue = createQueue("consumerHang", 1024); + Runnable producer = new IncProducer(queue, 1024*1024); + Runnable consumer = new Consumer(queue, new EventHandler() { + long _expected = 0; + @Override + public void onEvent(Object obj, long sequence, boolean endOfBatch) + throws Exception { + if (_expected != ((Number)obj).longValue()) { + allInOrder.set(false); + System.out.println("Expected "+_expected+" but got "+obj); + } + _expected++; + } + }); + + run(producer, consumer, 1000, 1); + Assert.assertTrue("Messages delivered out of order", + allInOrder.get()); + } private void run(Runnable producer, Runnable consumer) throws InterruptedException { + run(producer, consumer, 10, PRODUCER_NUM); + } - Thread[] producerThreads = new Thread[PRODUCER_NUM]; - for (int i = 0; i < PRODUCER_NUM; i++) { + private void run(Runnable producer, Runnable consumer, int sleepMs, int producerNum) + throws InterruptedException { + + Thread[] producerThreads = new Thread[producerNum]; + for (int i = 0; i < producerNum; i++) { producerThreads[i] = new Thread(producer); producerThreads[i].start(); } Thread consumerThread = new Thread(consumer); consumerThread.start(); - Thread.sleep(10); - for (int i = 0; i < PRODUCER_NUM; i++) { + Thread.sleep(sleepMs); + for (int i = 0; i < producerNum; i++) { producerThreads[i].interrupt(); } consumerThread.interrupt(); - for (int i = 0; i < PRODUCER_NUM; i++) { + for (int i = 0; i < producerNum; i++) { producerThreads[i].join(TIMEOUT); assertFalse("producer "+i+" is still alive", producerThreads[i].isAlive()); } @@ -109,6 +136,27 @@ private void run(Runnable producer, Runnable consumer) assertFalse("consumer is still alive", consumerThread.isAlive()); } + private static class IncProducer implements Runnable { + private DisruptorQueue queue; + private long _max; + + IncProducer(DisruptorQueue queue, long max) { + this.queue = queue; + this._max = max; + } + + @Override + public void run() { + try { + for (long i = 0; i < _max; i++) { + queue.publish(i, false); + } + } catch (InsufficientCapacityException e) { + return; + } + } + }; + private static class Producer implements Runnable { private String msg; private DisruptorQueue queue; @@ -153,7 +201,6 @@ public void run() { }; private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, new MultiThreadedClaimStrategy( - queueSize), new BlockingWaitStrategy(), 10L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L); } }