From 79ce22af1328056f37aeadb3401c0172560f50c1 Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Fri, 20 Apr 2018 23:46:57 -0700 Subject: [PATCH 1/2] [FLINK-6719] Add details about fault-tolerance of timers to ProcessFunction docs --- docs/dev/stream/operators/process_function.md | 39 +++++++++++++++---- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md index 1ed4edf7c9488..8260231c23fce 100644 --- a/docs/dev/stream/operators/process_function.md +++ b/docs/dev/stream/operators/process_function.md @@ -271,16 +271,39 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): -## Optimizations +## Timers -### Timer Coalescing +Every timer registered via `registerEventTimeTimer()` or `registerProcessingTimeTimer()` will be stored on `TimerService` +and enqueued for execution. -Every timer registered at the `TimerService` via `registerEventTimeTimer()` or -`registerProcessingTimeTimer()` will be stored on the Java heap and enqueued for execution. There is, -however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the -worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any -processing for outdated timers in `onTimer`, this may put a significant burden on the -Flink runtime. +Invocations of `onTimer()` and `processElement()` are always synchronized, so that users don't have to worry about +concurrent modification of state. + +Note that there is a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the worst case, +every key may have a timer for each upcoming millisecond. Even if you do not do any processing for outdated timers in `onTimer`, +this may put a significant burden on the Flink runtime. + +### Fault Tolerance + +Timers registered within `ProcessFunction` are fault tolerant. They are synchronously checkpointed by Flink, regardless of +configurations of state backends. (Therefore, a large number of timers can significantly increase checkpointing time. See optimizations +section for advice to reduce the number of timers.) + +Upon restoring, timers that are checkpointed from the previous job will be restored on whatever new instance is responsible for that key. + +#### Processing Time Timers + +For processing timer timers, the firing time of a timer is an absolute value of when to fire. + +It's important to note that if a job isn’t running at t (when the timer is supposed to fire), then on restore from either +a checkpoint or a savepoint, that timer is fired immediately. + +#### Event Time Timers + +For event time timers, a restored event time timer from either checkpoint or savepoint will fire when the watermark on the +new machine surpasses the timer's timestamp. + +### Optimizations - Timer Coalescing Since there is only one timer per key and timestamp, however, you may coalesce timers by reducing the timer resolution. For a timer resolution of 1 second (event or processing time), for example, you From 5c96676c71819fddaa5a2b432aacc0c17697bcce Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Mon, 30 Apr 2018 10:17:59 -0700 Subject: [PATCH 2/2] Update process_function.md --- docs/dev/stream/operators/process_function.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md index 8260231c23fce..37c361f9d6932 100644 --- a/docs/dev/stream/operators/process_function.md +++ b/docs/dev/stream/operators/process_function.md @@ -286,8 +286,8 @@ this may put a significant burden on the Flink runtime. ### Fault Tolerance Timers registered within `ProcessFunction` are fault tolerant. They are synchronously checkpointed by Flink, regardless of -configurations of state backends. (Therefore, a large number of timers can significantly increase checkpointing time. See optimizations -section for advice to reduce the number of timers.) +configurations of state backends. (Therefore, a large number of timers can significantly increase checkpointing time. See the optimizations +section for advice on how to reduce the number of timers.) Upon restoring, timers that are checkpointed from the previous job will be restored on whatever new instance is responsible for that key.