Skip to content

Commit

Permalink
Add docs for test_resumed_cancelled_handle_compute
Browse files Browse the repository at this point in the history
  • Loading branch information
fjetter committed Aug 18, 2022
1 parent 3647cfe commit 1032a10
Showing 1 changed file with 33 additions and 3 deletions.
36 changes: 33 additions & 3 deletions distributed/tests/test_cancelled_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,31 @@ async def test_resumed_cancelled_handle_compute(
Given the history of a task
executing -> cancelled -> resumed(fetch)
A handle_compute should properly restore executing.
Setup
-----
A task is running on the threadpool while a client is cancelling the
computations. In this case we cannot cancel the task on the threadpool but
need to transition the worker task to cancelled instead. While the task on
the threadpool is still running, the client resubmits the graph. This is
relatively common in interactive workflows in which a user cancels a
computation and resubmits shortly after.
This resubmission can decide to distribute tasks onto different workers than the initial submission
Parameters
----------
raise_error:
Both successful and erred results should be properly handled.
wait_for_processing:
Is the scheduler properly handling the task-done message of the worker
even if the task is not, yet, transitioned to processing again?
Expectation
-----------
A `handle_compute` should properly restore state `executing` even after the
task has transitioned through `cancelled` and `resumed(fetch)`.
"""
# This test is heavily using set_restrictions to simulate certain scheduler
# decisions of placing keys

lock_compute = Lock()
await lock_compute.acquire()
Expand Down Expand Up @@ -515,6 +536,12 @@ async def release_all_futures():
await release_all_futures()
await wait_for_state(f3.key, "cancelled", b)

# At the second iteration, the scheduler chooses to distribute the tasks to
# different workers
# Particularly for our "stuck" task f3 that is still cancelled(executing) on
# worker B this will transition the task to a resumed(fetch) since it is no
# longer supposed to be computed on B. However, B will still require the
# data of f3 and is supposed to fetch it in case the compute fails
f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[a.address])
Expand All @@ -523,6 +550,9 @@ async def release_all_futures():
await wait_for_state(f3.key, "resumed", b)
await release_all_futures()

# We're again cancelling, forcing f3 to transition back to executing from a
# resumed(fetch) state

f1 = c.submit(inc, 1, key="f1", workers=[a.address])
f2 = c.submit(inc, f1, key="f2", workers=[a.address])
f3 = c.submit(inc, f2, key="f3", workers=[b.address])
Expand Down

0 comments on commit 1032a10

Please sign in to comment.