Skip to content

Commit

Permalink
Add scheduler_options to pass scheduler-specific parameters (#384)
Browse files Browse the repository at this point in the history
* allows to have a different interface on the workers and the scheduler
* passing host and dashboard_address is now an error. They should be pass through scheduler_options
* improve dask-jobqueue SLURM setup to have a different interface on the worker and scheduler container and add test
* some cleanup as well
  • Loading branch information
lesteve committed Mar 23, 2020
1 parent d9c4cec commit a85b64f
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 22 deletions.
10 changes: 10 additions & 0 deletions ci/slurm.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ function jobqueue_before_install {

docker ps -a
docker images
show_network_interfaces
}

function show_network_interfaces {
for c in slurmctld c1 c2; do
echo '------------------------------------------------------------'
echo docker container: $c
docker exec -it $c python -c 'import psutil; print(psutil.net_if_addrs().keys())'
echo '------------------------------------------------------------'
done
}

function jobqueue_install {
Expand Down
2 changes: 2 additions & 0 deletions ci/slurm/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
FROM giovtorres/slurm-docker-cluster

RUN yum install -y iproute

RUN curl -o miniconda.sh https://repo.continuum.io/miniconda/Miniconda3-latest-Linux-x86_64.sh && \
bash miniconda.sh -f -b -p /opt/anaconda && \
/opt/anaconda/bin/conda clean -tipy && \
Expand Down
27 changes: 27 additions & 0 deletions ci/slurm/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ services:
MYSQL_PASSWORD: password
volumes:
- var_lib_mysql:/var/lib/mysql
networks:
common-network:

slurmdbd:
build: .
Expand All @@ -26,6 +28,8 @@ services:
- "6819"
depends_on:
- mysql
networks:
common-network:

slurmctld:
build: .
Expand All @@ -42,6 +46,11 @@ services:
- "6817"
depends_on:
- "slurmdbd"
networks:
common-network:
ipv4_address: 10.1.1.10
cap_add:
- NET_ADMIN

c1:
build: .
Expand All @@ -57,6 +66,11 @@ services:
- "6818"
depends_on:
- "slurmctld"
networks:
common-network:
ipv4_address: 10.1.1.11
cap_add:
- NET_ADMIN

c2:
build: .
Expand All @@ -72,10 +86,23 @@ services:
- "6818"
depends_on:
- "slurmctld"
networks:
common-network:
ipv4_address: 10.1.1.12
cap_add:
- NET_ADMIN

volumes:
etc_munge:
etc_slurm:
slurm_jobdir:
var_lib_mysql:
var_log_slurm:

networks:
common-network:
driver: bridge
ipam:
driver: default
config:
- subnet: 10.1.1.0/24
8 changes: 8 additions & 0 deletions ci/slurm/start-slurm.sh
Original file line number Diff line number Diff line change
@@ -1,9 +1,17 @@
#!/bin/bash

docker-compose up --build -d

while [ `./register_cluster.sh 2>&1 | grep "sacctmgr: error" | wc -l` -ne 0 ]
do
echo "Waiting for SLURM cluster to become ready";
sleep 2
done
echo "SLURM properly configured"

# On some clusters the login node does not have the same interface as the
# compute nodes. The next three lines allow to test this edge case by adding
# separate interfaces on the worker and on the scheduler nodes.
docker exec slurmctld ip addr add 10.1.1.20/24 dev eth0 label eth0:scheduler
docker exec c1 ip addr add 10.1.1.21/24 dev eth0 label eth0:worker
docker exec c2 ip addr add 10.1.1.22/24 dev eth0 label eth0:worker
68 changes: 50 additions & 18 deletions dask_jobqueue/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from distributed.deploy.spec import ProcessInterface, SpecCluster
from distributed.deploy.local import nprocesses_nthreads
from distributed.scheduler import Scheduler
from distributed.utils import format_bytes, parse_bytes, tmpfile, get_ip_interface
from distributed.utils import format_bytes, parse_bytes, tmpfile

logger = logging.getLogger(__name__)

Expand All @@ -30,7 +30,11 @@
By default, ``process ~= sqrt(cores)`` so that the number of processes
and the number of threads per process is roughly the same.
interface : str
Network interface like 'eth0' or 'ib0'.
Network interface like 'eth0' or 'ib0'. This will be used both for the
Dask scheduler and the Dask workers interface. If you need a different
interface for the Dask scheduler you can pass it through
the ``scheduler_options`` argument:
``interface=your_worker_interface, scheduler_options={'interface': your_scheduler_interface}``.
nanny : bool
Whether or not to start a nanny process
local_directory : str
Expand Down Expand Up @@ -69,8 +73,12 @@
Whether or not to run this cluster object with the async/await syntax
security : Security
A dask.distributed security object if you're using TLS/SSL
dashboard_address : str or int
An address like ":8787" on which to host the Scheduler's dashboard
scheduler_options : dict
Used to pass additional arguments to Dask Scheduler. For example use
``scheduler_options={'dasboard_address': ':12435'}`` to specify which
port the web dashboard should use or ``scheduler_options={'host': 'your-host'}``
to specify the host the Dask scheduler should run on. See
:class:`distributed.Scheduler` for more details.
""".strip()


Expand Down Expand Up @@ -202,9 +210,6 @@ def __init__(

if interface:
extra = extra + ["--interface", interface]
kwargs.setdefault("host", get_ip_interface(interface))
else:
kwargs.setdefault("host", "")

# Keep information on process, cores, and memory, for use in subclasses
self.worker_memory = parse_bytes(memory) if memory is not None else None
Expand Down Expand Up @@ -420,13 +425,15 @@ def __init__(
silence_logs="error",
name=None,
asynchronous=False,
# Scheduler keywords
interface=None,
# Scheduler-only keywords
dashboard_address=None,
host=None,
scheduler_options=None,
# Options for both scheduler and workers
interface=None,
protocol="tcp://",
dashboard_address=":8787",
config_name=None,
# Job keywords
config_name=None,
**kwargs
):
self.status = "created"
Expand All @@ -447,22 +454,47 @@ def __init__(
)
)

if dashboard_address is not None:
raise ValueError(
"Please pass 'dashboard_address' through 'scheduler_options': use\n"
'cluster = {0}(..., scheduler_options={{"dashboard_address": ":12345"}}) rather than\n'
'cluster = {0}(..., dashboard_address="12435")'.format(
self.__class__.__name__
)
)

if host is not None:
raise ValueError(
"Please pass 'host' through 'scheduler_options': use\n"
'cluster = {0}(..., scheduler_options={{"host": "your-host"}}) rather than\n'
'cluster = {0}(..., host="your-host")'.format(self.__class__.__name__)
)

default_config_name = self.job_cls.default_config_name()
if config_name is None:
config_name = default_config_name

if interface is None:
interface = dask.config.get("jobqueue.%s.interface" % config_name)
if scheduler_options is None:
scheduler_options = {}

default_scheduler_options = {
"protocol": protocol,
"dashboard_address": ":8787",
"security": security,
}
# scheduler_options overrides parameters common to both workers and scheduler
scheduler_options = dict(default_scheduler_options, **scheduler_options)

# Use the same network interface as the workers if scheduler ip has not
# been set through scheduler_options via 'host' or 'interface'
if "host" not in scheduler_options and "interface" not in scheduler_options:
scheduler_options["interface"] = interface

scheduler = {
"cls": Scheduler, # Use local scheduler for now
"options": {
"protocol": protocol,
"interface": interface,
"host": host,
"dashboard_address": dashboard_address,
"security": security,
},
"options": scheduler_options,
}

kwargs["config_name"] = config_name
Expand Down
77 changes: 76 additions & 1 deletion dask_jobqueue/tests/test_jobqueue_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def test_forward_ip():
cores=8,
memory="28GB",
name="dask-worker",
host=ip,
scheduler_options={"host": ip},
) as cluster:
assert cluster.scheduler.ip == ip

Expand Down Expand Up @@ -268,3 +268,78 @@ def test_default_number_of_worker_processes(Cluster):
with Cluster(cores=6, memory="4GB") as cluster:
assert " --nprocs 3" in cluster.job_script()
assert " --nthreads 2" in cluster.job_script()


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_scheduler_options(Cluster):
net_if_addrs = psutil.net_if_addrs()
interface = list(net_if_addrs.keys())[0]
port = 8804

with Cluster(
cores=1, memory="1GB", scheduler_options={"interface": interface, "port": port}
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options["interface"] == interface
assert scheduler_options["port"] == port


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_scheduler_options_interface(Cluster):
net_if_addrs = psutil.net_if_addrs()
scheduler_interface = list(net_if_addrs.keys())[0]
worker_interface = "worker-interface"
scheduler_host = socket.gethostname()

with Cluster(cores=1, memory="1GB", interface=scheduler_interface) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
worker_options = cluster.new_spec["options"]
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == scheduler_interface

with Cluster(
cores=1,
memory="1GB",
interface=worker_interface,
scheduler_options={"interface": scheduler_interface},
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
worker_options = cluster.new_spec["options"]
assert scheduler_options["interface"] == scheduler_interface
assert worker_options["interface"] == worker_interface

with Cluster(
cores=1,
memory="1GB",
interface=worker_interface,
scheduler_options={"host": scheduler_host},
) as cluster:
scheduler_options = cluster.scheduler_spec["options"]
assert scheduler_options.get("interface") is None
assert scheduler_options["host"] == scheduler_host
assert worker_options["interface"] == worker_interface


@pytest.mark.parametrize(
"Cluster",
[PBSCluster, MoabCluster, SLURMCluster, SGECluster, LSFCluster, OARCluster],
)
def test_cluster_error_scheduler_arguments_should_use_scheduler_options(Cluster):
scheduler_host = socket.gethostname()
message_template = "pass {!r} through 'scheduler_options'"

message = message_template.format("host")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", host=scheduler_host):
pass

message = message_template.format("dashboard_address")
with pytest.raises(ValueError, match=message):
with Cluster(cores=1, memory="1GB", dashboard_address=":8787"):
pass
19 changes: 19 additions & 0 deletions dask_jobqueue/tests/test_slurm.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,22 @@ def test_config_name_slurm_takes_custom_config():
with dask.config.set({"jobqueue.slurm-config-name": conf}):
with SLURMCluster(config_name="slurm-config-name") as cluster:
assert cluster.job_name == "myname"


@pytest.mark.env("slurm")
def test_different_interfaces_on_scheduler_and_workers(loop):
with SLURMCluster(
walltime="00:02:00",
cores=1,
memory="2GB",
interface="eth0:worker",
scheduler_options={"interface": "eth0:scheduler"},
loop=loop,
) as cluster:
cluster.scale(jobs=1)
with Client(cluster) as client:
future = client.submit(lambda x: x + 1, 10)

client.wait_for_workers(1)

assert future.result(QUEUE_WAIT) == 11
5 changes: 5 additions & 0 deletions docs/source/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@ Changelog
Development version
-------------------

- all cluster classes: add ``scheduler_options`` allows to pass parameters to
the Dask scheduler. For example ``scheduler_options={'interface': 'eth0',
dashboard_addresses=':12435')`` (:pr:`384`). Breaking change: using ``port``
or ``dashboard_addresses`` arguments raises an error. They have to be passed
through ``scheduler_options``.
- all cluster classes: ``processes`` parameter default has changed. By default,
``processes ~= sqrt(cores)`` so that the number of processes and the number
of threads per process is roughly the same. Old default was to use one
Expand Down
8 changes: 5 additions & 3 deletions docs/source/interactive.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,12 @@ dashboard is valuable to help you understand the state of your computation and
cluster.

Typically, the dashboard is served on a separate port from Jupyter, and so can
be used whether you choose to use Jupyter or not. If you want to open up a
be used whether you choose to use Jupyter or not. If you want to open up a
connection to see the dashboard you can do so with SSH Tunneling as described
above. The dashboard's default port is at ``8787``, and is configurable with
the ``dashboard_address=`` keyword to the Dask Jobqueue cluster objects.
above. The dashboard's default port is at ``8787``, and is configurable by
using the ``scheduler_options`` parameter in the Dask Jobqueue cluster object.
For example ``scheduler_options={'dashboard_address': ':12435'}`` would use
12435 for the web dasboard port.

However, Jupyter is also able to proxy the dashboard connection through the
Jupyter server, allowing you to access the dashboard at
Expand Down

0 comments on commit a85b64f

Please sign in to comment.