Skip to content

Commit

Permalink
Create dask worker through "python -m" rather than with dask-worker (#77
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ocaisa authored and lesteve committed Jun 26, 2018
1 parent 99d1444 commit 13016ae
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 13 deletions.
8 changes: 3 additions & 5 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import os
import shlex
import socket
import subprocess
Expand All @@ -12,8 +11,6 @@
from distributed.deploy import Cluster
from distributed.utils import get_ip_interface, ignoring, parse_bytes, tmpfile

dirname = os.path.dirname(sys.executable)

logger = logging.getLogger(__name__)
docstrings = docrep.DocstringProcessor()

Expand Down Expand Up @@ -124,8 +121,9 @@ def __init__(self,
self._env_header = '\n'.join(env_extra)

# dask-worker command line build
self._command_template = os.path.join(
dirname, 'dask-worker %s' % self.scheduler.address)
dask_worker_command = (
'%(python)s -m distributed.cli.dask_worker' % dict(python=sys.executable))
self._command_template = ' '.join([dask_worker_command, self.scheduler.address])
if threads is not None:
self._command_template += " --nthreads %d" % threads
if processes is not None:
Expand Down
4 changes: 0 additions & 4 deletions dask_jobqueue/slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@

import logging
import math
import os
import sys

import dask

from .core import JobQueueCluster, docstrings

logger = logging.getLogger(__name__)

dirname = os.path.dirname(sys.executable)


class SLURMCluster(JobQueueCluster):
__doc__ = docstrings.with_indents(""" Launch Dask on a SLURM cluster
Expand Down
5 changes: 3 additions & 2 deletions dask_jobqueue/tests/test_pbs.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from time import sleep, time

import pytest
import sys
from distributed import Client
from distributed.utils_test import loop # noqa: F401

Expand Down Expand Up @@ -57,7 +58,7 @@ def test_job_script():
assert '#PBS -q' not in job_script
assert '#PBS -A' not in job_script

assert '/dask-worker tcp://' in job_script
assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7GB' in job_script

with PBSCluster(queue='regular', project='DaskOnPBS', processes=4, threads=2, memory='7GB',
Expand All @@ -71,7 +72,7 @@ def test_job_script():
assert '#PBS -l walltime=' in job_script
assert '#PBS -A DaskOnPBS' in job_script

assert '/dask-worker tcp://' in job_script
assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7GB' in job_script


Expand Down
5 changes: 3 additions & 2 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from time import sleep, time

import pytest
import sys
from distributed import Client
from distributed.utils_test import loop # noqa: F401

Expand Down Expand Up @@ -56,7 +57,7 @@ def test_job_script():

assert 'export ' not in job_script

assert '/dask-worker tcp://' in job_script
assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7GB' in job_script

with SLURMCluster(walltime='00:02:00', processes=4, threads=2, memory='7GB',
Expand All @@ -77,7 +78,7 @@ def test_job_script():
assert 'export LANGUAGE="en_US.utf8"' in job_script
assert 'export LC_ALL="en_US.utf8"' in job_script

assert '/dask-worker tcp://' in job_script
assert '{} -m distributed.cli.dask_worker tcp://'.format(sys.executable) in job_script
assert '--nthreads 2 --nprocs 4 --memory-limit 7GB' in job_script


Expand Down

0 comments on commit 13016ae

Please sign in to comment.