diff --git a/.isort.cfg b/.isort.cfg index 829deab..3f01d0d 100644 --- a/.isort.cfg +++ b/.isort.cfg @@ -1,2 +1,2 @@ [settings] -known_third_party = click,dask,distributed,mpi4py,pytest,requests,setuptools,tornado,yaml +known_third_party = click,dask,distributed,mpi4py,pytest,requests,setuptools,yaml diff --git a/dask_mpi/__init__.py b/dask_mpi/__init__.py index ad7560e..ae8408e 100644 --- a/dask_mpi/__init__.py +++ b/dask_mpi/__init__.py @@ -1,5 +1,5 @@ from ._version import get_versions -from .core import initialize, send_close_signal +from .core import finalize, initialize __version__ = get_versions()["version"] del get_versions diff --git a/dask_mpi/core.py b/dask_mpi/core.py index 51bb8bb..a241c88 100644 --- a/dask_mpi/core.py +++ b/dask_mpi/core.py @@ -5,8 +5,6 @@ import dask from distributed import Client, Nanny, Scheduler from distributed.utils import import_term -from tornado import gen -from tornado.ioloop import IOLoop def initialize( @@ -77,7 +75,6 @@ def initialize( comm = MPI.COMM_WORLD rank = comm.Get_rank() - loop = IOLoop.current() if not worker_options: worker_options = {} @@ -108,7 +105,7 @@ async def run_scheduler(): if rank == 1: if exit: - atexit.register(send_close_signal) + atexit.register(finalize) return True else: @@ -138,7 +135,7 @@ async def run_worker(): return False -def send_close_signal(): +def finalize(): """ The client can call this function to explicitly stop the event loop. @@ -150,11 +147,5 @@ def send_close_signal(): in initialize. """ - async def stop(dask_scheduler): - await dask_scheduler.close() - await gen.sleep(0.1) - local_loop = dask_scheduler.loop - local_loop.add_callback(local_loop.stop) - with Client() as c: - c.run_on_scheduler(stop, wait=False) + c.shutdown() diff --git a/dask_mpi/tests/core_no_exit.py b/dask_mpi/tests/core_no_exit.py index fde7baf..9d45889 100644 --- a/dask_mpi/tests/core_no_exit.py +++ b/dask_mpi/tests/core_no_exit.py @@ -1,7 +1,7 @@ from distributed import Client from mpi4py.MPI import COMM_WORLD as world -from dask_mpi import initialize, send_close_signal +from dask_mpi import finalize, initialize # Split our MPI world into two pieces, one consisting just of # the old rank 3 process and the other with everything else @@ -16,7 +16,7 @@ with Client() as c: c.submit(lambda x: x + 1, 10).result() == 11 c.submit(lambda x: x + 1, 20).result() == 21 - send_close_signal() + finalize() # check that our original comm is intact world.Barrier()