You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Occupancy is a measure of expected work on a Worker given a couple of assumptions. If there was one thread on that worker and if network communication and compute could not overlap, the occupancy would give us the expected time this worker will require to compute all of its keys. It is an estimation of busyness.
Occupancy is currently used for two mechanisms
Scheduler.worker_objective to sort worker from least busy to most busy. Typically we will select the least busy worker for a new task assignment
Work balancing / work stealing. This is a background rebalancing mechanism that reassigns tasks to ensure that occupancy is uniformly distributed on all workers
This measure has currently a couple of flaws
The worker_objective is trying to compute a "earliest start time" for a given key and tries to minimize this. Exclusively considering runtimes is not sufficient since it neglects task priorities. A task that is assigned to a worker with very high occupancy and thousands of tasks queued up would still have the chance to cut the line. In fact, we are relying on this mechanism heavily to reduce memory pressure. Simply looking at expected start time might even act suppressingly to achieve this goal. See Respect priority in earliest start time heuristic #5253 for a deeper discussion
The way occupancy is calculated is intentionally allowing for occupancies to be inaccurate for a short amount of time since the current implementation would not allow a fast / near constant time way of propagating effects of updated timing measurements to the cluster. It also does not account live changes to held tasks in memory, i.e. both compute and network occupacny drift apart. This drift needs to manually be corrected in reevaluate_occupancy. Therefore, drastic changes to number of workers and/or timing measurements require time to be propagated to the entire cluster which until corrected can cause very poor scheduling (and stealing) decisions, e.g. Root-ish tasks all schedule onto one worker #6573 (comment)
Occupancy is subject to errors for unknown tasks. Particularly with 3.) this can cause oscillating behavior with stealing
To address problem 1.) we'd need to rework fundamentally how we schedule. some of the possibilities are discussed in #5253 without a conclusion due to performance concerns.
Problem 2.) and 3.) is mostly about the implementation itself and there are already small increments available on how to improve this (see #7020). I believe we can overcome these limitations entirely (not for free) while paving the way for a solution to 1.) at the same time.
Drop in replacement?
Assuming that the variance of durations within a TaskGroup is not too large, a suitable approximation for CPU occupancy can be computed accurately by counting the number of tasks per group assigned to a worker.
Using the duration_average we can calculate an accurate occupancy in O(TG) instead of O(T) where TG is the number of TaskGroups currently assigned to a worker and T is the number of tasks assigned to a worker. I expect TG to typically be <10 and consider iterating over this collection to be basically constant time.
(Note: This entire argument can be made for TaskPrefixes as well if performance doesn't play out but I believe we should start measureing duration_average on TG level. That's a bit off-topic)
Pseudo code
fromdistributed.schedulerimportTaskState, WorkerState, TaskGroupclassWorkerState:
...
# Reference to scheduler task_groupstask_groups: dict[str, int]
needs_what: dict[TaskState, int]
# Replace all scheduler updates of ws.processing with this.# Occupancy arg is merely there for drop in replacementdefadd_to_processing(self, ts: TaskState) ->None:
self.task_groups[ts.group.name] +=1self.processing[ts] =Nonefordtsints.dependencies:
ifselfnotindts.who_has:
self.need_replica(dts)
# This will effectively replace Scheduler.get_comm_cost# Comm cost is always called in # - move_task_request# - _set_duration_estimate# - add_to_processing# - _reevaluate_occupancy_workerdefneed_replica(self, ts: TaskState) ->None:
iftsnotinself.needs_what:
self.needs_what[ts] =1self._network_occ+=ts.get_nbytes()
else:
self.needs_what[ts] +=1defdont_need_replica(self, ts: TaskState) ->None:
iftsinself.needs_what:
self.needs_what[ts] -=1ifnotself.needs_what[ts]:
delself.needs_what[ts]
self._network_occ-=ts.get_nbytes()
defremove_from_processing(self, ts: TaskState) ->None:
self.task_groups[ts.group.name] -=1delself.processing[ts]
# TODO Loop over the smaller set of the two, see Scheduler.get_comm_costfordtsints.dependencies:
ifdtsinself.needs_what:
self.dont_need_replica(dts)
@propertydefoccupancy(self):
returnsum(
task_groups[group_name].prefix.duration_average*countforgroup_name, countinself.task_groups.items()
) +self._network_occclassScheduler:
...
task_groups: dict[str, TaskGroup]
defadd_replica(self, ts: TaskState, ws: WorkerState):
ws.dont_need_replica(ts)
def_set_duration_estimate(self, ts: TaskState, ws: WorkerState) ->None:
# TODO: duration while not completed is not measured by the task groups. # I'm sure we can find a way to accomodate for this shortcomingold=ws.occupancyws.add_to_processing(ts)
self.total_occupancy+=ws.occupancy-old
This pseudo code would effectively allow us to
Accurately track what dependencies a worker needs to fetch. This alone could be a valuable piece of information for scheduling decisions
Calculate an accurate occupancy by iterating only over task groups instead of tasks
Differentiate properly network vs cpu cost and do not double count dependencies
Inspect what task groups are already scheduled on that worker
I believe this implementation would server as a drop-in replacement to our existing logic but would allow us to remove Scheduler.reevaluate_occupancy and therefore not allow occupancy calculations to drift. The cost we'd pay for this is twofold
We'd always loop over all task dependencies if we assign a Task to a Worker. However, this is already happening in Scheduler._set_duration_estimate to calculate the comm cost so this will not add any computational complexity but rather shift it
We would also need to iterate over all dependencies when removing a task from a worker. This is also already a requirement, at least for tasks finishing successfully (see scheduler._add_to_memory) to compute follow up transitions.
The biggest hit is the occupancy itself since it would be computed on-demand. The most performance critical section is worker_objective which already isn't constant time since it is looping over a tasks dependencies to compute transfer cost. Is the number of task groups on a worker significantly larger than the number of dependencies of a task? At the same time, even if this is larger, I would be surprised if the removal of Scheduler.reevaluate_occupancy would amortize this additional cost.
If we are willing to allow some drift in occupancy we could also cache the workers occupancies. Cache would be invalidated whenever any taskgroup measurement would significantly change or smth else.
worker_objective also includes a loop over the dependencies which could leverage the newly introduced WorkerState.needs_what
Further progress towards problem 1.)
With this implementation we could further and more simply investigate scheduling based on task group / prefixes and would allow us to pivot towards a "uniform number of tasks per group" scheduling approach that no longer accounts for durations at all, removing the need for unknown_durations. For instance, if we could define an ordering of task groups we could infer if a task can cut the line and make progress towards 1.)
Context
Occupancy is a measure of expected work on a Worker given a couple of assumptions. If there was one thread on that worker and if network communication and compute could not overlap, the occupancy would give us the expected time this worker will require to compute all of its keys. It is an estimation of busyness.
Occupancy is currently used for two mechanisms
Scheduler.worker_objectiveto sort worker from least busy to most busy. Typically we will select the least busy worker for a new task assignmentThis measure has currently a couple of flaws
worker_objectiveis trying to compute a "earliest start time" for a given key and tries to minimize this. Exclusively considering runtimes is not sufficient since it neglects task priorities. A task that is assigned to a worker with very high occupancy and thousands of tasks queued up would still have the chance to cut the line. In fact, we are relying on this mechanism heavily to reduce memory pressure. Simply looking at expected start time might even act suppressingly to achieve this goal. See Respect priority in earliest start time heuristic #5253 for a deeper discussionreevaluate_occupancy. Therefore, drastic changes to number of workers and/or timing measurements require time to be propagated to the entire cluster which until corrected can cause very poor scheduling (and stealing) decisions, e.g. Root-ish tasks all schedule onto one worker #6573 (comment)To address problem 1.) we'd need to rework fundamentally how we schedule. some of the possibilities are discussed in #5253 without a conclusion due to performance concerns.
Problem 2.) and 3.) is mostly about the implementation itself and there are already small increments available on how to improve this (see #7020). I believe we can overcome these limitations entirely (not for free) while paving the way for a solution to 1.) at the same time.
Drop in replacement?
Assuming that the variance of durations within a
TaskGroupis not too large, a suitable approximation for CPU occupancy can be computed accurately by counting the number of tasks per group assigned to a worker.Using the duration_average we can calculate an accurate occupancy in
O(TG)instead ofO(T)whereTGis the number ofTaskGroups currently assigned to a worker andTis the number of tasks assigned to a worker. I expectTGto typically be <10 and consider iterating over this collection to be basically constant time.(Note: This entire argument can be made for
TaskPrefixes as well if performance doesn't play out but I believe we should start measureing duration_average on TG level. That's a bit off-topic)Pseudo code
This pseudo code would effectively allow us to
I believe this implementation would server as a drop-in replacement to our existing logic but would allow us to remove
Scheduler.reevaluate_occupancyand therefore not allow occupancy calculations to drift. The cost we'd pay for this is twofoldTaskto aWorker. However, this is already happening inScheduler._set_duration_estimateto calculate the comm cost so this will not add any computational complexity but rather shift itscheduler._add_to_memory) to compute follow up transitions.worker_objectivewhich already isn't constant time since it is looping over a tasks dependencies to compute transfer cost. Is the number of task groups on a worker significantly larger than the number of dependencies of a task? At the same time, even if this is larger, I would be surprised if the removal ofScheduler.reevaluate_occupancywould amortize this additional cost.worker_objectivealso includes a loop over the dependencies which could leverage the newly introducedWorkerState.needs_whatFurther progress towards problem 1.)
With this implementation we could further and more simply investigate scheduling based on task group / prefixes and would allow us to pivot towards a "uniform number of tasks per group" scheduling approach that no longer accounts for durations at all, removing the need for unknown_durations. For instance, if we could define an ordering of task groups we could infer if a task can cut the line and make progress towards 1.)
cc @hendrikmakait @gjoseph92 @wence- @crusaderky