From 7d5285545a4d4f227f7f8616c2f769f055649560 Mon Sep 17 00:00:00 2001 From: ddragan Date: Fri, 25 Sep 2015 17:58:07 +0300 Subject: [PATCH 1/2] [STORM-123], add check case for external change of 'now' value. --- .../src/jvm/storm/kafka/KafkaSpout.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java index 1743810b616..9c34c573346 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + *

* http://www.apache.org/licenses/LICENSE-2.0 - * + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -154,8 +154,8 @@ public void nextTuple() { } } - long now = System.currentTimeMillis(); - if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) { + long diffWithNow = System.currentTimeMillis() - _lastUpdateMs; + if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) { commit(); } } @@ -185,11 +185,11 @@ public void deactivate() { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - if (_spoutConfig.topicAsStreamId) { - declarer.declareStream(_spoutConfig.topic, _spoutConfig.scheme.getOutputFields()); - } else { + if (_spoutConfig.topicAsStreamId) { + declarer.declareStream(_spoutConfig.topic, _spoutConfig.scheme.getOutputFields()); + } else { declarer.declare(_spoutConfig.scheme.getOutputFields()); - } + } } private void commit() { From cc082aa93e5da92c4c3f635420fabf8bb6e60bf3 Mon Sep 17 00:00:00 2001 From: dragan Date: Fri, 25 Sep 2015 19:37:50 +0300 Subject: [PATCH 2/2] add a comment description --- external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java index 9c34c573346..d9f1c529a8e 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java +++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java @@ -155,6 +155,11 @@ public void nextTuple() { } long diffWithNow = System.currentTimeMillis() - _lastUpdateMs; + + /* + As far as the System.currentTimeMillis() is dependent on System clock, + additional check on negative value of diffWithNow in case of external changes. + */ if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) { commit(); }