Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4460] Allow ProcessFunction on non-keyed streams #3438

Merged
merged 5 commits into from
Mar 6, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions docs/dev/stream/process_function.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,32 +32,35 @@ The `ProcessFunction` is a low-level stream processing operation, giving access
all (acyclic) streaming applications:

- events (stream elements)
- state (fault tolerant, consistent)
- timers (event time and processing time)
- state (fault tolerant, consistent, only on keyed stream)
- timers (event time and processing time, only on keyed stream)

The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
by being invoked for each event received in the input stream(s).

For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
the `ProcessFunction` needs to be applied onto a `KeyedStream`:
```java
stream.keyBy("id").process(new MyProcessFunction())
```
`RuntimeContext`, similar to the way other stateful functions can access keyed state.

The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is
called. During that call, all states are again scoped to the key with which the timer was created, allowing
timers to perform keyed state manipulation as well.

<span class="label label-info">Note</span> If you want to access keyed state and timers you have
to apply the `ProcessFunction` on a keyed stream:

{% highlight java %}
stream.keyBy(...).process(new MyProcessFunction())
{% endhighlight %}


## Low-level Joins

To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
To realize low-level operations on two inputs, applications can use `CoProcessFunction`. This
function is bound to two different inputs and gets individual calls to `processElement1(...)` and
`processElement2(...)` for records from the two different inputs.

Implementing a low level join typically follows this pattern:

Expand All @@ -82,8 +85,8 @@ The following example maintains counts per key, and emits a key/count pair whene
- Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
and emits the key/count if they match (i.e., no further update occurred during that minute)

*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
the basic pattern it provides.
<span class="label label-info">Note</span> This simple example could have been implemented with
session windows. We use `ProcessFunction` here to illustrate the basic pattern it provides.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -207,7 +210,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
// initialize or retrieve/update the state

val current: CountWithTimestamp = state.value match {
case null =>
case null =>
CountWithTimestamp(key, 1, ctx.timestamp)
case CountWithTimestamp(key, count, time) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
Expand All @@ -222,7 +225,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long

override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
out.collect((key, count))
case _ =>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
import org.apache.flink.streaming.api.operators.co.CoStreamMap;
import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
import org.apache.flink.streaming.api.transformations.TwoInputTransformation;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -243,10 +243,6 @@ CoFlatMapFunction.class, false, true, getType1(), getType2(),
* function can also query the time and set timers. When reacting to the firing of set timers
* the function can directly emit elements and/or register yet more timers.
*
* <p>A {@link RichCoProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
Expand Down Expand Up @@ -274,10 +270,6 @@ CoProcessFunction.class, false, true, getType1(), getType2(),
* this function can also query the time and set timers. When reacting to the firing of set
* timers the function can directly emit elements and/or register yet more timers.
*
* <p>A {@link RichCoProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param coProcessFunction The {@link CoProcessFunction} that is called for each element
* in the stream.
*
Expand All @@ -290,13 +282,13 @@ public <R> SingleOutputStreamOperator<R> process(
CoProcessFunction<IN1, IN2, R> coProcessFunction,
TypeInformation<R> outputType) {

if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
"when both input streams are keyed.");
}
TwoInputStreamOperator<IN1, IN2, R> operator;

CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
inputStream1.clean(coProcessFunction));
if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
} else {
operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
}

return transform("Co-Process", outputType, operator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.TimestampExtractor;
Expand All @@ -59,6 +60,7 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.StreamFilter;
import org.apache.flink.streaming.api.operators.StreamFlatMap;
import org.apache.flink.streaming.api.operators.StreamMap;
Expand Down Expand Up @@ -555,6 +557,60 @@ public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMappe

}

/**
* Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
processFunction,
ProcessFunction.class,
false,
true,
getType(),
Utils.getCallLocationName(),
true);

return process(processFunction, outType);
}

/**
* Applies the given {@link ProcessFunction} on the input stream, thereby
* creating a transformed output stream.
*
* <p>The function will be called for every element in the input streams and can produce zero
* or more output elements.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this internal method only exposed as public for the Scala API? If yes, I'm wondering if it makes sense to call transform manually in the Scala DataStream API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a TypeInformation because we get the TypeInformation from the context bound in the Scala API.

Calling transform() manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in (Java)DataStream while (Scala)DataStream.flatMap() calls that method.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to keep it like that. The benefits to base the Scala API on top of the Java API instead of duplicating it are very persuasive, too. 😄 So +1 to keep it as is. 👍 I was just wondering whether users would be confused by this.

ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {

ProcessOperator<T, R> operator = new ProcessOperator<>(clean(processFunction));

return transform("Process", outputType, operator);
}

/**
* Applies a Filter transformation on a {@link DataStream}. The
* transformation calls a {@link FilterFunction} for each element of the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.RichProcessFunction;
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
Expand All @@ -41,7 +40,7 @@
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.ProcessOperator;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.operators.StreamGroupedFold;
import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
import org.apache.flink.streaming.api.transformations.OneInputTransformation;
Expand Down Expand Up @@ -181,17 +180,14 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* <p>A {@link RichProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
*
* @param <R> The type of elements emitted by the {@code ProcessFunction}.
*
* @return The transformed {@link DataStream}.
*/
@Override
@PublicEvolving
public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {

Expand All @@ -216,10 +212,6 @@ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFu
* function, this function can also query the time and set timers. When reacting to the firing
* of set timers the function can directly emit elements and/or register yet more timers.
*
* <p>A {@link RichProcessFunction}
* can be used to gain access to features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param processFunction The {@link ProcessFunction} that is called for each element
* in the stream.
* @param outputType {@link TypeInformation} for the result type of the function.
Expand All @@ -228,13 +220,14 @@ public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFu
*
* @return The transformed {@link DataStream}.
*/
@Override
@Internal
public <R> SingleOutputStreamOperator<R> process(
ProcessFunction<T, R> processFunction,
TypeInformation<R> outputType) {

ProcessOperator<KEY, T, R> operator =
new ProcessOperator<>(clean(processFunction));
KeyedProcessOperator<KEY, T, R> operator =
new KeyedProcessOperator<>(clean(processFunction));

return transform("Process", outputType, operator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,37 @@
package org.apache.flink.streaming.api.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;

/**
* A function that processes elements of a stream.
*
* <p>The function will be called for every element in the input stream and can produce
* zero or more output. The function can also query the time and set timers. When
* reacting to the firing of set timers the function can emit yet more elements.
* <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
* is invoked. This can produce zero or more elements as output. Implementations can also
* query the time and set timers through the provided {@link Context}. For firing timers
* {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
* zero or more elements as output and register further timers.
*
* <p>The function will be called for every element in the input stream and can produce
* zero or more output elements. Contrary to the
* {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
* the time (both event and processing) and set timers, through the provided {@link Context}.
* When reacting to the firing of set timers the function can directly emit a result, and/or
* register a timer that will trigger an action in the future.
* <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
* available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
*
* <p><b>NOTE:</b> A {@code ProcessFunction} is always a
* {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
* teardown methods can be implemented. See
* {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
* and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
*
* @param <I> Type of the input elements.
* @param <O> Type of the output elements.
*/
@PublicEvolving
public interface ProcessFunction<I, O> extends Function {
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, changing form interface to class is incompatible on the user side. Can't ProcessFunction just extend RichFunction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that we need a default implementation for onTimer(long, OnTimerContext, Collector) (see below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @wenlong88 in the ML discussion (https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E) we decided to make ProcessFunction available on non-keyed streams as well to allow using side outputs there. This requires making the onTimer() method abstract, otherwise every user would always have to implement it. We marked ProcessFunction as @PublicEvolcing just for such cases; it's still a very young API and we didn't know exactly what was going to be needed in the end.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing serialVersionUID


private static final long serialVersionUID = 1L;

/**
* Process one element from the input stream.
Expand All @@ -59,7 +66,7 @@ public interface ProcessFunction<I, O> extends Function {
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void processElement(I value, Context ctx, Collector<O> out) throws Exception;
public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;

/**
* Called when a timer set using {@link TimerService} fires.
Expand All @@ -74,36 +81,36 @@ public interface ProcessFunction<I, O> extends Function {
* @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
* to fail and may trigger recovery.
*/
void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}

/**
* Information available in an invocation of {@link #processElement(Object, Context, Collector)}
* or {@link #onTimer(long, OnTimerContext, Collector)}.
*/
interface Context {
public abstract class Context {

/**
* Timestamp of the element currently being processed or timestamp of a firing timer.
*
* <p>This might be {@code null}, for example if the time characteristic of your program
* is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
*/
Long timestamp();
public abstract Long timestamp();

/**
* A {@link TimerService} for querying time and registering timers.
*/
TimerService timerService();
public abstract TimerService timerService();
}

/**
* Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
*/
interface OnTimerContext extends Context {
public abstract class OnTimerContext extends Context {
/**
* The {@link TimeDomain} of the firing timer.
*/
TimeDomain timeDomain();
public abstract TimeDomain timeDomain();
}

}
Loading