Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-13248] [runtime] Adding processing of downstream messages for blocking operators #9383

Merged
merged 4 commits into from Aug 30, 2019

Conversation

@AHeise
Copy link
Contributor

commented Aug 7, 2019

What is the purpose of the change

  • This PR provides a basic building block to address a kind of live lock that appears when chained AsyncWaitOperators block the processing of mailbox events.

Brief change log

  • Separating life-cycle methods from Mailbox into new interface.
  • Adding view onto Mailbox bound to a specific operator in the operator chain.
  • Scoping MailboxExecutor to an individual operator instead of task level.
  • Using the newly scoped MailboxExecutor to implement yieldToDownstream.
  • Exposing MailboxExecutor through a stream operator factory.
  • Allow DataStream API to use factories for transformation.

Verifying this change

  • Added tests to test the Mailbox view.
  • Adjusted tests to support the new MailboxExecutor scope.

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable
@flinkbot

This comment has been minimized.

Copy link

commented Aug 7, 2019

Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community
to review your pull request. We will use this comment to track the progress of the review.

Automated Checks

Last check on commit 8dca786 (Fri Sep 06 09:07:59 UTC 2019)

Warnings:

  • No documentation files were touched! Remember to keep the Flink docs up to date!

Mention the bot in a comment to re-run the automated checks.

Review Progress

  • 1. The [description] looks good.
  • 2. There is [consensus] that the contribution should go into to Flink.
  • 3. Needs [attention] from.
  • 4. The change fits into the overall [architecture].
  • 5. Overall code [quality] is good.

Please see the Pull Request Review Guide for a full explanation of the review process.


The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commands
The @flinkbot bot supports the following commands:

  • @flinkbot approve description to approve one or more aspects (aspects: description, consensus, architecture and quality)
  • @flinkbot approve all to approve all aspects
  • @flinkbot approve-until architecture to approve everything until architecture
  • @flinkbot attention @username1 [@username2 ..] to require somebody's attention
  • @flinkbot disapprove architecture to remove an approval you gave earlier
@flinkbot

This comment has been minimized.

Copy link

commented Aug 7, 2019

CI report:

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 6 times, most recently from 2493723 to f3f0fe6 Aug 14, 2019

@AHeise AHeise changed the title WIP [FLINK-13248][runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops Aug 15, 2019

@1u0
Copy link
Contributor

left a comment

You are removing the ExecutorService from the MailboxExecutorImpl implementation in the last commit.
Are there any limitations/reasons to do so?

Initially, this functionality was added for convenience to have futures that result with a mailbox letter.
So far, this was used for checkpoints triggers via mailbox (in a different branch, that not PRed yet)

@1u0
Copy link
Contributor

left a comment

Implementation wise, it looks like the main (task level) mailbox (TaskMailboxImpl) became an implicit delegate of mailbox of operator at index 0.

Instead, I think, it still could be a proper mailbox that can allow executing any letters. In particular, some timer triggers and checkpoints are operator neutral. They still could progress independently of operators.
It may require, to adjust this implementation, to also execute letters that are not addressed directly to downstream operators. (It's possible also to place such letters as Mail with operator index MAX_INT).

@AHeise

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2019

Implementation wise, it looks like the main (task level) mailbox (TaskMailboxImpl) became an implicit delegate of mailbox of operator at index 0.

Instead, I think, it still could be a proper mailbox that can allow executing any letters. In particular, some timer triggers and checkpoints are operator neutral. They still could progress independently of operators.
It may require, to adjust this implementation, to also execute letters that are not addressed directly to downstream operators. (It's possible also to place such letters as Mail with operator index MAX_INT).

Yes, MAX_INT makes more sense than 0 in any case. That was actually my original intent.

@AHeise

This comment has been minimized.

Copy link
Contributor Author

commented Aug 15, 2019

You are removing the ExecutorService from the MailboxExecutorImpl implementation in the last commit.
Are there any limitations/reasons to do so?

Initially, this functionality was added for convenience to have futures that result with a mailbox letter.
So far, this was used for checkpoints triggers via mailbox (in a different branch, that not PRed yet)

The basic idea of removing the ExecutorService interface is to get rid of the life-cycle methods. Since each Mailbox view is wrapped into an MailboxExecutor, each Mailbox view would also need to have life-cycle methods, which is a total mess.

Instead, we just support the Executor interface, which is enough to support the nice CompletableFuture usages.
The MailboxProcessor will then invoke the life-cycle methods on TaskMailbox, both are owned by the task.

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch from f3f0fe6 to c6ee151 Aug 15, 2019

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 2 times, most recently from d0e4fbf to 741386a Aug 16, 2019

@AHeise

This comment has been minimized.

Copy link
Contributor Author

commented Aug 16, 2019

Implementation wise, it looks like the main (task level) mailbox (TaskMailboxImpl) became an implicit delegate of mailbox of operator at index 0.
Instead, I think, it still could be a proper mailbox that can allow executing any letters. In particular, some timer triggers and checkpoints are operator neutral. They still could progress independently of operators.
It may require, to adjust this implementation, to also execute letters that are not addressed directly to downstream operators. (It's possible also to place such letters as Mail with operator index MAX_INT).

Yes, MAX_INT makes more sense than 0 in any case. That was actually my original intent.

After some discussions, we moved it to -1 instead and just treat priority mail with MAX_INT.

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch from 741386a to 05e27c0 Aug 18, 2019

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 5 times, most recently from 50a91cb to 2fba64d Aug 19, 2019

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 2 times, most recently from ffe70dd to b925c47 Aug 26, 2019

@AHeise

This comment has been minimized.

Copy link
Contributor Author

commented Aug 26, 2019

@flinkbot run travis

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 2 times, most recently from 7815bc0 to 5821301 Aug 26, 2019

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 2 times, most recently from 9e6aa38 to 355736b Aug 27, 2019

@pnowojski
Copy link
Contributor

left a comment

almost LGTM (except of the last commit) % couple of comments

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 2 times, most recently from 85a77bc to e311962 Aug 28, 2019

@pnowojski
Copy link
Contributor

left a comment

Code LGTM. I think one last minor comment about commit's structure.

Besides that commit names are a bit confusing I think:

[FLINK-13248] [runtime] Enabling custom factories for

this is not a runtime change, but datastream or streaming.

Regarding:

[FLINK-13248] [runtime] Separating mailbox into taskmailbox (once per task) and mailbox (view on taskmailbox, once per operator

I think it is also a bit confusing, as it doesn't reference yield to downstream/priorities. Also this one deservers more description in the commit message. What do you think about something along those lines:

[FLINK-13248][runtime] Implement per operator priorities for mailbox actions and yieldToDownstream concept

This commit introduces separate Mailboxes per operator each handling enqueuing actions/letters and yielding with different priorities. Yielding execution, yields only for down stream actions, which in principle allows for having yielding operators in the middle of an operator chain (and not only as a head).

?

Optionally I would expand the commit description of the commit that's exposing StreamOperatorFactoryies on the DataStream API.

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch 4 times, most recently from 6e4b34a to 8e4f5b9 Aug 28, 2019

@AHeise

This comment has been minimized.

Copy link
Contributor Author

commented Aug 29, 2019

@flinkbot run travis

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch from 8e4f5b9 to 8c630ec Aug 29, 2019

Arvid Heise added 3 commits Aug 28, 2019
Arvid Heise
[FLINK-13248][runtime] Implement per operator priorities for mailbox …
…actions and yieldToDownstream concept

This commit introduces separate mailboxes per operator each handling enqueuing actions/letters and yielding with different priorities. Yielding execution, yields only for down stream actions, which in principle allows for having yielding operators in the middle of an operator chain (and not only as a head).
Arvid Heise
[FLINK-13248] [datastream/streaming] Enabling custom factories for on…
…e input stream operators to be passed in DataStream

Also enabled StreamOperatorTestHarness and InputStreamTaskTestHarness to work with factories. In the future, instead of passing operators directly in the DataStream API, factories should be used instead.

@AHeise AHeise force-pushed the AHeise:FLINK-13248 branch from 8c630ec to 8dca786 Aug 29, 2019

@AHeise AHeise changed the title [FLINK-13248] [runtime] Adding processing of downstream messages in AsyncWaitOperator's wait loops [FLINK-13248] [runtime] Adding processing of downstream messages for blocking operators Aug 29, 2019

@pnowojski
Copy link
Contributor

left a comment

LGTM :) Please @AHeise ping me once travis is green, I will merge it then.

@AHeise

This comment has been minimized.

Copy link
Contributor Author

commented Aug 29, 2019

@flinkbot run travis

@pnowojski pnowojski merged commit ccc7eb4 into apache:master Aug 30, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants
You can’t perform that action at this time.