Skip to content

Commit

Permalink
Close state machine and add-ins first in Worker.close (#8066)
Browse files Browse the repository at this point in the history
  • Loading branch information
hendrikmakait committed Aug 4, 2023
1 parent eb297b3 commit 8cc2be4
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
2 changes: 1 addition & 1 deletion distributed/tests/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3678,7 +3678,7 @@ async def test_gather_on_worker_bad_recipient(c, s, a, b):
"""The recipient is missing"""
x = await c.scatter("x")
await b.close()
assert s.workers.keys() == {a.address}
await async_poll_for(lambda: s.workers.keys() == {a.address}, timeout=5)
out = await s.gather_on_worker(b.address, {x.key: [a.address]})
assert out == {x.key}

Expand Down
47 changes: 23 additions & 24 deletions distributed/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -1535,43 +1535,43 @@ async def close( # type: ignore
logger.info("Closed worker has not yet started: %s", self.status)
if not executor_wait:
logger.info("Not waiting on executor to close")
self.status = Status.closing

# Stop callbacks before giving up control in any `await`.
# We don't want to heartbeat while closing.
for pc in self.periodic_callbacks.values():
pc.stop()
# This also informs the scheduler about the status update
self.status = Status.closing
setproctitle("dask worker [closing]")

self.stop()
if nanny and self.nanny:
with self.rpc(self.nanny) as r:
await r.close_gracefully(reason=reason)

# Cancel async instructions
await BaseWorker.close(self, timeout=timeout)

for preload in self.preloads:
try:
await preload.teardown()
except Exception:
logger.exception("Failed to tear down preload")
teardowns = [
plugin.teardown(self)
for plugin in self.plugins.values()
if hasattr(plugin, "teardown")
]
await asyncio.gather(*(td for td in teardowns if isawaitable(td)))

for extension in self.extensions.values():
if hasattr(extension, "close"):
result = extension.close()
if isawaitable(result):
await result

if nanny and self.nanny:
with self.rpc(self.nanny) as r:
await r.close_gracefully(reason=reason)
self.stop_services()

setproctitle("dask worker [closing]")
for preload in self.preloads:
try:
await preload.teardown()
except Exception:
logger.exception("Failed to tear down preload")

teardowns = [
plugin.teardown(self)
for plugin in self.plugins.values()
if hasattr(plugin, "teardown")
]
for pc in self.periodic_callbacks.values():
pc.stop()

await asyncio.gather(*(td for td in teardowns if isawaitable(td)))
self.stop()

if self._client:
# If this worker is the last one alive, clean up the worker
Expand All @@ -1597,8 +1597,6 @@ async def close( # type: ignore

await self.scheduler.close_rpc()

self.stop_services()

# Give some time for a UCX scheduler to complete closing endpoints
# before closing self.batched_stream, otherwise the local endpoint
# may be closed too early and errors be raised on the scheduler when
Expand Down Expand Up @@ -1649,10 +1647,11 @@ def _close(executor, wait):
await self.rpc.close()

self.status = Status.closed
setproctitle("dask worker [closed]")

await ServerNode.close(self)

self.__exit_stack.__exit__(None, None, None)
setproctitle("dask worker [closed]")
return "OK"

async def close_gracefully(
Expand Down

0 comments on commit 8cc2be4

Please sign in to comment.