From b8bfdf05ae9bfb546a223c5f638016a3e18f41bb Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Sun, 18 Dec 2016 18:33:13 +0100 Subject: [PATCH] [FLINK-5363] Fire timers when window state is currently empty Before this change, when a Trigger sets a timer and that timer fires in the future at a point when there is currently no data in the window state, then that timer is being ignored. This was a problem for some users because they manually set cleanup timers and they need to be called because the trigger needs to cleanup some state. (For normal time windows this is not a problem, but for special cases built on top of GlobalWindows the old behaviour was leading to problems.) --- .../windowing/triggers/ContinuousEventTimeTrigger.java | 9 +++++---- .../runtime/operators/windowing/WindowOperator.java | 10 ++++++++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java index f3b3e4f5eed23..cb62d02e7a68f 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java @@ -77,10 +77,11 @@ public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws return TriggerResult.FIRE; } - ReducingState fireTimestamp = ctx.getPartitionedState(stateDesc); - if (fireTimestamp.get().equals(time)) { - fireTimestamp.clear(); - fireTimestamp.add(time + interval); + ReducingState fireTimestampState = ctx.getPartitionedState(stateDesc); + Long fireTimestamp = fireTimestampState.get(); + if (fireTimestamp != null && fireTimestamp.equals(time)) { + fireTimestampState.clear(); + fireTimestampState.add(time + interval); ctx.registerEventTimeTimer(time + interval); return TriggerResult.FIRE; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index edcd8339c306e..91947156a3b0b 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -331,6 +331,10 @@ public void onEventTime(InternalTimer timer) throws Exception { context.key = timer.getKey(); context.window = timer.getNamespace(); + // Call the trigger early, even if the window is already empty. Some Triggers + // might have to clean up state. + TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); + AppendingState windowState; MergingWindowSet mergingWindows = null; @@ -357,7 +361,6 @@ public void onEventTime(InternalTimer timer) throws Exception { return; } - TriggerResult triggerResult = context.onEventTime(timer.getTimestamp()); if (triggerResult.isFire()) { fire(context.window, contents); } @@ -372,6 +375,10 @@ public void onProcessingTime(InternalTimer timer) throws Exception { context.key = timer.getKey(); context.window = timer.getNamespace(); + // Call the trigger early, even if the window is already empty. Some Triggers + // might have to clean up state. + TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); + AppendingState windowState; MergingWindowSet mergingWindows = null; @@ -395,7 +402,6 @@ public void onProcessingTime(InternalTimer timer) throws Exception { return; } - TriggerResult triggerResult = context.onProcessingTime(timer.getTimestamp()); if (triggerResult.isFire()) { fire(context.window, contents); }