Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-7789][DataStream API] Add handler for Async IO operator timeouts #5983

Closed
wants to merge 2 commits into from

Conversation

kisimple
Copy link

What is the purpose of the change

Currently Async IO operator does not provide a mechanism to handle timeouts. This PR fixs the problem by adding an enhanced AsyncFunction, named TimeoutAwareAsyncFunction.

Brief change log

  • Add a new interface, TimeoutAwareAsyncFunction, which extends the AsyncFunction
  • Change RichAsyncFunction to implement TimeoutAwareAsyncFunction instead of AsyncFunction
  • AsyncWaitOperator will invoke TimeoutAwareAsyncFunction#timeout when asyncInvoke times out

Verifying this change

  • Add tests to AsyncWaitOperatorTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no )
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( yes )
  • The serializers: ( no )
  • The runtime per-record code paths (performance sensitive): ( yes )
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: ( no )
  • The S3 file system connector: ( no )

Documentation

  • Does this pull request introduce a new feature? ( yes )
  • If yes, how is the feature documented? ( docs / JavaDocs )

@kisimple
Copy link
Author

cc @tillrohrmann @kl0u

Copy link
Contributor

@pnowojski pnowojski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution! I left one higher level comment/question.

* @exception Exception in case of a user code error. An exception will make the task fail and
* trigger fail-over process.
*/
void timeout(IN input, ResultFuture<OUT> resultFuture) throws Exception;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be enough to add such timeout() method to AsyncFunction with default implementation that fails the ResultFuture? I mean instead of adding new interface and deprecating AsyncFunction?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. The deprecated AsyncFunction is a Java Interface which can not have a method body due to Java grammars. However, your comment make me realize that I just forgot about the Scala API for AsyncFunction, so there is more work need to be done.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can not we add default method here (Java 8 feature)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for that I did not pay so much attention to new features. I will try and if it works I will close this PR and open a new one. Thanks:)

Copy link
Contributor

@pnowojski pnowojski May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem :) Please CC me if you open next PR for this issue.

@kisimple kisimple closed this May 29, 2018
@pnowojski
Copy link
Contributor

This PR superseded by #6091

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants