Skip to content

Commit

Permalink
Use a mix of processes and threads when processes is not specified (#375
Browse files Browse the repository at this point in the history
)

This matches the behaviour of `distributed.LocalCluster`.
  • Loading branch information
lesteve committed Dec 6, 2019
1 parent fea0df7 commit 7343632
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 25 deletions.
36 changes: 21 additions & 15 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@

import dask
from dask.utils import ignoring

from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.local import nprocesses_nthreads
from distributed.scheduler import Scheduler

from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface

logger = logging.getLogger(__name__)
Expand All @@ -24,8 +25,10 @@
memory: str
Total amount of memory per job
processes : int
Cut the job up into this many processes.
Good for GIL workloads or for nodes with many cores.
Cut the job up into this many processes. Good for GIL workloads or for
nodes with many cores.
By default, ``process ~= sqrt(cores)`` so that the number of processes
and the number of threads per process is roughly the same.
interface : str
Network interface like 'eth0' or 'ib0'.
nanny : bool
Expand Down Expand Up @@ -148,14 +151,27 @@ def __init__(
config_name = default_config_name
self.config_name = config_name

if job_name is None:
job_name = dask.config.get("jobqueue.%s.name" % self.config_name)
if cores is None:
cores = dask.config.get("jobqueue.%s.cores" % self.config_name)
if memory is None:
memory = dask.config.get("jobqueue.%s.memory" % self.config_name)

if cores is None or memory is None:
job_class_name = self.__class__.__name__
cluster_class_name = job_class_name.replace("Job", "Cluster")
raise ValueError(
"You must specify how much cores and memory per job you want to use, for example:\n"
"cluster = {}(cores={}, memory={!r})".format(
cluster_class_name, cores or 8, memory or "24GB"
)
)

if job_name is None:
job_name = dask.config.get("jobqueue.%s.name" % self.config_name)
if processes is None:
processes = dask.config.get("jobqueue.%s.processes" % self.config_name)
if processes is None:
processes, _ = nprocesses_nthreads(cores)
if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % self.config_name)
if death_timeout is None:
Expand All @@ -181,16 +197,6 @@ def __init__(
if shebang is None:
shebang = dask.config.get("jobqueue.%s.shebang" % self.config_name)

if cores is None or memory is None:
job_class_name = self.__class__.__name__
cluster_class_name = job_class_name.replace("Job", "Cluster")
raise ValueError(
"You must specify how much cores and memory per job you want to use, for example:\n"
"cluster = {}(cores={}, memory={!r})".format(
cluster_class_name, cores or 8, memory or "24GB"
)
)

# This attribute should be set in the derived class
self.job_header = None

Expand Down
16 changes: 8 additions & 8 deletions dask_jobqueue/jobqueue.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -28,7 +28,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -51,7 +51,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -75,7 +75,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -99,7 +99,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -122,7 +122,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -148,7 +148,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand All @@ -167,7 +167,7 @@ jobqueue:
# Dask worker options
cores: null # Total number of cores per job
memory: null # Total amount of memory per job
processes: 1 # Number of Python processes per job
processes: null # Number of Python processes per job

interface: null # Network interface to use like eth0 or ib0
death-timeout: 60 # Number of seconds to wait if a worker can not find a scheduler
Expand Down
16 changes: 15 additions & 1 deletion dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def test_command_template():
"%s -m distributed.cli.dask_worker" % (sys.executable)
in cluster._dummy_job._command_template
)
assert " --nthreads 2" in cluster._dummy_job._command_template
assert " --nthreads 1" in cluster._dummy_job._command_template
assert " --memory-limit " in cluster._dummy_job._command_template
assert " --name " in cluster._dummy_job._command_template

Expand Down Expand Up @@ -254,3 +254,17 @@ def __init__(self, *args, **kwargs):

with pytest.raises(ValueError, match="job_cls.+MyCluster"):
MyCluster(cores=1, memory="1GB")


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_default_number_of_worker_processes(Cluster):
with Cluster(cores=4, memory="4GB") as cluster:
assert " --nprocs 4" in cluster.job_script()
assert " --nthreads 1" in cluster.job_script()

with Cluster(cores=6, memory="4GB") as cluster:
assert " --nprocs 3" in cluster.job_script()
assert " --nthreads 2" in cluster.job_script()
5 changes: 4 additions & 1 deletion docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ Devlopment version
(:pr:`360`).
- ``HTCondorCluster``: support older ``HTCondor`` versions without ``-file``
argument (:pr:`351`).

- all cluster classes: ``processes`` parameter default has changed. By default,
``processes ~= sqrt(cores)`` so that the number of processes and the number
of threads per process is roughly the same. Old default was to use one
process and only threads, i.e. ``proccesses=1``, ``threads_per_process=cores``.

0.7.0 / 2019-10-09
------------------
Expand Down

0 comments on commit 7343632

Please sign in to comment.