From f197cb442d6c9b324bb8b270148f8cc90d3b8d06 Mon Sep 17 00:00:00 2001 From: a4566 <103030084+a4566@users.noreply.github.com> Date: Mon, 18 Nov 2024 10:42:49 +0800 Subject: [PATCH] fix:Event latency does not equal window triggering eventTime <= timerService.currentWatermark() This code can only indicate a delayed event, not that the window has been triggered. --- docs/content/docs/learn-flink/event_driven.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docs/content/docs/learn-flink/event_driven.md b/docs/content/docs/learn-flink/event_driven.md index e03d4598e20b5..616b967fe99c2 100644 --- a/docs/content/docs/learn-flink/event_driven.md +++ b/docs/content/docs/learn-flink/event_driven.md @@ -141,12 +141,12 @@ public void processElement( long eventTime = fare.getEventTime(); TimerService timerService = ctx.timerService(); - if (eventTime <= timerService.currentWatermark()) { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + if (endOfWindow <= timerService.currentWatermark()) { // This event is late; its window has already been triggered. } else { - // Round up eventTime to the end of the window containing this event. - long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); - // Schedule a callback for when the window has been completed. timerService.registerEventTimeTimer(endOfWindow); @@ -236,7 +236,7 @@ Shown above is a static `OutputTag` that can be referenced both when e late events in the `processElement` method of the `PseudoWindow`: ```java -if (eventTime <= timerService.currentWatermark()) { +if (endOfWindow <= timerService.currentWatermark()) { // This event is late; its window has already been triggered. ctx.output(lateFares, fare); } else {