From 9af7fca5cb067fb882f0f94144b2ff5f87186e51 Mon Sep 17 00:00:00 2001 From: fjetter Date: Thu, 18 Aug 2022 11:11:44 +0200 Subject: [PATCH] add minimal example with wsm --- distributed/tests/test_cancelled_state.py | 6 ++++ .../tests/test_worker_state_machine.py | 33 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index fc9efe9700..78bed9e433 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -496,6 +496,12 @@ async def test_resumed_cancelled_handle_compute( ----------- A `handle_compute` should properly restore state `executing` even after the task has transitioned through `cancelled` and `resumed(fetch)`. + + + See also + -------- + test_worker_state_machine.py::test_executing_cancelled_fetch_executing + for minimal example """ lock_compute = Lock() diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index ae96835605..34b9036fdd 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -47,6 +47,7 @@ SerializedTask, StateMachineEvent, TaskErredMsg, + TaskFinishedMsg, TaskState, TransitionCounterMaxExceeded, UnpauseEvent, @@ -1293,3 +1294,35 @@ def test_gather_dep_failure(ws): ] assert ws.tasks["x"].state == "error" assert ws.tasks["y"].state == "waiting" # Not ready + + +@pytest.mark.parametrize("fail", [True, False]) +def test_executing_cancelled_fetch_executing(ws, fail): + """See also test_cancelled_state.py::test_resumed_cancelled_handle_compute for full example""" + + ws2 = "127.0.0.1:2" + instructions = ws.handle_stimulus( + ComputeTaskEvent.dummy("x", stimulus_id="s1"), + FreeKeysEvent(keys=["x"], stimulus_id="s2"), + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s3"), + FreeKeysEvent(keys=["y"], stimulus_id="s4"), + ComputeTaskEvent.dummy("x", stimulus_id="s5"), + ) + + assert len(instructions) == 1 + assert instructions[0] == Execute(key="x", stimulus_id="s1") + if fail: + instructions = ws.handle_stimulus( + ExecuteFailureEvent.dummy(key="x", stimulus_id="s6") + ) + assert len(instructions) == 1 + assert isinstance(instructions[0], TaskErredMsg) + assert ws.tasks["x"].state == "error" + + else: + instructions = ws.handle_stimulus( + ExecuteSuccessEvent.dummy(key="x", stimulus_id="s6") + ) + assert len(instructions) == 1 + assert isinstance(instructions[0], TaskFinishedMsg) + assert ws.tasks["x"].state == "memory"