Skip to content

Commit

Permalink
Add LSFCluster (#78)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Bell authored and mrocklin committed Aug 1, 2018
1 parent 4f2701f commit 2319b22
Show file tree
Hide file tree
Showing 8 changed files with 371 additions and 7 deletions.
1 change: 1 addition & 0 deletions dask_jobqueue/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from .pbs import PBSCluster
from .slurm import SLURMCluster
from .sge import SGECluster
from .lsf import LSFCluster

from ._version import get_versions
__version__ = get_versions()['version']
Expand Down
14 changes: 9 additions & 5 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,13 +280,16 @@ def job_file(self):
f.write(self.job_script())
yield fn

def _submit_job(self, script_filename):
return self._call(shlex.split(self.submit_command) + [script_filename])

def start_workers(self, n=1):
""" Start workers and point them to our local scheduler """
logger.debug('starting %s workers' % n)
num_jobs = math.ceil(n / self.worker_processes)
for _ in range(num_jobs):
with self.job_file() as fn:
out = self._call(shlex.split(self.submit_command) + [fn])
out = self._submit_job(fn)
job = self._job_id_from_submit_output(out.decode())
logger.debug("started job: %s" % job)
self.pending_jobs[job] = {}
Expand All @@ -296,7 +299,7 @@ def scheduler(self):
""" The scheduler of this cluster """
return self.local_cluster.scheduler

def _calls(self, cmds):
def _calls(self, cmds, **kwargs):
""" Call a command using subprocess.communicate
This centralzies calls out to the command line, providing consistent
Expand All @@ -323,7 +326,8 @@ def _calls(self, cmds):
logger.debug(' '.join(cmd))
procs.append(subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE))
stderr=subprocess.PIPE,
**kwargs))

result = []
for proc in procs:
Expand All @@ -333,9 +337,9 @@ def _calls(self, cmds):
result.append(out)
return result

def _call(self, cmd):
def _call(self, cmd, **kwargs):
""" Singular version of _calls """
return self._calls([cmd])[0]
return self._calls([cmd], **kwargs)[0]

def stop_workers(self, workers):
""" Stop a list of workers"""
Expand Down
22 changes: 22 additions & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,25 @@ jobqueue:
env-extra: []
resource-spec: null
job-extra: []

lsf:
name: dask-worker

# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
local-directory: null # Location of fast local storage like /scratch or $TMPDIR

# LSF resource manager options
queue: null
project: null
walltime: '00:30'
extra: ""
env-extra: []
ncpus: null
mem: null
job-extra: []
131 changes: 131 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
from __future__ import absolute_import, division, print_function

import logging
import math

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)


class LSFCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a LSF cluster
Parameters
----------
queue : str
Destination queue for each worker job. Passed to `#BSUB -q` option.
project : str
Accounting string associated with each worker job. Passed to
`#BSUB -P` option.
ncpus : int
Number of cpus. Passed to `#BSUB -n` option.
mem : int
Request memory in bytes. Passed to `#BSUB -M` option.
walltime : str
Walltime for each worker job in HH:MM. Passed to `#BSUB -W` option.
job_extra : list
List of other LSF options, for example -u. Each option will be
prepended with the #LSF prefix.
%(JobQueueCluster.parameters)s
Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFcluster(queue='general', project='DaskonLSF',
... cores=15, memory='25GB')
>>> cluster.start_workers(10) # this may take a few seconds to launch
>>> from dask.distributed import Client
>>> client = Client(cluster)
This also works with adaptive clusters. This automatically launches and
kill workers based on load.
>>> cluster.adapt()
""", 4)

# Override class variables
submit_command = 'bsub <'
cancel_command = 'bkill'
scheduler_name = 'lsf'

def __init__(self, queue=None, project=None, ncpus=None, mem=None,
walltime=None, job_extra=None, **kwargs):
if queue is None:
queue = dask.config.get('jobqueue.%s.queue' % self.scheduler_name)
if project is None:
project = dask.config.get('jobqueue.%s.project' % self.scheduler_name)
if ncpus is None:
ncpus = dask.config.get('jobqueue.%s.ncpus' % self.scheduler_name)
if mem is None:
mem = dask.config.get('jobqueue.%s.mem' % self.scheduler_name)
if walltime is None:
walltime = dask.config.get('jobqueue.%s.walltime' % self.scheduler_name)
if job_extra is None:
job_extra = dask.config.get('jobqueue.%s.job-extra' % self.scheduler_name)

# Instantiate args and parameters from parent abstract class
super(LSFCluster, self).__init__(**kwargs)

header_lines = []
# LSF header build
if self.name is not None:
header_lines.append('#BSUB -J %s' % self.name)
header_lines.append('#BSUB -e %s.err' % self.name)
header_lines.append('#BSUB -o %s.out' % self.name)
if queue is not None:
header_lines.append('#BSUB -q %s' % queue)
if project is not None:
header_lines.append('#BSUB -P %s' % project)
if ncpus is None:
# Compute default cores specifications
ncpus = self.worker_cores
logger.info("ncpus specification for LSF not set, "
"initializing it to %s" % ncpus)
if ncpus is not None:
header_lines.append('#BSUB -n %s' % ncpus)
if mem is None:
# Compute default memory specifications
mem = self.worker_memory
logger.info("mem specification for LSF not set, "
"initializing it to %s" % mem)
if mem is not None:
memory_string = lsf_format_bytes_ceil(mem)
header_lines.append('#BSUB -M %s' % memory_string)
if walltime is not None:
header_lines.append('#BSUB -W %s' % walltime)
header_lines.extend(['#BSUB %s' % arg for arg in job_extra])
header_lines.append('JOB_ID=${LSB_JOBID%.*}')

# Declare class attribute that shall be overriden
self.job_header = '\n'.join(header_lines)

logger.debug("Job script: \n %s" % self.job_script())

def _job_id_from_submit_output(self, out):
return out.split('<')[1].split('>')[0].strip()

def _submit_job(self, script_filename):
piped_cmd = [self.submit_command + ' ' + script_filename + ' 2> /dev/null']
return self._call(piped_cmd, shell=True)


def lsf_format_bytes_ceil(n):
""" Format bytes as text
Convert bytes to megabytes which LSF requires.
Parameters
----------
n: int
Bytes
Examples
--------
>>> lsf_format_bytes_ceil(1234567890)
'1235'
"""
return '%d' % math.ceil(n / (1000**2))

0 comments on commit 2319b22

Please sign in to comment.