Skip to content

Commit

Permalink
lsf: Allow use_stdin to be passed in via the LSFCluster constructor.
Browse files Browse the repository at this point in the history
  • Loading branch information
stuarteberg committed Oct 25, 2019
1 parent b4f94c0 commit b524b3e
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 17 deletions.
11 changes: 11 additions & 0 deletions conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

import pytest

import dask_jobqueue.lsf


def pytest_addoption(parser):
parser.addoption(
Expand All @@ -27,3 +29,12 @@ def pytest_runtest_setup(item):
if envnames:
if item.config.getoption("-E") not in envnames:
pytest.skip("test requires env in %r" % envnames)


@pytest.fixture(autouse=True)
def mock_lsf_version(monkeypatch):
try:
dask_jobqueue.lsf.lsf_version()
except OSError:
# Provide a fake implementation of lsf_version()
monkeypatch.setattr(dask_jobqueue.lsf, "lsf_version", lambda: "10")
2 changes: 1 addition & 1 deletion dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobqueue:
job-extra: []
log-directory: null
lsf-units: null
use-stdin: null
use-stdin: null # (bool) How jobs are launched, i.e. 'bsub jobscript.sh' or 'bsub < jobscript.sh'

htcondor:
name: dask-worker
Expand Down
35 changes: 26 additions & 9 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ def __init__(
job_extra=None,
lsf_units=None,
config_name="lsf",
use_stdin=None,
**kwargs
):
if queue is None:
Expand All @@ -46,6 +47,12 @@ def __init__(
if lsf_units is None:
lsf_units = dask.config.get("jobqueue.%s.lsf-units" % config_name)

if use_stdin is None:
use_stdin = dask.config.get("jobqueue.%s.use-stdin" % config_name)
if use_stdin is None:
use_stdin = lsf_version() < "10"
self.use_stdin = use_stdin

# Instantiate args and parameters from parent abstract class
super().__init__(*args, config_name=config_name, **kwargs)

Expand Down Expand Up @@ -97,7 +104,7 @@ def __init__(
logger.debug("Job script: \n %s" % self.job_script())

async def _submit_job(self, script_filename):
if use_stdin():
if self.use_stdin:
piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"]
return self._call(piped_cmd, shell=True)
else:
Expand Down Expand Up @@ -189,12 +196,29 @@ class LSFCluster(JobQueueCluster):
lsf_units : str
Unit system for large units in resource usage set by the
LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.
use_stdin : bool
LSF's ``bsub`` command allows us to launch a job by passing it as an
argument (``bsub /tmp/jobscript.sh``) or feeding it to stdin
(``bsub < /tmp/jobscript.sh``). Depending on your cluster's configuration
and/or shared filesystem setup, one of those methods may not work,
forcing you to use the other one. This option controls which method
``dask-jobqueue`` will use to submit jobs via ``bsub``.
In particular, if your cluster fails to launch and the LSF log contains
an error message similar to the following:
.. code-block::
/home/someuser/.lsbatch/1571869562.66512066: line 8: /tmp/tmpva_yau8m.sh: No such file or directory
...then try passing ``use_stdin=True`` here or setting ``use-stdin: true``
in your ``jobqueue.lsf`` config section.
Examples
--------
>>> from dask_jobqueue import LSFCluster
>>> cluster = LSFCluster(queue='general', project='DaskonLSF',
... cores=15, memory='25GB')
... cores=15, memory='25GB', use_stdin=True)
>>> cluster.scale(jobs=10) # ask for 10 jobs
>>> from dask.distributed import Client
Expand All @@ -211,13 +235,6 @@ class LSFCluster(JobQueueCluster):
config_name = "lsf"


def use_stdin():
if dask.config.get("jobqueue.lsf.use-stdin") is not None:
return dask.config.get("jobqueue.lsf.use-stdin")

return lsf_version() < "10"


@toolz.memoize
def lsf_version():
out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate()
Expand Down
15 changes: 8 additions & 7 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,11 +217,17 @@ def test_adaptive_grouped(loop):

def test_config(loop):
with dask.config.set(
{"jobqueue.lsf.walltime": "00:02", "jobqueue.lsf.local-directory": "/foo"}
{
"jobqueue.lsf.walltime": "00:02",
"jobqueue.lsf.local-directory": "/foo",
"jobqueue.lsf.use-stdin": True,
}
):
with LSFCluster(loop=loop, cores=1, memory="2GB") as cluster:
assert "00:02" in cluster.job_script()
assert "--local-directory /foo" in cluster.job_script()
lsf_job = cluster.new_spec["cls"](cores=8, memory="24GB")
assert lsf_job.use_stdin


def test_config_name_lsf_takes_custom_config():
Expand All @@ -244,6 +250,7 @@ def test_config_name_lsf_takes_custom_config():
"env-extra": [],
"log-directory": None,
"shebang": "#!/usr/bin/env bash",
"use-stdin": None,
}

with dask.config.set({"jobqueue.lsf-config-name": conf}):
Expand Down Expand Up @@ -306,9 +313,3 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit):

def test_lsf_unit_detection_without_file():
lsf_unit_detection_helper("kb", conf_text=None)


@pytest.mark.parametrize("stdin", [True, False])
def test_stdin(stdin):
with dask.config.set({"jobqueue.lsf.use-stdin": stdin}):
assert lsf.use_stdin() is stdin

0 comments on commit b524b3e

Please sign in to comment.