Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions distributed/cli/dask_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from tornado.ioloop import IOLoop
from tornado import gen

from distributed import Scheduler, Worker
from distributed import Scheduler, Nanny, Worker
from distributed.bokeh.worker import BokehWorker
from distributed.cli.utils import check_python_3, uri_from_host_port
from distributed.utils import get_ip_interface
Expand Down Expand Up @@ -33,12 +33,14 @@
@click.option('--scheduler/--no-scheduler', default=True,
help=("Whether or not to include a scheduler. "
"Use --no-scheduler to increase an existing dask cluster"))
@click.option('--nanny/--no-nanny', default=True,
help="Start workers in nanny process for management")
@click.option('--bokeh-port', type=int, default=8787,
help="Bokeh port for visual diagnostics")
@click.option('--bokeh-prefix', type=str, default=None,
help="Prefix for the bokeh app")
def main(scheduler_file, interface, nthreads, local_directory, memory_limit,
scheduler, bokeh_port, bokeh_prefix):
scheduler, bokeh_port, bokeh_prefix, nanny):
if interface:
host = get_ip_interface(interface)
else:
Expand All @@ -63,13 +65,14 @@ def main(scheduler_file, interface, nthreads, local_directory, memory_limit,
finally:
scheduler.stop()
else:
worker = Worker(scheduler_file=scheduler_file,
loop=loop,
name=rank if scheduler else None,
ncores=nthreads,
local_dir=local_directory,
services={'bokeh': BokehWorker},
memory_limit=memory_limit)
W = Nanny if nanny else Worker
worker = W(scheduler_file=scheduler_file,
loop=loop,
name=rank if scheduler else None,
ncores=nthreads,
local_dir=local_directory,
services={'bokeh': BokehWorker},
memory_limit=memory_limit)
addr = uri_from_host_port(host, None, 0)

@gen.coroutine
Expand Down
5 changes: 3 additions & 2 deletions distributed/cli/tests/test_dask_mpi.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
from distributed.utils_test import loop # flake8: noqa


def test_basic(loop):
@pytest.mark.parametrize('nanny', ['--nanny', '--no-nanny'])
def test_basic(loop, nanny):
with tmpfile() as fn:
with popen(['mpirun', '--np', '4', 'dask-mpi', '--scheduler-file', fn],
with popen(['mpirun', '--np', '4', 'dask-mpi', '--scheduler-file', fn, nanny],
stdin=subprocess.DEVNULL):
with Client(scheduler_file=fn) as c:

Expand Down