-
Notifications
You must be signed in to change notification settings - Fork 13.8k
[FLINK-31476] AdaptiveScheduler respects minimum parallelism #22883
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
I think this behavior may cause some serious problems if the new resources cannot be acquired. I think the scheduler should wait until the resources are acquired before taking any action to not cause downtime. I am thinking about the case where you simply want to increase the parallelism to a given new (larger) parallelism . In that case you would set the min max to the same new value, and you would expect the scaling to happen only when resources are available. It’s not enough in these cases to simply raise the max because that may lead to incremental multiple restarts as the resources become available which can cause even more downtime. what do you think @zentol ? cc @mxm |
I may have misremembered what your requirements were. It shouldn't be difficult to solve this such that don't cancel the job immediately. |
|
We were thinking of using this in the context of autoscaling or in general scaling jobs up or down to a target parallelism. Theoretically there could be some circumstances where if the job requirements state a minimum that is higher than the current we may want cancel the job (eventually). But from a purely practical perspective, I think most people would rather have the job running with the current (smaller) parallelism than to not have it at all in cases where the resources cannot be acquired. This is a rather common case in cloud environments due to various resource quotas, changing circumstances etc. We could also make this configurable with a timeout to cancel where a negative/infinite timeout would mean that we actually wait forever for the resources without cancelling. |
...k-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismInformation.java
Show resolved
Hide resolved
| private static Map<SlotSharingGroupId, Integer> getMaxParallelismForSlotSharingGroups( | ||
| Iterable<JobInformation.VertexInformation> vertices) { | ||
| return getPerSlotSharingGroups( | ||
| vertices, JobInformation.VertexInformation::getParallelism, Math::max); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't this supposed to be getMaxParallelism here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nope, but the confusion is understandable. Max parallelism is an overloaded term, both referring to the max parallelism a job can ever run (== the number of key groups), and the upper bound parallelism that the job can run at.
Outside of validation purposes the actual max parallelism isn't relevant for scaling.
We should rename things to explicitly refer to lower/upper parallelism bound.
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
gyfora
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think one test case may still be missing if I understand correctly. Otherwise the PR looks good to me!
| newJobResourceRequirements); | ||
| assertThat(scheduler.getState()).isSameAs(originalState); | ||
| }, | ||
| singleThreadMainThreadExecutor)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we may be missing the test case that this actually scales up once the resources are available for the new higher min parallelism. Maybe that could be added here
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
...ain/java/org/apache/flink/runtime/scheduler/adaptive/allocator/SlotSharingSlotAllocator.java
Outdated
Show resolved
Hide resolved
…/adaptive/allocator/SlotSharingSlotAllocator.java
| - vertexInformation.getMinParallelism()), | ||
| (metaInfo1, metaInfo2) -> | ||
| new SlotSharingGroupMetaInfo( | ||
| Math.min(metaInfo1.getMinLowerBound(), metaInfo2.minLowerBound), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi Schepler! May you explain here why it's not Math.max(metaInfo1.getMinLowerBound(), metaInfo2.minLowerBound) but min here?
based on #22795 for ease of testing.
The Adaptive Scheduler now supports a minimum parallelism per vertex.