Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rewrite dask-jobqueue with SpecCluster #307

Merged
merged 113 commits into from Sep 25, 2019

Conversation

@mrocklin
Copy link
Member

mrocklin commented Aug 5, 2019

This is the first step towards rewriting with SpecCluster
I mostly copied the implementations from the Cluster classes,
but then removed the cluster bits

For people reviewing, I recommend looking at the tests first.

This is the first step towards rewriting with SpecCluster
I mostly copied the implementations from the Cluster classes,
but then removed the cluster bits
mrocklin and others added 2 commits Aug 6, 2019
Fix flake8 as well.
@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Aug 6, 2019

For some reason, I could not get the PBS test to pass locally (basically the worker never shows up and I was unable to debug this). I managed to get a very similar test passing with SGE though and I pushed into your PR branch, hope you don't mind.

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 6, 2019

Thanks @lesteve ! I'm very glad to see this. Both because it's always nice to have help (you're very welcome to push to this branch or any other I have) and because it's nice to see that this approach is accessible.

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 6, 2019

I'll make PBSCluster/SGECluster functions next to get your feedback on how they feel.

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 6, 2019

OK, I've added a rough draft using SpecCluster to build out a generic JobQueueCluster implementation. It should compose well with the Job classes.

It's currently tricky to pass around the keyword arguments correctly, but it's certainly less work than before. If anyone has an opportunity to try this out on a real cluster and provide feedback, that would be welcome.

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 7, 2019

OK, this use of job-name seems to work, at least for PBS. I'm not sure what's up with SGE.

This change is because we are now using the name= attribute with spec-cluster, so it's a bit harder to also use it for dask-jobqueue.

@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Aug 7, 2019

I have looked at this quickly: at the moment in JobQueueCluster, the created Job instances do not know about the scheduler addresses (I am guessing this is done later as part of SpecCluster but I have not managed to figure out where).

On my cluster if I try something like this:

from dask_jobqueue import SGEJob
from dask_jobqueue.job import JobQueueCluster
from dask.distributed import Client

env_extra = ['source /sequoia/data1/lesteve/miniconda3/etc/profile.d/conda.sh',
             'conda activate dask-dev',
            ]
resource_spec = 'h_vmem=10G,mem_req=500M'
queue = 'all.q'

cluster = JobQueueCluster(1, Job=SGEJob, queue=queue, env_extra=env_extra,
                     cores=1, processes=1,
                     memory='16GB',
                     resource_spec=resource_spec,
                     interface='ib0'
                    )

I see a job with qstat but it dies quickly.

Log from the worker:

distributed.nanny - INFO -         Start Nanny at: 'tcp://10.149.0.3:54180'
distributed.diskutils - INFO - Found stale lock file and directory '/home/lesteve/dev/dask-jobqueue/worker-y509nqij', purging
distributed.diskutils - INFO - Found stale lock file and directory '/home/lesteve/dev/dask-jobqueue/worker-0ksc2mwf', purging
distributed.worker - INFO -       Start worker at:     tcp://10.149.0.3:60429
distributed.worker - INFO -          Listening to:     tcp://10.149.0.3:60429
distributed.worker - INFO -          dashboard at:           10.149.0.3:43263
distributed.worker - INFO - Waiting to connect to:                 tcp://None
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO -               Threads:                          1
distributed.worker - INFO -                Memory:                   16.00 GB
distributed.worker - INFO -       Local Directory: /home/lesteve/dev/dask-jobqueue/worker-6to6xw20
distributed.worker - INFO - -------------------------------------------------
distributed.nanny - ERROR - Failed to start worker
Traceback (most recent call last):
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/nanny.py", line 674, in run
    await worker
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/worker.py", line 983, in start
    await self._register_with_scheduler()
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/worker.py", line 767, in _register_with_scheduler
    self.scheduler.address, connection_args=self.connection_args
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect
    quiet_exceptions=EnvironmentError,
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/tcp.py", line 347, in connect
    ip, port = parse_host_port(address)
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/addressing.py", line 91, in parse_host_port
    port = _default()
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/addressing.py", line 70, in _default
    raise ValueError("missing port number in address %r" % (address,))
ValueError: missing port number in address 'None'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <tornado.platform.asyncio.AsyncIOMainLoop object at 0x2aaab3b39e10>>, <Task finished coro=<Nanny._on_exit() done, defined at /home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/nanny.py:369> exception=ValueError("missing port number in address 'None'")>)
Traceback (most recent call last):
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/nanny.py", line 372, in _on_exit
    await self.scheduler.unregister(address=self.worker_address)
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/core.py", line 747, in send_recv_from_rpc
    comm = await self.pool.connect(self.addr)
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/core.py", line 874, in connect
    connection_args=self.connection_args,
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/core.py", line 215, in connect
    quiet_exceptions=EnvironmentError,
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/tcp.py", line 347, in connect
    ip, port = parse_host_port(address)
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/addressing.py", line 91, in parse_host_port
    port = _default()
  File "/home/lesteve/miniconda3/envs/dask-dev/lib/python3.7/site-packages/distributed/comm/addressing.py", line 70, in _default
    raise ValueError("missing port number in address %r" % (address,))
ValueError: missing port number in address 'None'
distributed.nanny - INFO - Closing Nanny at 'tcp://10.149.0.3:54180'
distributed.dask_worker - INFO - End worker

It looks like None is passed as the scheduler address (Waiting to connect to: tcp://None) and not changed later basically. I have no clue why the same issue would not happen for PBS ...

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 7, 2019

SpecCluster starts the scheduler and then passes that address to each of the workers as they start as the first argument.

I think the problem is that we're not passing through *args

mrocklin added 2 commits Aug 7, 2019
@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 7, 2019

Tests are now passing. Thanks for pointing out that we weren't getting the scheduler address @lesteve , that was the issue.

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Aug 7, 2019

Some next steps:

  1. I need to unbreak the tests :)
  2. We should try this out on a real cluster to make sure that all of the UI elements of SpecCluster function well
  3. Increase testing
  4. Move the scheduler out to a Job
@jhamman

This comment has been minimized.

Copy link
Member

jhamman commented Sep 25, 2019

Was there anything else behind the "mostly" adverb above?

Not really. I dusted off a fairly old use case. I noticed that using startup_cost in adapt ended up throwing a bunch of exceptions later on (details below). I wasn't sure if this ever worked correctly though so I didn't think much of it.

distributed.core - ERROR - workers_to_close() got an unexpected keyword argument 'startup_cost'
Traceback (most recent call last):
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 399, in handle_comm
    result = handler(comm, **msg)
TypeError: workers_to_close() got an unexpected keyword argument 'startup_cost'
tornado.application - ERROR - Exception in callback functools.partial(<bound method IOLoop._discard_future_result of <zmq.eventloop.ioloop.ZMQIOLoop object at 0x2b2e33748dd8>>, <Task finished coro=<AdaptiveCore.adapt() done, defined at /glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/deploy/adaptive_core.py:170> exception=TypeError("workers_to_close() got an unexpected keyword argument 'startup_cost'",)>)
Traceback (most recent call last):
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/ioloop.py", line 743, in _run_callback
    ret = callback()
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/tornado/ioloop.py", line 767, in _discard_future_result
    future.result()
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/deploy/adaptive_core.py", line 182, in adapt
    recommendations = await self.recommendations(target)
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/deploy/adaptive_core.py", line 152, in recommendations
    L = await self.workers_to_close(target=target)
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/deploy/adaptive.py", line 138, in workers_to_close
    **self._workers_to_close_kwargs
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 673, in send_recv_from_rpc
    result = await send_recv(comm=comm, op=key, **kwargs)
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 547, in send_recv
    raise exc.with_traceback(tb)
  File "/glade/u/home/jhamman/miniconda3/envs/pangeo/lib/python3.6/site-packages/distributed/core.py", line 399, in handle_comm
    result = handler(comm, **msg)

I was glad to see that my existing configurations worked straight out of the box, and except for the change in jobs vs. core behavior (I see you've been working on that today), things behaved as the did before.

This has been removed upstream
@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Sep 25, 2019

I've just pushed changes that do this

Nice, thanks a lot for this!

I hope this means that you will be enjoying life :)

That's the plan :).

Do you mean these points from earlier?

Yes

Copy link
Member

lesteve left a comment

A few comments

dask_jobqueue/htcondor.py Outdated Show resolved Hide resolved
dask_jobqueue/lsf.py Outdated Show resolved Hide resolved
dask_jobqueue/pbs.py Outdated Show resolved Hide resolved
docs/source/howitworks.rst Outdated Show resolved Hide resolved
docs/source/index.rst Outdated Show resolved Hide resolved
docs/source/index.rst Outdated Show resolved Hide resolved
@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Sep 25, 2019

OK, I think that I've addressed all comments. @lesteve I also removed the xfail on the adaptive test (things work upstream now). Tests pass. From my perspective this is good to go.

@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Sep 25, 2019

I think this looks good, in particular the code to convert from n to jobs in both scale and adapt seems fine (only read the code though not tried it ).

I am going to merge this PR. I feel this has already been under review long enough and the only contentious point left was the scale in terms of "worker process" vs "grouped workers" which has been resolved in recent commits in favour of "worker processes".

It would be great if people involved in this PR could test master on their favourite cluster, in particular whether the scale(jobs=...) or adapt(max_jobs=...) is working as they expect, and report back if they see any problems!

Thanks a lot to everyone involved in this PR and in particular @mrocklin for doing the bulk of the work!

Another thing to tidy up in another PR is to stop using distributed from master in CI, my expectation is that 0.24 should be good enough.

@lesteve lesteve merged commit 52fac42 into dask:master Sep 25, 2019
1 check passed
1 check passed
continuous-integration/travis-ci/pr The Travis CI build passed
Details
@jhamman

This comment has been minimized.

Copy link
Member

jhamman commented Sep 25, 2019

Thanks all! Glad to see this brought to completion.

@mrocklin mrocklin deleted the mrocklin:spec-rewrite branch Sep 25, 2019
@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Sep 25, 2019

Woot!

@jacobtomlinson

This comment has been minimized.

Copy link
Member

jacobtomlinson commented Sep 25, 2019

🚀

@andersy005

This comment has been minimized.

Copy link
Contributor

andersy005 commented Sep 25, 2019

🎉

@mrocklin

This comment has been minimized.

Copy link
Member Author

mrocklin commented Sep 25, 2019

Next question: what should happen before we release?

It would be nice if people could try out master on their own systems and report back.

I think that I've covered all of the concerns that @lesteve had (except for the cd issue in the docker testing infrastructure). Are there any other known issues?

@lesteve

This comment has been minimized.

Copy link
Member

lesteve commented Sep 25, 2019

I had this mostly in mind before the release:

It would be great if people involved in this PR could test master on their favourite cluster, in particular whether the scale(jobs=...) or adapt(max_jobs=...) is working as they expect, and report back if they see any problems!

It would have been great since we are introducing breaking changes to potentially think about introducing a few more, e.g. #323 and #205 but I don't think that is critical.

@guillaumeeb

This comment has been minimized.

Copy link
Member

guillaumeeb commented Sep 25, 2019

🎆 👍 🥇 🚀

Thanks @mrocklin and @lesteve for all the work! I'll try to test this in the next few days.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
7 participants
You can’t perform that action at this time.