Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SpecificationCluster #2675

Merged
merged 58 commits into from May 22, 2019
Merged

Add SpecificationCluster #2675

merged 58 commits into from May 22, 2019

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented May 9, 2019

This is intended to be a base for LocalCluster (and others) that want to
specify more heterogeneous information about workers.

Additionally, this PR does the following:

  1. Starts the use of Python 3 only code in the main codebase
  2. Cleans up a number of our intermittent testing failures (we had nannies that survived test cleanup before, and they were sending random messages to ports that were screwing up other tests)
  3. Adds a couple of new small failures, notably silent shutdown is no longer entirely silent (working on it)

Docstring

Cluster that requires a full specification of workers

This attempts to handle much of the logistics of cleanly setting up and
tearing down a scheduler and workers, without handling any of the logic
around user inputs. It should form the base of other cluster creation
functions.

Examples

>>> spec = {
...     'my-worker': {"cls": Worker, "options": {"ncores": 1}},
...     'my-nanny': {"cls": Nanny, "options": {"ncores": 2}},
... }
>>> cluster = SpecCluster(workers=spec)
This is intended to be a base for LocalCluster (and others) that want to
specify more heterogeneous information about workers.
@mrocklin
Copy link
Member Author

@mrocklin mrocklin commented May 9, 2019

I'm going to try this out with some heterogeneous GPU machines. This feels like a nice base on which to rewrite and cleanup LocalCluster though, a prospect for which I am excited :)

Loading

distributed/deploy/spec.py Show resolved Hide resolved
Loading
@dhirschfeld
Copy link
Contributor

@dhirschfeld dhirschfeld commented May 16, 2019

Is the spec intended to be per-worker - e.g.:

spec = {
    'worker1': {"cls": Worker, "options": {"ncores": 1}},
    'nanny1': {"cls": Nanny, "options": {"ncores": 2}},
    'worker2': {"cls": Worker, "options": {"ncores": 1}},
    'nanny2': {"cls": Nanny, "options": {"ncores": 2}},
    'worker3': {"cls": Worker, "options": {"ncores": 1}},
    'nanny3': {"cls": Nanny, "options": {"ncores": 2}},
    ...
}

Loading

@dhirschfeld
Copy link
Contributor

@dhirschfeld dhirschfeld commented May 16, 2019

I'm just wondering if now is a good time to introduce the concept of "worker pools":
#2208 (comment)

e.g. you would pass pools in addition to workers and have the workers dict reference the specs defined in pools

>>> pool_specs = {
...     'default': {
...         'worker': {"cls": Worker, "options": {"ncores": 1}},
...         'nanny': {"cls": Nanny, "options": {"ncores": 2}},
...     },
...     'no-nanny': {
...         'worker': {"cls": Worker, "options": {"ncores": 1}},
...     },
... }

>>> worker_specs = {'worker1': 'default', 'worker2': 'no-nanny'}
>>> cluster = SpecCluster(workers=worker_specs, pools=pool_specs)

Loading

mrocklin added 3 commits May 16, 2019
Previously nannies could leak out in various ways
@mrocklin
Copy link
Member Author

@mrocklin mrocklin commented May 16, 2019

I'm just wondering if now is a good time to introduce the concept of "worker pools":

This is related to that issue, but is lower level. I think that it would enable other people to add things like pools more easily. If this is something that you'd like to explore I encourage you to do so now. I agree that now would be a good time to explore this to help guide design.

Loading

distributed/deploy/spec.py Show resolved Hide resolved
Loading
distributed/deploy/local.py Outdated Show resolved Hide resolved
Loading
# If people call this frequently, we only want to run it once
return self._correct_state_waiting
else:
task = asyncio.Task(self._correct_state_internal())
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't create Tasks manually, but instead use asyncio.ensure_future.

Loading

distributed/deploy/spec.py Show resolved Hide resolved
Loading
d = self.worker_spec[name]
cls, opts = d["cls"], d.get("options", {})
if "name" not in opts:
opts = toolz.merge({"name": name}, opts, {"loop": self.loop})
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you mean to include the loop in here?

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, ideally we want the worker to use the IOLoop used by the cluster object.

Loading

Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean that loop is only added if name is not in opts. Wouldn't you always want to pass it?

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, indeed. Looking at this again it looks like we do this in an async def function anyway, so IOLoop.current() should be valid regardless. I'll remove the reference to loop entirely, which should be helpful in reducing the contract too.

Loading

if workers:
await asyncio.wait(workers)
for w in workers:
w._cluster = weakref.ref(self)
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the cluster weakref for?

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a lot of weakrefs around now. They're useful when tracking down leaking references to things.

Loading

for w in workers:
w._cluster = weakref.ref(self)
if self.status == "running":
await w
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The non running workers are never awaited, what happens to them? They're still added to the workers dict below.

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is again a tornado/asyncio difference. I've removed the running check and made things optimal, I think for both async def and gen.coroutine style functions.

Loading


async def _close(self):
while self.status == "closing":
await asyncio.sleep(0.1)
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of polling, could have a future for the closing operation (created by the first call to _close), and just wait on that?

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good thought. I'm inclined to wait on this for now though if that's ok.

Loading


def _correct_state(self):
if self._correct_state_waiting:
# If people call this frequently, we only want to run it once
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this drops scale requests while a current scale request is processing:

  • Call scale
  • Spec updated
  • correct state task start, task is stored as _correct_state_waiting
  • scale returns
  • Call scale again
  • Spec updated
  • since previous call is still in progress, state is not corrected, no new workers are started/stopped. Spec and tasks are now out of sync. Also, since there are multiple await calls in _correct_state_internal, the worker_spec can be different at different points in that function, leading to potential bugs.

One naive solution would be to have a background task that loops forever, waiting on an event:

while self.running:
    await self._spec_updated.wait()
    # update workers to match spec
    # After updating, only clear the event if things are up to date
    # If things aren't up to date, then we loop again
    if self.spec_matches_current_state():
        self._spec_updated.clear()

Then _correct_state would look like:

def _correct_state(self):
    # set the event, it's only ever cleared in the loop
    # We force synchronization here to prevent scheduling tons
    # of tasks all setting the event, this blocks until it's set.
    return self.sync(self._mark_state_updated)

async def _mark_state_updated(self):
    self._state_updated.set()

There are likely other ways to handle this. In dask-gateway I have a task per worker/scheduler. As the spec updates, unfinished tasks are cancelled or new ones are fired. If a previous scale call is still in progress for a cluster, scale will block until that call has finished. Note that this only blocks while we update our internal task state (cancelling/firing new tasks), not until those tasks have completed.

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since previous call is still in progress, state is not corrected, no new workers are started/stopped. Spec and tasks are now out of sync. Also, since there are multiple await calls in _correct_state_internal, the worker_spec can be different at different points in that function, leading to potential bugs.

So, the _correct_state_waiting attribute isn't the currently running task, it's the currently enqueued one. Once _correct_state starts running it immediately clears this attribute. After someone calls scale there is a clean, not-yet-run _correct_state_waiting future that will run soon.

Loading

Copy link
Member

@jcrist jcrist May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since _correct_state_internal waits on the created workers, this does mean that there's no way to cancel pending workers. This is fine for LocalCluster, but would be problematic if used as a base class for other cluster managers. The following would request and start 100 workers before scaling back down afaict:

cluster.scale(100)
cluster.scale(2)

Loading

Copy link
Member Author

@mrocklin mrocklin May 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that this depends on what you mean by "waits on".

One approach is that for a cluster manager to reach a correct state it only has to successfully submit a request to the resource manager have received an acknowledgement that the resource manager is handling it. We're not guaranteeing full deployment, merely that we've done our part of the job. I would expect this to almost always be fairly fast.

Separately, there is now a Client.wait_for_workers(n=10) method that might be used for full client <-> scheduler checks.

Loading


async def _start(self):
while self.status == "starting":
await asyncio.sleep(0.01)
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here as closing, could wait on the start task instead of polling.

Loading


def __enter__(self):
self.sync(self._correct_state)
self.sync(self._wait_for_workers)
Copy link
Member

@jcrist jcrist May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that __enter__ will only complete once the initial n workers have started? What happens if we request 2, 1 worker starts and 1 fails?

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this might hang. I'm not sure we ever had a test in our test suite with this case. I'll add something.

Loading

Copy link
Member Author

@mrocklin mrocklin May 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a test in 5e94069

Loading

@mrocklin
Copy link
Member Author

@mrocklin mrocklin commented May 21, 2019

Thanks for the review @jcrist ! If you have a chance to pass through things tomorrow I would appreciate it

Loading

@mrocklin
Copy link
Member Author

@mrocklin mrocklin commented May 22, 2019

I plan to merge this later today if there are no further comments. Tests here are pretty decent, although I'll need to overhaul adaptive. I'd like to do this in a separate PR though.

Loading

@mrocklin
Copy link
Member Author

@mrocklin mrocklin commented May 22, 2019

OK. Merging this in. I intend to be active in this area for a while, so if there are still issues please feel free to raise them. I plan to do the following:

  1. Fix up adaptive so that it moves logic into the scheduler, and makes tests here pass
  2. Try out SpecCluster with Dask-Kubernetes. I imagine that this will force some changes here.

Loading

@mrocklin mrocklin merged commit 6e0c0a6 into dask:master May 22, 2019
1 of 2 checks passed
Loading
@mrocklin mrocklin deleted the spec-cluster branch May 22, 2019
lesteve added a commit to lesteve/distributed that referenced this issue May 29, 2019
LocalCluster.__repr__ was removed in dask#2675.
lesteve added a commit to lesteve/distributed that referenced this issue May 29, 2019
LocalCluster.__repr__ was removed in dask#2675.
mrocklin added a commit that referenced this issue May 29, 2019
LocalCluster.__repr__ was removed in #2675.
calebho added a commit to calebho/distributed that referenced this issue May 29, 2019
This is intended to be a base for LocalCluster (and others) that want to
specify more heterogeneous information about workers.

This forces the use of Python 3 and introduces more asyncio and async def handling.

This cleans up a number of intermittent testing failures and improves our testing harness hygeine.
calebho added a commit to calebho/distributed that referenced this issue May 29, 2019
LocalCluster.__repr__ was removed in dask#2675.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Linked issues

Successfully merging this pull request may close these issues.

None yet

3 participants