From 276a2c0572ed45fdbdb2c50f7a016d29bd2afc04 Mon Sep 17 00:00:00 2001 From: crusaderky Date: Mon, 4 Apr 2022 13:03:40 +0100 Subject: [PATCH] Harden test_abort_execution_to_fetch and more --- distributed/tests/test_cancelled_state.py | 46 ++++++++++++++- distributed/tests/test_cluster_dump.py | 18 +++--- distributed/tests/test_steal.py | 11 +++- distributed/tests/test_worker.py | 69 +++++++++++------------ 4 files changed, 92 insertions(+), 52 deletions(-) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index ef53e9e1ccb..1a0df078376 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -2,8 +2,15 @@ from unittest import mock import distributed +from distributed import Event from distributed.core import CommClosedError -from distributed.utils_test import _LockedCommPool, gen_cluster, inc, slowinc +from distributed.utils_test import ( + _LockedCommPool, + assert_worker_story, + gen_cluster, + inc, + slowinc, +) async def wait_for_state(key, state, dask_worker): @@ -51,7 +58,13 @@ async def test_abort_execution_add_as_dependency(c, s, a): @gen_cluster(client=True) async def test_abort_execution_to_fetch(c, s, a, b): - fut = c.submit(slowinc, 1, delay=2, key="f1", workers=[a.worker_address]) + ev = Event() + + def f(ev): + ev.wait() + return 123 + + fut = c.submit(f, ev, key="f1", workers=[a.worker_address]) await wait_for_state(fut.key, "executing", a) fut.release() await wait_for_cancelled(fut.key, a) @@ -62,7 +75,34 @@ async def test_abort_execution_to_fetch(c, s, a, b): # then, a must switch the execute to fetch. Instead of doing so, it will # simply re-use the currently computing result. fut = c.submit(inc, fut, workers=[a.worker_address], key="f2") - await fut + await wait_for_state("f2", "waiting", a) + await ev.set() + assert await fut == 124 # It would be 3 if the result was copied from b + del fut + while "f1" in a.tasks: + await asyncio.sleep(0.01) + + assert_worker_story( + a.story("f1"), + [ + ("f1", "compute-task"), + ("f1", "released", "waiting", "waiting", {"f1": "ready"}), + ("f1", "waiting", "ready", "ready", {}), + ("f1", "ready", "executing", "executing", {}), + ("free-keys", ("f1",)), + ("f1", "executing", "released", "cancelled", {}), + ("f1", "ensure-task-exists", "cancelled"), + ("f1", "cancelled", "fetch", "cancelled", {"f1": ("resumed", "fetch")}), + ("f1", "cancelled", "resumed", "resumed", {}), + ("f1", "put-in-memory"), + ("f1", "resumed", "memory", "memory", {"f2": "ready"}), + ("free-keys", ("f1",)), + ("f1", "release-key"), + ("f1", "memory", "released", "released", {}), + ("f1", "released", "forgotten", "forgotten", {}), + ], + strict=True, + ) @gen_cluster(client=True) diff --git a/distributed/tests/test_cluster_dump.py b/distributed/tests/test_cluster_dump.py index 963d7563725..b01cf2611ca 100644 --- a/distributed/tests/test_cluster_dump.py +++ b/distributed/tests/test_cluster_dump.py @@ -115,19 +115,15 @@ async def test_cluster_dump_state(c, s, a, b, tmp_path): @gen_cluster(client=True) async def test_cluster_dump_story(c, s, a, b, tmp_path): filename = tmp_path / "dump" - futs = c.map(inc, range(2)) - fut_keys = {f.key for f in futs} - await c.gather(futs) + f1 = c.submit(inc, 0, key="f1") + f2 = c.submit(inc, 1, key="f2") + await c.gather([f1, f2]) await c.dump_cluster_state(filename, format="msgpack") dump = DumpArtefact.from_url(f"{filename}.msgpack.gz") - task_key = next(iter(fut_keys)) - - def _expected_story(task_key): - return - story = dump.scheduler_story(*fut_keys) - assert len(story) == len(fut_keys) + story = dump.scheduler_story("f1", "f2") + assert story.keys() == {"f1", "f2"} for k, task_story in story.items(): expected = [ @@ -140,8 +136,8 @@ def _expected_story(task_key): for e1, e2 in zip(event, expected_event): assert e1 == e2 - story = dump.worker_story(*fut_keys) - assert len(story) == len(fut_keys) + story = dump.worker_story("f1", "f2") + assert story.keys() == {"f1", "f2"} for k, task_story in story.items(): assert_worker_story( diff --git a/distributed/tests/test_steal.py b/distributed/tests/test_steal.py index e16269d92f8..96288503098 100644 --- a/distributed/tests/test_steal.py +++ b/distributed/tests/test_steal.py @@ -13,7 +13,7 @@ import dask -from distributed import Lock, Nanny, Worker, wait, worker_client +from distributed import Event, Lock, Nanny, Worker, wait, worker_client from distributed.compatibility import LINUX from distributed.config import config from distributed.core import Status @@ -1148,9 +1148,11 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): # https://github.com/dask/distributed/issues/5370 steal = s.extensions["stealing"] w0 = workers[0] + ev = Event() futs1 = c.map( - slowinc, + lambda _, ev: ev.wait(), range(10), + ev=ev, key=[f"f1-{ix}" for ix in range(10)], workers=[w0.address], allow_other_workers=True, @@ -1175,6 +1177,9 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): assert wsB == victim_ts.processing_on # move_task_request is not responsible for respecting worker restrictions steal.move_task_request(victim_ts, wsB, wsC) + + # Let tasks finish + await ev.set() await c.gather(futs1) # If this turns out to be overly flaky, the following may be relaxed or @@ -1183,7 +1188,7 @@ async def test_reschedule_concurrent_requests_deadlock(c, s, *workers): # Either the last request goes through or both have been rejected since the # computation was already done by the time the request comes in. This is - # unfortunately not stable even if we increase the compute time + # unfortunately not stable. if victim_ts.who_has != {wsC}: msgs = steal.story(victim_ts) assert len(msgs) == 2 diff --git a/distributed/tests/test_worker.py b/distributed/tests/test_worker.py index d3a0ce8b28f..aa8549b07ba 100644 --- a/distributed/tests/test_worker.py +++ b/distributed/tests/test_worker.py @@ -1677,41 +1677,40 @@ async def test_story_with_deps(c, s, a, b): Assert that the structure of the story does not change unintentionally and expected subfields are actually filled """ - dep = c.submit(inc, 1, workers=[a.address]) - res = c.submit(inc, dep, workers=[b.address]) + dep = c.submit(inc, 1, workers=[a.address], key="dep") + res = c.submit(inc, dep, workers=[b.address], key="res") await res - key = res.key - story = a.story(key) + story = a.story("res") assert story == [] - story = b.story(key) + story = b.story("res") # Story now includes randomized stimulus_ids and timestamps. stimulus_ids = {ev[-2] for ev in story} assert len(stimulus_ids) == 3, stimulus_ids # This is a simple transition log expected = [ - (key, "compute-task"), - (key, "released", "waiting", "waiting", {dep.key: "fetch"}), - (key, "waiting", "ready", "ready", {}), - (key, "ready", "executing", "executing", {}), - (key, "put-in-memory"), - (key, "executing", "memory", "memory", {}), + ("res", "compute-task"), + ("res", "released", "waiting", "waiting", {"dep": "fetch"}), + ("res", "waiting", "ready", "ready", {}), + ("res", "ready", "executing", "executing", {}), + ("res", "put-in-memory"), + ("res", "executing", "memory", "memory", {}), ] assert_worker_story(story, expected, strict=True) - story = b.story(dep.key) + story = b.story("dep") stimulus_ids = {ev[-2] for ev in story} assert len(stimulus_ids) == 2, stimulus_ids expected = [ - (dep.key, "ensure-task-exists", "released"), - (dep.key, "released", "fetch", "fetch", {}), - ("gather-dependencies", a.address, {dep.key}), - (dep.key, "fetch", "flight", "flight", {}), - ("request-dep", a.address, {dep.key}), - ("receive-dep", a.address, {dep.key}), - (dep.key, "put-in-memory"), - (dep.key, "flight", "memory", "memory", {res.key: "ready"}), + ("dep", "ensure-task-exists", "released"), + ("dep", "released", "fetch", "fetch", {}), + ("gather-dependencies", a.address, {"dep"}), + ("dep", "fetch", "flight", "flight", {}), + ("request-dep", a.address, {"dep"}), + ("receive-dep", a.address, {"dep"}), + ("dep", "put-in-memory"), + ("dep", "flight", "memory", "memory", {"res": "ready"}), ] assert_worker_story(story, expected, strict=True) @@ -3057,11 +3056,11 @@ async def test_task_flight_compute_oserror(c, s, a, b): await wait(futs) assert a.data assert write_queue.empty() - f1 = c.submit(sum, futs, workers=[b.address]) + f1 = c.submit(sum, futs, workers=[b.address], key="f1") peer, msg = await write_queue.get() assert peer == a.address assert msg["op"] == "get_data" - in_flight_tasks = [ts for ts in b.tasks.values() if ts.key != f1.key] + in_flight_tasks = [ts for ts in b.tasks.values() if ts.key != "f1"] assert all(ts.state == "flight" for ts in in_flight_tasks) await a.close() write_event.set() @@ -3072,28 +3071,28 @@ async def test_task_flight_compute_oserror(c, s, a, b): # asserting a few internals to make sure that if things change this is done # deliberately - sum_story = b.story(f1.key) + sum_story = b.story("f1") expected_sum_story = [ - (f1.key, "compute-task"), + ("f1", "compute-task"), ( - f1.key, + "f1", "released", "waiting", "waiting", {ts.key: "fetch" for ts in in_flight_tasks}, ), # inc is lost and needs to be recomputed. Therefore, sum is released - ("free-keys", (f1.key,)), - (f1.key, "release-key"), - (f1.key, "waiting", "released", "released", {f1.key: "forgotten"}), - (f1.key, "released", "forgotten", "forgotten", {}), + ("free-keys", ("f1",)), + ("f1", "release-key"), + ("f1", "waiting", "released", "released", {"f1": "forgotten"}), + ("f1", "released", "forgotten", "forgotten", {}), # Now, we actually compute the task *once*. This must not cycle back - (f1.key, "compute-task"), - (f1.key, "released", "waiting", "waiting", {f1.key: "ready"}), - (f1.key, "waiting", "ready", "ready", {}), - (f1.key, "ready", "executing", "executing", {}), - (f1.key, "put-in-memory"), - (f1.key, "executing", "memory", "memory", {}), + ("f1", "compute-task"), + ("f1", "released", "waiting", "waiting", {"f1": "ready"}), + ("f1", "waiting", "ready", "ready", {}), + ("f1", "ready", "executing", "executing", {}), + ("f1", "put-in-memory"), + ("f1", "executing", "memory", "memory", {}), ] assert_worker_story(sum_story, expected_sum_story, strict=True)