Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Group tasks into partitions which all execute on the same worker #1559

Closed
wants to merge 1 commit into from

Conversation

sjperkins
Copy link
Member

This is a very basic implementation of an idea that I think would be useful. I'm creating the PR to create discussion and decide whether it would be useful to take it further.

Worker restrictions on Client.get, Client.compute, Client.persist etc. are hard physical restrictions.

It would be useful to allow some flexibility and instead group tasks together into a partition which all execute on the same worker. This would give the scheduler flexibility when choosing which worker to schedule the first task in the partition. Once the first task is scheduled, all tasks within that partition would then have to execute on that worker, ensuring data locality.

In the following pseudo-code for e.g. A, B and C each have four chunks of size 250.

A = dask.array<ones, shape=(1000,), dtype=float64, chunks=(250,250,250,250)>
B = dask.array<ones, shape=(1000,), dtype=float64, chunks=(250,250,250,250)>
C = dask.array<ones, shape=(1000,), dtype=float64, chunks=(250,250,250,250)>

D = A + B + C

When submitting the task graph representing D, it would be useful to assign the first chunk in each array to a partition, the second to another and so on, to ensure data locality.

At present, this is specified as another restriction in Client.get

f = client.get({'x' :1, 'y':2}, ['x', 'y], partitions={'x':'a', 'y':'a'})

I would like to fold the partitions functionality into worker_restrictions but the worker/hostname handling is sticky and I didn't want to mess with it too much. Something like workers=['Alice', 'tcp://123.456.789.10.11.12:8080/', 'parta'], where parta is treated as a partition if no matching worker name or host name is matched.

I could also see this being renamed to task_group for e.g.

I'd welcome your input on this -- I'd imagine integrating this with the rest of the scheduling functionality would be tricky.

All tasks within the same partition are executed on the same worker.
@mrocklin
Copy link
Member

My first impressions after reading your description, but not yet your implemenation, are that I'm curious if there are other ways to acheive this same behavior. What is your final objective here? Do you need these tasks to be executed on the same machines or are you just looking to mostly-align datasets for more efficient computation in the future.

For example, another possible solution to this would be for the task scheduler to do a little bit of look-ahead when assigning tasks to workers and try to see which tasks might be related in the future. This isn't necessarily easier than the approach that you have laid out here, but does demonstrate that there might be other solutions worth investigating.

@mrocklin
Copy link
Member

This problem might also be resolvable by using diamond fusion, the ave_width= keyword in fuse

Something like

with dask.set_options(fuse_ave_width=5):
    ...

This would lump the related tasks in A, B, and C into single tasks and just compute D directly. You would lose parallelism, and wouldn't have access to A, B, and C afterwards, but it would probably work well today.

@sjperkins
Copy link
Member Author

@mrocklin Thanks for nudging me towards the optimisation features. You're right, I can probably get 90% of the way by tending the graph. I'd glanced over them at some point, but never in depth.

I still think there's some intersection between what align in #1002 and task partitioning could achieve. I think one of the pros of partioning is that the alignment/grouping is specified at graph/task submission, rather than as a secondary step.

@mrocklin
Copy link
Member

I still think there's some intersection between what align in #1002 and task partitioning could achieve. I think one of the pros of partioning is that the alignment/grouping is specified at graph/task submission, rather than as a secondary step.

Yes, I agree, and I see value in this. However anything this deep in the scheduler inevitably becomes fairly expensive to do correctly. For example what happens if the worker goes down, what happens if two tasks in the same partition run at the same time on different workers, how does this interact with work stealing, how will this react with future features, and so on. This isn't to say that we shouldn't pursue this, just that it'll be tricky.

Also, I'm hopeful that in some moderate future we might be able to solve this in a more automatic way that doesn't require as much input from the user.

One approach here would be to use task priorities provided by dask.order more carefully. The dask.order function currently assigns a numeric priority to tasks that is actually probably pretty close to how you want them (the same chunks in A, B, and C will all have very close priorities). When we get a new batch of tasks we assign tasks somewhat randomly among the idle workers. If we were to change this allocation of tasks just after update_graph then this might achieve most of what you want without much cost. We would be compteting against other interests though, like tasks with actually different priorities.

@mrocklin
Copy link
Member

This PR has gone stale. Closing for now.

@mrocklin mrocklin closed this Apr 15, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants