Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 35 additions & 27 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
from collections import OrderedDict
from contextlib import contextmanager

import six

import dask
import docrep
from distributed import LocalCluster
Expand Down Expand Up @@ -326,7 +328,7 @@ def start_workers(self, n=1):
for _ in range(num_jobs):
with self.job_file() as fn:
out = self._submit_job(fn)
job = self._job_id_from_submit_output(out.decode())
job = self._job_id_from_submit_output(out)
if not job:
raise ValueError('Unable to parse jobid from output of %s' % out)
logger.debug("started job: %s", job)
Expand All @@ -337,43 +339,49 @@ def scheduler(self):
""" The scheduler of this cluster """
return self.local_cluster.scheduler

def _calls(self, cmds, **kwargs):
""" Call a command using subprocess.communicate
def _call(self, cmd, **kwargs):
""" Call a command using subprocess.Popen.

This centralizes calls out to the command line, providing consistent outputs, logging, and an opportunity
to go asynchronous in the future
This centralizes calls out to the command line, providing consistent
outputs, logging, and an opportunity to go asynchronous in the future.

Parameters
----------
cmd: List(List(str))
A list of commands, each of which is a list of strings to hand to subprocess.communicate
cmd: List(str))
A command, each of which is a list of strings to hand to
subprocess.Popen

Examples
--------
>>> self._calls([['ls'], ['ls', '/foo']])
>>> self._call(['ls', '/foo'])

Returns
-------
The stdout result as a string
Also logs any stderr information
"""
logger.debug("Submitting the following calls to command line")
procs = []
for cmd in cmds:
logger.debug(' '.join(cmd))
procs.append(subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs))

result = []
for proc in procs:
out, err = proc.communicate()
if err:
raise RuntimeError(err.decode())
result.append(out)
return result
The stdout produced by the command, as string.

def _call(self, cmd, **kwargs):
""" Singular version of _calls """
return self._calls([cmd], **kwargs)[0]
Raises
------
RuntimeError if the command exits with a non-zero exit code
"""
cmd_str = ' '.join(cmd)
logger.debug("Executing the following command to command line\n{}".format(cmd_str))

proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
**kwargs)

out, err = proc.communicate()
if six.PY3:
out, err = out.decode(), err.decode()
if proc.returncode != 0:
raise RuntimeError('Command exited with non-zero exit code.\n'
'Exit code: {}\n'
'Command:\n{}\n'
'stdout:\n{}\n'
'stderr:\n{}\n'.format(proc.returncode,
cmd_str, out, err))
return out

def stop_workers(self, workers):
""" Stop a list of workers"""
Expand Down
26 changes: 25 additions & 1 deletion dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from __future__ import absolute_import, division, print_function

import os
import pytest
import shutil
import socket
import sys
import re

import pytest

from dask_jobqueue import (JobQueueCluster, PBSCluster, MoabCluster,
SLURMCluster, SGECluster, LSFCluster)
Expand Down Expand Up @@ -107,3 +109,25 @@ def test_log_directory(tmpdir):
with PBSCluster(cores=1, memory='1GB',
log_directory=tmpdir.strpath):
assert os.path.exists(tmpdir.strpath)


def test_jobqueue_cluster_call(tmpdir):
cluster = PBSCluster(cores=1, memory='1GB')

path = tmpdir.join('test.py')
path.write('print("this is the stdout")')

out = cluster._call([sys.executable, path.strpath])
assert out == 'this is the stdout\n'

path_with_error = tmpdir.join('non-zero-exit-code.py')
path_with_error.write('print("this is the stdout")\n1/0')

match = ('Command exited with non-zero exit code.+'
'Exit code: 1.+'
'stdout:\nthis is the stdout.+'
'stderr:.+ZeroDivisionError')

match = re.compile(match, re.DOTALL)
with pytest.raises(RuntimeError, match=match):
cluster._call([sys.executable, path_with_error.strpath])