New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-21915] Optimize Execution#finishPartitionsAndUpdateConsumers #15382
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 0e09d89 (Sat Aug 28 13:07:31 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Thesharing Could you share the number of the improvements with this change?
for (IntermediateResultPartition finishedPartition : newlyFinishedResults) { | ||
final IntermediateResultPartition[] allPartitionsOfNewlyFinishedResults = | ||
finishedPartition.getIntermediateResult().getPartitions(); | ||
|
||
for (IntermediateResultPartition partition : allPartitionsOfNewlyFinishedResults) { | ||
updatePartitionConsumers(partition); | ||
for (ConsumerVertexGroup consumerVertexGroup : partition.getConsumers()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It may be out of the scope of this PR. But I think it's better to rename partition.getConsumers()
to partition.getConsumerGroups()
. Also ExecutionVertex#getConsumedPartitions(...)
should be renamed to ExecutionVertex#getConsumedPartitionGroups(...)
. This will make their invocations easier to understand.
@@ -486,8 +487,8 @@ void notifyPartitionDataAvailable(ResultPartitionID partitionId) { | |||
partition.markDataProduced(); | |||
} | |||
|
|||
void cachePartitionInfo(PartitionInfo partitionInfo) { | |||
getCurrentExecutionAttempt().cachePartitionInfo(partitionInfo); | |||
void cachePartitionInfo(Collection<PartitionInfo> partitionInfos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cachePartitionInfo
-> cachePartitionInfos
@@ -1025,8 +1035,8 @@ private void finishCancellation(boolean releasePartitions) { | |||
handlePartitionCleanup(releasePartitions, releasePartitions); | |||
} | |||
|
|||
void cachePartitionInfo(PartitionInfo partitionInfo) { | |||
partitionInfos.add(partitionInfo); | |||
void cachePartitionInfo(Collection<PartitionInfo> partitionInfos) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cachePartitionInfo
-> cachePartitionInfos
Due to FLINK-22017, now blocking partitions will be individually consumable once it's finished. finishPartitionAndUpdateConsumer will be called every partition is finished. The proposed optimization is no longer valid. Furthermore, this function is called only when there is intra-region edges in the graph. In this case the downstream vertices are DEPLOYING/RUNNING when the upstream vertices are FINISHED. The scenario is rare. Thus for now we just close it. If there's a new idea about it, we'd like to reopen it. |
What is the purpose of the change
Based on the scheduler benchmark PartitionReleaseInBatchJobBenchmark introduced in FLINK-20612, we find that there's another procedure that has O(N^2) computation complexity: Execution#finishPartitionsAndUpdateConsumers.
Once an execution is finished, it will finish all its BLOCKING partitions and update the partition info to all consumer vertices. The procedure can be illustrated as the following pseudo code:
This procedure has O(N^2) complexity in total.
Based on FLINK-21326, the consumed partitions are grouped if they are connected to the same consumer vertices. Therefore, we can update partition info of the entire ConsumedPartitionGroup in batch, rather than one by one. This will decrease the complexity from O(N^2) to O(N).
Brief change log
Verifying this change
Since this optimization does not change the original logic of Execution#finishPartitionsAndUpdateConsumers, we believe that this change is already covered by existing tests, like ExecutionPartitionLifecycleTest, DefaultExecutionGraphDeploymentTest, and etc.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation