Skip to content

Commit

Permalink
Move job id parsing to JobQueueCluster. (#131)
Browse files Browse the repository at this point in the history
  • Loading branch information
willirath authored and lesteve committed Aug 28, 2018
1 parent ff4ad95 commit a8d37e6
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 19 deletions.
19 changes: 17 additions & 2 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import logging
import math
import re
import shlex
import subprocess
import sys
Expand Down Expand Up @@ -138,6 +139,7 @@ class JobQueueCluster(Cluster):
cancel_command = None
scheduler_name = ''
_adaptive_options = {'worker_key': lambda ws: _job_id_from_worker_name(ws.name)}
job_id_regexp = r'(?P<job_id>\d+)'

def __init__(self,
name=None,
Expand Down Expand Up @@ -410,5 +412,18 @@ def _del_pending_jobs(self):
return jobs

def _job_id_from_submit_output(self, out):
raise NotImplementedError('_job_id_from_submit_output must be implemented when JobQueueCluster is '
'inherited. It should convert the stdout from submit_command to the job id')
match = re.search(self.job_id_regexp, out)
if match is None:
msg = ('Could not parse job id from submission command '
"output.\nJob id regexp is {!r}\nSubmission command "
'output is:\n{}'.format(self.job_id_regexp, out))
raise ValueError(msg)

job_id = match.groupdict().get('job_id')
if job_id is None:
msg = ("You need to use a 'job_id' named group in your regexp, e.g. "
"r'(?P<job_id>\d+)', in your regexp. Your regexp was: "
"{!r}".format(self.job_id_regexp))
raise ValueError(msg)

return job_id
3 changes: 0 additions & 3 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ def __init__(self, queue=None, project=None, ncpus=None, mem=None, walltime=None

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('<')[1].split('>')[0].strip()

def _submit_job(self, script_filename):
piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null']
return self._call(piped_cmd, shell=True)
Expand Down
3 changes: 0 additions & 3 deletions dask_jobqueue/moab.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,3 @@ class MoabCluster(PBSCluster):
submit_command = 'msub'
cancel_command = 'canceljob'
scheduler_name = 'moab'

def _job_id_from_submit_output(self, out):
return out.strip()
3 changes: 0 additions & 3 deletions dask_jobqueue/pbs.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,6 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('.')[0].strip()


def pbs_format_bytes_ceil(n):
""" Format bytes as text.
Expand Down
3 changes: 0 additions & 3 deletions dask_jobqueue/sge.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,3 @@ def __init__(self, queue=None, project=None, resource_spec=None, walltime=None,
self.job_header = header_template % config

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.strip()
3 changes: 0 additions & 3 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,6 @@ def __init__(self, queue=None, project=None, walltime=None, job_cpu=None, job_me

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split(';')[0].strip()


def slurm_format_bytes_ceil(n):
""" Format bytes as text.
Expand Down
35 changes: 35 additions & 0 deletions dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,38 @@ def test_forward_ip():
with PBSCluster(walltime='00:02:00', processes=4, cores=8, memory='28GB',
name='dask-worker') as cluster:
assert cluster.local_cluster.scheduler.ip == default_ip


@pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster, SLURMCluster,
SGECluster, LSFCluster])
@pytest.mark.parametrize(
'qsub_return_string',
['{job_id}.admin01',
'Request {job_id}.asdf was sumbitted to queue: standard.',
'sbatch: Submitted batch job {job_id}',
'{job_id};cluster',
'Job <{job_id}> is submitted to default queue <normal>.',
'{job_id}'])
def test_job_id_from_qsub(Cluster, qsub_return_string):
original_job_id = '654321'
qsub_return_string = qsub_return_string.format(job_id=original_job_id)
with Cluster(cores=1, memory='1GB') as cluster:
assert (original_job_id
== cluster._job_id_from_submit_output(qsub_return_string))


@pytest.mark.parametrize('Cluster', [PBSCluster, MoabCluster, SLURMCluster,
SGECluster, LSFCluster])
def test_job_id_error_handling(Cluster):
# non-matching regexp
with Cluster(cores=1, memory='1GB') as cluster:
with pytest.raises(ValueError, match="Could not parse job id"):
return_string = "there is no number here"
cluster._job_id_from_submit_output(return_string)

# no job_id named group in the regexp
with Cluster(cores=1, memory='1GB') as cluster:
with pytest.raises(ValueError, match="You need to use a 'job_id' named group"):
return_string = 'Job <12345> submited to <normal>.'
cluster.job_id_regexp = r'(\d+)'
cluster._job_id_from_submit_output(return_string)
3 changes: 1 addition & 2 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -285,5 +285,4 @@ problems are the following:
We use submit command stdout to parse the job_id corresponding to the
launched group of worker. If the parsing fails, then dask-jobqueue won't work
as expected and may throw exceptions. You can have a look at the parsing
function in every ``JobQueueCluster`` implementation, see
``_job_id_from_submit_output`` function.
function ``JobQueueCluster._job_id_from_submit_output``.

0 comments on commit a8d37e6

Please sign in to comment.