diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java index 3dfa06474e..721953a644 100644 --- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java +++ b/sdk/src/main/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunner.java @@ -112,20 +112,30 @@ public WindowedValue apply(BoundedWindow window) { }); }}); + Iterable> concatElements = Iterables.concat(windowsExpandedElements); + + // Bump the counter separately since we don't want multiple iterations to + // increase it multiple times. + for (WindowedValue input : concatElements) { + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + if (canDropDueToExpiredWindow(window)) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " + + "since too far behind inputWatermark:{}; outputWatermark:{}", + input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), + timerInternals.currentOutputWatermarkTime()); + } + } + Iterable> nonLateElements = Iterables.filter( - Iterables.concat(windowsExpandedElements), + concatElements, new Predicate>() { @Override public boolean apply(WindowedValue input) { BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); if (canDropDueToExpiredWindow(window)) { - // The element is too late for this window. - droppedDueToLateness.addValue(1L); - WindowTracing.debug( - "ReduceFnRunner.processElement: Dropping element at {} for key:{}; window:{} " - + "since too far behind inputWatermark:{}; outputWatermark:{}", - input.getTimestamp(), key, window, timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); return false; } else { return true; diff --git a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java index c951d4c9a1..b654b31b1c 100644 --- a/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java +++ b/sdk/src/test/java/com/google/cloud/dataflow/sdk/util/LateDataDroppingDoFnRunnerTest.java @@ -77,6 +77,9 @@ public void testLateDataFilter() throws Exception { createDatum(18, 18L)); assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class))); assertEquals(1, droppedDueToLateness.sum); + // Ensure that reiterating returns the same results and doesn't increment the counter again. + assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class))); + assertEquals(1, droppedDueToLateness.sum); } private WindowedValue createDatum(T element, long timestampMillis) { @@ -112,4 +115,3 @@ public String getName() { } } } -