diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 9080c1d77f..78bed9e433 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -472,10 +472,37 @@ async def test_resumed_cancelled_handle_compute( Given the history of a task executing -> cancelled -> resumed(fetch) - A handle_compute should properly restore executing. + Setup + ----- + A task is running on the threadpool while a client is cancelling the + computations. In this case we cannot cancel the task on the threadpool but + need to transition the worker task to cancelled instead. While the task on + the threadpool is still running, the client resubmits the graph. This is + relatively common in interactive workflows in which a user cancels a + computation and resubmits shortly after. + + This resubmission can decide to distribute tasks onto different workers than the initial submission + + Parameters + ---------- + raise_error: + Both successful and erred results should be properly handled. + + wait_for_processing: + Is the scheduler properly handling the task-done message of the worker + even if the task is not, yet, transitioned to processing again? + + Expectation + ----------- + A `handle_compute` should properly restore state `executing` even after the + task has transitioned through `cancelled` and `resumed(fetch)`. + + + See also + -------- + test_worker_state_machine.py::test_executing_cancelled_fetch_executing + for minimal example """ - # This test is heavily using set_restrictions to simulate certain scheduler - # decisions of placing keys lock_compute = Lock() await lock_compute.acquire() @@ -515,6 +542,12 @@ async def release_all_futures(): await release_all_futures() await wait_for_state(f3.key, "cancelled", b) + # At the second iteration, the scheduler chooses to distribute the tasks to + # different workers + # Particularly for our "stuck" task f3 that is still cancelled(executing) on + # worker B this will transition the task to a resumed(fetch) since it is no + # longer supposed to be computed on B. However, B will still require the + # data of f3 and is supposed to fetch it in case the compute fails f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[a.address]) f3 = c.submit(inc, f2, key="f3", workers=[a.address]) @@ -523,6 +556,9 @@ async def release_all_futures(): await wait_for_state(f3.key, "resumed", b) await release_all_futures() + # We're again cancelling, forcing f3 to transition back to executing from a + # resumed(fetch) state + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[a.address]) f3 = c.submit(inc, f2, key="f3", workers=[b.address]) diff --git a/distributed/tests/test_worker_state_machine.py b/distributed/tests/test_worker_state_machine.py index ae96835605..34b9036fdd 100644 --- a/distributed/tests/test_worker_state_machine.py +++ b/distributed/tests/test_worker_state_machine.py @@ -47,6 +47,7 @@ SerializedTask, StateMachineEvent, TaskErredMsg, + TaskFinishedMsg, TaskState, TransitionCounterMaxExceeded, UnpauseEvent, @@ -1293,3 +1294,35 @@ def test_gather_dep_failure(ws): ] assert ws.tasks["x"].state == "error" assert ws.tasks["y"].state == "waiting" # Not ready + + +@pytest.mark.parametrize("fail", [True, False]) +def test_executing_cancelled_fetch_executing(ws, fail): + """See also test_cancelled_state.py::test_resumed_cancelled_handle_compute for full example""" + + ws2 = "127.0.0.1:2" + instructions = ws.handle_stimulus( + ComputeTaskEvent.dummy("x", stimulus_id="s1"), + FreeKeysEvent(keys=["x"], stimulus_id="s2"), + ComputeTaskEvent.dummy("y", who_has={"x": [ws2]}, stimulus_id="s3"), + FreeKeysEvent(keys=["y"], stimulus_id="s4"), + ComputeTaskEvent.dummy("x", stimulus_id="s5"), + ) + + assert len(instructions) == 1 + assert instructions[0] == Execute(key="x", stimulus_id="s1") + if fail: + instructions = ws.handle_stimulus( + ExecuteFailureEvent.dummy(key="x", stimulus_id="s6") + ) + assert len(instructions) == 1 + assert isinstance(instructions[0], TaskErredMsg) + assert ws.tasks["x"].state == "error" + + else: + instructions = ws.handle_stimulus( + ExecuteSuccessEvent.dummy(key="x", stimulus_id="s6") + ) + assert len(instructions) == 1 + assert isinstance(instructions[0], TaskFinishedMsg) + assert ws.tasks["x"].state == "memory"