From e1df0f2cdae123df301d2b8cebf42931852fe6b8 Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 5 Apr 2016 15:12:16 -0700 Subject: [PATCH 1/2] Formatting fiddles --- .../cloud/dataflow/sdk/util/MergingActiveWindowSet.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java index 8fee332fe748..4a19bbd4c036 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/MergingActiveWindowSet.java @@ -44,16 +44,15 @@ /** * An {@link ActiveWindowSet} for merging {@link WindowFn} implementations. - *

+ * *

The underlying notion of {@link MergingActiveWindowSet} is that of representing equivalence * classes of merged windows as a mapping from the merged "super-window" to a set of * state address windows in which some state has been persisted. The mapping need not * contain EPHEMERAL windows, because they are created and merged without any persistent state. * Each window must be a state address window for at most one window, so the mapping is * invertible. - *

+ * *

The states of a non-expired window are treated as follows: - *

*

- *

+ * *

To illustrate why an ACTIVE window need not be amongst its own state address windows, * consider two active windows W1 and W2 that are merged to form W12. Further writes may be * applied to either of W1 or W2, since a read of W12 implies reading both of W12 and merging From 65874af3abf35eed66976b47f9712eec4e06304c Mon Sep 17 00:00:00 2001 From: Mark Shields Date: Tue, 5 Apr 2016 21:20:06 -0700 Subject: [PATCH 2/2] Also reject closed pre-merged. --- .../dataflow/sdk/util/ReduceFnRunner.java | 26 +++++++++++++++---- .../dataflow/sdk/util/ReduceFnRunnerTest.java | 5 ++-- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java index 05c5d7de3287..db3c3a39f918 100644 --- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java +++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunner.java @@ -326,6 +326,15 @@ private void collectAndMergeWindows(Iterable> values) thro @SuppressWarnings("unchecked") W window = (W) untypedWindow; + ReduceFn.Context directContext = + contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. Ignore it now. + // We'll revisit it again in processElement, at which point will increment the + // accumulator. + continue; + } + // For backwards compat with pre 1.4 only. if (activeWindows.isActive(window)) { Set stateAddressWindows = activeWindows.readStateAddresses(window); @@ -341,9 +350,6 @@ private void collectAndMergeWindows(Iterable> values) thro } // Add this window as NEW if it is not currently ACTIVE or MERGED. - // If we had already seen this window and closed its trigger, then the - // window will not be ACTIVE or MERGED. It will then be added as NEW here, - // and fall into the merging logic as usual. activeWindows.ensureWindowExists(window); } } @@ -445,6 +451,17 @@ private Collection processElement(WindowedValue value) throws Excepti ReduceFn.Context directContext = contextFactory.base(window, StateStyle.DIRECT); + if (triggerRunner.isClosed(directContext.state())) { + // This window has already been closed. Reject it now. + droppedDueToClosedWindow.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since original window is no longer active at inputWatermark:{}; outputWatermark:{}", + value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + continue; + } + W active = activeWindows.mergeResultWindow(window); Preconditions.checkState(active != null, "Window %s has no mergeResultWindow", window); windows.add(active); @@ -462,13 +479,12 @@ private Collection processElement(WindowedValue value) throws Excepti for (W window : windows) { ReduceFn.ProcessValueContext directContext = contextFactory.forValue( window, value.getValue(), value.getTimestamp(), StateStyle.DIRECT); - if (triggerRunner.isClosed(directContext.state())) { // This window has already been closed. droppedDueToClosedWindow.addValue(1L); WindowTracing.debug( "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since window is no longer active at inputWatermark:{}; outputWatermark:{}", + + "since merged window is no longer active at inputWatermark:{}; outputWatermark:{}", value.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), timerInternals.currentOutputWatermarkTime()); continue; diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java index b2d246e77461..3e552d4bfd76 100644 --- a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/util/ReduceFnRunnerTest.java @@ -839,8 +839,7 @@ public void testMergingWithClosedRepresentative() throws Exception { } /** - * If an element for a closed session window ends up being merged into other still-open - * session windows, the resulting session window is not 'poisoned'. + * An element for a closed window does not contribute to merging of still open windows. */ @Test public void testMergingWithClosedDoesNotPoison() throws Exception { @@ -873,7 +872,7 @@ public void testMergingWithClosedDoesNotPoison() throws Exception { output.get(0).getPane(), equalTo(PaneInfo.createPane(true, true, Timing.EARLY, 0, 0))); assertThat(output.get(1), - isSingleWindowedValue(containsInAnyOrder(1, 2, 3), + isSingleWindowedValue(containsInAnyOrder(1, 3), 1, // timestamp 1, // window start 13)); // window end