-
-
Notifications
You must be signed in to change notification settings - Fork 143
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixes for Adaptive #63
Conversation
dask_jobqueue/core.py
Outdated
names = {v['name'] for v in workers.values()} | ||
# This will close down the full group of workers | ||
job_ids = {name.split('-')[-2] for name in names} | ||
self.stop_workers(job_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking there is a better way to do this. The current behavior to scale down removes the entire job from the system. So if Adaptive
tells us to remove 1 worker (say we have 10 workers per job), we're going to remove all 10.
@mrocklin - Would it make sense to add logic to Adaptive
so it knows how to bundle groups of workers? Otherwise, we could bundle here and check to see we're being asked to scale down an entire group.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The key= parameter to workers_to_close
(passed through from retire_workers
) seems relevant here. I believe that it was made for this purpose.
https://github.com/dask/distributed/blob/master/distributed/scheduler.py#L2525-L2548
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to see that grouped worker is handled in adaptive!
Another comment here, not linked to this PR, is that I find the job_ids
var name misleading. Should be something like worker_ids
.
dask_jobqueue/core.py
Outdated
@@ -161,7 +167,8 @@ def job_file(self): | |||
def start_workers(self, n=1): | |||
""" Start workers and point them to our local scheduler """ | |||
workers = [] | |||
for _ in range(n): | |||
num_jobs = min(1, math.ceil(n / self.worker_processes)) | |||
for _ in range(num_jobs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change I want to make sure everyone is aware of. The current behavior for a hypothetical setup that includes 10 workers per job would be:
cluster.start_workers(1)
...and get 1 job and 10 workers.
I'd like to change this so that start_workers(n)
gives us n
workers and as many jobs as needed to make that happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Historically start_workers was a semi-convention between a few projects. This has decayed, so I have no strong thoughts here. I do think that we need to be consistent on scale
though, which seems a bit more standard today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this really help adaptive? Would'nt there still be a problem with starting the worker in a grouped manner?
With your example, calling cluster.start_workers(1)
will still lead to 1 job and 10 workers!
But this may be well handled by adaptive, I don't know. In this case, this may not be needed to do this breaking change?
This allows adaptive clusters to intelligently close down groups of workers based on some logical association. See dask/dask-jobqueue#63 for motivation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The title of the PR does not underline the work on adaptive cluster here 🙂 .
I don't have a strong opinion on the jobs ids in worker names, so this part is OK for me (I did not test it).
I am more concerned about the breaking change in start_workers()
. Is this really needed if adaptive handles grouped workers? In this case, could we use an alternative method or add some parameter to this method for adaptive handling?
dask_jobqueue/core.py
Outdated
@@ -161,7 +168,8 @@ def job_file(self): | |||
def start_workers(self, n=1): | |||
""" Start workers and point them to our local scheduler """ | |||
workers = [] | |||
for _ in range(n): | |||
num_jobs = min(1, math.ceil(n / self.worker_processes)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why using min
here? This would always lead to only one job started if I'm not mistaken.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point. I've removed this.
dask_jobqueue/core.py
Outdated
@@ -161,7 +167,8 @@ def job_file(self): | |||
def start_workers(self, n=1): | |||
""" Start workers and point them to our local scheduler """ | |||
workers = [] | |||
for _ in range(n): | |||
num_jobs = min(1, math.ceil(n / self.worker_processes)) | |||
for _ in range(num_jobs): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this really help adaptive? Would'nt there still be a problem with starting the worker in a grouped manner?
With your example, calling cluster.start_workers(1)
will still lead to 1 job and 10 workers!
But this may be well handled by adaptive, I don't know. In this case, this may not be needed to do this breaking change?
dask_jobqueue/core.py
Outdated
names = {v['name'] for v in workers.values()} | ||
# This will close down the full group of workers | ||
job_ids = {name.split('-')[-2] for name in names} | ||
self.stop_workers(job_ids) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Glad to see that grouped worker is handled in adaptive!
Another comment here, not linked to this PR, is that I find the job_ids
var name misleading. Should be something like worker_ids
.
dask_jobqueue/core.py
Outdated
@@ -117,7 +122,6 @@ def __init__(self, | |||
self.worker_threads = threads | |||
self.name = name | |||
|
|||
self.jobs = dict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.jobs
was a mapping from n-->job_id. However, we were not really using it and it often was not cleaned up when a job ended (so I've removed it).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we can get rid of self.n
and thus self.jobs
as it is right now. However, for adaptive to work correctly, I believe we should keep track of all submitted jobs and their statuses. If not, don't we risk to continuously submit new jobs that are kept in the scheduler queue?
I'm in favour of keeping a dict mapping job_id --> job-status, e.g as @mrocklin proposed in #11: 'pending', 'running', 'finished' or equivalent. This way, in the scale_up method, we can take that into account.
A maybe simpler solution is to only keep track of the number of workers that are pending or running, and at least to use this number in scale_up:
return self.start_workers(n - number_of_pending_or_running_worker)
But it seems difficult to deal with finished, running and pending worker this way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@guillaumeeb - I think we're in agreement here.
we should keep track of all submitted jobs and their statuses.
This would be nice but it might be somewhat difficult to do. We have three/four states that a job might be in:
- Pending - we may be able to combine some form of
qstat job_identifier
with a dictionary of submitted jobsself.jobs above
- Running - it is straight forward to determine which workers are attached to the scheduler
- Finished - jobs can exit normally or be killed (e.g. exceeded wall time). When JobQueue culls a worker, its easy to remove that worker from the list of jobs. However, when the queuing system kills a worker, we would need a way to remove that job from the list of running jobs.
Generally, I think any use of qstat
is going to be a bit ugly just because repeated queries of the queueing system tend to be quite expensive. For example:
$ time qstat 9595055
Job id Name User Time Use S Queue
---------------- ---------------- ---------------- -------- - -----
9595055.chadmin1 dask jhamman 00:07:22 R premium
real 0m1.030s
user 0m0.012s
sys 0m0.000s
So we probably don't want to do this very often. Do others have ideas as to how one would track the status of these jobs in a tractable way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could add a plugin to the scheduler that watched for when workers started and compared them against a set of known pending workers: http://distributed.readthedocs.io/en/latest/plugins.html
Something like the following:
from distributed.diagnostics.plugin import SchedulerPlugin
class JobQueuePlugin(SchedulerPlugin):
def add_worker(self, scheduler, worker=None, name=None, **kwargs):
job_id = parse(name)
pending_jobs.remove(job_id)
scheduler.add_plugin(JobQueuePlugin())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sounds like what we need. I can implement this as part of this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good! We don't seem to need a watcher on stopped worker this way, we know started workers through scheduler.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See 92eaf4e for an initial (untested) implementation using the scheduler plugin approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I really like where we're heading here! A few comments and things to fix.
Thanks @jhamman
dask_jobqueue/core.py
Outdated
self.finished_jobs[job_id] = self.running_jobs.pop(job_id) | ||
self.finished_jobs[job_id].update(status='finished') | ||
if self.finished_jobs[job_id].workers: | ||
self.finished_jobs[job_id].workers = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I understand correctly, you are considering the workers are ending all at once, when the first worker corresponding to this job_id is removed. Perhaps we could remove workers one by one, just to be sure? This may be overkill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea. This was a pretty easy fix so I have included it in 4776892.
dask_jobqueue/core.py
Outdated
self.n += 1 | ||
template = self._command_template % {'n': self.n} | ||
self._n += 1 | ||
template = self._command_template % {'n': self._n} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"-%(n)d" has been removed from self._command_template (l.137), so we don't need this line if I'm not mistaken.
dask_jobqueue/core.py
Outdated
|
||
def scale_up(self, n, **kwargs): | ||
""" Brings total worker count up to ``n`` """ | ||
return self.start_workers(n - len(self.jobs)) | ||
pending_workers = self.worker_processes * len(self.pending_jobs) | ||
active_and_pending = len(self.scheduler.workers) + pending_workers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A probably rare case, but you may miss starting workers here, when a job just began to start, so is moving from pending to running. We may have some worker process started for a given job_id, but not all of them.
Maybe it is safer to just rely on self.pending_jobs and self.running_jobs, but I'm not sure, we could also miss ending jobs 🙂 ...
dask_jobqueue/core.py
Outdated
|
||
def __enter__(self): | ||
return self | ||
|
||
def __exit__(self, type, value, traceback): | ||
self.stop_workers(self.jobs) | ||
self.stop_workers(self.scheduler.workers) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here, don't we need to also cancel pending jobs?
dask_jobqueue/tests/test_pbs.py
Outdated
with PBSCluster(walltime='00:02:00', processes=1, threads=2, memory='2GB', local_directory='/tmp', | ||
job_extra=['-V'], loop=loop) as cluster: | ||
with PBSCluster(walltime='00:02:00', processes=1, threads=2, memory='2GB', | ||
local_directory='/tmp', ob_extra=['-V'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo in job_extra
dask_jobqueue/tests/test_pbs.py
Outdated
@pytest.mark.env("pbs") # noqa: F811 | ||
def test_adaptive_grouped(loop): | ||
with PBSCluster(walltime='00:02:00', processes=2, threads=1, memory='2GB', | ||
local_directory='/tmp', ob_extra=['-V'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo here too.
This is ready for another round of reviews. Note, the tests will be failing here until dask/distributed#1992 is merged. |
This allows adaptive clusters to intelligently close down groups of workers based on some logical association. See dask/dask-jobqueue#63 for motivation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose you've tried all that on Cheyenne?
This looks very good to me, thanks for all the work you've done here @jhamman. I will try to test it next week to give you more feedback!
dask_jobqueue/core.py
Outdated
self.finished_jobs = self._scheduler_plugin.finished_jobs | ||
|
||
# counter to keep track of how many jobs have been submitted | ||
self._n = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we still need to keep this counter?
I feel like we've got all the information we want in pending, running and finished jobs. We could even add some detailed status method wIth all that, maybe in another PR!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it can go now. I'll remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small comments
dask_jobqueue/core.py
Outdated
# if this is the first worker for this job, move job to running | ||
if job_id not in self.running_jobs: | ||
self.running_jobs[job_id] = self.pending_jobs.pop(job_id) | ||
self.running_jobs[job_id].update(status='running') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any implications of doing this part-way through a set of workers starting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The status flag is really just for internal tracking. The pop from pending to running is the real state change here.
dask_jobqueue/core.py
Outdated
del self.running_jobs[job_id].workers[worker] | ||
break | ||
else: | ||
raise ValueError('did not find a job that owned this worker') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will also run if a worker just restarts, or is temporarily killed by a nanny. We might not want to remove the job entirely here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see that we just remove one of the workers from the job. I guess we add the worker back in when it starts back up in the add_worker
function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, as long as add_worker
called each time a worker comes online, the worker will be added back to its "host job".
dask_jobqueue/core.py
Outdated
self.cluster.scheduler.add_plugin(self._scheduler_plugin) | ||
self.pending_jobs = self._scheduler_plugin.pending_jobs | ||
self.running_jobs = self._scheduler_plugin.running_jobs | ||
self.finished_jobs = self._scheduler_plugin.finished_jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps these should be properties? I'm not sure exactly why I'm suggesting this, but it seems like a more common pattern.
dask_jobqueue/core.py
Outdated
self._command_template += "-%(n)d" # Keep %(n) to be replaced later | ||
# worker names follow this template: {NAME}-{JOB_ID} | ||
self._command_template += " --name %s" % name # e.g. "dask-worker" | ||
self._command_template += "-${JOB_ID}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'"Some preference to put this on one line
" --name %s-${JOB_ID}" % name"
dask_jobqueue/core.py
Outdated
workers = [] | ||
for w in workers: | ||
try: | ||
# Get the actual "Worker" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend removing the quotes here, and instead use the class name WorkerState
dask_jobqueue/core.py
Outdated
@@ -212,14 +276,12 @@ def job_file(self): | |||
|
|||
def start_workers(self, n=1): | |||
""" Start workers and point them to our local scheduler """ | |||
workers = [] | |||
for _ in range(n): | |||
num_jobs = math.ceil(n / self.worker_processes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably need from __future__ import division
at the top of this file.
Nice! I'll try to have a closer look today. |
Wont be able to test until two weeks for my part. Thanks for the hard work here. A lot of activity from Scipy apparently, I hope I would be able to come in the next years. |
+1 on @guillaumeeb coming to SciPy. It would be great to meet you! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems fine to me
ci/pbs.sh
Outdated
} | ||
|
||
function jobqueue_script { | ||
docker exec -it -u pbsuser pbs_master /bin/bash -c "cd /dask-jobqueue; py.test dask_jobqueue --verbose -E pbs" | ||
docker exec -it -u pbsuser pbs_master /bin/bash -c "cd /dask-jobqueue; py.test dask_jobqueue --verbose -E pbs -s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider removing the -s
dask_jobqueue/core.py
Outdated
|
||
logger = logging.getLogger(__name__) | ||
logger.setLevel(logging.DEBUG) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove this line entirely. It's nicer to lets users define logging priorities
I plan to merge this later today if there are no further comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to see that merged!
Looks like I had a pending review and I did not submit it ...
All the comments can be tackled in further PRs.
@@ -0,0 +1,2 @@ | |||
|
|||
QUEUE_WAIT = 60 # seconds |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's great to have a constant that is used consistently in the test!
Is there a good reason to leave this to 60s? If not a smaller number like 15s (I think that was the number before) would be good.
# Keep information on process, cores, and memory, for use in subclasses | ||
self.worker_memory = parse_bytes(memory) | ||
|
||
# Keep information on process, threads and memory, for use in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You probably want to revert the change in this comment
def __init__(self): | ||
self.pending_jobs = OrderedDict() | ||
self.running_jobs = OrderedDict() | ||
self.finished_jobs = OrderedDict() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find finished_jobs
is not such a great name because those are jobs that have been qdel
ed. In my mind finished_jobs
means the job has finished normally (i.e. was not qdel
ed). I don't have a very good suggestion for a better name though, maybe stopped_jobs
or canceled_jobs
.
@@ -201,3 +201,12 @@ When the cluster object goes away, either because you delete it or because you | |||
close your Python program, it will send a signal to the workers to shut down. | |||
If for some reason this signal does not get through then workers will kill | |||
themselves after 60 seconds of waiting for a non-existent scheduler. | |||
|
|||
Workers vs Jobs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice to have something like this!
|
||
def add_worker(self, scheduler, worker=None, name=None, **kwargs): | ||
''' Run when a new worker enters the cluster''' | ||
logger.debug("adding worker %s" % worker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: generally with logging, you should do:
logger.debug("adding worker %s", worker)
I think this avoids to do some unnecessary formatting work when not logging.
|
||
# if this is the first worker for this job, move job to running | ||
if job_id not in self.running_jobs: | ||
logger.debug("this is a new job") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add the job_id in this log output? I guess job_id is in the previous logging statement but sometimes I find it more convenient to have a single logging statement as stand-alone as possible rather than having to go up a few lines to figure out the information you need.
more adaptive sacling fixe from #63
I'm just starting to work on a clearer naming convention for individual
Worker
s coming from jobqueue. The current behavior is to pass the following string to the--name
argument of eachdask-worker
call:and the worker number for each job is appended so we actually end up with:
JOB_NAME
is the name argument given to the Cluster (e.g. 'dask-worker')JOB_NUM
is an integer the job count submitted from each cluster (starting at 1)WORKER_NUM
is an integer assigned by distributed when using grouped workersI'm proposing we add the
JOB_ID
and consider dropping theJOB_NUM
. So we end up with either:We could also drop the
JOB_NAME
but maybe that is a step too far.Edit 6/14/2018:
We ended up with the following name:
Worker num is optional but job prefix and job id are required in all cases.
This PR includes some small changes to add the
JOB_ID
to the naming convention. Ultimately, this will allow us to create logical mappings from job ids to workers and that will hopefully help elleviate some of the problems like #26 and #30.