From cae541795c632f5ba4799b24a730018ec75ffb1b Mon Sep 17 00:00:00 2001 From: bchambers Date: Tue, 19 Apr 2016 11:34:25 -0700 Subject: [PATCH 1/4] Remove unused generic arguments in ReduceFnRunnerTest. --- .../beam/sdk/util/ReduceFnRunnerTest.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 1939b20feed2e..5eccb0409306a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -500,8 +500,8 @@ public void testPaneInfoAllStatesAfterWatermark() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withTrigger(Repeatedly.forever(AfterFirst.of( - AfterPane.elementCountAtLeast(2), - AfterWatermark.pastEndOfWindow()))) + AfterPane.elementCountAtLeast(2), + AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); @@ -552,8 +552,8 @@ public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withTrigger(Repeatedly.forever(AfterFirst.of( - AfterPane.elementCountAtLeast(2), - AfterWatermark.pastEndOfWindow()))) + AfterPane.elementCountAtLeast(2), + AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); @@ -604,8 +604,8 @@ public void testPaneInfoFinalAndOnTime() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withTrigger( - Repeatedly.forever(AfterPane.elementCountAtLeast(2)) - .orFinally(AfterWatermark.pastEndOfWindow())) + Repeatedly.forever(AfterPane.elementCountAtLeast(2)) + .orFinally(AfterWatermark.pastEndOfWindow())) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); @@ -890,7 +890,7 @@ public void testDropDataMultipleWindowsFinishedTrigger() throws Exception { ReduceFnTester tester = ReduceFnTester.combining( WindowingStrategy.of( SlidingWindows.of(Duration.millis(100)).every(Duration.millis(30))) - .withTrigger(AfterWatermark.pastEndOfWindow()) + .withTrigger(AfterWatermark.pastEndOfWindow()) .withAllowedLateness(Duration.millis(1000)), new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of()); @@ -1024,7 +1024,7 @@ public void testEmptyOnTimeFromOrFinally() throws Exception { .forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), + .orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(25)))), @@ -1074,7 +1074,7 @@ public void testProcessingTime() throws Exception { .forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(5))) - .orFinally(AfterWatermark.pastEndOfWindow()), + .orFinally(AfterWatermark.pastEndOfWindow()), Repeatedly.forever( AfterProcessingTime.pastFirstElementInPane().plusDelayOf( new Duration(25)))), From 4ab1c175f188e03f0ef2a5b6b2019c1e0ba27260 Mon Sep 17 00:00:00 2001 From: bchambers Date: Tue, 19 Apr 2016 12:43:52 -0700 Subject: [PATCH 2/4] Add test for empty ON_TIME and no empty final pane Add a test that we get an empty `ON_TIME` pane, and don't get the empty final pane when using accumulation mode with the only if non-empty `ClosingBehavior`. --- .../beam/sdk/util/ReduceFnRunnerTest.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 5eccb0409306a..e723d821efe41 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -547,6 +547,32 @@ public void testPaneInfoAllStatesAfterWatermark() throws Exception { WindowMatchers.isSingleWindowedValue(emptyIterable(), 9, 0, 10))); } + @Test + public void noEmptyPanes() throws Exception { + ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger(Repeatedly.forever(AfterFirst.of( + AfterPane.elementCountAtLeast(2), + AfterWatermark.pastEndOfWindow()))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); + + tester.advanceInputWatermark(new Instant(0)); + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(2, new Instant(2))); + tester.advanceInputWatermark(new Instant(20)); + tester.advanceInputWatermark(new Instant(250)); + + List>> output = tester.extractOutput(); + assertThat(output, contains( + // Trigger with 2 elements + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), + // Trigger for the empty on time pane. + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); + } + @Test public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( From 5d03f6e7d464ebcf3ff8763059a697d752e645dd Mon Sep 17 00:00:00 2001 From: bchambers Date: Tue, 19 Apr 2016 13:17:11 -0700 Subject: [PATCH 3/4] fixup! Add a test with FIRE_ALWAYS --- .../beam/sdk/util/ReduceFnRunnerTest.java | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index e723d821efe41..b266794fa9bb2 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -548,7 +548,7 @@ public void testPaneInfoAllStatesAfterWatermark() throws Exception { } @Test - public void noEmptyPanes() throws Exception { + public void noEmptyPanesFinalIfNonEmpty() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) .withTrigger(Repeatedly.forever(AfterFirst.of( @@ -569,10 +569,38 @@ public void noEmptyPanes() throws Exception { assertThat(output, contains( // Trigger with 2 elements WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), - // Trigger for the empty on time pane. + // Trigger for the empty on time pane WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); } + @Test + public void noEmptyPanesFinalAlways() throws Exception { + ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( + WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) + .withTrigger(Repeatedly.forever(AfterFirst.of( + AfterPane.elementCountAtLeast(2), + AfterWatermark.pastEndOfWindow()))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) + .withAllowedLateness(Duration.millis(100)) + .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); + + tester.advanceInputWatermark(new Instant(0)); + tester.injectElements( + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(2, new Instant(2))); + tester.advanceInputWatermark(new Instant(20)); + tester.advanceInputWatermark(new Instant(250)); + + List>> output = tester.extractOutput(); + assertThat(output, contains( + // Trigger with 2 elements + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 1, 0, 10), + // Trigger for the empty on time pane + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10), + // Trigger for the final pane + WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); + } + @Test public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining( From a0c9409e82c3095f1bce31614f224bb6442b5d9b Mon Sep 17 00:00:00 2001 From: bchambers Date: Tue, 19 Apr 2016 14:21:46 -0700 Subject: [PATCH 4/4] fixup! Checkstyle --- .../test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index b266794fa9bb2..65b5ee68fd7a9 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -600,7 +600,7 @@ public void noEmptyPanesFinalAlways() throws Exception { // Trigger for the final pane WindowMatchers.isSingleWindowedValue(containsInAnyOrder(1, 2), 9, 0, 10))); } - + @Test public void testPaneInfoAllStatesAfterWatermarkAccumulating() throws Exception { ReduceFnTester, IntervalWindow> tester = ReduceFnTester.nonCombining(