New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19552][coordination] Consider only available input location preferences for slot profile in pipelined region scheduling #13730
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit d4a0257 (Wed Oct 21 14:20:09 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
I also working on an integration test. I will try the following graph of disjoint pipelines: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for creating this PR @azagrebin. The changes look good to me. +1 for merging.
...ntime/src/main/java/org/apache/flink/runtime/scheduler/OneSlotPerExecutionSlotAllocator.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for opening this PR @azagrebin
I have a few minor comments for it.
The one I want to confirm is whether we can throw an exception when a bulk cannot be correctly created instead of just skipping scheduling the slot request check in e98f87a.
...time/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocator.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/scheduler/DefaultSyncPreferredLocationsRetrieverTest.java
Outdated
Show resolved
Hide resolved
...test/java/org/apache/flink/runtime/scheduler/DefaultSyncPreferredLocationsRetrieverTest.java
Outdated
Show resolved
Hide resolved
.../src/test/java/org/apache/flink/runtime/scheduler/AvailableInputsLocationsRetrieverTest.java
Outdated
Show resolved
Hide resolved
d4a0257
to
2f7b394
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have tested the change again with the failing benchmark job, and the JobManager doesn't exit with a FATAL error anymore.
No unexpected exceptions.
Thanks for the fix!
…cutionSlotAllocatorFactory's
…eferences for slot profile in pipelined region scheduling The pipelined region scheduling strategy schedules regions once all their input blocking dependencies are ready. The SlotSharingGroups of the region can include executions of other regions which are not scheduled yet including their dependencies. Hence we should not wait other unavailable input dependencies to unblock the current region scheduling. The new SlotSharingExecutionSlotAllocator creates the DefaultSyncPreferredLocationsRetriever where the original InputsLocationsRetriever is wrapped with the AvailableInputsLocationsRetriever. It makes the InputsLocationsRetriever return only completed input location futures, others are filtered out. This allows to return completed future from DefaultSyncPreferredLocationsRetriever and make it synchronous and non-blocking. This closes apache#13730.
2f7b394
to
c313c12
Compare
The behavior is still fine with the hotfix removed. |
@flinkbot run azure |
What is the purpose of the change
The pipelined region scheduling strategy schedules regions once all their input blocking dependencies are ready.
The
SlotSharingGroups
of the region can include executions of other regions which are not scheduled yet including their dependencies. Hence we should not wait other unavailable input dependencies to unblock the current region scheduling.Brief change log
The PR moves
PreferredLocationsRetriever
creation fromSchedulerBase
to theExecutionSlotAllocatorFactories
.Then the legacy allocator continues to use the
DefaultPreferredLocationsRetriever
.The new
SlotSharingExecutionSlotAllocator
creates theDefaultSyncPreferredLocationsRetriever
where the original
InputsLocationsRetriever
is wrapped with theAvailableInputsLocationsRetriever
.It makes the
InputsLocationsRetriever
return only completed input location futures, others are filtered out.This allows to return completed future from
DefaultSyncPreferredLocationsRetriever
and make it synchronous and non-blocking.Verifying this change
unit tests