Skip to content

Commit

Permalink
Better logging for worker removal (#8517)
Browse files Browse the repository at this point in the history
  • Loading branch information
crusaderky committed Feb 28, 2024
1 parent f86a031 commit 1602d74
Show file tree
Hide file tree
Showing 5 changed files with 226 additions and 46 deletions.
23 changes: 5 additions & 18 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1497,8 +1497,7 @@ async def __aexit__(self, exc_type, exc_value, traceback):
await self._close(
# if we're handling an exception, we assume that it's more
# important to deliver that exception than shutdown gracefully.
fast=exc_type
is not None
fast=(exc_type is not None)
)

def __exit__(self, exc_type, exc_value, traceback):
Expand Down Expand Up @@ -1669,16 +1668,15 @@ async def _wait_for_handle_report_task(self, fast=False):
await wait_for(handle_report_task, 0 if fast else 2)

@log_errors
async def _close(self, fast=False):
"""
Send close signal and wait until scheduler completes
async def _close(self, fast: bool = False) -> None:
"""Send close signal and wait until scheduler completes
If fast is True, the client will close forcefully, by cancelling tasks
the background _handle_report_task.
"""
# TODO: aclose more forcefully by aborting the RPC and cancelling all
# TODO: close more forcefully by aborting the RPC and cancelling all
# background tasks.
# see https://trio.readthedocs.io/en/stable/reference-io.html#trio.aclose_forcefully
# See https://trio.readthedocs.io/en/stable/reference-io.html#trio.aclose_forcefully
if self.status == "closed":
return

Expand Down Expand Up @@ -1773,18 +1771,7 @@ def close(self, timeout=no_default):
coro = wait_for(coro, timeout)
return coro

if self._start_arg is None:
with suppress(AttributeError):
f = self.cluster.close()
if asyncio.iscoroutine(f):

async def _():
await f

self.sync(_)

sync(self.loop, self._close, fast=True, callback_timeout=timeout)

assert self.status == "closed"

if not is_python_shutting_down():
Expand Down
92 changes: 68 additions & 24 deletions distributed/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5182,14 +5182,6 @@ async def remove_worker(

ws = self.workers[address]

event_msg = {
"action": "remove-worker",
"processing-tasks": {ts.key for ts in ws.processing},
}
self.log_event(address, event_msg.copy())
event_msg["worker"] = address
self.log_event("all", event_msg)

logger.info(f"Remove worker {ws} ({stimulus_id=})")
if close:
with suppress(AttributeError, CommClosedError):
Expand Down Expand Up @@ -5220,6 +5212,7 @@ async def remove_worker(

recommendations: Recs = {}

processing_keys = {ts.key for ts in ws.processing}
for ts in list(ws.processing):
k = ts.key
recommendations[k] = "released"
Expand All @@ -5244,21 +5237,49 @@ async def remove_worker(
worker=address,
)
recommendations.update(r)
logger.info(
logger.error(
"Task %s marked as failed because %d workers died"
" while trying to run it",
ts.key,
ts.suspicious,
)

recompute_keys = set()
lost_keys = set()

for ts in list(ws.has_what):
self.remove_replica(ts, ws)
if not ts.who_has:
if ts.run_spec:
recompute_keys.add(ts.key)
recommendations[ts.key] = "released"
else: # pure data
lost_keys.add(ts.key)
recommendations[ts.key] = "forgotten"

if recompute_keys:
logger.warning(
f"Removing worker {ws.address!r} caused the cluster to lose "
"already computed task(s), which will be recomputed elsewhere: "
f"{recompute_keys} ({stimulus_id=})"
)
if lost_keys:
logger.error(
f"Removing worker {ws.address!r} caused the cluster to lose scattered "
f"data, which can't be recovered: {lost_keys} ({stimulus_id=})"
)

event_msg = {
"action": "remove-worker",
"processing-tasks": processing_keys,
"lost-computed-tasks": recompute_keys,
"lost-scattered-tasks": lost_keys,
"stimulus_id": stimulus_id,
}
self.log_event(address, event_msg.copy())
event_msg["worker"] = address
self.log_event("all", event_msg)

self.transitions(recommendations, stimulus_id=stimulus_id)

awaitables = []
Expand Down Expand Up @@ -5827,6 +5848,7 @@ def handle_worker_status_change(
"action": "worker-status-change",
"prev-status": prev_status.name,
"status": ws.status.name,
"stimulus_id": stimulus_id,
},
)
logger.debug(f"Worker status {prev_status.name} -> {status} - {ws}")
Expand Down Expand Up @@ -7207,7 +7229,7 @@ async def retire_workers(
try:
coros = []
for ws in wss:
logger.info("Retiring worker %s", ws.address)
logger.info(f"Retiring worker {ws.address!r} ({stimulus_id=!r})")

policy = RetireWorker(ws.address)
amm.add_policy(policy)
Expand Down Expand Up @@ -7244,19 +7266,37 @@ async def retire_workers(
# time (depending on interval settings)
amm.run_once()

workers_info = {
addr: info
for addr, info in await asyncio.gather(*coros)
if addr is not None
}
workers_info_ok = {}
workers_info_abort = {}
for addr, result, info in await asyncio.gather(*coros):
if result == "OK":
workers_info_ok[addr] = info
else:
workers_info_abort[addr] = info

finally:
if stop_amm:
amm.stop()

self.log_event("all", {"action": "retire-workers", "workers": workers_info})
self.log_event(list(workers_info), {"action": "retired"})
self.log_event(
"all",
{
"action": "retire-workers",
"retired": workers_info_ok,
"could-not-retire": workers_info_abort,
"stimulus_id": stimulus_id,
},
)
self.log_event(
list(workers_info_ok),
{"action": "retired", "stimulus_id": stimulus_id},
)
self.log_event(
list(workers_info_abort),
{"action": "could-not-retire", "stimulus_id": stimulus_id},
)

return workers_info
return workers_info_ok

async def _track_retire_worker(
self,
Expand All @@ -7266,7 +7306,7 @@ async def _track_retire_worker(
close: bool,
remove: bool,
stimulus_id: str,
) -> tuple[str | None, dict]:
) -> tuple[str, Literal["OK", "no-recipients"], dict]:
while not policy.done():
# Sleep 0.01s when there are 4 tasks or less
# Sleep 0.5s when there are 200 or more
Expand All @@ -7284,10 +7324,14 @@ async def _track_retire_worker(
"stimulus_id": stimulus_id,
}
)
return None, {}
logger.warning(
f"Could not retire worker {ws.address!r}: unique data could not be "
f"moved to any other worker ({stimulus_id=!r})"
)
return ws.address, "no-recipients", ws.identity()

logger.debug(
"All unique keys on worker %s have been replicated elsewhere", ws.address
f"All unique keys on worker {ws.address!r} have been replicated elsewhere"
)

if remove:
Expand All @@ -7297,8 +7341,8 @@ async def _track_retire_worker(
elif close:
self.close_worker(ws.address)

logger.info("Retired worker %s", ws.address)
return ws.address, ws.identity()
logger.info(f"Retired worker {ws.address!r} ({stimulus_id=!r})")
return ws.address, "OK", ws.identity()

def add_keys(
self, worker: str, keys: Collection[Key] = (), stimulus_id: str | None = None
Expand Down Expand Up @@ -7434,7 +7478,7 @@ async def feed(
def log_worker_event(
self, worker: str, topic: str | Collection[str], msg: Any
) -> None:
if isinstance(msg, dict):
if isinstance(msg, dict) and worker != topic:
msg["worker"] = worker
self.log_event(topic, msg)

Expand Down
5 changes: 4 additions & 1 deletion distributed/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5196,7 +5196,10 @@ def test_quiet_client_close(loop):
threads_per_worker=4,
) as c:
futures = c.map(slowinc, range(1000), delay=0.01)
sleep(0.200) # stop part-way
# Stop part-way
s = c.cluster.scheduler
while sum(ts.state == "memory" for ts in s.tasks.values()) < 20:
sleep(0.01)
sleep(0.1) # let things settle

out = logger.getvalue()
Expand Down
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3058,7 +3058,7 @@ async def connect(self, *args, **kwargs):


@gen_cluster(client=True)
async def test_gather_failing_cnn_recover(c, s, a, b):
async def test_gather_failing_can_recover(c, s, a, b):
x = await c.scatter({"x": 1}, workers=a.address)
rpc = await FlakyConnectionPool(failing_connections=1)
with mock.patch.object(s, "rpc", rpc), dask.config.set(
Expand Down
Loading

0 comments on commit 1602d74

Please sign in to comment.