From 7b7a8b31d3eb32d0feae16055590409db37bd9d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stig=20Rohde=20D=C3=B8ssing?= Date: Tue, 16 May 2017 21:27:41 +0200 Subject: [PATCH] STORM-2516: Fix timing issues with testPrepareLateTupleStreamWithoutBuilder --- .../windowing/WaterMarkEventGenerator.java | 28 ++++++++++++------- .../topology/WindowedBoltExecutorTest.java | 26 ++++++++--------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java index ef81d6ee393..29c6244ab3f 100644 --- a/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java +++ b/storm-client/src/jvm/org/apache/storm/windowing/WaterMarkEventGenerator.java @@ -15,12 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.windowing; -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.topology.FailedException; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +package org.apache.storm.windowing; import java.util.Map; import java.util.Set; @@ -30,6 +26,11 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.topology.FailedException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Tracks tuples across input streams and periodically emits watermark events. @@ -46,15 +47,22 @@ public class WaterMarkEventGenerator implements Runnable { private final ScheduledExecutorService executorService; private final int interval; private ScheduledFuture executorFuture; - private long lastWaterMarkTs = 0; + private volatile long lastWaterMarkTs; - public WaterMarkEventGenerator(WindowManager windowManager, int interval, - int eventTsLag, Set inputStreams) { + /** + * Creates a new WatermarkEventGenerator. + * @param windowManager The window manager this generator will submit watermark events to + * @param intervalMs The generator will check if it should generate a watermark event with this interval + * @param eventTsLagMs The max allowed lag behind the last watermark event before an event is considered late + * @param inputStreams The input streams this generator is expected to handle + */ + public WaterMarkEventGenerator(WindowManager windowManager, int intervalMs, + int eventTsLagMs, Set inputStreams) { this.windowManager = windowManager; streamToTs = new ConcurrentHashMap<>(); executorService = Executors.newSingleThreadScheduledExecutor(); - this.interval = interval; - this.eventTsLag = eventTsLag; + this.interval = intervalMs; + this.eventTsLag = eventTsLagMs; this.inputStreams = inputStreams; } diff --git a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java index a051a6b258b..76fdf338e48 100644 --- a/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java +++ b/storm-client/test/jvm/org/apache/storm/topology/WindowedBoltExecutorTest.java @@ -28,7 +28,6 @@ import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.TupleImpl; import org.apache.storm.tuple.Values; -import org.apache.storm.utils.Time; import org.apache.storm.windowing.TupleWindow; import org.junit.Before; import org.junit.Test; @@ -52,7 +51,7 @@ * Unit tests for {@link WindowedBoltExecutor} */ public class WindowedBoltExecutorTest { - + private WindowedBoltExecutor executor; private TestWindowedBolt testWindowedBolt; @@ -124,8 +123,8 @@ public void testExecuteWithoutTs() throws Exception { @Test public void testExecuteWithTs() throws Exception { - long[] timstamps = {603, 605, 607, 618, 626, 636}; - for (long ts : timstamps) { + long[] timestamps = {603, 605, 607, 618, 626, 636}; + for (long ts : timestamps) { executor.execute(getTuple("s1", new Fields("ts"), new Values(ts))); } //Thread.sleep(120); @@ -172,7 +171,7 @@ public void testPrepareLateTupleStreamWithoutTs() throws Exception { @Test - public void testPrepareLateTUpleStreamWithoutBuilder() throws Exception { + public void testPrepareLateTupleStreamWithoutBuilder() throws Exception { Map conf = new HashMap<>(); conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 100000); conf.put(Config.TOPOLOGY_BOLTS_WINDOW_LENGTH_DURATION_MS, 20); @@ -209,20 +208,21 @@ public void testExecuteWithLateTupleStream() throws Exception { conf.put(Config.TOPOLOGY_BOLTS_SLIDING_INTERVAL_DURATION_MS, 10); conf.put(Config.TOPOLOGY_BOLTS_LATE_TUPLE_STREAM, "$late"); conf.put(Config.TOPOLOGY_BOLTS_TUPLE_TIMESTAMP_MAX_LAG_MS, 5); - conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 10); + //Trigger manually to avoid timing issues + conf.put(Config.TOPOLOGY_BOLTS_WATERMARK_EVENT_INTERVAL_MS, 1_000_000); executor.prepare(conf, context, outputCollector); - long[] timstamps = {603, 605, 607, 618, 626, 636, 600}; - List tuples = new ArrayList<>(timstamps.length); + long[] timestamps = {603, 605, 607, 618, 626, 636, 600}; + List tuples = new ArrayList<>(timestamps.length); - executor.waterMarkEventGenerator.run(); - for (long ts : timstamps) { + for (long ts : timestamps) { Tuple tuple = getTuple("s1", new Fields("ts"), new Values(ts)); tuples.add(tuple); executor.execute(tuple); - Time.sleep(10); - } - + + //Update the watermark to this timestamp + executor.waterMarkEventGenerator.run(); + } System.out.println(testWindowedBolt.tupleWindows); Tuple tuple = tuples.get(tuples.size() - 1); Mockito.verify(outputCollector).emit("$late", Arrays.asList(tuple), new Values(tuple));