-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-21332][runtime] Optimize releasing result partitions in RegionPartitionReleaseStrategy #15314
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit f6a937c (Sat Aug 28 11:09:01 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. DetailsThe Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
2dc45db to
d69e745
Compare
|
@flinkbot run azure |
I would suggest to introduce the assumption that one |
Thanks for proposing this solution 👍 I've already add the check and comments according to this assumption. Would you mind re-reviewing it once you got any free time? |
...runtime/executiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionView.java
Show resolved
Hide resolved
...k/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
Outdated
Show resolved
Hide resolved
...k/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
Outdated
Show resolved
Hide resolved
.../executiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewTracker.java
Outdated
Show resolved
Hide resolved
.../executiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewTracker.java
Show resolved
Hide resolved
.../executiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewTracker.java
Outdated
Show resolved
Hide resolved
.../executiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewTracker.java
Outdated
Show resolved
Hide resolved
...cutiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewTrackerTest.java
Outdated
Show resolved
Hide resolved
zhuzhurk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here are some comments for the renaming commits.
flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/PartitionDescriptor.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
Outdated
Show resolved
Hide resolved
| consumerRegion = new TestingSchedulingPipelinedRegion(Collections.singleton(consumer)); | ||
| } | ||
|
|
||
| private void createConsumerRegionGroupExecutionViewTracker() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| private void createConsumerRegionGroupExecutionViewTracker() { | |
| private void createConsumerRegionGroupExecutionViewMaintainer() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
zhuzhurk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing all the comments @Thesharing
The change looks good to me except for several minor comments.
...iongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewMaintainerTest.java
Outdated
Show resolved
Hide resolved
...ecutiongraph/failover/flip1/partitionrelease/ConsumerRegionGroupExecutionViewMaintainer.java
Outdated
Show resolved
Hide resolved
...k/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionReleaseStrategy.java
Outdated
Show resolved
Hide resolved
…tex#getConsumedPartitionGroup
…ateResultPartition#getConsumerVertexGroups
zhuzhurk
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
Merging.

What is the purpose of the change
This pull request introduce the optimization of releasing result partitions in RegionPartitionReleaseStrategy.
RegionPartitionReleaseStrategy is responsible for releasing result partitions when all the downstream tasks finish.
The current implementation is:
The time complexity of releasing a result partition is O(N^2). However, considering that during the entire stage, all the result partitions need to be released, the time complexity is actually O(N^3).
Based on FLINK-21228, the consumed result partitions of a pipelined region are grouped. Since the result partitions in one group are isomorphic, we can just cache the finished status of the pipeline regions and the fully consumed status of result partition groups.
The optimized implementation is:
After the optimization, the complexity decreases from O(N^3) to O(N).
For more details, please check FLINK-21332.
Brief change log
Verifying this change
Since this optimization does not change the original logic of releasing result partitions in RegionPartitionReleaseStrategy, we believe that this change is already covered by RegionPartitionReleaseStrategyTest.
For newly added class
ConsumerRegionGroupExecutionViewMaintainerandConsumerRegionGroupExecutionView, we added the test caseConsumerRegionGroupExecutionViewMaintainerTest.Does this pull request potentially affect one of the following parts:
@Public(Evolving): (yes / no)Documentation