Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Respect task ordering when making worker assignments #4922

Open
wants to merge 7 commits into
base: main
Choose a base branch
from

Conversation

mrocklin
Copy link
Member

In situations where tasks have many related tasks, and few dependencies
among them, we try to co-schedule those tasks onto similar workers
according to their dask.ordering. We do this in hopes that this reduces
the burden of communication on their dependents.

In situations where tasks have many related tasks, and few dependencies
among them, we try to co-schedule those tasks onto similar workers
according to their dask.ordering.  We do this in hopes that this reduces
the burden of communication on their dependents.

# If our group is large with few dependencies
# Then assign sequential tasks to similar workers, even if occupancy isn't ideal
if len(ts._group) > nthreads * 2 and sum(map(len, ts._group._dependencies)) < 5:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Isn't the length of all dependencies of a TG potentially very expensive? The length of a group iterates over all TaskStates in a given group. For some topologies, this would require us to iterate over all tasks (-1), wouldn't it?
  • Is there any way to reason about the numeric values here? I think I'm still lacking intuition for TGs to tell how stable this heuristic is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The length of a group iterates over all TaskStates in a given group

The code looks like this

    def __init__(self, ...):
        self._states = {"memory": 0, "processing": 0, ...}

    def __len__(self):
        return sum(self._states.values())

So it's not as bad as it sounds. However, iterating the dict of a few elements could still be concerning. If so we could always keep a _len value around. It would be cheap to maintain.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to reason about the numeric values here? I think I'm still lacking intuition for TGs to tell how stable this heuristic is.

The 2 is because we want more than one task per worker to be allocated. If there are more or equal workers as tasks then we're unlikely to co-schedule any tasks on similar workers, so this is a moot point.

The < 5 is really saying "we want there to be almost no dependencies for the tasks in this group, but we're going to accept a common case of all tasks depending on some parameter or something like an abstract zarr file". We're looking for cases where the dependency won't significantly affect the distribution of tasks throughout the cluster. This could be len(dependencies) in (0, 1) but we figured we'd allow a couple of these just in case.

I expect that the distribution here will be bi-modal with tasks either in (0, 1) or in the hundreds or thousands. Five seemed like a good separator value in that distribution. I think that, given the distribution, this choice is stable and defensible.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks like this

Right, that's state as in {Running, Memory, Released} and not state as in TaskState and is an aggregated dict. I was already a bit thrown off when I saw that. That's perfectly fine.

I expect that the distribution here will be bi-modal with tasks either in (0, 1) or in the hundreds or thousands.

Thanks for the detailed description. I think I was thrown off by the TaskGroup semantics again. I was thinking about our typical tree reductions where we have usually task splits like 8 or 16. These are the situations where one would want to group all dependencies for the first reduction.
However, for group dependencies this should be a trivial dependency of one, correct?

Then, five is conservative, I agree 👍

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps state should have been called state_counts. Oh well.

Ah, it's not len(ts._group._dependencies) which is what you're describing, I think. It's sum(map(len, ts._group._dependencies)) < 5.

We're counting up all of the dependencies for all of the tasks that are like this task. So in a tree reduction, this number would likely be in the thousands for any non-trivially sized computation. It is non-zero and less than five only in cases like the following:

a1 a2 a3 a4 a5 a6 a7 a8
 \  \  \  \  /  /  /  /
            b

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really we're looking for cases where the number of dependencies, amortized over all similar tasks, is near-zero.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the "ish" in "root-ish" tasks that we sometimes talk about here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps state should have been called state_counts. Oh well.

naming is hard

We're counting up all of the dependencies for all of the tasks that are like this task. So in a tree reduction, this number would likely be in the thousands for any non-trivially sized computation. It is non-zero and less than five only in cases like the following:
Really we're looking for cases where the number of dependencies, amortized over all similar tasks, is near-zero.
This is the "ish" in "root-ish" tasks that we sometimes talk about here.

I think I got it now. That's an interesting approach to gauge the local topology. What I'm currently wondering is if this or a closely related metric (e.g. ratio of group dependents/dependencies) could be used to estimate whether a task has the potential to increase/decrease parallelism. that'd be an interesting metric for work stealing.

anyhow, don't want to increase the scope here. this is a discussion we can delay. I'll let the professionals back to work! thanks!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it could be a useful metric for memory consuming/producing tasks.

It's also, yes, a good metric for increasing parallelism. My experience though is that we are always in a state of abundant parallelism, and that scheduling to increase parallelism is not worth considering in our domain.

Instead we should focus our scheduling decisions to reduce memory use and free intermediate tasks quickly.

This gets test_scheduler.py::test_reschedule to pass
@mrocklin
Copy link
Member Author

The test failure is distributed/tests/test_scheduler.py::test_memory, which I don't understand particularly well. Unfortunately I'm not able to make it fail locally. cc'ing @crusaderky in case he has any quick suggestions on what might be going on, or why allocating tasks differently to machines might affect that test.

@gjoseph92
Copy link
Collaborator

Running this code (uncomment the first bit to generate the zarr array)

import xarray as xr
import dask.array as da
from distributed import Client, LocalCluster
import coiled


if __name__ == "__main__":
    cluster = LocalCluster(
        processes=True, n_workers=4, threads_per_worker=1, memory_limit=0
    )
    client = Client(cluster)

    # Write a zarr array to disk (requires 100GB free disk space!)
    # Comment this out once you've run it once.
    # data = da.zeros((12500000, 1000), chunks=(12500000, 1))
    # ds = xr.Dataset({"data": (("x", "y"), data)})
    # ds.to_zarr("test.zarr")
    # print("Saved zarr")

    # Do the same array-sum example, but from zarr.
    ds_zarr = xr.open_zarr("test.zarr")
    with coiled.performance_report("zarr-4899.html"):
        ds_zarr.sum("y").compute()

causes a lot of transfers using this branch (performance report) compared to #4899 (performance report). I believe this is because, when moving on to a new worker, this is still using the typical candidate-restricting logic—see commit message of 0fbb75e for an explanation.

@mrocklin
Copy link
Member Author

Ah, right. Would this be solved by your trick of including a few workers from the general pool into the mix?

We might also consider applying the root-ish check when we check for dependencies. If there are far fewer dependencies than tasks in this group then we just fall back to the all_workers case.

@gjoseph92
Copy link
Collaborator

Would this be solved by your trick of including a few workers from the general pool into the mix?

Yes, but I think we should not consider dependencies at all when selecting candidates in this case:

# Previous worker is fully assigned, so pick a new worker.
# Since this is a root-like task, we should ignore the placement of its dependencies while selecting workers.
# Every worker is going to end up running this type of task eventually, and any dependencies will have to be
# transferred to all workers, so there's no gain from only considering workers where the dependencies already live.
# Indeed, we _must_ consider all workers, otherwise we would keep picking the same "new" worker(s) every time,
# since there are only N workers to choose from that actually have the dependency (where N <= n_deps).
ignore_deps_while_picking = True

So rather than picking candidates as usual and then adding a few random workers, I think we should only use random workers in this instance. The whole point of the "ish" in root-ish tasks is that it's a case where we've decided dependencies don't matter.

@mrocklin
Copy link
Member Author

So rather than picking candidates as usual and then adding a few random workers, I think we should only use random workers in this instance. The whole point of the "ish" in root-ish tasks is that it's a case where we've decided dependencies don't matter.

There are two proposals that came out of conversation here:

  1. Mix in random workers to the candidates, probably good to do in general
  2. Consider not looking at dependencies at all in some cases. This might be ...
    • A check similar to what we do today for rootish, large number of tasks in the group and small number of total dependencies
    • Something fancier, like looking at the amount of bytes of all dependencies, amortized over all tasks, maybe comparing communication cost to computation cost

They might both make sense

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants