Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add alternative SSHCluster implementation #2827

Merged
merged 8 commits into from
Jul 18, 2019
Merged

Conversation

mrocklin
Copy link
Member

@mrocklin mrocklin commented Jul 6, 2019

This is a proof of concept here for two reasons:

  1. It opens up a possible alternative for SSH deployment (which was surprisingly popular in the user survey)
  2. It is the first non-local application of SpecCluster and so serves as a proof of concept for other future deployments that are mostly defined by creating a remote Worker/Scheduler object

This forced some changes in SpecCluster, notably we now have an rpc object that does remote calls rather than accessing the scheduler directly. Also, we're going to have to figure out how to handle all of the keyword arguments. In this case we need to pass them from Python down to the CLI, and presumably we'll also want a dask-ssh CLI command which has to translate the other way.

cc @jcrist @jacobtomlinson @quasiben

This is a proof of concept here for two reasons:

1.  It opens up a possible alternative for SSH deployment (which was
    surprisingly popular in the user survey)
2.  It is the first non-local application of `SpecCluster` and so serves
    as a proof of concept for other future deployments that are mostly
    defined by creating a remote Worker/Scheduler object

This forced some changes in `SpecCluster`, notably we now have an `rpc`
object that does remote calls rather than accessing the scheduler
directly.  Also, we're going to have to figure out how to handle all of
the keyword arguments.  In this case we need to pass them from Python
down to the CLI, and presumably we'll also want a `dask-ssh` CLI command
which has to translate the other way.
@mrocklin
Copy link
Member Author

mrocklin commented Jul 6, 2019

Also cc @guillaumeeb @jhamman . I could imagine rewriting Dask Jobqueue in this way as well.

@guillaumeeb
Copy link
Member

notably we now have an rpc object that does remote calls rather than accessing the scheduler directly

That part sounds really interesting! I'm kinda overwhelmed by high level strategical questions currently at work, and I still have to go through SpecCluster... But I've not given up, and I hope I can have a closer look at all this during the summer!

@mrocklin
Copy link
Member Author

mrocklin commented Jul 6, 2019 via email

@jacobtomlinson
Copy link
Member

This is really nice to see. I think we could remove a bunch of code from dask-kubernetes by switching to this.

How would adaptive behave in this situation? I notice there is no stop method implemented on the worker classes and I guess there are a fixed number of hosts anyway. It could adapt within the number of hosts?

@mrocklin
Copy link
Member Author

mrocklin commented Jul 8, 2019

How would adaptive behave in this situation? I notice there is no stop method implemented on the worker classes and I guess there are a fixed number of hosts anyway.

Adaptive first asks the scheduler to kill the worker gracefully. Eventually it then calls the Worker.close method on the Worker object (where Worker is now SSHWorker or PodWorker or something similar).

It could adapt within the number of hosts?

I'm not sure I fully understand this question.

@jacobtomlinson
Copy link
Member

It could adapt within the number of hosts?

If I provide a list of nine host addresses I will get one scheduler and eight workers. I would then expect that it would adapt up a maximum of eight workers. Is that also what you would expect?

@mrocklin
Copy link
Member Author

mrocklin commented Jul 8, 2019

Oh I see, right. There isn't an obvious way to add new workers.

I would probably just not support adaptive for SSHClusters to start.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 8, 2019

@jcrist next time I run into you I'll probably have questions about your thoughts on how we should unify keywords in Python and on the CLI. That ends up being fairly important here.

@mrocklin
Copy link
Member Author

mrocklin commented Jul 9, 2019

I think I'm inclined to merge this in without including it in any public import, mostly as a short-term test case and with a warning on use.

Tests are failing currently because ssh'ing into localhost isn't working trivially. My guess is that the fix to this is fairly simple, but I'm not immediately sure how to handle it. @quasiben I suspect that you can probably solve this quickly. Could you take a look?

    def validate_server_host_key(self, key_data):
        """Validate and return the server's host key"""
    
        try:
            host_key = self._validate_host_key(self._host, self._peer_addr,
                                               self._port, key_data)
        except ValueError as exc:
>           raise HostKeyNotVerifiable(str(exc)) from None
E           asyncssh.misc.HostKeyNotVerifiable: Host key is not trusted

@quasiben
Copy link
Member

quasiben commented Jul 9, 2019

Sure, I'll take a look now

@quasiben
Copy link
Member

quasiben commented Jul 9, 2019

My first suspicion is that travis-ci sometimes has inconsistent behavior with regards to localhost vs 127.0.0.1. Mind if I take over this PR and get the CI sorted out?

@mrocklin
Copy link
Member Author

mrocklin commented Jul 9, 2019

Mind if I take over this PR and get the CI sorted out?

You are always welcome to push to any PR of mine.

@quasiben
Copy link
Member

quasiben commented Jul 9, 2019

Ah, nevermind -- the error is hostkey trust issue so we probably just need to pass in another flag or something.

You are always welcome to push to any PR of mine.

Great!

@quasiben
Copy link
Member

quasiben commented Jul 9, 2019

@mrocklin pushed a commit to resolve the issue. This is a bit of security issue (disabling host key validation) -- I added some options/comments for when things are improved

@jacobtomlinson
Copy link
Member

@quasiben defaulting this to False makes me nervous. I would be tempted to default to True and just set to false in the tests.

Also happy not to as it's a POC. It just makes the sysadmin in my twitchy.

Copy link
Member Author

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Ben! Some tiny comments.

distributed/deploy/ssh2.py Outdated Show resolved Hide resolved
distributed/deploy/ssh2.py Outdated Show resolved Hide resolved
distributed/deploy/ssh2.py Outdated Show resolved Hide resolved
@mrocklin
Copy link
Member Author

mrocklin commented Jul 9, 2019

@quasiben defaulting this to False makes me nervous. I would be tempted to default to True and just set to false in the tests.

Also happy not to as it's a POC. It just makes the sysadmin in my twitchy.

I think that we can resolve this by just setting something to pass through kwargs to connect. That way we just expose functionality, but don't take any opinions.

@quasiben
Copy link
Member

quasiben commented Jul 9, 2019

@mrocklin tests are passing now

self.scale(0)
await self._correct_state()
async with self._lock:
await self.scheduler_comm.close(close_workers=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the scheduler comm has closed for some reason and resulted in a broken pipe we will get an exception here.

Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 194, in read
    n_frames = yield stream.read_bytes(8)
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/iostream.py", line 436, in read_bytes
    future = self._start_read()
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/iostream.py", line 797, in _start_read
    self._check_closed()  # Before reading, check that stream is not closed.
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/iostream.py", line 1009, in _check_closed
    raise StreamClosedError(real_error=self.error)
tornado.iostream.StreamClosedError: Stream is closed

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/core.py", line 675, in send_recv_from_rpc
    result = yield send_recv(comm=comm, op=key, **kwargs)
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/core.py", line 535, in send_recv
    response = yield comm.read(deserializers=deserializers)
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/gen.py", line 209, in wrapper
    yielded = next(result)
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 214, in read
    convert_stream_closed_error(self, e)
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/comm/tcp.py", line 139, in convert_stream_closed_error
    raise CommClosedError("in %s: %s: %s" % (obj, exc.__class__.__name__, exc))
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/contextlib.py", line 130, in __exit__
    self.gen.throw(type, value, traceback)
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/utils.py", line 182, in ignoring
    yield
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/deploy/spec.py", line 343, in close_clusters
    cluster.close(timeout=10)
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/deploy/spec.py", line 271, in close
    return self.sync(self._close, callback_timeout=timeout)
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/deploy/cluster.py", line 245, in sync
    return sync(self.loop, func, *args, **kwargs)
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/utils.py", line 332, in sync
    six.reraise(*error[0])
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/six.py", line 693, in reraise
    raise value
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/utils.py", line 317, in f
    result[0] = yield future
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/gen.py", line 735, in run
    value = future.result()
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/deploy/spec.py", line 258, in _close
    await self.scheduler_comm.close(close_workers=True)
  File "/home/nfs/jtomlinson/miniconda3/envs/dask-csp/lib/python3.7/site-packages/tornado/gen.py", line 742, in run
    yielded = self.gen.throw(*exc_info)  # type: ignore
  File "/home/nfs/jtomlinson/Projects/dask/distributed/distributed/core.py", line 678, in send_recv_from_rpc
    "%s: while trying to call remote method %r" % (e, key)
distributed.comm.core.CommClosedError: in <closed TCP>: BrokenPipeError: [Errno 32] Broken pipe: while trying to call remote method 'close'

This results in self.scheduler.close() never being called and failing to clean up.

@@ -216,9 +227,10 @@ def __await__(self):
async def _wait_for_workers(self):
# TODO: this function needs to query scheduler and worker state
# remotely without assuming that they are local
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should probably remove this TODO. I think that now that we're accessing the comm rather than the scheduler that we're handling the remote case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I seem to still be getting caught in this loop forever despite it using the scheduler_comm. Only when starting a cluster with workers and awaiting the cluster object. When scaling up it's fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps there is some inconsistency between self.workers (the cluster's expected worker set) and the return value of identity (what the scheduler sees)? It might be worth printing out what these look like to see if, for example, they are using different naming conventions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Weirdly it seems to be that await cluster.scheduler_comm.identity() hangs forever every other time you call it. If I keyboard interrupt it and run it again I get the output twice.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That seems odd. Do you have a branch where I can try this out?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happening for me on master of dask-cloud.

In [1]: from dask_cloud import ECSCluster

In [2]: cluster = ECSCluster()

In [3]: await cluster.scheduler_comm.identity()  # hangs, press ctrl+c

In [4]: await cluster.scheduler_comm.identity()  # Returns

I'll look at working up a minimum reproducible example if I keep getting stuck with this.

@mrocklin mrocklin merged commit df2addc into dask:master Jul 18, 2019
@mrocklin mrocklin deleted the ssh-cluster-2 branch July 18, 2019 02:59
@mrocklin
Copy link
Member Author

This is in. Thanks @quasiben for pushing this over the finish line.

muammar added a commit to muammar/distributed that referenced this pull request Jul 18, 2019
* upstream/master: (33 commits)
  SpecCluster: move init logic into start (dask#2850)
  Dont reuse closed worker in get_worker (dask#2841)
  Add alternative SSHCluster implementation (dask#2827)
  Extend prometheus metrics endpoint (dask#2792) (dask#2833)
  Include type name in SpecCluster repr (dask#2834)
  Don't make False add-keys report to scheduler (dask#2421)
  Add Nanny to worker docs (dask#2826)
  Respect security configuration in LocalCluster (dask#2822)
  bump version to 2.1.0
  Fix typo that prevented error message (dask#2825)
  Remove dask-mpi (dask#2824)
  Updates to use update_graph in task journey docs (dask#2821)
  Fix Client repr with memory_info=None (dask#2816)
  Fix case where key, rather than TaskState, could end up in ts.waiting_on (dask#2819)
  Use Keyword-only arguments (dask#2814)
  Relax check for worker references in cluster context manager (dask#2813)
  Add HTTPS support for the dashboard (dask#2812)
  CLN: Use dask.utils.format_bytes (dask#2810)
  bump version to 2.0.1
  Add python_requires entry to setup.py (dask#2807)
  ...
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants