Skip to content

Commit

Permalink
Reorder operations in Worker.close (#8076)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 7, 2023
1 parent 4c4748a commit 9255987
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 10 deletions.
10 changes: 5 additions & 5 deletions distributed/core.py
Expand Up @@ -662,11 +662,6 @@ def stop(self):
if self.__stopped:
return

if self._workdir is not None:
self._workdir.release()

self.monitor.close()

self.__stopped = True
_stops = set()
for listener in self.listeners:
Expand All @@ -687,6 +682,11 @@ async def background_stops():

self._ongoing_background_tasks.call_soon(background_stops)

self.monitor.close()

if self._workdir is not None:
self._workdir.release()

@property
def listener(self):
if self.listeners:
Expand Down
28 changes: 23 additions & 5 deletions distributed/worker.py
Expand Up @@ -6,6 +6,7 @@
import contextlib
import contextvars
import errno
import inspect
import logging
import math
import os
Expand Down Expand Up @@ -1571,8 +1572,6 @@ async def close( # type: ignore
for pc in self.periodic_callbacks.values():
pc.stop()

self.stop()

if self._client:
# If this worker is the last one alive, clean up the worker
# initialized clients
Expand All @@ -1595,7 +1594,27 @@ async def close( # type: ignore
# otherwise
c.close()

await self.scheduler.close_rpc()
# FIXME: Copy-paste from `Server.stop`. See dask/distributed#8077
_stops = set()
for listener in self.listeners:
future = listener.stop()
if inspect.isawaitable(future):
_stops.add(future)
try:
abort_handshaking_comms = listener.abort_handshaking_comms
except AttributeError:
pass
else:
abort_handshaking_comms()

if _stops:

async def background_stops():
await asyncio.gather(*_stops)

# end copy-paste

await self.rpc.close()

# Give some time for a UCX scheduler to complete closing endpoints
# before closing self.batched_stream, otherwise the local endpoint
Expand Down Expand Up @@ -1644,8 +1663,7 @@ def _close(executor, wait):
executor=executor, wait=executor_wait
) # Just run it directly

await self.rpc.close()

self.stop()
self.status = Status.closed
setproctitle("dask worker [closed]")

Expand Down

0 comments on commit 9255987

Please sign in to comment.