Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-4460] Allow ProcessFunction on non-keyed streams #3438

Merged
merged 5 commits into from
Mar 6, 2017

Conversation

aljoscha
Copy link
Contributor

@aljoscha aljoscha commented Mar 1, 2017

This is in preparation for side outputs, which will only work on ProcessFunction. We still want side outputs on non-keyed streams so we have to make ProcessFunction available there.

See this ML thread for reference: https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 1, 2017

R: @uce or @rmetzger for review, please

@rmetzger
Copy link
Contributor

rmetzger commented Mar 3, 2017

I looked over the changes and didn't find anything critical. The only thing that made me thinking was the boxed Long type for the timestamp(). I assume you decided for this approach to signal timestamp unavailability using null. The Java documentation does not recommend to rely on autoboxing for performance critical code: http://docs.oracle.com/javase/1.5.0/docs/guide/language/autoboxing.html

Tests, Scala API were done. I assume that we don't need to explicitly mention support for the process function on non-keyed streams.

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 4, 2017

@rmetzger Yes, it's unfortunate that in our model not all elements always have a timestamp. The other alternative is throwing an exception when trying to access a non-existing timestamp.

@rmetzger
Copy link
Contributor

rmetzger commented Mar 4, 2017

In addition to throwing an exception, we should also expose element.hasTimestamp() to offer our users a clean way of checking for timestamps.
Lets see what @uce or other reviewers think about this.

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 4, 2017

I think the discussion of timestamps and additional interfaces is orthogonal to this PR: KeyedProcessOperator is a renaming of the pre-existing ProcessOperator and the new ProcessOperator is a simplification that does away with timers. The interface for timestamps exists in the current code base, if we want to change that we should open other Jira issues.

*
* @param <I> Type of the input elements.
* @param <O> Type of the output elements.
*/
@PublicEvolving
public interface ProcessFunction<I, O> extends Function {
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, changing form interface to class is incompatible on the user side. Can't ProcessFunction just extend RichFunction?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that we need a default implementation for onTimer(long, OnTimerContext, Collector) (see below).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi @wenlong88 in the ML discussion (https://lists.apache.org/thread.html/f3fe7d68986877994ad6b66173f40e72fc454420720a74ea5a834cc2@%3Cdev.flink.apache.org%3E) we decided to make ProcessFunction available on non-keyed streams as well to allow using side outputs there. This requires making the onTimer() method abstract, otherwise every user would always have to implement it. We marked ProcessFunction as @PublicEvolcing just for such cases; it's still a very young API and we didn't know exactly what was going to be needed in the end.

Copy link
Contributor

@uce uce left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool change! I'm OK with the change from interface to abstract class. Do we need to update the documentation for any of the changes? If yes, I would make this part of this PR.

I had some inline comments that you can have a look at before merging. Other than that, +1 to merge this.

* @return The transformed {@link DataStream}.
*/
@Internal
public <R> SingleOutputStreamOperator<R> process(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this internal method only exposed as public for the Scala API? If yes, I'm wondering if it makes sense to call transform manually in the Scala DataStream API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's exposed for that. The pattern, so far, is for methods to also expose a public method that takes a TypeInformation because we get the TypeInformation from the context bound in the Scala API.

Calling transform() manually is an option but if we do that we would basically not base the Scala API on the Java API anymore and we would have code that instantiates the Stream Operators in both the Java and Scala API. For example, right now we have the code for instantiating a flat map operator in (Java)DataStream while (Scala)DataStream.flatMap() calls that method.

What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense to keep it like that. The benefits to base the Scala API on top of the Java API instead of duplicating it are very persuasive, too. 😄 So +1 to keep it as is. 👍 I was just wondering whether users would be confused by this.

*
* <p><b>NOTE:</b> A {@code ProcessFunction} is always a
* {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
* {@link org.apache.flink.api.common.functions.RuntimeContext} as always available and setup and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: as => is?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing

*
* @param <I> Type of the input elements.
* @param <O> Type of the output elements.
*/
@PublicEvolving
public interface ProcessFunction<I, O> extends Function {
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is that we need a default implementation for onTimer(long, OnTimerContext, Collector) (see below).

*
* @param <I> Type of the input elements.
* @param <O> Type of the output elements.
*/
@PublicEvolving
public interface ProcessFunction<I, O> extends Function {
public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing serialVersionUID

@uce
Copy link
Contributor

uce commented Mar 6, 2017

@rmetzger @aljoscha I would agree with Aljoscha that your point is independent of this PR. Is there an issue for 2.0 to track this?

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 6, 2017

@uce There is not issue for 2.0 to track this because I don't think there is consensus about always having timestamps.

@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 6, 2017

@uce There is some documentation that says that ProcessFunction is only available on keyed streams. I'll change that.

This is in preparation of allowing ProcessFunction on DataStream because
we will use it to allow side outputs from the ProcessFunction Context.
Introduce new ProcessOperator for this. Rename the pre-existing
ProcessOperator to KeyedProcessOperator.
…thod

This is in preparation of allowing CoProcessFunction on a non-keyed
connected stream.  we will use it to allow side outputs from the
ProcessFunction Context.
Introduce new CoProcessOperator for this. Rename the pre-existing
CoProcessOperator to KeyedCoProcessOperator.
@aljoscha aljoscha force-pushed the jira-4460-process-for-everyone branch from a26accf to 746c1ef Compare March 6, 2017 13:04
@asfgit asfgit merged commit 746c1ef into apache:master Mar 6, 2017
@aljoscha aljoscha deleted the jira-4460-process-for-everyone branch March 6, 2017 15:53
@aljoscha
Copy link
Contributor Author

aljoscha commented Mar 6, 2017

Merged

@wenlong88
Copy link
Contributor

thanks for explaination, I have such concern because we have just suggested our users to use processFunction to implement their jobs, they need to change their code too when we sync the cimmit.after all, it is really nice to have timer in more scenarios.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants