diff --git a/docs/build_docs.sh b/docs/build_docs.sh index f9ffc5a02c9b6..aa08f4c21f156 100755 --- a/docs/build_docs.sh +++ b/docs/build_docs.sh @@ -27,16 +27,18 @@ DIR="`pwd`" # We need at least bundler to proceed if [ "`command -v bundle`" == "" ]; then - echo "WARN: Could not find bundle." - echo "Attempting to install locally. If this doesn't work, please install with 'gem install bundler'." + RUBYGEM_BINDIR="" - # Adjust the PATH to discover the locally installed Ruby gem - if which ${RUBY} >/dev/null && which gem >/dev/null; then - export PATH="$(${RUBY} -rubygems -e 'puts Gem.user_dir')/bin:$PATH" - fi + # Adjust the PATH to discover locally installed ruby gem binaries + export PATH="$(${RUBY} -e 'puts Gem.user_dir')/bin:$PATH" - # install bundler locally - ${GEM} install --user-install bundler + if [ "`command -v bundle`" == "" ]; then + echo "WARN: Could not find bundle." + echo "Attempting to install locally. If this doesn't work, please install with 'gem install bundler'." + + # install bundler locally + ${GEM} install --user-install --no-format-executable bundler + fi fi # Install Ruby dependencies locally @@ -57,7 +59,6 @@ while getopts "pi" opt; do ;; i) [[ `${RUBY} -v` =~ 'ruby 1' ]] && echo "Error: building the docs with the incremental option requires at least ruby 2.0" && exit 1 - bundle install --path .rubydeps JEKYLL_CMD="liveserve --baseurl= --watch --incremental" ;; esac diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md index d96798323ceed..d82092afc6e96 100644 --- a/docs/dev/stream/operators/process_function.md +++ b/docs/dev/stream/operators/process_function.md @@ -269,4 +269,56 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]): } {% endhighlight %} - \ No newline at end of file + + +## Optimisations + +### Timer Coalescing + +Every timer registered at the `TimerService` via `registerEventTimeTimer()` or +`registerProcessingTimeTimer()` will be stored on heap and enqueued for execution. There is, +however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the +worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any +processing for outdated timers in `onTimer` (as above), this may put a significant burden on the +Flink runtime. + +Since there is only one timer per key and timestamp, however, you may coalesce timers by reducing the +timer resolution. For a timer resolution of 1 second (event or processing time), for example, you +can round down the target time to full seconds and therefore allow the timer to fire at most 1 +second earlier but not later than with millisecond accuracy. As a result, there would be at most +one timer for each combination of key and timestamp: + +
+
+{% highlight java %} +long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000; +ctx.timerService().registerProcessingTimeTimer(coalescedTime); +{% endhighlight %} +
+ +
+{% highlight scala %} +val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000 +ctx.timerService.registerProcessingTimeTimer(coalescedTime) +{% endhighlight %} +
+
+ +Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce +these timers with the next watermark by using the current one: + +
+
+{% highlight java %} +long coalescedTime = ctx.timerService().currentWatermark() + 1; +ctx.timerService().registerEventTimeTimer(coalescedTime); +{% endhighlight %} +
+ +
+{% highlight scala %} +val coalescedTime = ctx.timerService.currentWatermark + 1 +ctx.timerService.registerEventTimeTimer(coalescedTime) +{% endhighlight %} +
+