New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy #11770
[FLINK-17014][runtime] Implement PipelinedRegionSchedulingStrategy #11770
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 2577ed1 (Thu Apr 16 08:55:57 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The PipelinedRegionSchedulingStrategy
implementation looks neat!
...main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyUtils.java
Outdated
Show resolved
Hide resolved
...main/java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategy.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/flink/runtime/scheduler/strategy/PipelinedRegionSchedulingStrategyTest.java
Outdated
Show resolved
Hide resolved
2577ed1
to
395043e
Compare
395043e
to
d8568c4
Compare
|
||
private TestingSchedulerOperations testingSchedulerOperation; | ||
|
||
private int parallelism = 2; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider making this static final or convert to a local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok.
|
||
@Override | ||
public Iterable<TestingSchedulingExecutionVertex> getVertices() { | ||
return regionVertices.values(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider wrapping it in Collections.unmodifiableCollection()
for immutability or returning a copy. Same in getConsumedResults()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Iterable is considered as unmodifiable so I think it's not very needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that's true, see Iterator#remove()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. Will wrap it with Collections.unmodifiableCollection()
.
@@ -78,6 +83,31 @@ public TestingSchedulingResultPartition getResultPartition(final IntermediateRes | |||
return resultPartition; | |||
} | |||
|
|||
@Override | |||
public Iterable<SchedulingPipelinedRegion> getAllPipelinedRegions() { | |||
return vertexRegions.values().stream().collect(Collectors.toSet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that's the same as new HashSet<>(vertexRegions.values())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Let's simplify it.
return vertexRegions.get(vertexId); | ||
} | ||
|
||
void generatePipelinedRegions() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider invoking this method lazily in getAllPipelinedRegions()
and getPipelinedRegionOfVertex()
instead of relying the client (test) to invoke it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that would be better.
The only problem is that the regions cannot be refreshed if the topology is changed after invoking getAllPipelinedRegions()
. So I will clear the cached regions on any topology changes.
d8568c4
to
17d9f5e
Compare
17d9f5e
to
a47976d
Compare
Will merge it once travis gives green. |
What is the purpose of the change
Implement PipelinedRegionSchedulingStrategy which schedules tasks in granularity of pipelined regions. More details see https://cwiki.apache.org/confluence/display/FLINK/FLIP-119+Pipelined+Region+Scheduling.
Brief change log
Verifying this change
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation