From 600d6967c5744136e9f2754b18a79a5130ad61c9 Mon Sep 17 00:00:00 2001 From: lastephey Date: Fri, 13 Dec 2019 16:07:14 -0800 Subject: [PATCH 1/8] add scheduler bcast logic from core into cli --- dask_mpi/cli.py | 29 ++++++++++++++++++----------- 1 file changed, 18 insertions(+), 11 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index ae8b048..e6daa46 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -2,7 +2,9 @@ import asyncio from mpi4py import MPI -from dask.distributed import Scheduler, Worker, Nanny + +import dask +from dask.distributed import Client, Scheduler, Worker, Nanny from distributed.cli.utils import check_python_3 comm = MPI.COMM_WORLD @@ -72,24 +74,29 @@ def main( protocol, ): - if rank == 0 and scheduler: + if rank == 0: - async def run(): + async def run_scheduler(): async with Scheduler( interface=interface, protocol=protocol, - scheduler_file=scheduler_file, dashboard_address=dashboard_address, - port=scheduler_port, - ) as s: - await s.finished() + ) as scheduler: + comm.bcast(scheduler.address, root=0) + comm.Barrier() + await scheduler.finished() + + asyncio.get_event_loop().run_until_complete(run_scheduler()) + sys.exit() else: + scheduler_address = comm.bcast(None, root=0) + dask.config.set(scheduler_address=scheduler_address) + comm.Barrier() - async def run(): + async def run_worker(): WorkerType = Nanny if nanny else Worker async with WorkerType( - scheduler_file=scheduler_file, interface=interface, protocol=protocol, nthreads=nthreads, @@ -99,8 +106,8 @@ async def run(): ) as worker: await worker.finished() - asyncio.get_event_loop().run_until_complete(run()) - + asyncio.get_event_loop().run_until_complete(run_worker()) + sys.exit() def go(): check_python_3() From 4111d7772a9adb909fa5216d8f5e18b5aa107208 Mon Sep 17 00:00:00 2001 From: lastephey Date: Fri, 13 Dec 2019 16:18:02 -0800 Subject: [PATCH 2/8] add scheduler option --- dask_mpi/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index e6daa46..ca2cee4 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -74,7 +74,7 @@ def main( protocol, ): - if rank == 0: + if rank == 0 and scheduler: async def run_scheduler(): async with Scheduler( From ae4bbe9ed9e7aced5dd89270f3ad17a2c142a4a1 Mon Sep 17 00:00:00 2001 From: lastephey Date: Sat, 14 Dec 2019 15:20:49 -0800 Subject: [PATCH 3/8] forgot import sys, move mpi comm assign inside main --- dask_mpi/cli.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index ca2cee4..7ec4d84 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -1,15 +1,13 @@ import click import asyncio -from mpi4py import MPI +import sys import dask from dask.distributed import Client, Scheduler, Worker, Nanny from distributed.cli.utils import check_python_3 -comm = MPI.COMM_WORLD -rank = comm.Get_rank() - +from mpi4py import MPI @click.command() @click.option( @@ -74,6 +72,9 @@ def main( protocol, ): + comm = MPI.COMM_WORLD + rank = comm.Get_rank() + if rank == 0 and scheduler: async def run_scheduler(): From 0d95cba2afbe31f3e5f04049a2e9671d8512a2dd Mon Sep 17 00:00:00 2001 From: lastephey Date: Sun, 15 Dec 2019 22:28:24 -0800 Subject: [PATCH 4/8] try to get dask-mpi to run in background to prevent unit test hangs --- dask_mpi/tests/test_cli.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/dask_mpi/tests/test_cli.py b/dask_mpi/tests/test_cli.py index e05de19..ddf044e 100644 --- a/dask_mpi/tests/test_cli.py +++ b/dask_mpi/tests/test_cli.py @@ -25,7 +25,7 @@ def test_basic(loop, nanny, mpirun): with tmpfile(extension="json") as fn: - cmd = mpirun + ["-np", "4", "dask-mpi", "--scheduler-file", fn, nanny] + cmd = mpirun + ["-np", "4", "dask-mpi", "--scheduler-file", fn, nanny, "&"] with popen(cmd): with Client(scheduler_file=fn) as c: @@ -40,7 +40,7 @@ def test_basic(loop, nanny, mpirun): def test_no_scheduler(loop, mpirun): with tmpfile(extension="json") as fn: - cmd = mpirun + ["-np", "2", "dask-mpi", "--scheduler-file", fn] + cmd = mpirun + ["-np", "2", "dask-mpi", "--scheduler-file", fn, "&"] with popen(cmd, stdin=FNULL): with Client(scheduler_file=fn) as c: @@ -59,6 +59,7 @@ def test_no_scheduler(loop, mpirun): "--scheduler-file", fn, "--no-scheduler", + "&", ] with popen(cmd): @@ -86,6 +87,7 @@ def test_non_default_ports(loop, nanny, mpirun): "58464", "--nanny-port", "50164", + "&", ] with popen(cmd): @@ -132,6 +134,7 @@ def test_dashboard(loop, mpirun): fn, "--dashboard-address", ":59583", + "&", ] with popen(cmd, stdin=FNULL): @@ -153,6 +156,7 @@ def test_bokeh_worker(loop, mpirun): fn, "--bokeh-worker-port", "59584", + "&", ] with popen(cmd, stdin=FNULL): From 9381daee400f25456c6be596b71be1e096b128d5 Mon Sep 17 00:00:00 2001 From: lastephey Date: Wed, 18 Dec 2019 19:58:40 -0800 Subject: [PATCH 5/8] general cleanup suggested by kmpaul --- dask_mpi/cli.py | 13 ++++++------- dask_mpi/tests/test_cli.py | 8 ++------ 2 files changed, 8 insertions(+), 13 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 7ec4d84..811d64a 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -1,7 +1,6 @@ import click import asyncio -import sys import dask from dask.distributed import Client, Scheduler, Worker, Nanny @@ -82,14 +81,14 @@ async def run_scheduler(): interface=interface, protocol=protocol, dashboard_address=dashboard_address, - ) as scheduler: - comm.bcast(scheduler.address, root=0) + scheduler_file=scheduler_file, + ) as s: + comm.bcast(s.address, root=0) comm.Barrier() - await scheduler.finished() + await s.finished() asyncio.get_event_loop().run_until_complete(run_scheduler()) - sys.exit() - + else: scheduler_address = comm.bcast(None, root=0) dask.config.set(scheduler_address=scheduler_address) @@ -104,11 +103,11 @@ async def run_worker(): memory_limit=memory_limit, local_directory=local_directory, name=rank, + scheduler_file=scheduler_file, ) as worker: await worker.finished() asyncio.get_event_loop().run_until_complete(run_worker()) - sys.exit() def go(): check_python_3() diff --git a/dask_mpi/tests/test_cli.py b/dask_mpi/tests/test_cli.py index ddf044e..e05de19 100644 --- a/dask_mpi/tests/test_cli.py +++ b/dask_mpi/tests/test_cli.py @@ -25,7 +25,7 @@ def test_basic(loop, nanny, mpirun): with tmpfile(extension="json") as fn: - cmd = mpirun + ["-np", "4", "dask-mpi", "--scheduler-file", fn, nanny, "&"] + cmd = mpirun + ["-np", "4", "dask-mpi", "--scheduler-file", fn, nanny] with popen(cmd): with Client(scheduler_file=fn) as c: @@ -40,7 +40,7 @@ def test_basic(loop, nanny, mpirun): def test_no_scheduler(loop, mpirun): with tmpfile(extension="json") as fn: - cmd = mpirun + ["-np", "2", "dask-mpi", "--scheduler-file", fn, "&"] + cmd = mpirun + ["-np", "2", "dask-mpi", "--scheduler-file", fn] with popen(cmd, stdin=FNULL): with Client(scheduler_file=fn) as c: @@ -59,7 +59,6 @@ def test_no_scheduler(loop, mpirun): "--scheduler-file", fn, "--no-scheduler", - "&", ] with popen(cmd): @@ -87,7 +86,6 @@ def test_non_default_ports(loop, nanny, mpirun): "58464", "--nanny-port", "50164", - "&", ] with popen(cmd): @@ -134,7 +132,6 @@ def test_dashboard(loop, mpirun): fn, "--dashboard-address", ":59583", - "&", ] with popen(cmd, stdin=FNULL): @@ -156,7 +153,6 @@ def test_bokeh_worker(loop, mpirun): fn, "--bokeh-worker-port", "59584", - "&", ] with popen(cmd, stdin=FNULL): From 205f32e0987c2cf4f1a86ac4e1bcbf76beac1706 Mon Sep 17 00:00:00 2001 From: lastephey Date: Wed, 18 Dec 2019 20:18:23 -0800 Subject: [PATCH 6/8] changes from black and flake8 --- dask_mpi/cli.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 811d64a..c818fbc 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -3,11 +3,12 @@ import asyncio import dask -from dask.distributed import Client, Scheduler, Worker, Nanny +from dask.distributed import Scheduler, Worker, Nanny from distributed.cli.utils import check_python_3 from mpi4py import MPI + @click.command() @click.option( "--scheduler-file", @@ -88,7 +89,7 @@ async def run_scheduler(): await s.finished() asyncio.get_event_loop().run_until_complete(run_scheduler()) - + else: scheduler_address = comm.bcast(None, root=0) dask.config.set(scheduler_address=scheduler_address) @@ -109,6 +110,7 @@ async def run_worker(): asyncio.get_event_loop().run_until_complete(run_worker()) + def go(): check_python_3() main() From c9ca1d5648487502ed77cf0b3aff5b598d2af3d5 Mon Sep 17 00:00:00 2001 From: lastephey Date: Thu, 19 Dec 2019 11:45:04 -0800 Subject: [PATCH 7/8] solve this issue with barriers instead of bcast --- dask_mpi/cli.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index c818fbc..7f1671b 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -84,15 +84,13 @@ async def run_scheduler(): dashboard_address=dashboard_address, scheduler_file=scheduler_file, ) as s: - comm.bcast(s.address, root=0) comm.Barrier() await s.finished() asyncio.get_event_loop().run_until_complete(run_scheduler()) + else: - scheduler_address = comm.bcast(None, root=0) - dask.config.set(scheduler_address=scheduler_address) comm.Barrier() async def run_worker(): From da75a6d38998d85784d0c348d69897c6e5f44a03 Mon Sep 17 00:00:00 2001 From: lastephey Date: Thu, 19 Dec 2019 11:58:10 -0800 Subject: [PATCH 8/8] changes from black and flake8 --- dask_mpi/cli.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/dask_mpi/cli.py b/dask_mpi/cli.py index 7f1671b..ae0cd84 100644 --- a/dask_mpi/cli.py +++ b/dask_mpi/cli.py @@ -2,7 +2,6 @@ import asyncio -import dask from dask.distributed import Scheduler, Worker, Nanny from distributed.cli.utils import check_python_3 @@ -89,7 +88,6 @@ async def run_scheduler(): asyncio.get_event_loop().run_until_complete(run_scheduler()) - else: comm.Barrier()