diff --git a/distributed/tests/test_cancelled_state.py b/distributed/tests/test_cancelled_state.py index 9080c1d77f..fc9efe9700 100644 --- a/distributed/tests/test_cancelled_state.py +++ b/distributed/tests/test_cancelled_state.py @@ -472,10 +472,31 @@ async def test_resumed_cancelled_handle_compute( Given the history of a task executing -> cancelled -> resumed(fetch) - A handle_compute should properly restore executing. + Setup + ----- + A task is running on the threadpool while a client is cancelling the + computations. In this case we cannot cancel the task on the threadpool but + need to transition the worker task to cancelled instead. While the task on + the threadpool is still running, the client resubmits the graph. This is + relatively common in interactive workflows in which a user cancels a + computation and resubmits shortly after. + + This resubmission can decide to distribute tasks onto different workers than the initial submission + + Parameters + ---------- + raise_error: + Both successful and erred results should be properly handled. + + wait_for_processing: + Is the scheduler properly handling the task-done message of the worker + even if the task is not, yet, transitioned to processing again? + + Expectation + ----------- + A `handle_compute` should properly restore state `executing` even after the + task has transitioned through `cancelled` and `resumed(fetch)`. """ - # This test is heavily using set_restrictions to simulate certain scheduler - # decisions of placing keys lock_compute = Lock() await lock_compute.acquire() @@ -515,6 +536,12 @@ async def release_all_futures(): await release_all_futures() await wait_for_state(f3.key, "cancelled", b) + # At the second iteration, the scheduler chooses to distribute the tasks to + # different workers + # Particularly for our "stuck" task f3 that is still cancelled(executing) on + # worker B this will transition the task to a resumed(fetch) since it is no + # longer supposed to be computed on B. However, B will still require the + # data of f3 and is supposed to fetch it in case the compute fails f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[a.address]) f3 = c.submit(inc, f2, key="f3", workers=[a.address]) @@ -523,6 +550,9 @@ async def release_all_futures(): await wait_for_state(f3.key, "resumed", b) await release_all_futures() + # We're again cancelling, forcing f3 to transition back to executing from a + # resumed(fetch) state + f1 = c.submit(inc, 1, key="f1", workers=[a.address]) f2 = c.submit(inc, f1, key="f2", workers=[a.address]) f3 = c.submit(inc, f2, key="f3", workers=[b.address])