Skip to content

Commit

Permalink
Support stdin with LSF bsub (#347)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrocklin committed Oct 7, 2019
1 parent fb6e29e commit 8d836a3
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
1 change: 1 addition & 0 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ jobqueue:
job-extra: []
log-directory: null
lsf-units: null
use-stdin: null

htcondor:
name: dask-worker
Expand Down
27 changes: 27 additions & 0 deletions dask_jobqueue/lsf.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from distutils.version import LooseVersion

import logging
import math
import os
import re
import subprocess
import toolz

import dask

Expand Down Expand Up @@ -91,6 +96,14 @@ def __init__(

logger.debug("Job script: \n %s" % self.job_script())

async def _submit_job(self, script_filename):
if use_stdin():
piped_cmd = [self.submit_command + "< " + script_filename + " 2> /dev/null"]
return self._call(piped_cmd, shell=True)
else:
result = await super()._submit_job(script_filename)
return result


def lsf_format_bytes_ceil(n, lsf_units="mb"):
""" Format bytes as text
Expand Down Expand Up @@ -196,3 +209,17 @@ class LSFCluster(JobQueueCluster):
)
job_cls = LSFJob
config_name = "lsf"


def use_stdin():
if dask.config.get("jobqueue.lsf.use-stdin") is not None:
return dask.config.get("jobqueue.lsf.use-stdin")

return lsf_version() < "10"


@toolz.memoize
def lsf_version():
out, _ = subprocess.Popen("lsid", stdout=subprocess.PIPE).communicate()
version = re.search(r"(\d+\.)+\d+", out.decode()).group()
return LooseVersion(version)
6 changes: 6 additions & 0 deletions dask_jobqueue/tests/test_lsf.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,9 @@ def test_lsf_unit_detection(lsf_units_string, expected_unit):

def test_lsf_unit_detection_without_file():
lsf_unit_detection_helper("kb", conf_text=None)


@pytest.mark.parametrize("stdin", [True, False])
def test_stdin(stdin):
with dask.config.set({"jobqueue.lsf.use-stdin": stdin}):
assert lsf.use_stdin() is stdin

0 comments on commit 8d836a3

Please sign in to comment.