New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-21326][runtime] Optimize building topology when initializing ExecutionGraph #14868
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 85af2bb (Thu Feb 04 09:35:47 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
85af2bb
to
997d4dd
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @Thesharing
The change generally looks good to me. I have a few minor comments.
...untime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/DistributionPattern.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
Outdated
Show resolved
Hide resolved
997d4dd
to
62df715
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing and providing these great suggestions. I've resolved them in the fix-up commits.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/DistributionPattern.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
Outdated
Show resolved
Hide resolved
e19dc30
to
caf86ff
Compare
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/PointwisePatternTest.java
Outdated
Show resolved
Hide resolved
cad67a0
to
cb77b97
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for addressing all the comments @Thesharing
The change looks good to me.
@tillrohrmann do you want to take another look?
I'll try to give it a pass until Monday. If I didn't manage to do it, then go ahead with merging it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @Thesharing. The changes go in a good direction. I had a couple of comments. Please take a look.
// sanity check | ||
checkState(consumedPartitions.size() == inputNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this mean that we have to add the consumed partitions in increasing order? If this is the contract, then we might wanna add a JavaDoc explaining this more explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively we could change the API so that one needs to add all Collection<ConsumedPartitionGroup>
when adding an ExecutionVertexID
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this order is redundant, there is no limitation about order before. I prefer to remove inputNumber
from the parameters, since currently in EdgeManagerBuildUtils
ConsumedPartitionGroup is added one-by-one per JobEdge.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
...untime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
/** Utilities for building {@link EdgeManager}. */ | ||
public class EdgeManagerBuildUtil { | ||
|
||
public static void connectVertexToResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we have some tests for this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Like EdgeManager
, we think since we didn't change the original logic, it's covered by The all-to-all edges are tested by ExecutionGraphConstructionTest
. The pointwise edges are tested by PointwisePatternTest
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, could you add a comment to the the two test classes that they effectively test EdgeManagerBuildUtil
now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Show resolved
Hide resolved
public List<IntermediateResultPartitionID> getResultPartitions() { | ||
return Collections.unmodifiableList(resultPartitions); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could hide the implementation detail by letting ConsumedPartitionGroup
implement the methods we need it to have to directly work with it. For example if it implements size()
and Iterable
, then it should already go a far way. Maybe we also need get(int index)
. The same applies to the ConsumerVertexGroup
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agreed. This will make the call of ConsumedPartitionGroup
more simplified. After discussing with @zhuzhurk, we decided to have the following methods:
iterator()
size()
getFirst()
(to replaceget(0)
)isEmpty()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for these patient and enlightening suggestions, @tillrohrmann. I've made several changes according to them. Would you mind re-reviewing it again when you got free time?
...ntime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Show resolved
Hide resolved
// sanity check | ||
checkState(consumedPartitions.size() == inputNumber); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this order is redundant, there is no limitation about order before. I prefer to remove inputNumber
from the parameters, since currently in EdgeManagerBuildUtils
ConsumedPartitionGroup is added one-by-one per JobEdge.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
public List<IntermediateResultPartitionID> getResultPartitions() { | ||
return Collections.unmodifiableList(resultPartitions); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agreed. This will make the call of ConsumedPartitionGroup
more simplified. After discussing with @zhuzhurk, we decided to have the following methods:
iterator()
size()
getFirst()
(to replaceget(0)
)isEmpty()
...untime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
Outdated
Show resolved
Hide resolved
* based on the {@link DistributionPattern}. The connection information is stored in the {@link | ||
* EdgeManager}. | ||
*/ | ||
public static void connectVertexToResult( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems it can be package private.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved.
if (parallelism % numSources == 0) { | ||
// same number of targets per source | ||
int factor = parallelism / numSources; | ||
sourcePartition = subTaskIndex / factor; | ||
} else { | ||
// different number of targets per source | ||
float factor = ((float) parallelism) / numSources; | ||
sourcePartition = (int) (subTaskIndex / factor); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the previous code which handles case XXX % YYY == 0
is an unnecessarily complication and we can simplify it a bit. The result should be the same and PointwiseTest#testPointwiseConnectionSequence
is added to ensure this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @Thesharing. I had a few more comments. Please take a look.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
this.partitionId = new IntermediateResultPartitionID(totalResult.getId(), partitionNumber); | ||
|
||
producer.getExecutionGraph().registerResultPartition(partitionId, this); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a huge fan of coupling components by these kind of constructs. Couldn't we register the IntermediateResultPartition
where it is created (e.g. in the ExecutionVertex
)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it seems coupling too much. I think it's better to register all the ExecutionVertices
and IntermediateResultParititons
after all of them are created.
Now we register them in ExecutionGraph#registerExecutionVerticesAndResultPartitions
, and this method is called in ExecutionGraph#attachJobGraph
, right after creating all the ExecutionJobVertices
.
private EdgeManager getEdgeManager() { | ||
return producer.getExecutionGraph().getEdgeManager(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this shows that we are overly coupling the IntermediateResultPartition
with the ExecutionGraph
. Couldn't we give an EdgeManager
to the IntermediateResultPartition
when we create it? This makes the dependency explicit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved. I'm wondering should we also give EdgeManager
to the constructor of ExecutionVertex
? I'm not sure about it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Theoretically yes. But the ExecutionVertex
is already coupled quite tightly to the ExecutionGraph
. Hence, it might not make a big difference to not pass it in.
...untime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for providing these great suggestions, @tillrohrmann. They really help me improve this pull request. Would you mind re-reviewing it if you got free time? Thank you in advance.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManager.java
Outdated
Show resolved
Hide resolved
private EdgeManager getEdgeManager() { | ||
return producer.getExecutionGraph().getEdgeManager(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resolved. I'm wondering should we also give EdgeManager
to the constructor of ExecutionVertex
? I'm not sure about it.
...untime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumerVertexGroup.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
Outdated
Show resolved
Hide resolved
782c2e6
to
3ed3f44
Compare
9b89841
to
1a9fa79
Compare
I've rebased on latest master branch, due to changes introduced in FLINK-21347. |
I'll try to give it another pass today. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating this PR @Thesharing. It looks really nice now. Well done! I had a few very minor comments. I will address them myself while merging this PR.
private void registerExecutionVerticesAndResultPartitions( | ||
List<ExecutionJobVertex> executionJobVertices) { | ||
for (ExecutionJobVertex executionJobVertex : executionJobVertices) { | ||
for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) { | ||
executionVerticesById.put(executionVertex.getID(), executionVertex); | ||
resultPartitionsById.putAll(executionVertex.getProducedPartitions()); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice, this is a very good solution :-)
// sanity check | ||
checkState( | ||
consumers.isEmpty(), "Currently there has to be exactly one consumer in real jobs"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This checkState
and the one above seem to be testing the same thing. I would keep only one. Ideally one with an explanation message.
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/EdgeManagerBuildUtil.java
Show resolved
Hide resolved
...untime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ConsumedPartitionGroup.java
Show resolved
Hide resolved
|
||
@Override | ||
public EdgeManager getEdgeManager() { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's implement this method with UnsupportedOperationException
|
||
@Override | ||
public ExecutionVertex getExecutionVertexOrThrow(ExecutionVertexID id) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's implement this method with UnsupportedOperationException
@Override | ||
public IntermediateResultPartition getResultPartitionOrThrow( | ||
IntermediateResultPartitionID id) { | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's implement this method with UnsupportedOperationException
…mediateResultPartitionID
… POINTWISE edges follows the initial logic
1a9fa79
to
91c2d95
Compare
Thank you, @tillrohrmann and @zhuzhurk! I believe I've learnt a lot from your suggestions. I'll start to prepare the next pull request. Thank you for these enlightening reviews! |
What is the purpose of the change
This PR introduces the optimization of building topology when initializing ExecutionGraph.
The main idea is to put all the vertices that consumed the same result partitions into one group, and put all the result partitions that have the same consumer vertices into one consumer group.
The complexity of building topology in ExecutionGraph decreases from O(N^2) to O(N).
For more details please check FLINK-21326.
Brief change log
Verifying this change
Since these optimizations do not change the original logic of building topology in ExecutionGraph, we believe that this change is already covered by existing tests, such as ExecutionGraphConstructionTest, ExecutionGraphRescalingTest, PointwisePatternTest, and etc.
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation