Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure en error during ShuffleRun.close cannot block worker shutdown #8184

Merged
merged 2 commits into from Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
23 changes: 10 additions & 13 deletions distributed/shuffle/_worker_plugin.py
Expand Up @@ -104,10 +104,13 @@
await shuffle.inputs_done()

async def _close_shuffle_run(self, shuffle: ShuffleRun) -> None:
await shuffle.close()
async with self._runs_cleanup_condition:
self._runs.remove(shuffle)
self._runs_cleanup_condition.notify_all()
with log_errors():
try:
await shuffle.close()
finally:
async with self._runs_cleanup_condition:
self._runs.remove(shuffle)
self._runs_cleanup_condition.notify_all()

def shuffle_fail(self, shuffle_id: ShuffleId, run_id: int, message: str) -> None:
"""Fails the shuffle run with the message as exception and triggers cleanup.
Expand Down Expand Up @@ -277,15 +280,9 @@
RuntimeError("{existing!r} stale, expected run_id=={run_id}")
)

async def _(
extension: ShuffleWorkerPlugin, shuffle: ShuffleRun
) -> None:
await shuffle.close()
async with extension._runs_cleanup_condition:
extension._runs.remove(shuffle)
extension._runs_cleanup_condition.notify_all()

self.worker._ongoing_background_tasks.call_soon(_, self, existing)
self.worker._ongoing_background_tasks.call_soon(

Check warning on line 283 in distributed/shuffle/_worker_plugin.py

View check run for this annotation

Codecov / codecov/patch

distributed/shuffle/_worker_plugin.py#L283

Added line #L283 was not covered by tests
ShuffleWorkerPlugin._close_shuffle_run, self, existing
)
shuffle: ShuffleRun = result.spec.create_run_on_worker(
result.run_id, result.worker_for, self
)
Expand Down
30 changes: 30 additions & 0 deletions distributed/shuffle/tests/test_shuffle.py
Expand Up @@ -3,6 +3,7 @@
import asyncio
import io
import itertools
import logging
import os
import random
import shutil
Expand Down Expand Up @@ -610,6 +611,35 @@ async def test_closed_bystanding_worker_during_shuffle(c, s, w1, w2, w3):
await check_scheduler_cleanup(s)


class RaiseOnCloseShuffleRun(DataFrameShuffleRun):
async def close(self, *args, **kwargs):
raise RuntimeError("test-exception-on-close")


@mock.patch(
"distributed.shuffle._shuffle.DataFrameShuffleRun",
RaiseOnCloseShuffleRun,
)
@gen_cluster(client=True, nthreads=[])
async def test_exception_on_close_cleans_up(c, s, caplog):
# Ensure that everything is cleaned up and does not lock up if an exception
# is raised during shuffle close.
with caplog.at_level(logging.ERROR):
async with Worker(s.address) as w:
df = dask.datasets.timeseries(
start="2000-01-01",
end="2000-01-10",
dtypes={"x": float, "y": float},
freq="10 s",
)
shuffled = dd.shuffle.shuffle(df, "x", shuffle="p2p")
await c.compute([shuffled, df], sync=True)

assert any("test-exception-on-close" in record.message for record in caplog.records)
await check_worker_cleanup(w, closed=True)
w.logs
fjetter marked this conversation as resolved.
Show resolved Hide resolved


class BlockedInputsDoneShuffle(DataFrameShuffleRun):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
Expand Down