From 3a6dde76be043fcdc30a5e19168080cff6cda9e9 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 9 Dec 2016 11:16:01 -0800 Subject: [PATCH 1/2] Only increase counters in LateDroppingDoFnRunner once, even if iterated multiple times. --- .../sdk/util/LateDataDroppingDoFnRunner.java | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java index 3dfa06474e..721953a644 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java @@ -112,20 +112,30 @@ public WindowedValue apply(BoundedWindow window) { }); }}); + Iterable> concatElements = Iterables.concat(windowsExpandedElements); + + // Bump the counter separately since we don't want multiple iterations to + // increase it multiple times. + for (WindowedValue input : concatElements) { + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + if (canDropDueToExpiredWindow(window)) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since too far behind inputWatermark:{}; outputWatermark:{}", + input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + } + } + Iterable> nonLateElements = Iterables.filter( - Iterables.concat(windowsExpandedElements), + concatElements, new Predicate>() { @Override public boolean apply(WindowedValue input) { BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); if (canDropDueToExpiredWindow(window)) { - // The element is too late for this window. - droppedDueToLateness.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since too far behind inputWatermark:{}; outputWatermark:{}", - input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); return false; } else { return true; From 9b96b97b6eaf8f0b4d280144fdebbe55b82384f8 Mon Sep 17 00:00:00 2001 From: Sam Whittle Date: Fri, 9 Dec 2016 12:39:49 -0800 Subject: [PATCH 2/2] Improve LateDataDroppingDoFnRunner to include reiteration. --- .../dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java index c951d4c9a1..b654b31b1c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java @@ -77,6 +77,9 @@ public void testLateDataFilter() throws Exception { createDatum(18, 18L)); assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class))); assertEquals(1, droppedDueToLateness.sum); + // Ensure that reiterating returns the same results and doesn't increment the counter again. + assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class))); + assertEquals(1, droppedDueToLateness.sum); } private WindowedValue createDatum(T element, long timestampMillis) { @@ -112,4 +115,3 @@ public String getName() { } } } -