diff --git a/dask_mpi/tests/test_cli.py b/dask_mpi/tests/test_cli.py index e05de19..6437595 100644 --- a/dask_mpi/tests/test_cli.py +++ b/dask_mpi/tests/test_cli.py @@ -10,6 +10,12 @@ import requests +import json + +import tempfile + +import subprocess + from distributed import Client from distributed.comm.addressing import get_address_host_port from distributed.metrics import time @@ -157,3 +163,61 @@ def test_bokeh_worker(loop, mpirun): with popen(cmd, stdin=FNULL): check_port_okay(59584) + + +def tmpfile_static(extension="", dir=None): + """ + utility function for test_stale_sched test + """ + + extension = "." + extension.lstrip(".") + handle, filename = tempfile.mkstemp(extension, dir=dir) + return handle, filename + + +@pytest.mark.parametrize("nanny", ["--nanny", "--no-nanny"]) +def test_stale_sched(loop, nanny, mpirun): + """ + the purpose of this unit test is to simulate the situation in which + an old scheduler file has been left behind from a non-clean dask exit. + in this situation the scheduler should wake up and overwrite the stale + file before the workers start. + """ + + fhandle, fn = tmpfile_static(extension="json") + + stale_json = { + "type": "Scheduler", + "id": "Scheduler-edb63f9c-9e83-4021-8563-44bcffc451cc", + "address": "tcp://10.128.0.32:45373", + "services": {"dashboard": 8787}, + "workers": {}, + } + + with open(fn, "w") as f: + json.dump(stale_json, f) + + cmd = mpirun + [ + "-np", + "4", + "dask-mpi", + "--scheduler-file", + fn, + "--dashboard-address", + "0", + nanny, + ] + + p = subprocess.Popen(cmd) + + sleep(5) + + p.kill() + + with open(fn) as f: + new_json = json.load(f) + + os.close(fhandle) + os.remove(fn) + + assert new_json != stale_json