Skip to content

Commit

Permalink
OARCluster implementation (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
lesteve committed Aug 29, 2018
1 parent a8d37e6 commit 8ae5fff
Show file tree
Hide file tree
Showing 4 changed files with 195 additions and 0 deletions.
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .slurm import SLURMCluster
from .sge import SGECluster
from .lsf import LSFCluster
from .oar import OARCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
21 changes: 21 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,25 @@
jobqueue:
oar:
name: dask-worker

# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# OAR resource manager options
queue: null
project: null
walltime: '00:30:00'
extra: ""
env-extra: []
resource-spec: null
job-extra: []

pbs:
name: dask-worker

Expand Down
112 changes: 112 additions & 0 deletions dask_jobqueue/oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from __future__ import absolute_import, division, print_function

import logging
import shlex

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class OARCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a OAR cluster
Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#OAR -q` option.
project : str
Accounting string associated with each worker job. Passed to `#OAR -p` option.
resource_spec : str
Request resources and specify job placement. Passed to `#OAR -l` option.
walltime : str
Walltime for each worker job.
job_extra : list
List of other OAR options, for example `-t besteffort`. Each option will be prepended with the #OAR prefix.
%(JobQueueCluster.parameters)s
Examples
--------
>>> from dask_jobqueue import OARCluster
>>> cluster = OARCluster(queue='regular')
>>> cluster.scale(10) # this may take a few seconds to launch
>>> from dask.distributed import Client
>>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and kill workers based on load.
>>> cluster.adapt()
""", 4)

# Override class variables
submit_command = 'oarsub'
cancel_command = 'oardel'
scheduler_name = 'oar'
job_id_regexp = r'OAR_JOB_ID=(?P<job_id>\d+)'

def __init__(self, queue=None, project=None, resource_spec=None, walltime=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
if resource_spec is None:
resource_spec = dask.config.get('jobqueue.%s.resource-spec' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)

super(OARCluster, self).__init__(**kwargs)

header_lines = ['#!/usr/bin/env bash']
if self.name is not None:
header_lines.append('#OAR -n %s' % self.name)
if queue is not None:
header_lines.append('#OAR -q %s' % queue)
if project is not None:
header_lines.append('#OAR --project %s' % project)

# OAR needs to have the resource on a single line otherwise it is
# considered as a "moldable job" (i.e. the scheduler can chose between
# multiple sets of resources constraints)
resource_spec_list = []
if resource_spec is None:
# default resource_spec if not specified. Crucial to specify
# nodes=1 to make sure the cores allocated are on the same node.
resource_spec = '/nodes=1/core=%d' % self.worker_cores
resource_spec_list.append(resource_spec)
if walltime is not None:
resource_spec_list.append('walltime=%s' % walltime)

full_resource_spec = ','.join(resource_spec_list)
header_lines.append('#OAR -l %s' % full_resource_spec)
header_lines.extend(['#OAR %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${OAR_JOB_ID}')

self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

def _submit_job(self, fn):
# OAR specificity: the submission script needs to exist on the worker
# when the job starts on the worker. This is different from other
# schedulers that only need the script on the submission node at
# submission time. That means that we can not use the same strategy as
# in JobQueueCluster: create a temporary submission file, submit the
# script, delete the submission file. In order to reuse the code in
# the base JobQueueCluster class, we read from the temporary file and
# reconstruct the command line where the script is passed in as a
# string (inline script in OAR jargon) rather than as a filename.
with open(fn) as f:
content_lines = f.readlines()

oar_lines = [line for line in content_lines if line.startswith('#OAR ')]
oarsub_options = [line.replace('#OAR ', '').strip() for line in oar_lines]
inline_script_lines = [line for line in content_lines if not line.startswith('#')]
inline_script = ''.join(inline_script_lines)
oarsub_command = ' '.join([self.submit_command] + oarsub_options)
oarsub_command_split = shlex.split(oarsub_command) + [inline_script]
return self._call(oarsub_command_split)
61 changes: 61 additions & 0 deletions dask_jobqueue/tests/test_oar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from __future__ import absolute_import, division, print_function

import sys

from dask_jobqueue import OARCluster


def test_header():
with OARCluster(walltime='00:02:00', processes=4, cores=8, memory='28GB') as cluster:
assert '#OAR -n dask-worker' in cluster.job_header
assert '#OAR -l /nodes=1/core=8,walltime=00:02:00' in cluster.job_header
assert '#OAR --project' not in cluster.job_header
assert '#OAR -q' not in cluster.job_header

with OARCluster(queue='regular', project='DaskOnOar', processes=4, cores=8, memory='28GB',
job_cpu=16, job_mem='100G', job_extra=['-t besteffort']) as cluster:
assert 'walltime=' in cluster.job_header
assert '#OAR --project DaskOnOar' in cluster.job_header
assert '#OAR -q regular' in cluster.job_header
assert '#OAR -t besteffort' in cluster.job_header

with OARCluster(cores=4, memory='8GB') as cluster:
assert '#OAR -n dask-worker' in cluster.job_header
assert 'walltime=' in cluster.job_header
assert '#OAR --project' not in cluster.job_header
assert '#OAR -q' not in cluster.job_header


def test_job_script():
with OARCluster(walltime='00:02:00', processes=4, cores=8, memory='28GB') as cluster:
job_script = cluster.job_script()
assert '#OAR' in job_script
assert '#OAR -n dask-worker' in job_script
assert '--memory-limit 7.00GB ' in job_script
assert '#OAR -l /nodes=1/core=8,walltime=00:02:00' in job_script
assert '#OAR --project' not in job_script
assert '#OAR -q' not in job_script

assert 'export ' not in job_script

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script

with OARCluster(walltime='00:02:00', processes=4, cores=8, memory='28GB',
env_extra=['export LANG="en_US.utf8"',
'export LANGUAGE="en_US.utf8"',
'export LC_ALL="en_US.utf8"']) as cluster:
job_script = cluster.job_script()
assert '#OAR' in job_script
assert '#OAR -n dask-worker' in job_script
assert '--memory-limit 7.00GB ' in job_script
assert '#OAR -l /nodes=1/core=8,walltime=00:02:00' in job_script
assert '#OAR --project' not in job_script
assert '#OAR -q' not in job_script

assert 'export LANG="en_US.utf8"' in job_script
assert 'export LANGUAGE="en_US.utf8"' in job_script
assert 'export LC_ALL="en_US.utf8"' in job_script

assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7.00GB' in job_script

0 comments on commit 8ae5fff

Please sign in to comment.