From c9d6c962584ce1d44c537fdd0f8077300e97ddb0 Mon Sep 17 00:00:00 2001 From: Matthew Rocklin Date: Thu, 2 Nov 2017 11:34:28 -0400 Subject: [PATCH] add nanny flag to dask-mpi --- distributed/cli/dask_mpi.py | 21 ++++++++++++--------- distributed/cli/tests/test_dask_mpi.py | 5 +++-- 2 files changed, 15 insertions(+), 11 deletions(-) diff --git a/distributed/cli/dask_mpi.py b/distributed/cli/dask_mpi.py index 7d3174b5a56..7e40c7d3c74 100644 --- a/distributed/cli/dask_mpi.py +++ b/distributed/cli/dask_mpi.py @@ -5,7 +5,7 @@ from tornado.ioloop import IOLoop from tornado import gen -from distributed import Scheduler, Worker +from distributed import Scheduler, Nanny, Worker from distributed.bokeh.worker import BokehWorker from distributed.cli.utils import check_python_3, uri_from_host_port from distributed.utils import get_ip_interface @@ -33,12 +33,14 @@ @click.option('--scheduler/--no-scheduler', default=True, help=("Whether or not to include a scheduler. " "Use --no-scheduler to increase an existing dask cluster")) +@click.option('--nanny/--no-nanny', default=True, + help="Start workers in nanny process for management") @click.option('--bokeh-port', type=int, default=8787, help="Bokeh port for visual diagnostics") @click.option('--bokeh-prefix', type=str, default=None, help="Prefix for the bokeh app") def main(scheduler_file, interface, nthreads, local_directory, memory_limit, - scheduler, bokeh_port, bokeh_prefix): + scheduler, bokeh_port, bokeh_prefix, nanny): if interface: host = get_ip_interface(interface) else: @@ -63,13 +65,14 @@ def main(scheduler_file, interface, nthreads, local_directory, memory_limit, finally: scheduler.stop() else: - worker = Worker(scheduler_file=scheduler_file, - loop=loop, - name=rank if scheduler else None, - ncores=nthreads, - local_dir=local_directory, - services={'bokeh': BokehWorker}, - memory_limit=memory_limit) + W = Nanny if nanny else Worker + worker = W(scheduler_file=scheduler_file, + loop=loop, + name=rank if scheduler else None, + ncores=nthreads, + local_dir=local_directory, + services={'bokeh': BokehWorker}, + memory_limit=memory_limit) addr = uri_from_host_port(host, None, 0) @gen.coroutine diff --git a/distributed/cli/tests/test_dask_mpi.py b/distributed/cli/tests/test_dask_mpi.py index 63b1e4c4afb..37e5a3b6e82 100644 --- a/distributed/cli/tests/test_dask_mpi.py +++ b/distributed/cli/tests/test_dask_mpi.py @@ -15,9 +15,10 @@ from distributed.utils_test import loop # flake8: noqa -def test_basic(loop): +@pytest.mark.parametrize('nanny', ['--nanny', '--no-nanny']) +def test_basic(loop, nanny): with tmpfile() as fn: - with popen(['mpirun', '--np', '4', 'dask-mpi', '--scheduler-file', fn], + with popen(['mpirun', '--np', '4', 'dask-mpi', '--scheduler-file', fn, nanny], stdin=subprocess.DEVNULL): with Client(scheduler_file=fn) as c: