From 0e7bf5d9244ba4ea529c9037e680c9bfdf2069f9 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 4 Jul 2016 14:32:42 +0200 Subject: [PATCH 1/2] [FLINK-4134] [Runtime] Bugfix: multiple late elements that fall in one session accidentally created a triggering empty window --- .../runtime/operators/windowing/EvictingWindowOperator.java | 1 + .../streaming/runtime/operators/windowing/WindowOperator.java | 1 + .../runtime/operators/windowing/WindowOperatorTest.java | 2 ++ 3 files changed, 4 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index d82fc85cb6a16..3f2c6a3904072 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -133,6 +133,7 @@ public void merge(W mergeResult, // check if the window is already inactive if (isLate(actualWindow)) { LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness."); + mergingWindows.retireWindow(actualWindow); continue; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index f06fd33b3b8f7..09926bb2e458d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -335,6 +335,7 @@ public void merge(W mergeResult, // drop if the window is already late if (isLate(actualWindow)) { LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness."); + //mergingWindows.retireWindow(actualWindow); continue; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 8ba6da2719624..ba335ee826983 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -1477,6 +1477,8 @@ public void testDropDueToLatenessSessionZeroLateness() throws Exception { // this is dropped as late testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000)); + // this is also dropped as late (we test that they are not accidentally merged) + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10100)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500)); testHarness.processWatermark(new Watermark(20000)); From e82c5a623854d76be958176027d44e948810e609 Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Mon, 4 Jul 2016 14:36:43 +0200 Subject: [PATCH 2/2] FLINK-4134] [Runtime] Bugfix: multiple late elements that fall in one session accidentally created a triggering empty window --- .../streaming/runtime/operators/windowing/WindowOperator.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 09926bb2e458d..b6ca564edcb79 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -335,7 +335,7 @@ public void merge(W mergeResult, // drop if the window is already late if (isLate(actualWindow)) { LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness."); - //mergingWindows.retireWindow(actualWindow); + mergingWindows.retireWindow(actualWindow); continue; }