Skip to content

Commit

Permalink
Fix deadlock (#8703)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Jun 19, 2024
1 parent ac3854e commit 03035da
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
2 changes: 1 addition & 1 deletion distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2540,7 +2540,7 @@ def _transition_memory_released(self, key: Key, stimulus_id: str) -> RecsMsgs:
recommendations[key] = "waiting"

for dts in ts.waiters or ():
if dts.state in ("no-worker", "processing"):
if dts.state in ("no-worker", "processing", "queued"):
recommendations[dts.key] = "waiting"
elif dts.state == "waiting":
if not dts.waiting_on:
Expand Down
37 changes: 37 additions & 0 deletions distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4650,6 +4650,43 @@ def assert_rootish():
await c.gather(fut3)


@gen_cluster(client=True, nthreads=[("", 1)])
async def test_deadlock_dependency_of_queued_released(c, s, a):
@delayed
def inc(input):
return input + 1

@delayed
def block_on_event(input, block, executing):
executing.set()
block.wait()
return input

block = Event()
executing = Event()

dep = inc(0)
futs = [
block_on_event(dep, block, executing, dask_key_name=("rootish", i))
for i in range(s.total_nthreads * 2 + 1)
]
del dep
futs = c.compute(futs)
await executing.wait()
assert s.queued
await s.remove_worker(address=a.address, stimulus_id="test")

s.validate_state()

await block.set()
await executing.clear()

async with Worker(s.address) as b:
s.validate_state()
await c.gather(*futs)
s.validate_state()


@gen_cluster(client=True)
async def test_submit_dependency_of_erred_task(c, s, a, b):
x = c.submit(lambda: 1 / 0, key="x")
Expand Down

0 comments on commit 03035da

Please sign in to comment.