Skip to content

Commit

Permalink
[FLINK-4460] Update doc: ProcessFunction now possible on DataStream
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Mar 6, 2017
1 parent 06740fb commit 746c1ef
Showing 1 changed file with 18 additions and 15 deletions.
33 changes: 18 additions & 15 deletions docs/dev/stream/process_function.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,35 @@ The `ProcessFunction` is a low-level stream processing operation, giving access
all (acyclic) streaming applications:

- events (stream elements)
- state (fault tolerant, consistent)
- timers (event time and processing time)
- state (fault tolerant, consistent, only on keyed stream)
- timers (event time and processing time, only on keyed stream)

The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
by being invoked for each event received in the input stream(s).

For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
the `ProcessFunction` needs to be applied onto a `KeyedStream`:
```java
stream.keyBy("id").process(new MyProcessFunction())
```
`RuntimeContext`, similar to the way other stateful functions can access keyed state.

The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.

<span class="label label-info">Note</span> If you want to access keyed state and timers you have
to apply the `ProcessFunction` on a keyed stream:

{% highlight java %}
stream.keyBy(...).process(new MyProcessFunction())
{% endhighlight %}


## Low-level Joins

To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
To realize low-level operations on two inputs, applications can use `CoProcessFunction`. This
function is bound to two different inputs and gets individual calls to `processElement1(...)` and
`processElement2(...)` for records from the two different inputs.

Implementing a low level join typically follows this pattern:

Expand All @@ -82,8 +85,8 @@ The following example maintains counts per key, and emits a key/count pair whene
- Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
and emits the key/count if they match (i.e., no further update occurred during that minute)

*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
the basic pattern it provides.
<span class="label label-info">Note</span> This simple example could have been implemented with
session windows. We use `ProcessFunction` here to illustrate the basic pattern it provides.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -207,7 +210,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
// initialize or retrieve/update the state

val current: CountWithTimestamp = state.value match {
case null =>
case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
Expand All @@ -222,7 +225,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
Expand Down

0 comments on commit 746c1ef

Please sign in to comment.