-
Notifications
You must be signed in to change notification settings - Fork 13.9k
[FLINK-30631][runtime] Limit the max number of subpartitons consumed by each downstream task #21646
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
50715d0 to
4129ede
Compare
zhuzhurk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR. @wanglijie95
I only have one comment regarding the first commit.
...ache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java
Outdated
Show resolved
Hide resolved
|
@zhuzhurk Thanks for review, I 've addressed the comment, please take a look. |
zhuzhurk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
…ism and input infos
…by each downstream task
8f0f55a to
81f6e2b
Compare
|
@flinkbot run azure |
…by each downstream task This closes apache#21646
…by each downstream task This closes apache#21646
What is the purpose of the change
In the current implementation(FLINK-25035), when the upstream vertex parallelism is much greater than the downstream vertex parallelism, it may lead to a large number of channels in the downstream tasks(for example, A -> B, all to all edge, max parallelism is 1000. If parallelism of A is 1000, parallelism of B is decided to be 1, then the only subtask of B will consume 1000 * 1000 subpartitions), resulting in a large overhead for processing channels.
In this ticket, we temporarily address this issue by limiting the max number of subpartitons consumed by each downstream task. The ultimate solution should be to support single channel consume multiple subpartitons.
Verifying this change
Unit tests:
DefaultVertexParallelismAndInputInfosDeciderTest#testEvenlyDistributeDataWithMaxSubpartitionLimitationDefaultVertexParallelismAndInputInfosDeciderTest#testDecideParallelismWithMaxSubpartitionLimitationDoes this pull request potentially affect one of the following parts:
@Public(Evolving): (no)Documentation