From 6c6bec9b1b2b07b5dfb5000297c11911eda5389d Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Wed, 1 Oct 2014 11:50:48 +0200 Subject: [PATCH 01/11] New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples. --- storm-core/src/jvm/backtype/storm/tuple/Tuple.java | 9 +++++++-- storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 8 +++++++- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java index 113b3003ccd..b3f5e56abcc 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java +++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java @@ -153,7 +153,7 @@ public interface Tuple { * Gets the id of the component that created this tuple. */ public String getSourceComponent(); - + /** * Gets the id of the task that created this tuple. */ @@ -163,7 +163,12 @@ public interface Tuple { * Gets the id of the stream that this tuple was emitted to. */ public String getSourceStreamId(); - + + /** + * Returns if this tuple is a tick tuple or not. + */ + public boolean isTick(); + /** * Gets the message id that associated with this tuple. */ diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java index 818eff15567..7ff2c8c6b08 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java +++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java @@ -17,6 +17,7 @@ */ package backtype.storm.tuple; +import backtype.storm.Constants; import backtype.storm.generated.GlobalStreamId; import backtype.storm.task.GeneralTopologyContext; import backtype.storm.utils.IndifferentAccessMap; @@ -212,7 +213,12 @@ public int getSourceTask() { public String getSourceStreamId() { return streamId; } - + + public boolean isTick() { + return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && + this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); + } + public MessageId getMessageId() { return id; } From 73e54a8f8b0ef9a62955c7c1cee20925d359772b Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Wed, 1 Oct 2014 11:51:42 +0200 Subject: [PATCH 02/11] KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka. --- external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java index b6c3de487c4..20257667190 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java @@ -89,6 +89,10 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(Tuple input) { + if (input.isTick()) { + return; // Do not try to send ticks to Kafka + } + K key = null; V message = null; String topic = null; From 8888ae631360ad124af9c480d2f8b621b9c51727 Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Wed, 1 Oct 2014 11:53:17 +0200 Subject: [PATCH 03/11] Use isTick in all relevant places to avoid code duplication. --- .../src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java index 4dfccc65a1f..da4c1a5fe42 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java @@ -299,7 +299,7 @@ private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { @Override public void execute(Tuple tuple) { - if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) { + if(tuple.isTick()) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); From 59b2a42af4419e9217849add60e62a0c863c1ece Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Wed, 29 Oct 2014 15:50:18 +0100 Subject: [PATCH 04/11] Replaced TupleHelpers in the examples with the new tuple.isTick() --- .../starter/bolt/AbstractRankerBolt.java | 3 +- .../storm/starter/bolt/RollingCountBolt.java | 3 +- .../jvm/storm/starter/util/TupleHelpers.java | 33 ------------------- 3 files changed, 2 insertions(+), 37 deletions(-) delete mode 100644 examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java index cc5c0e77233..83c2cfc1a73 100644 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java @@ -26,7 +26,6 @@ import backtype.storm.tuple.Values; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; -import storm.starter.util.TupleHelpers; import java.util.HashMap; import java.util.Map; @@ -78,7 +77,7 @@ protected Rankings getRankings() { */ @Override public final void execute(Tuple tuple, BasicOutputCollector collector) { - if (TupleHelpers.isTickTuple(tuple)) { + if (tuple.isTick()) { getLogger().debug("Received tick tuple, triggering emit of current rankings"); emitRankings(collector); } diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java index f83906cea90..f023c0b9231 100644 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java @@ -28,7 +28,6 @@ import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; -import storm.starter.util.TupleHelpers; import java.util.HashMap; import java.util.Map; @@ -95,7 +94,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(Tuple tuple) { - if (TupleHelpers.isTickTuple(tuple)) { + if (tuple.isTick()) { LOG.debug("Received tick tuple, triggering emit of current window counts"); emitCurrentWindowCounts(); } diff --git a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java deleted file mode 100644 index 4ea669ed2e1..00000000000 --- a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package storm.starter.util; - -import backtype.storm.Constants; -import backtype.storm.tuple.Tuple; - -public final class TupleHelpers { - - private TupleHelpers() { - } - - public static boolean isTickTuple(Tuple tuple) { - return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals( - Constants.SYSTEM_TICK_STREAM_ID); - } - -} From cd47f1d6e4165f8e3393aa5be01d02f4148c3216 Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Thu, 30 Oct 2014 10:27:05 +0100 Subject: [PATCH 05/11] Added missing ack for the tick --- external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 + 1 file changed, 1 insertion(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java index 20257667190..7de25db2071 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java @@ -90,6 +90,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(Tuple input) { if (input.isTick()) { + collector.ack(input); return; // Do not try to send ticks to Kafka } From 71bd4a435b882120f7542fcae678a52e20600539 Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Tue, 9 Dec 2014 13:53:05 +0100 Subject: [PATCH 06/11] Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves --- .../jvm/storm/starter/tools/MockTupleHelpers.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java index b253350ef4d..3180fd3da4e 100644 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java @@ -19,6 +19,7 @@ import backtype.storm.Constants; import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.TupleImpl; import static org.mockito.Mockito.*; @@ -28,13 +29,22 @@ private MockTupleHelpers() { } public static Tuple mockTickTuple() { - return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); + Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); + when(tuple.isTick()).thenReturn(true); + return tuple; } public static Tuple mockTuple(String componentId, String streamId) { Tuple tuple = mock(Tuple.class); when(tuple.getSourceComponent()).thenReturn(componentId); when(tuple.getSourceStreamId()).thenReturn(streamId); + when(tuple.isTick()).thenReturn(isTick(componentId, streamId)); return tuple; } + + private static boolean isTick(String componentId, String streamId) { + return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && + streamId.equals(Constants.SYSTEM_TICK_STREAM_ID); + } + } From 4e7a7e102723deb0f827946db28a88f7276b113e Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Tue, 9 Dec 2014 13:56:11 +0100 Subject: [PATCH 07/11] Code cleanup in mocking storm-starter tests --- .../test/jvm/storm/starter/tools/MockTupleHelpers.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java index 3180fd3da4e..374288e4d99 100644 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java @@ -29,9 +29,7 @@ private MockTupleHelpers() { } public static Tuple mockTickTuple() { - Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); - when(tuple.isTick()).thenReturn(true); - return tuple; + return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID); } public static Tuple mockTuple(String componentId, String streamId) { From 401ebebdf8b201b87ff944cc3c2217cc30acd1fc Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Tue, 9 Dec 2014 14:00:33 +0100 Subject: [PATCH 08/11] Code cleanup in mocking storm-starter tests --- .../test/jvm/storm/starter/tools/MockTupleHelpers.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java index 374288e4d99..9e8629ccd64 100644 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java @@ -19,7 +19,6 @@ import backtype.storm.Constants; import backtype.storm.tuple.Tuple; -import backtype.storm.tuple.TupleImpl; import static org.mockito.Mockito.*; @@ -39,9 +38,9 @@ public static Tuple mockTuple(String componentId, String streamId) { when(tuple.isTick()).thenReturn(isTick(componentId, streamId)); return tuple; } - + private static boolean isTick(String componentId, String streamId) { - return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && + return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && streamId.equals(Constants.SYSTEM_TICK_STREAM_ID); } From 4d2804aacb0a0abb60b5760a2f8bb3eb657ab67b Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Thu, 11 Dec 2014 12:35:01 +0100 Subject: [PATCH 09/11] Resolve NPE that can occur if there is no SourceComponent in a Tuple --- storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java index 7ff2c8c6b08..40ad11c3c78 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java +++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java @@ -215,8 +215,8 @@ public String getSourceStreamId() { } public boolean isTick() { - return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && - this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID); + return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) && + Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId()); } public MessageId getMessageId() { From 59fb8ded9ef892792879efc94b12ea46ff924c41 Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Mon, 15 Dec 2014 10:56:05 +0100 Subject: [PATCH 10/11] Refactor to move the isTick to a utility class --- .../starter/bolt/AbstractRankerBolt.java | 3 +- .../storm/starter/bolt/RollingCountBolt.java | 3 +- .../storm/starter/tools/MockTupleHelpers.java | 6 ---- .../src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 +-- .../src/jvm/backtype/storm/tuple/Tuple.java | 5 --- .../jvm/backtype/storm/tuple/TupleImpl.java | 5 --- .../jvm/backtype/storm/utils/TupleUtils.java | 35 +++++++++++++++++++ .../trident/topology/TridentBoltExecutor.java | 3 +- 8 files changed, 43 insertions(+), 21 deletions(-) create mode 100644 storm-core/src/jvm/backtype/storm/utils/TupleUtils.java diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java index 83c2cfc1a73..64ceb29b48a 100644 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java @@ -24,6 +24,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.TupleUtils; import org.apache.log4j.Logger; import storm.starter.tools.Rankings; @@ -77,7 +78,7 @@ protected Rankings getRankings() { */ @Override public final void execute(Tuple tuple, BasicOutputCollector collector) { - if (tuple.isTick()) { + if (TupleUtils.isTick(tuple)) { getLogger().debug("Received tick tuple, triggering emit of current rankings"); emitRankings(collector); } diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java index f023c0b9231..31f7ee2ba91 100644 --- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java +++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java @@ -25,6 +25,7 @@ import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; +import backtype.storm.utils.TupleUtils; import org.apache.log4j.Logger; import storm.starter.tools.NthLastModifiedTimeTracker; import storm.starter.tools.SlidingWindowCounter; @@ -94,7 +95,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(Tuple tuple) { - if (tuple.isTick()) { + if (TupleUtils.isTick(tuple)) { LOG.debug("Received tick tuple, triggering emit of current window counts"); emitCurrentWindowCounts(); } diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java index 9e8629ccd64..eeaeeae3e13 100644 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java @@ -35,13 +35,7 @@ public static Tuple mockTuple(String componentId, String streamId) { Tuple tuple = mock(Tuple.class); when(tuple.getSourceComponent()).thenReturn(componentId); when(tuple.getSourceStreamId()).thenReturn(streamId); - when(tuple.isTick()).thenReturn(isTick(componentId, streamId)); return tuple; } - private static boolean isTick(String componentId, String streamId) { - return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && - streamId.equals(Constants.SYSTEM_TICK_STREAM_ID); - } - } diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java index 7de25db2071..35c0da6ebdd 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java @@ -22,6 +22,7 @@ import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichBolt; import backtype.storm.tuple.Tuple; +import backtype.storm.utils.TupleUtils; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; @@ -89,8 +90,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(Tuple input) { - if (input.isTick()) { - collector.ack(input); + if (TupleUtils.isTick(input)) { return; // Do not try to send ticks to Kafka } diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java index b3f5e56abcc..c644feca2cd 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java +++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java @@ -164,11 +164,6 @@ public interface Tuple { */ public String getSourceStreamId(); - /** - * Returns if this tuple is a tick tuple or not. - */ - public boolean isTick(); - /** * Gets the message id that associated with this tuple. */ diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java index 40ad11c3c78..78293276b73 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java +++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java @@ -214,11 +214,6 @@ public String getSourceStreamId() { return streamId; } - public boolean isTick() { - return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) && - Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId()); - } - public MessageId getMessageId() { return id; } diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java new file mode 100644 index 00000000000..f9fb2c07c0b --- /dev/null +++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package backtype.storm.utils; + +import backtype.storm.Constants; +import backtype.storm.tuple.Tuple; + +public final class TupleUtils { + + private TupleUtils() { + // No instantiation + } + + public static boolean isTick(Tuple tuple) { + return tuple != null + && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent()) + && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId()); + } + +} diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java index da4c1a5fe42..41741a1a683 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java @@ -34,6 +34,7 @@ import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; import backtype.storm.utils.RotatingMap; +import backtype.storm.utils.TupleUtils; import backtype.storm.utils.Utils; import java.io.Serializable; import java.util.Arrays; @@ -299,7 +300,7 @@ private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { @Override public void execute(Tuple tuple) { - if(tuple.isTick()) { + if (TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate(); From 1ca5f7659ea807f1ae83694788e889ac5cae6669 Mon Sep 17 00:00:00 2001 From: Niels Basjes Date: Mon, 15 Dec 2014 10:58:49 +0100 Subject: [PATCH 11/11] Reducing the differences --- .../test/jvm/storm/starter/tools/MockTupleHelpers.java | 1 - storm-core/src/jvm/backtype/storm/tuple/Tuple.java | 4 ++-- storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 3 +-- .../src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +- 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java index eeaeeae3e13..b253350ef4d 100644 --- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java +++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java @@ -37,5 +37,4 @@ public static Tuple mockTuple(String componentId, String streamId) { when(tuple.getSourceStreamId()).thenReturn(streamId); return tuple; } - } diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java index c644feca2cd..113b3003ccd 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java +++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java @@ -153,7 +153,7 @@ public interface Tuple { * Gets the id of the component that created this tuple. */ public String getSourceComponent(); - + /** * Gets the id of the task that created this tuple. */ @@ -163,7 +163,7 @@ public interface Tuple { * Gets the id of the stream that this tuple was emitted to. */ public String getSourceStreamId(); - + /** * Gets the message id that associated with this tuple. */ diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java index 78293276b73..818eff15567 100644 --- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java +++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java @@ -17,7 +17,6 @@ */ package backtype.storm.tuple; -import backtype.storm.Constants; import backtype.storm.generated.GlobalStreamId; import backtype.storm.task.GeneralTopologyContext; import backtype.storm.utils.IndifferentAccessMap; @@ -213,7 +212,7 @@ public int getSourceTask() { public String getSourceStreamId() { return streamId; } - + public MessageId getMessageId() { return id; } diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java index 41741a1a683..a23e5559e56 100644 --- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java +++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java @@ -300,7 +300,7 @@ private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) { @Override public void execute(Tuple tuple) { - if (TupleUtils.isTick(tuple)) { + if(TupleUtils.isTick(tuple)) { long now = System.currentTimeMillis(); if(now - _lastRotate > _messageTimeoutMs) { _batches.rotate();