Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factor out and instrument task categorization logic - static graph analysis #6922

Open
fjetter opened this issue Aug 19, 2022 · 4 comments
Open
Labels

Comments

@fjetter
Copy link
Member

fjetter commented Aug 19, 2022

TL;DR I think we should build out our instrumentation around root-ish tasks to improve visibility, UX and enable further research in this space


When deciding on which worker to schedule a task on we're not treating all tasks equally.

Prior and ongoing work in this sector is the effort to enforce co-location and the effort to withhold tasks on the scheduler. Both cases single out a specific class of tasks and implements special scheduling heuristics for them. These issues are introducing something that is called a "root-ish task" which refers to nodes in the graph that are likely exhibiting a fan-out / reduce pattern. The reason why these need special treatment is an artifact of us assigning not yet runnable tasks (i.e. dependents) to workers just-in-time but are assigning ready tasks greedily. This can temporarily bypass the depth-first-search ordering of dask.order which causes suboptimal scheduling and significantly higher cluster-wide memory consuption for many use cases. This behavior is commonly referred to as root task overproduction.

There are commonly two approaches discussed to fix this problem

We are approaching consensus that both solutions would address this problem but they are taking almost orthogonal approaches to scheduling. Both solutions come with benefits, opportunities, shortcomings, costs and risks. The approach of task withholding is currently the most likely short term fix for the situation since it requries comparatively few adjustments to the code base and can be implemented scheduler-side only.

A common theme between both approaches is how we detect "special" tasks and I would like to start a conversation about generalizing the approach taken for root-tasks and discuss how this could be expanded.

How are root tasks detected

Root tasks can be detected with a quadratic runtime algorithm trivially by walking up and down the task graph but this is not feasible to perform for every task.

The current approach instead utilizes TaskGroups to infer the root task property

# Group is larger than cluster with few dependencies?
# Minimize future data transfers.
if (
valid_workers is None
and len(tg) > self.total_nthreads * 2
and len(tg.dependencies) < 5
and sum(map(len, tg.dependencies)) < 5
):

The dynamic component of this if clause (valid_workers is None and len(tg) > self.total_nthreads * 2) is there to protect us from making a bad scheduling decision that would reduce our ability to parallelize.
The static components are really what we use to classify the root tasks len(tg.dependencies) < 2 and sum(map(len, tg.dependencies)) < 5 which is a way of describing that we're dealing with a task that has "few, relatively small reducers".
This classification could be moved to become an actual TaskState property which has a couple of trivial benefits.

  1. We can do it once, e.g. after update_graph or after dask.order. While this is not a costly computation, there is no need to do this at runtime
  2. Given this information is available all the time, we can visualize this information on the dashboard and/or other debugging/instrumentation tooling.
  3. This static information could be broadcast to workers (if there is need for it)
  4. Given that this is a static graph property, this would open an easy alleyway to unittest our task categorization logic, similar to how we nail down dask.order by providing specific examples.
  5. If this system is roughly formalized, task annotations (or a similar/parallel mechanism) could be used to inform scheduling decisions as well (up to building an entire dispatching mechanism)

How can we use this? What's the link to STA?

2.) will be very useful from a UX perspective. Our scheduling is already relatively opaque, if we now start to withhold some tasks because they are special, it would be nice if we could visualize this (e.g. as part of dask.order visualization, graph bokeh dashboard, annotations when hovering on the task stream).

From a developers perspective, I strongly believe that this is equally helpful to talk about less trivial (non-root) root-ish task graphs.

Apart from UX, I consider 2.), 4.) and to a lesser extend also 3.) as valueable components for further research in this space. For instance, are all commonly appearing "root-ish" tasks part of a same category of subtopologies or do they break up into further categories? Are there any topolgies for which we know that we can afford exact solutions[1]?
How valueable would it be to introduce manual annotations that mark certain tasks as special (For instance, data generators, high memory use, reducers, )? I'm sure there is more.

Apart from worker side state machine changes (which I consider quite doable after #5736) one of the big questions remaining in STA is "which tasks should we schedule ahead of time?"
As outlined above, I consider both the root task withholding and STA of dependents as a symmetrical problem when it comes to this decision. I believe further research into these task classifications could inform future decisions for both approaches.

[1] For example, think of trivial map-reduce/tree-reduce patterns. If we can detect that we're operating on these topologies it is almost trivial to implement perfect scheduling. Once we classified a task as root-ish we could further probe/analyze whether we are dealing with a trivial reduction and implement a special scheduling decision for this.

cc @gjoseph92

@gjoseph92
Copy link
Collaborator

Thanks for writing this up. I agree that the way we categorize tasks for queuing and co-location is highly related to how we'd do it for STA.

I'm not sure that the current root task detection logic could be done statically, though. (I think there could probably be a totally different static algorithm, but I don't know what it is yet.) I would actually like to change it to make the currently-static part cluster-size-dependent.

e5175ce is that change I'd like to make. The commit message explains this:

The goal is to identify a specific situation: fan-outs where the group is larger than the cluster, but the dependencies are (much) smaller than the cluster. When this is the case, scheduling near the dependencies is pointless, since you know those workers will get filled up and the dependencies will have to get copied everywhere anyway. So you want to instead schedule in a methodical order which ends up keeping neighbors together.

But the key is really crossing that boundary of cluster size. Hence these changes:

  • total_nthreads * 2 -> total_nthreads: so long as every thread will be saturated by this group, we know every worker will need all its dependencies. The 2x requirement is too restrictive.

  • Switch magic 5 to min(5, len(self.workers)): if you have 3 workers, and your group has 3 dependencies, you actually should try to schedule near those dependencies. Then each worker only needs 1 dependency, instead of copying all 3 dependencies to all 3 workers. If you had 20 workers, duplicating the dependencies would be unavoidable (without leaving most of the workers idle). But here, it is avoidable while maintaining parallelism, so avoid it.

    I'm actually wondering if we should get rid of magic 5 entirely, and just use a cluster-size metric. Like len(self.workers) / 4 or something. If you have 1,000 workers, and a multi-thousand group has 20 dependencies, maybe you do want to copy those 20 dependencies to all workers up front. But if you only had 30 workers, you'd be much better off considering locality.

I think that what the current root-task logic is actually trying to identify is when we're crossing the cluster-size boundary: when we switch from having more threads than tasks, to more tasks than threads.

@gjoseph92
Copy link
Collaborator

I have also thought about this a lot for task queuing, though none of it made it into the current PR. I think there are two major factors we should consider more in scheduling, which would encompass the current root task detection logic, co-assignment logic, widely-shared dependencies, and STA:

1. Amortized cost / return on investment of data transfer

Generally, we want to minimize data transfers between workers. Of course, the trivial way to do this is to schedule all tasks onto one worker—then you'd never have transfers! But that is obviously a bad blanket policy.

So we have to have some way of deciding when it's "worth it" to incur the short-term cost of copying data from one worker to another, because it would open up the opportunity for some long-term gain—namely, by copying the data, we can increase the parallelism of our overall computation.

Right now, we don't ask this question in a formalized way. "Normal-mode" scheduling is actually unwilling to do this at all: if a task has 1 dependency, for example, it will only go to a worker that holds that dependency. (We tried to change this once in #4925.) Currently, work stealing and root-ish task logic are the only ways tasks can be assigned to workers that don't have any of their dependencies. This leads to problems like the dogpile.

One way I've thought about this is to amortize the cost of moving a key over all the tasks that could then run on the new worker if it were moved—basically, how much it increases the opportunity for parallelism: b4ebbee. Lots more discussion of this in #5325 / #5326.

Maybe a better way to think about it would be to try to estimate the return-on-investment of moving a key. Rather than purely looking at transfers as a cost, in some cases, they can be an investment. From that framework, some keys are a bad investment to copy: if a task only has one dependent, and it's on a multithreaded worker, copying it gains you almost no parallelism. But if a task has 100 dependents, duplicating it onto another worker doubles the parallelism of those 100 tasks. You could look at the wall-clock time you'd expect those 100 tasks to take with the current number of threads available to them, divided by the transfer time + the wall-clock time with the increased number of threads. Maybe that's your ROI.

From that perspective, the root-ish task metric (lots of tasks, few dependencies) is actually a crude way of doing this amortization. When 10k load_zarr tasks all depend on one open_zarr key, we're basically amortizing the cost of transferring the open_zarr down to 0. open_zarr gets copied to every worker, but in exchange, we massively increase parallelism—a great investment.

From this perspective, then, maybe root-ish tasks don't actually need to be special-cased? That is, in these big-fan-out cases, or cluster-size-boundary-crossing cases, the handful of dependencies that thousands of tasks share would look like good enough investments to duplicate that we'd naturally consider every worker as a candidate for those root-ish downstream tasks, not just the workers holding the few dependencies.

2. Identifying families of tasks to co-locate

The other major thing the root task scheduling does is try to pick the same worker for tasks whose outputs will be used together. If C takes both A and B as inputs, you want A and B to run on the same worker—that way, you don't have to transfer any data to run C.

This only works right now because we assign all ready tasks greedily, so we happen to iterate through them in priority order. When you switch to scheduler-side queuing, the iteration order changes and you lose this co-assignment.

It turns out that both for both STA and root-task withholding, it would be valuable to have a quick way to identify "families"/groups of sibling/cousin tasks which should ideally all run on the same worker.

I think of a family as a set of tasks that are all inputs to a common downstream task. A corollary is that in most cases, all tasks in a family will have to be in memory on the same worker at the same time. But coming up with an actual metric for this is harder once you consider all-to-all graphs, widely-shared dependencies, linear chains, etc. Still, I think there's probably a pragmatic definition we could come up with, and a way we could compute it with minimal additional cost during update_graph.

This, more than root-ish-ness, is the thing I'd like to explore formalizing and statically identifying in the graph.

  • For STA, this grouping is a good way to identify which tasks can be assigned ahead-of-time, and which can't. If all of a task's inputs are in the same family, then you can preemptively assign the task to whichever worker that family is already using. If some inputs don't belong to any family, we'd disregard them—not having a family would be another way of saying those tasks are low-amortized-cost, or high-ROI, to transfer to a new worker (aka widely-shared dependencies).

    But if inputs are in different families, or none are in any family, then you should wait and see which worker looks best once all inputs are actually in memory.

  • For queuing, families are a way to get co-location. You could schedule whole families to a worker at a time and even allow some oversaturation (since all family members will have to be in memory at the same time anyway). Then, you could queue excess families, not tasks, on the scheduler, and assign them wholesale as threads open up (whether on existing or new workers).

  • In both cases, families are a great way to handle rebalancing as workers join and leave.

  • In general, families could be an interesting higher-level unit of scheduling to consider.


Overall, my hypothesis is that these two special things that root-ish task logic considers would actually be good to think about for all tasks.

Furthermore, those things might actually be the underlying problems in a number of different domains! So if we had the framework to measure these things easily, solutions to a variety of other problems (widely-shared dependencies dogpile, STA, co-assignment for queued tasks, etc.) might also pop out in a generic way.

@fjetter fjetter changed the title Factor out and instrument task categorization logic? Factor out and instrument task categorization logic - static graph analysis Aug 22, 2022
@fjetter
Copy link
Member Author

fjetter commented Aug 22, 2022

Thanks @gjoseph92 for your thorough writeup. I think your ideas do have some merit but are already a couple of steps ahead of what I have in mind.

  1. Amortized cost / return on investment of data transfer

Generally speaking what you are describing is already what work stealing is trying to do. I think this argument is sound but our problem is that, generally, we do not have this kind of information/measurement available at scheduling time.
Work stealing is doing such a cost based assessment (you can make a case that it is the wrong time, the wrong algorithm, etc. but the concept is there already)

IIUC you are trying to make the point of "maybe we do not need static task classification"? I think what you are describing are further dynamic components that can weigh in on our scheduling decision (right now, it's basically mostly occupancy) but in this issue I wanted to discuss static graph analysis (similar to dask.order) we could utilize further

  1. Identifying families of tasks to co-locate

I think this is mostly a static graph property and also something I had in mind as a possible next step.

Just to repeat, I'm not saying we should base scheduling decisions exclusively on static analysis but I think the static analysis is a major component we're not using sufficiently, yet.


Overall, I think we should try to get more out of our dask.order than we are right now. If I take a look at "recent" improvements in this space, visualize already offers a couple of different colorings I haven't really spent any time with, yet. These are based on a couple of internal metrics (e.g. age, freed, memorypressure, memoryincrease) see here
we could try utilizing the same metrics and/or add additional ones in our scheduling decisions. Right now, dask.oder and scheduler TaskState is fairly separated (dask.order has very rich, internal information about the graph) but I think with what we're doing in terms of root-tasks is closely related.

I think we should have a similar mechanism/visualization for root-(ish-)tasks. I think whatever we do here can also be used for further research (e.g. find groups)


@eriknw I think the dask.order stuff was created by you? Maybe you are interested in this space as well

@eriknw
Copy link
Member

eriknw commented Aug 22, 2022

Thanks for the ping!

Yes, lots of things here interest me very much, and yes I've done a bunch of dask.order stuff. I'm on vacation right now, so I'll give a more thoughtful reply later.

One idea I've been playing with for a while (and discussed with some folks at SciPy) is to have another dask.order-like pass that uses the current size of the dask cluster to create an "upper bounds" value for each task. This could detect root-ish tasks more generally and is enough information to allow us to schedule more intelligently in the presence of root-ish tasks. I'll try to explain this better later. This is related to some of the "pebble games".

In regard to detecting high level patterns in the task graph to schedule better, it would be nice to know when all dependent tasks "split" an input into smaller chunks. In this case, scheduling order should prefer to do BFS so we can release the big dependency ASAP. Knowing expected relative sizes of tasks can also improve low-level task fusion.

gjoseph92 added a commit to gjoseph92/distributed that referenced this issue Aug 24, 2022
This overhauls `decide_worker` into separate methods for different
cases.

More importantly, it explicitly turns `transition_waiting_processing`
into the primary dispatch mechanism for ready tasks.

All ready tasks (deps in memory) now always get recommended to
processing, regardless of whether there are any workers in the cluster,
whether the have restrictions, whether they're root-ish, etc.

`transition_waiting_processing` then decides how to handle them
(depending on whether they're root-ish or not), and calls the
appropriate `decide_worker` method to search for a worker.

If a worker isn't available, then it recommends them off to `queued` or
`no-worker` (depending, again, on whether they're root-ish and the
WORKER_SATURATION setting).

This also updates the `no-worker` state to better match `queued`.
Before, `bulk_schedule_after_adding_worker` would send `no-worker` tasks
to `waiting`, which would then send them to `processing`. This was
weird, because in order to be in `no-worker`, they should already be ready
to run (just in need of a worker). So going straight to `processing` makes
more sense than sending a ready task back to waiting.

Finally, this adds a `SchedulerState.is_rootish` helper. Not quite the
static field on a task @fjetter wants in dask#6922, but a step in that
direction.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants