Skip to content

Commit

Permalink
Fix test_process_executor_kills_process flakyness (#5183)
Browse files Browse the repository at this point in the history
* Fix test_process_executor_kills_process flakiness

* Increase test grace time / add flaky

* fix regression on Windows

* tweak

* More informative failure
  • Loading branch information
crusaderky committed Aug 11, 2021
1 parent cbe97d1 commit 2801f11
Show file tree
Hide file tree
Showing 5 changed files with 29 additions and 24 deletions.
1 change: 1 addition & 0 deletions distributed/cli/tests/test_dask_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def test_interface(loop):
assert all("127.0.0.1" == d["host"] for d in info["workers"].values())


@pytest.mark.flaky(reruns=10, reruns_delay=5)
def test_pid_file(loop):
def check_pidfile(proc, pidfile):
start = time()
Expand Down
6 changes: 3 additions & 3 deletions distributed/deploy/tests/test_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
Worker,
wait,
)
from distributed.compatibility import WINDOWS
from distributed.compatibility import LINUX, MACOS, WINDOWS
from distributed.metrics import time
from distributed.utils_test import async_wait_for, clean, gen_test, slowinc

Expand Down Expand Up @@ -301,8 +301,8 @@ def test_basic_no_loop(loop):
loop.add_callback(loop.stop)


@pytest.mark.flaky(condition=not WINDOWS, reruns=10, reruns_delay=5)
@pytest.mark.xfail(condition=WINDOWS, reason="extremely flaky")
@pytest.mark.flaky(condition=LINUX, reruns=10, reruns_delay=5)
@pytest.mark.xfail(condition=MACOS or WINDOWS, reason="extremely flaky")
@gen_test()
async def test_target_duration():
with dask.config.set(
Expand Down
5 changes: 3 additions & 2 deletions distributed/deploy/tests/test_slow_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,21 @@ async def test_startup(cleanup):
scheduler=scheduler,
workers={
0: {"cls": Worker, "options": {}},
1: {"cls": SlowWorker, "options": {"delay": 5}},
1: {"cls": SlowWorker, "options": {"delay": 120}},
2: {"cls": SlowWorker, "options": {"delay": 0}},
},
asynchronous=True,
) as cluster:
assert len(cluster.workers) == len(cluster.worker_spec) == 3
assert time() < start + 5
assert time() < start + 60
assert 0 <= len(cluster.scheduler_info["workers"]) <= 2

async with Client(cluster, asynchronous=True) as client:
await client.wait_for_workers(n_workers=2)


@pytest.mark.asyncio
@pytest.mark.flaky(reruns=10, reruns_delay=5)
async def test_scale_up_down(cleanup):
start = time()
async with SpecCluster(
Expand Down
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2229,7 +2229,7 @@ def _transition(self, key, finish: str, *args, **kwargs):
self._transition_counter += 1
recommendations, client_msgs, worker_msgs = a
elif "released" not in start_finish:
assert not args and not kwargs
assert not args and not kwargs, (args, kwargs)
a_recs: dict
a_cmsgs: dict
a_wmsgs: dict
Expand Down
39 changes: 21 additions & 18 deletions distributed/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -2013,33 +2013,36 @@ def kill_process():
import os
import signal

os.kill(os.getpid(), signal.SIGTERM)
if WINDOWS:
# There's no SIGKILL on Windows
sig = signal.SIGTERM
else:
# With SIGTERM there may be several seconds worth of delay before the worker
# actually shuts down - particularly on slow CI. Use SIGKILL for instant
# termination.
sig = signal.SIGKILL

os.kill(os.getpid(), sig)
sleep(60) # Cope with non-instantaneous termination

@gen_cluster(client=True)
async def test_process_executor_kills_process(c, s, a, b):

@gen_cluster(nthreads=[("127.0.0.1", 1)], client=True)
async def test_process_executor_kills_process(c, s, a):
with ProcessPoolExecutor() as e:
a.executors["processes"] = e
b.executors["processes"] = e
with dask.annotate(executor="processes", retries=1):
future = c.submit(kill_process)

with pytest.raises(
BrokenProcessPool,
match="A child process terminated abruptly, the process pool is not usable anymore",
):
msg = "A child process terminated abruptly, the process pool is not usable anymore"
with pytest.raises(BrokenProcessPool, match=msg):
await future

with dask.annotate(executor="processes", retries=1):
future = c.submit(inc, 1)

# FIXME: The processpool is now unusable and the worker is effectively
# dead
with pytest.raises(
BrokenProcessPool,
match="A child process terminated abruptly, the process pool is not usable anymore",
):
assert await future == 2
# The process pool is now unusable and the worker is effectively dead
with pytest.raises(BrokenProcessPool, match=msg):
await future


def raise_exc():
Expand Down Expand Up @@ -2388,11 +2391,11 @@ async def test_hold_on_to_replicas(c, s, *workers):

@gen_cluster(client=True)
async def test_worker_reconnects_mid_compute(c, s, a, b):
"""
This test ensure that if a worker disconnects while computing a result, the scheduler will still accept the result.
"""Ensure that, if a worker disconnects while computing a result, the scheduler will
still accept the result.
There is also an edge case tested which ensures that the reconnect is
successful if a task is currently executing, see
successful if a task is currently executing; see
https://github.com/dask/distributed/issues/5078
See also distributed.tests.test_scheduler.py::test_gather_allow_worker_reconnect
Expand Down

0 comments on commit 2801f11

Please sign in to comment.