Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Co-assign neighboring tasks to neighboring workers #4892

Closed
mrocklin opened this issue Jun 8, 2021 · 19 comments · Fixed by #4967
Closed

Co-assign neighboring tasks to neighboring workers #4892

mrocklin opened this issue Jun 8, 2021 · 19 comments · Fixed by #4967

Comments

@mrocklin
Copy link
Member

mrocklin commented Jun 8, 2021

Edit: This proposal is now defunct(ish). Read below for updated proposal.

Sometimes we assign tasks that are closely related to different machines, resulting in unnecessary communication. The most simple case is when we have a couple of siblings

   c
  / \
 /   \
a     b

Because neither a nor b have dependencies, they are assigned somewhat randomly onto workers. However because c requires them both one or the other will have to move across the network. This can cause excess communication which can bog down the system.

This has come up several times, notably in #2602 and ocean-transport/coiled_collaboration#3 and #4864 . There are a few different things that we can do to solve these issues. This issue contains one part of a solution, assigning sibling tasks to the same worker.

Straightforward (but slow) solution

@fjetter provided a solution here #4864 (comment)

which involves iterating up through all dependents of a task, and then back down through all of those tasks dependencies, in order to find similar machines. This doesn't work in the general case (we have to navigate through k-squared links, where k is the number of dependencies/dependents) but it might be ok in a restricted case.

Restricted case

In particular we seem to be interested in the following conditions:

  1. The task has no dependencies (otherwise we would schedule wherever those are)
  2. The task has very few dependents, possibly only one
  3. That dependent has very few dependencies, maybe less than five? (ten?)
  4. There are many more tasks than workers, so that we're comfortable losing some concurrency here
  5. The task we're thinking about is sizeable, and not something that we would want to move, relative to it's computation time

I suspect that there is an order by which we could check these conditions that is fast in all of the cases where it matters. Additionally, if we have several dependencies to check then we might consider just accepting the location of the very first dependency that is already assigned, and skip checking the rest.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 8, 2021

I hit send on this too early. I've updated the original comment.

@TomAugspurger
Copy link
Member

A (failed) attempt at something like this in #2940. I didn't post it, but I vaguely recall that approach really slowing down the scheduling.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 8, 2021

Yeah, the implementation you had hit the full N^2 case. See https://github.com/dask/distributed/pull/2940/files#r311737887

I'm hopeful that if we are able to restrict things suitably that we can avoid significant slowdown

@mrocklin
Copy link
Member Author

mrocklin commented Jun 8, 2021

Conversely though, the question of the approach in this issue is whether or not this set of constraints will let us actually solve important problems. It could be that although these constraints make things fast, they exclude important cases.

For example, this wouldn't work with a rechunk situation (although, to be fair, that graph is fully connected in just two layers, and so not a case where we would want to schedule on the same node as our sibling).

@mrocklin
Copy link
Member Author

mrocklin commented Jun 8, 2021

OK, @gjoseph92 and I were talking about this for a while. I'll try to summarize some of the thoughts that went around there.

First, Gabe brought up the same concern I brought up before with Tom around checking for siblings, namely uncles/aunts, cousins, second cousins twice removed, and so on. Graphs get messy and so restrictive rules are less fun.

We then went on a journey thinking about switching decide_worker behavior in the case where we are assigning many tasks (where many is defined relative to the number of workers/cores that we have). If we're in this situation, and if we assume that tasks are being cycled through in the order determined by dask.order (which I think should be true?) then we might do better by adding a bit of inertia to our choice of which worker to decide. This would be a change in our policy:

  • Today: we choose the worker with least occupancy. This results in tasks that are neighbors in terms of dask.order being very likely to be on different machines. This is good if the graph is generic/asymmetric, because the task ordered 0 and the task ordered 1 are the two most important tasks to start, and so they should both run as soon as possible
  • Proposed: we try to allocate batches of neighboring tasks (neighboring in dask.order space) to similar workers. For example if we have 1000 tasks and 10 workers then we might allocate tasks ordered 0..99 to worker 1 and tasks ordered 100..199 to worker 2. This is good if the graph is highly symmetric, such as is the case with I/O tasks that show up in update_graph sometimes. This is good because task 0 and task 1 are very likely to have shared dependents (directly or transitively (grandparents))

To be clear, the proposed behavior, of bulk/batch assignment is bad in many cases, but good in some others. How do we identify the situation?

First we thought about identifying this in update_graph, by looking for a case where we had many tasks with no dependencies. Unfortunately, this breaks in situations like from_zarr where there is a single dependency claimed by thousands of dependents. It's also somewhat odd to change a policy in update_graph.

Where we ended up

So we still want to support a batch/bulk task assignment policy (where neighboring tasks are assigned to the same worker, rather than spread around), and we think that we can identify this situation deep within decide_worker by comparing the size of the task state's group with the number of threads we have going and checking that the number of dependencies of that group is quite small.

If those hold then we want to glom together neighboring tasks rather than spread them apart.

One way to achieve that glomming together would be for each TaskGroup to keep track of the most recent worker to which state of that group was assigned. Then, in decide_worker we would figure out whatever worker we would normally choose, and then if it's not much better than the previously assigned worker we would use the previously assigned worker.

ws = decide_worker(ts, ...)
if not_much_better(ws, ts.group._last_assigned_worker):
    ws = ts.group._last_assigned_worker
else:
    ts.group._last_assigned_worker = ws

The definition of "much better" here would have to depend on the ratio of tasks in that group to cores in the cluster. This ensures that we still respect general load allocation (if a worker is busy for other reasons then it won't be used here) while still providing some inertia that we only shift workers somewhat slowly.

cc @fjetter

@mrocklin mrocklin changed the title Co-assign sibling tasks Co-assign neighboring tasks to neighboring workers Jun 8, 2021
@fjetter
Copy link
Member

fjetter commented Jun 9, 2021

I don't mind the batched assignment and think this is in general a good idea as long as it requires minimal state and clean APIs. I wouldn't want to have something like Worker.gather_dep / Worker.select_keys_for_gather again. imho, that feels overly complicated. If we can pull this off with minimal state and clean APIs, that's great. However, our entire state machine is currently working using individual task transitions which is why batched assignments do not fit nicely to the concept. The _last_assigned_worker might be just enough to pull this off, idk.

To make this somewhat stateless, one could structure the batch assignment more explicitly, similar to the worker for execution/fetch but a bit more explicit. We'd introduce a new state Assigning for tasks and the transition path is then waiting->Assigning->Processing where the two transition would not do much. However, this intermediare state would give us a transition barrier to accumulate tasks and would mark a natural transition between single-task state machine and bulk processing.

def transition_waiting_assigned(self, ts: TaskState):
    self.to_be_assigned.add(ts)
    ts.state = "assigning"

def transition_assigned_processing(self, ts: TaskState, ws: WorkerState):
    ts.processing_on = ws
    submit_task_to_worker(ts, ws)
    ...

def periodic_callback_assign_worker(self):
    assignments = decide_worker(self.to_be_assigned, self.workers.values())
    for ts, ws in assignments:
        transition_assigned_processing(ts, ws)

def decide_worker(tasks: Set[TaskState], workers: Set[WorkerState]) -> List[Tuple[TaskState, WorkerState]]:
    # This function performs the actual optimization problem given the above input and this input alone (please do not mutate anything here ;)
    # We do not keep any additional state but only optimize on the provided input information
    # Input, e.g. allowed workers could be further reduced by applying filters up front (based on occupancy / take n smallest, allowed resources, etc.)
    # Optimization could be task group based, account for occupancy, apply some generic weighted 
    # bin packing or just magic but would not require additional state, e.g. `_last_assigned_worker` would be local variable for this decision

I'm sure we can find something where we can leverage synergies if we're optimizing a bunch of tasks at the same time. I would argue this can be done in O(#Tasks * #Dependents) which would be sufficient, wouldn't it?

we might also want to peak towards our most recent rebalancing algorithm which, I believe, is a related problem, algorithmically speaking

Why I believe we can pull this off without holding on to state is because I think most of the assignments where this matters, or at least the assignments where I suspect the biggest impact, happen during the same stimulus.


Taking a step back, this is somehow what I would envision to be solved by the speculative task assignment. if we had already assigned the dependencies, the decision should become simpler. I'm wondering if we should try to investigate this approach since it is also not straight forward how this speculative assignment would work. fwiw, we could implement the speculative assignment on scheduler side without actually implementing it on the worker. for this use case it would still give us benefits and would allow us to implement this incrementally, letting us deal with stealing and worker states at a later point in time.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

I don't think that code complexity will be a big issue here. I think that we can accomplish this by storing the most recently assigned worker, and making a ten-line change to decide worker

def decide_worker(...):
    ws = ...  # current best guess for worker

    # Group is large relative to cluster size and there are few dependencies
    if len(ts._group) > self._total_nthreads and sum(map(len, ts._group.dependencies)) < 5:
        old = ts._group._last_worker  # what we chose last time
        ratio = math.ceil(len(ts._group) / self._total_nthreads)  # how many tasks we expect per worker

        # Can we still fit a few more of these tasks on the old worker?
        if ts._prefix._duration_average + old.occupancy < ws.occupancy + ratio * ts._prefix._duration_average:
            ws = old
        else: 
            ts._group._last_worker = ws
    ...

That's probably optimistic, but I think that something similar might work

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

@gjoseph92 my understanding is that you're trying this path out. Is that correct?

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

It's also worth noting that all of this is built on the assumption that we're iterating over tasks in a way that respects dask.order. It might be worth verifying that assumption at some point. (or not, if this is very easy to test)

@gjoseph92
Copy link
Collaborator

@mrocklin yes, I'm working on this.

I think that a clever metric for when to switch to a new worker (like ts._prefix._duration_average + old.occupancy < ws.occupancy + ratio * ts._prefix._duration_average in your example) might not be better than just counting how many tasks have been assigned to last_worker, and when that's exceeded our ratio, finding a new worker. If we trust that our initial checks prove we're in a situation with lots of root-like tasks which saturate the cluster (and that we're indeed being called in priority order), then occupancy doesn't really matter—we're going to have to schedule all the tasks anyway, and the workers will all be over-saturated with tasks. Especially we're still learning _duration_average (say, on the second task submission), trying to estimate occupancy seems likely to do something we don't expect.

all of this is built on the assumption that we're iterating over tasks in a way that respects dask.order

The main way I think this will fail is if the tasks aren't strictly root tasks, but have a few small dependencies (like zarr arrays). If not all tasks require all those dependencies, then decide_workers will be called in whatever order those dependencies finish, which may not follow priority.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

I think that a clever metric for when to switch to a new worker (like ts._prefix._duration_average + old.occupancy < ws.occupancy + ratio * ts._prefix._duration_average in your example) might not be better than just counting how many tasks have been assigned to last_worker, and when that's exceeded our ratio, finding a new worker

I don't disagree. However then we would need to keep track of how many tasks we've assigned to each worker. We're already doing this implicitly with occupancy, so I'm reusing that counter by multiplying by the duration_average (which is what gets added to occupancy each time we add a task).

I think that what I wrote down achieves the result that you want, but tracks less new things.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

Regardless, I think that the thing to do now is to try this out and see how it performs. I'm hopeful that this is a small enough change that running this experiment takes a small number of hours.

@gjoseph92
Copy link
Collaborator

I think that what I wrote down achieves the result that you want, but tracks less new things.

Unfortunately ts._prefix._duration_average is -1 for all the root sum tasks in the example from #4864. I'd expect this to be the case in general, since we should be scheduling root tasks much faster than they complete—when we're scheduling the second root task moments after scheduling the first, we wouldn't have any timing information about that task prefix yet.

@gjoseph92
Copy link
Collaborator

Here are initial results from #4899. I'm using a _last_worker_tasks_left field on the TaskGroup to keep track of when to switch to a new worker for the reason above.

Performance report: https://cloud.coiled.io/gjoseph92/reports/16. The task stream looks great; no transfers just like in #4864.

I'm confused why worker memory usage increases like this, as though tasks aren't being released from memory:

Screen Shot 2021-06-09 at 2 41 53 PM

The dashboard indicates tasks are being released as usual though:

Screen Shot 2021-06-09 at 3 12 38 PM

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

Those look like excellent initial results? :)

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

I'm curious if any of the Pangeo workloads benefit from this as well . Is it too early to ping Julius?

@gjoseph92
Copy link
Collaborator

I think it is. I'll try running some Pangeo cases once this is further along, but I think this monotonically-increasing-memory issue is important to track down first.

@mrocklin
Copy link
Member Author

mrocklin commented Jun 9, 2021

I'm confused why worker memory usage increases like this, as though tasks aren't being released from memory:

This feels like an orthogonal problem to me. We can see that tasks are being assigned to workers well. Extra memory sticking around seems to be a lower level issue to me.

@gjoseph92
Copy link
Collaborator

Using jemalloc on macOS (with DYLD_INSERT_LIBRARIES=$(brew --prefix jemalloc)/lib/libjemalloc.dylib mprof run --multiprocess test.py) gives this—so yup, it was the same malloc issue as usual:

jemalloc

mrocklin pushed a commit that referenced this issue Jun 30, 2021
In `decide_worker`, rather than spreading out root tasks as much as possible, schedule consecutive (by priority order) root(ish) tasks on the same worker. This ensures the dependencies of a reduction start out on the same worker, reducing future data transfer.

Closes #4892

Closes #2602
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants