Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions docs/build_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@ DIR="`pwd`"

# We need at least bundler to proceed
if [ "`command -v bundle`" == "" ]; then
echo "WARN: Could not find bundle."
echo "Attempting to install locally. If this doesn't work, please install with 'gem install bundler'."
RUBYGEM_BINDIR=""

# Adjust the PATH to discover the locally installed Ruby gem
if which ${RUBY} >/dev/null && which gem >/dev/null; then
export PATH="$(${RUBY} -rubygems -e 'puts Gem.user_dir')/bin:$PATH"
fi
# Adjust the PATH to discover locally installed ruby gem binaries
export PATH="$(${RUBY} -e 'puts Gem.user_dir')/bin:$PATH"

# install bundler locally
${GEM} install --user-install bundler
if [ "`command -v bundle`" == "" ]; then
echo "WARN: Could not find bundle."
echo "Attempting to install locally. If this doesn't work, please install with 'gem install bundler'."

# install bundler locally
${GEM} install --user-install --no-format-executable bundler
fi
fi

# Install Ruby dependencies locally
Expand All @@ -57,7 +59,6 @@ while getopts "pi" opt; do
;;
i)
[[ `${RUBY} -v` =~ 'ruby 1' ]] && echo "Error: building the docs with the incremental option requires at least ruby 2.0" && exit 1
bundle install --path .rubydeps
JEKYLL_CMD="liveserve --baseurl= --watch --incremental"
;;
esac
Expand Down
54 changes: 53 additions & 1 deletion docs/dev/stream/operators/process_function.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,56 @@ override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):
}
{% endhighlight %}
</div>
</div>
</div>

## Optimisations

### Timer Coalescing

Every timer registered at the `TimerService` via `registerEventTimeTimer()` or
`registerProcessingTimeTimer()` will be stored on heap and enqueued for execution. There is,
however, a maximum of one timer per key and timestamp at a millisecond resolution and thus, in the
worst case, every key may have a timer for each upcoming millisecond. Even if you do not do any
processing for outdated timers in `onTimer` (as above), this may put a significant burden on the
Flink runtime.

Since there is only one timer per key and timestamp, however, you may coalesce timers by reducing the
timer resolution. For a timer resolution of 1 second (event or processing time), for example, you
can round down the target time to full seconds and therefore allow the timer to fire at most 1
second earlier but not later than with millisecond accuracy. As a result, there would be at most
one timer for each combination of key and timestamp:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
long coalescedTime = ((ctx.timestamp() + timeout) / 1000) * 1000;
ctx.timerService().registerProcessingTimeTimer(coalescedTime);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
{% endhighlight %}
</div>
</div>

Since event-time timers only fire with watermarks coming in, you may also schedule and coalesce
these timers with the next watermark by using the current one:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
long coalescedTime = ctx.timerService().currentWatermark() + 1;
ctx.timerService().registerEventTimeTimer(coalescedTime);
{% endhighlight %}
</div>

<div data-lang="scala" markdown="1">
{% highlight scala %}
val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerEventTimeTimer(coalescedTime)
{% endhighlight %}
</div>
</div>