From 62d4528b4f20c83d0f7237d31b1f8af4c38377d8 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 14 Jun 2016 16:04:10 -0700 Subject: [PATCH 1/4] Add test for ReduceFnRunner GC time overflow --- .../beam/sdk/util/ReduceFnRunnerTest.java | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 0df4bc6f2eec8..5aa15b9d739b1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -51,6 +51,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; @@ -226,6 +227,33 @@ public void testOnElementCombiningDiscarding() throws Exception { tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); } + /** + * Tests that the garbage collection time for a fixed window does not overflow + * the end of time. + */ + @Test + public void testFixedWindowEndOfTimeGarbageCollection() throws Exception { + // Test basic execution of a trigger using a non-combining window set and accumulating mode. + ReduceFnTester tester = + ReduceFnTester.combining( + FixedWindows.of(Duration.millis(10)), + Never.ever(), + AccumulationMode.DISCARDING_FIRED_PANES, + new Sum.SumIntegerFn().asKeyedFn(), + VarIntCoder.of(), + Duration.standardDays(365)); + + // Insert one element where the window it is in ends prior to the end of the global windo, but + // the GC time is past the end of the global window + tester.injectElements( + TimestampedValue.of( + 0, new Instant(GlobalWindow.INSTANCE.maxTimestamp().minus(Duration.millis(15))))); + + // Should fire ON_TIME pane and there will be a checkState that the cleanup time + // is prior to timestamp max value + tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp()); + } + @Test public void testOnElementCombiningAccumulating() throws Exception { // Test basic execution of a trigger using a non-combining window set and accumulating mode. From d741cb72e92e5ad4c137eff70bbc82c4d748bdd7 Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 14 Jun 2016 16:10:09 -0700 Subject: [PATCH 2/4] Fix overflow in ReduceFnRunner garbage collection times --- .../org/apache/beam/sdk/util/ReduceFnRunner.java | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 34208da3accba..7d9137ab12703 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -936,16 +936,17 @@ private void cancelEndOfWindowAndGarbageCollectionTimers( } /** - * Return when {@code window} should be garbage collected. If the window is the GlobalWindow, - * that will be the end of the window. Otherwise, add the allowed lateness to the end of - * the window. + * Return when {@code window} should be garbage collected. If the window's expiration time is on + * or after the end of the global window, it will be truncated to the end of the global window. */ private Instant garbageCollectionTime(W window) { - Instant maxTimestamp = window.maxTimestamp(); - if (maxTimestamp.isBefore(GlobalWindow.INSTANCE.maxTimestamp())) { - return maxTimestamp.plus(windowingStrategy.getAllowedLateness()); + if (window + .maxTimestamp() + .isAfter( + GlobalWindow.INSTANCE.maxTimestamp().minus(windowingStrategy.getAllowedLateness()))) { + return GlobalWindow.INSTANCE.maxTimestamp(); } else { - return maxTimestamp; + return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness()); } } From 63c40e49e8ce2c79d4a5e7c027856398254a639f Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Tue, 14 Jun 2016 19:15:36 -0700 Subject: [PATCH 3/4] fixup! --- .../org/apache/beam/sdk/WindowMatchers.java | 5 ++ .../beam/sdk/util/ReduceFnRunnerTest.java | 60 +++++++++++++++---- 2 files changed, 55 insertions(+), 10 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java index b47c32c4940d6..7a5e2fbba9f36 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java @@ -47,6 +47,11 @@ public static Matcher> isWindowedValue( return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); } + public static Matcher> isWindowedValue( + Matcher valueMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); + } + public static Matcher> isSingleWindowedValue( T value, long timestamp, long windowStart, long windowEnd) { return WindowMatchers.isSingleWindowedValue( diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 5aa15b9d739b1..b7ec540784344 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -60,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -80,6 +82,7 @@ import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -228,30 +231,67 @@ public void testOnElementCombiningDiscarding() throws Exception { } /** - * Tests that the garbage collection time for a fixed window does not overflow - * the end of time. + * Tests that the garbage collection time for a fixed window does not overflow the end of time. */ @Test public void testFixedWindowEndOfTimeGarbageCollection() throws Exception { + + Duration allowedLateness = Duration.standardDays(365); + Duration windowSize = Duration.millis(10); + WindowFn windowFn = FixedWindows.of(windowSize); + + // This timestamp falls into a window where the end of the window is before the end of the + // global window - the "end of time" - yet its expiration time is after. + final Instant elementTimestamp = + GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1); + + IntervalWindow window = Iterables.getOnlyElement( + windowFn.assignWindows( + windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException(); + } + @Override + public Instant timestamp() { + return elementTimestamp; + } + + @Override + public Collection windows() { + throw new UnsupportedOperationException(); + } + })); + + assertTrue( + window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp())); + assertTrue( + window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp())); + // Test basic execution of a trigger using a non-combining window set and accumulating mode. ReduceFnTester tester = ReduceFnTester.combining( - FixedWindows.of(Duration.millis(10)), - Never.ever(), + windowFn, + AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(), AccumulationMode.DISCARDING_FIRED_PANES, new Sum.SumIntegerFn().asKeyedFn(), VarIntCoder.of(), - Duration.standardDays(365)); + allowedLateness); - // Insert one element where the window it is in ends prior to the end of the global windo, but - // the GC time is past the end of the global window - tester.injectElements( - TimestampedValue.of( - 0, new Instant(GlobalWindow.INSTANCE.maxTimestamp().minus(Duration.millis(15))))); + tester.injectElements(TimestampedValue.of(13, elementTimestamp)); // Should fire ON_TIME pane and there will be a checkState that the cleanup time // is prior to timestamp max value + tester.advanceInputWatermark(window.maxTimestamp()); + + // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner) + assertThat(tester.extractOutput(), emptyIterable()); + + tester.injectElements(TimestampedValue.of(42, elementTimestamp)); + + // Now the final pane should fire, demonstrating that the GC time was truncated tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp()); + assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55)))); } @Test From 37088d804a6f7aaa80ee8e500cbc6b1f47ae957a Mon Sep 17 00:00:00 2001 From: Kenneth Knowles Date: Wed, 15 Jun 2016 21:21:56 -0700 Subject: [PATCH 4/4] fixup! --- .../java/org/apache/beam/sdk/util/ReduceFnRunner.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java index 7d9137ab12703..864e8e7ad0c70 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReduceFnRunner.java @@ -940,10 +940,14 @@ private void cancelEndOfWindowAndGarbageCollectionTimers( * or after the end of the global window, it will be truncated to the end of the global window. */ private Instant garbageCollectionTime(W window) { - if (window + + // If the end of the window + allowed lateness is beyond the "end of time" aka the end of the + // global window, then we truncate it. The conditional is phrased like it is because the + // addition of EOW + allowed lateness might even overflow the maximum allowed Instant + if (GlobalWindow.INSTANCE .maxTimestamp() - .isAfter( - GlobalWindow.INSTANCE.maxTimestamp().minus(windowingStrategy.getAllowedLateness()))) { + .minus(windowingStrategy.getAllowedLateness()) + .isBefore(window.maxTimestamp())) { return GlobalWindow.INSTANCE.maxTimestamp(); } else { return window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());