Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions docs/content/docs/learn-flink/event_driven.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,12 +141,12 @@ public void processElement(
long eventTime = fare.getEventTime();
TimerService timerService = ctx.timerService();

if (eventTime <= timerService.currentWatermark()) {
// Round up eventTime to the end of the window containing this event.
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

if (endOfWindow <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
} else {
// Round up eventTime to the end of the window containing this event.
long endOfWindow = (eventTime - (eventTime % durationMsec) + durationMsec - 1);

// Schedule a callback for when the window has been completed.
timerService.registerEventTimeTimer(endOfWindow);

Expand Down Expand Up @@ -236,7 +236,7 @@ Shown above is a static `OutputTag<TaxiFare>` that can be referenced both when e
late events in the `processElement` method of the `PseudoWindow`:

```java
if (eventTime <= timerService.currentWatermark()) {
if (endOfWindow <= timerService.currentWatermark()) {
// This event is late; its window has already been triggered.
ctx.output(lateFares, fare);
} else {
Expand Down