diff --git a/docs/content/docs/learn-flink/event_driven.md b/docs/content/docs/learn-flink/event_driven.md index e03d4598e20b5..616b967fe99c2 100644 --- a/docs/content/docs/learn-flink/event_driven.md +++ b/docs/content/docs/learn-flink/event_driven.md @@ -141,12 +141,12 @@ public void processElement( long eventTime = fare.getEventTime(); TimerService timerService = ctx.timerService(); - if (eventTime <= timerService.currentWatermark()) { + // Round up eventTime to the end of the window containing this event. + long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); + + if (endOfWindow <= timerService.currentWatermark()) { // This event is late; its window has already been triggered. } else { - // Round up eventTime to the end of the window containing this event. - long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1); - // Schedule a callback for when the window has been completed. timerService.registerEventTimeTimer(endOfWindow); @@ -236,7 +236,7 @@ Shown above is a static `OutputTag` that can be referenced both when e late events in the `processElement` method of the `PseudoWindow`: ```java -if (eventTime <= timerService.currentWatermark()) { +if (endOfWindow <= timerService.currentWatermark()) { // This event is late; its window has already been triggered. ctx.output(lateFares, fare); } else {