-
Notifications
You must be signed in to change notification settings - Fork 13k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-21450][runtime] Support LocalRecovery by AdaptiveScheduler #21981
Conversation
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not a fan of passing around execution graphs, and would rather see a dedicated structure for our purposes that lives in the AdaptiveScheduler.
This would avoid some edge-cases, like local recovery breaking down unnecessarily when CreatingWithExecutionGraph
failed.
...src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @rkhachatryan, this starts looking really good! I've left a few questions, PTAL
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/JobSchedulingPlan.java
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
Outdated
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...ime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/AllocationsInfo.java
Outdated
Show resolved
Hide resolved
...src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/DefaultSlotAssigner.java
Show resolved
Hide resolved
.../src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateSizeEstimates.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/WaitingForResources.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update @rkhachatryan; this looks great! I've added a few more minor comments, PTAL.
My biggest concern is whether the integration test correctly stresses the AdaptiveScheduler code path.
Please prepare the PR for the merging (fixing the commit history + moving the StateSizeEstimests out of the PR as discussed offline).
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/VertexParallelism.java
Show resolved
Hide resolved
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatedTest.java
Outdated
Show resolved
Hide resolved
...me/src/test/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraphTest.java
Outdated
Show resolved
Hide resolved
...runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
Outdated
Show resolved
Hide resolved
...ntime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotAllocator.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/StateLocalitySlotAssigner.java
Outdated
Show resolved
Hide resolved
flink-tests/src/test/java/org/apache/flink/test/recovery/LocalRecoveryITCase.java
Show resolved
Hide resolved
…match Currently, wrong allocation fails the task causing a restart, which eventually allows to fix the allocation by picking the right TM. This prevents the test from failure and hides the wrong allocation.
Thanks a lot for the thorough review @dmvk! I've cleaned up the commit history and I think all concerns are now resolved, PTAL. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for updating the PR @rkhachatryan. Great stuff!
🎉 💪
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveScheduler.java
Outdated
Show resolved
Hide resolved
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/StateTransitions.java
Outdated
Show resolved
Hide resolved
...in/java/org/apache/flink/runtime/scheduler/adaptive/allocator/JobAllocationsInformation.java
Outdated
Show resolved
Hide resolved
...untime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
Outdated
Show resolved
Hide resolved
…rces AdaptiveScheduler state Previous ExecutionGraph will be used in a subsequent commit to allocate workloads more optimally by taking previous allocations into account.
…uler and SlotAssigner Slot assignments are computed and consumed by SlotAllocator. This is expressed implicitly by extending VertexParallelism. This change tries to make that clear, while still allowing to assign slots to something other than Slot Sharing Groups. It does so by: 1. Introduce JobSchedulingPlan, computed and consumed by SlotAllocator. It couples VertexParallelism with slot assignments 2. Introduce determineParallelismAndCalculateAssignment method in addition to determineParallelism, specifically for assignments 3. Push the polymorphism of state assignments from VertexParallelism into the JobSchedulingPlan (slot assignment target)
What is the purpose of the change
Adjust slot assignment by Adaptive Scheduler
to try to re-use previous allocations
so that TMs can use Local Recovery.
Contributed mostly by @dmvk.
The main defferences from the original contribution:
ExecutionGraph
is passed from the previous state explicitly (currently,WaitingForResources
stage, which triggers the computation, doesn't have the graph)SlotAssigner
, the split into two methods is removed mostly for consistency (two methods mostly duplicated each other). That results in higher asymptotical complexity ofStateLocalitySlotAssigner
(O(mnlog*mnlog)
vsO(mnlog
)Brief change log
Verifying this change
LocalRecoveryITCase
SlotSharingSlotAllocatorTest.testStickyAllocation
StateLocalitySlotAssignerTest
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (yes / no)Documentation