Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redesign Cluster Managers #2235

Closed
mrocklin opened this issue Sep 5, 2018 · 18 comments
Closed

Redesign Cluster Managers #2235

mrocklin opened this issue Sep 5, 2018 · 18 comments

Comments

@mrocklin
Copy link
Member

mrocklin commented Sep 5, 2018

Over the past few months we've learned more about the requirements to deploy Dask on different cluster resource managers. This has created several projects like dask-yarn, dask-jobqueue, and dask-kubernetes. These projects share some code within this repository, but also add their own constraints.
Now that we have more experience, this might be a good time to redesign things both in the external projects and in the central API.

There are a few changes that have been proposed that likely affect everyone:

  1. We would like to separate the Cluster manager object from the Scheduler in some cases
  2. We would like to optionally change how the user specifies cluster size to incorporate things like number of cores or amount of memory. In the future this might also expand to other resources like GPUs.
  3. We would like to build more UI elements around the cluster object, like JupyterLab server extensions

The first two will likely require synchronized changes to all of the downstream projects.

Separate ClusterManager, Scheduler, and Client

I'm going to start calling the object that starts and stops workers, like KubeCluster, ClusterManager from here on.

Previously the ClusterManager, Scheduler, and Client were started from within a notebook and so all lived in the same process, and could act on the same event loop thread. It didn't matter so much which part did which action. We're now considering separating all of these. This forces us to think about the kinds of communication they'll have to do, and where certain decisions and actions take place. At first glance we might assign actions like the following:

  1. ClusterManager
    • Start new workers
    • Force-kill old workers
    • Have UI that allows users to specify a desired makeup of workers, either a specific number or an Adaptive range. I'm going to call this the Worker Spec
  2. Scheduler
    • Maintain Adaptive logic, and determine how many workers should exist given current load
    • Choose workes to remove and remove those workers politely
  3. Client: normal operation, I don't think that anything needs to change here.

Some of these actions depend on information from the others. In particular:

  • The ClusterManager needs to send the following to the Scheduler
    • The Worker Spec (like amount of desired memory or adaptive range), whenever it changes
  • The Scheduler needs to send the following to the ClusterManager
    • The desired target number of workers given current load (if in adaptive mode)
    • An announcement every time a new worker arrives (to manage the difference between launched/pending and actively running workers)
    • Requests to force-kill and remove workers after it politely kills them

This is all just a guess. I suspect that other things might arise when actually trying to build everything here.

How to specify cluster size

This came up in #2208 by @guillaumeeb . Today we ask users for a number of desired workers

cluster.scale(10)

But we might instead want to allow users to specify amount of desired ram or memory

cluster.scale('1TB')

And in the future people will probably also ask for more complex compositions, like some small workers and some big workers, some with GPUs, and so on.

If we now have to establish a communication protocol between the ClusterManager and the Scheduler/Adaptive then it might be an interesting challenge to make that future-proof.

Further discussion on this topic should probably remain in #2208 , but I wanted to mention it here.

Shared UI

@ian-r-rose has done some excellent work on the JupyterLab extension for Dask. He has also mentioned thoughts on how to include the ClusterManager within something like that as a server extension. This would optionally move the ClusterManager outside of the notebook and into the JupyterLab sidebar. A number of people have asked for this in the past. If we're going to redesign things I thought we should also include UI in that process to make sure we get any constraints.

Organization

Should the ClusterManager object continue to be a part of this repository or should it be spun out? There are probably costs and benefits both ways.

cc @jcrist (dask-yarn) @jacobtomlinson (dask-kubernetes) @jhamman @guillaumeeb @lesteve (dask-jobqueue) @ian-r-rose (dask-labextension) for feedback.

@mrocklin
Copy link
Member Author

Just a gentle ping here. It would be good to get the group to start thinking about this problem. I think that some moderate energy here might be a better use of time than patching around the current system.

@guillaumeeb
Copy link
Member

Some thoughts here:

  • I don't think we should spun out ClusterManager from here. It will need tigh integration with Scheduler, and I currently don't see any benefit from doing that.
  • ClusterManager will probably need the ability to start a Scheduler as It does with workers.
  • As discussed in Scale by number of cores or amount of memory #2208, specifying different worker pools might be usefull.
  • Scheduler must be aware of these pools and of their characteristics (CPU, memory, GPU...).

Concerning ClusterManager implementation, for the moment I don't see the need for a major change of how things are done from the abstract class/implementation point of view.

  • we would still need scale or adapt fonctions.
  • we could define an init that would take into parameters a Scheduler address, and other kwargs like adaptive options, worker Key (see Using Cluster._worker_key function both in scale or adaptive #2256), worker pools spec...
  • define some start_scheduler method?
  • and then we need to put in place all the communication layer between ClusterManager and Scheduler.

@mrocklin
Copy link
Member Author

I don't think we should spun out ClusterManager from here. It will need tigh integration with Scheduler, and I currently don't see any benefit from doing that.

Can you expand on this? I think that several groups find it valuable to keep "the thing that launches workers" on a separate machine from the scheduler.

@guillaumeeb
Copy link
Member

Was just answering your question above:

Should the ClusterManager object continue to be a part of this repository or should it be spun out?

Sorry if I did not make myself clear enough. I totaly agree that separating ClusterManager and Scheduler is valuable.

@mrocklin
Copy link
Member Author

Ah, I see. Got it.

@jhamman
Copy link
Member

jhamman commented Sep 18, 2018

This all sounds reasonable to me. It would be useful for me to see the reasons why we can't use the current configuration better spelled out. In dask-jobqueue, we I think we have this configuration:

ClusterManager
  - LocalCluster
    | - Scheduler
Client

where the in each ClusterManager we have a LocalCluster (with no workers) and attached to that is a Scheduler. The Client then exists somewhere else.

  1. Do I have this right?
  2. What is the benefit of pulling the scheduler out to the same level as the ClusterManager and Client?

@dhirschfeld
Copy link
Contributor

In my custom deployment both the scheduler and workers are spun up remotely - there is no LocalCluster.

My CustomClusterManager instantiates (or attaches to) a remote scheduler and can start/stop workers remotely. CustomClusterManager.client is a Client instance connecting the user to the remote scheduler.

Users create and manage a cluster with cluster = CustomClusterManager(*args, **kwargs)
and submit jobs with cluster.client.submit/map/compute

I'm not across the current LocalCluster implementation but I don't think it caters for this usecase (scheduler spun up remotely from the user Client process)?

@mrocklin
Copy link
Member Author

@jhamman some reasons to separate the scheduler from the "thing that controls starting and stopping workers"

  1. You want to separate the lifecycle of the dask cluster from the lifecycle of a notebook kernel session. For example you might want to manage the cluster with a JupyterLab plugin separately from the notebook.
  2. You are using Jupyter from a place that is not on your normal computational node pool (like a login node). You want your scheduler and workers to be close to each other on the network of computational nodes, even if your client is somewhat further away.

@guillaumeeb
Copy link
Member

How should we move forward here? Should we try to design something, with rough class modelling (ClusterManager, Adaptive and Scheduler, Client) and interaction?

@mrocklin
Copy link
Member Author

mrocklin commented Oct 2, 2018 via email

@mturok
Copy link

mturok commented Oct 16, 2018

@mrocklin Is there any thought to making the cpu/mem per worker set up as part of the client request, rather than the cluster specification? That is: I might have one request which needs more memory, and another request which needs less (or gpu's, or some other spec).

Or is the intent that I would start a different cluster for each request?

@guillaumeeb
Copy link
Member

@mturok there are some discussions about this: #2118 and #2208 (comment). The currently admitted solution is to have a set of different worker pools.

Then user need to be able to submit tasks to a particular pool through Dask APIs.

@mrocklin
Copy link
Member Author

@mturok maybe you're referring to something more fine-grained like the following: http://distributed.dask.org/en/latest/resources.html ?

Ideally adaptive deployments would look at the resource requirements of the current set of tasks when requesting workers from a cluster manager. This is an open problem though. I know of groups that have done this sort of thing in-house (the adaptive class is not hard to modify/subclass) but there is no drop-in solution in Dask today. (though of course there could be with moderate effort)

@guillaumeeb
Copy link
Member

Just cross posting some findings from other discussion here for consolidation of the ClusterManager approach:

@guillaumeeb
Copy link
Member

I'm wondering, should we consolidate ClusterManager/Adaptive/Scheduler refactor needs somewhere else than in this issue?
Currently working on this for dask-jobqueue, I would like for example to take the first post of @mrocklin and propose some modifications/add some informations on what I think needs to be done and where.

We could do this in the project wiki for example, but I see it is not used. I would like to avoid copy pasting the entire design each time a modification is identified. Any idea?

@dhirschfeld
Copy link
Contributor

Maybe a PR with just a design document. The PR can serve as a discussion forum and you can add commits as the discussion shapes the design.

The actual implementation could be done in a separate PR to keep the discussion around implementation details separate from the design discussion.

The design document doesn't need to be merged at the end if it has served its purpose. It may well be a good basis for some docs though.

@jacobtomlinson
Copy link
Member

I would be tempted to say this has been done. Thoughts @mrocklin?

@mrocklin
Copy link
Member Author

mrocklin commented Jun 4, 2020

Yup. Thanks for flagging @jacobtomlinson

@mrocklin mrocklin closed this as completed Jun 4, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants