From 0d1e40bfdd828d32d9e9af7618209aeebb37a1fa Mon Sep 17 00:00:00 2001 From: Anishek Agarwal Date: Tue, 23 Jun 2015 09:48:09 +0530 Subject: [PATCH 1/2] Ability to emit from the spout to a specific stream, by default it will emit to the default stream. --- external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java | 2 +- .../src/jvm/storm/kafka/PartitionManager.java | 9 +++++++-- .../storm-kafka/src/jvm/storm/kafka/SpoutConfig.java | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java index f3bc3ea499b..4419f742c34 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java @@ -138,7 +138,7 @@ public void nextTuple() { try { // in case the number of managers decreased _currPartitionIndex = _currPartitionIndex % managers.size(); - EmitState state = managers.get(_currPartitionIndex).next(_collector); + EmitState state = managers.get(_currPartitionIndex).next(_collector, _spoutConfig.emitStreamId); if (state != EmitState.EMITTED_MORE_LEFT) { _currPartitionIndex = (_currPartitionIndex + 1) % managers.size(); } diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 00ab981e0b4..90d07300d98 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -23,6 +23,7 @@ import backtype.storm.metric.api.MeanReducer; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.spout.SpoutOutputCollector; +import backtype.storm.utils.Utils; import com.google.common.collect.ImmutableMap; import kafka.javaapi.consumer.SimpleConsumer; import kafka.javaapi.message.ByteBufferMessageSet; @@ -125,8 +126,12 @@ public Map getMetricsDataMap() { return ret; } - //returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector) { + return this.next(collector, Utils.DEFAULT_STREAM_ID); + } + + //returns false if it's reached the end of current batch + public EmitState next(SpoutOutputCollector collector, String emitStreamId) { if (_waitingToEmit.isEmpty()) { fill(); } @@ -138,7 +143,7 @@ public EmitState next(SpoutOutputCollector collector) { Iterable> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg); if (tups != null) { for (List tup : tups) { - collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset)); + collector.emit(emitStreamId, tup, new KafkaMessageId(_partition, toEmit.offset)); } break; } else { diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index 61d0b355667..c706daef680 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -29,6 +29,7 @@ public class SpoutConfig extends KafkaConfig implements Serializable { // setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; + public String emitStreamId; // Exponential back-off retry settings. These are used when retrying messages after a bolt // calls OutputCollector.fail(). From 2be0d1671747bf0c5d1a8e084fdeb8cb7c20c340 Mon Sep 17 00:00:00 2001 From: Anishek Agarwal Date: Sat, 27 Jun 2015 23:08:34 +0530 Subject: [PATCH 2/2] STORM-917 :: Ability to emit from the spout to a specific stream, by default it will emit to the default stream. --- external/storm-kafka/README.md | 2 ++ .../storm-kafka/src/jvm/storm/kafka/PartitionManager.java | 4 ---- external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java | 5 ++++- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index 1e9cc120ca3..00aaf88bb7c 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -70,6 +70,8 @@ In addition to these parameters, SpoutConfig contains the following fields that public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000; + // by default the spout emits to the default storm stream. Provide a different stream id to change it. + public String emitStreamId = Utils.DEFAULT_STREAM_ID; ``` Core KafkaSpout only accepts an instance of SpoutConfig. diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java index 90d07300d98..499547481fd 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java @@ -126,10 +126,6 @@ public Map getMetricsDataMap() { return ret; } - public EmitState next(SpoutOutputCollector collector) { - return this.next(collector, Utils.DEFAULT_STREAM_ID); - } - //returns false if it's reached the end of current batch public EmitState next(SpoutOutputCollector collector, String emitStreamId) { if (_waitingToEmit.isEmpty()) { diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index c706daef680..1175ac28b1f 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -17,6 +17,8 @@ */ package storm.kafka; +import backtype.storm.utils.Utils; + import java.io.Serializable; import java.util.List; @@ -29,7 +31,8 @@ public class SpoutConfig extends KafkaConfig implements Serializable { // setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; - public String emitStreamId; + // by default the spout emits to the default storm stream. Provide a different stream id to change it. + public String emitStreamId = Utils.DEFAULT_STREAM_ID; // Exponential back-off retry settings. These are used when retrying messages after a bolt // calls OutputCollector.fail().