Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@
## Bug Fixes

* Fixes reconnecting after connection loss for streams
* Fixed an issue in the `Dispatcher` class where the client connection was not properly disconnected during cleanup, potentially causing unclosed socket errors.
16 changes: 11 additions & 5 deletions src/frequenz/dispatch/_dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,13 @@ def __init__(
)
self._actor_dispatchers: dict[str, ActorDispatcher] = {}
self._empty_event = Event()
self._empty_event.set()
self._disconnecting_future: asyncio.Future[None] | None = None

@override
def start(self) -> None:
"""Start the local dispatch service."""
self._bg_service.start()
self._empty_event.set()

@property
@override
Expand All @@ -235,19 +236,23 @@ def is_running(self) -> bool:

@override
async def wait(self) -> None:
"""Wait until all actor dispatches are stopped."""
await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
"""Wait until all actor dispatches are stopped and client is disconnected."""
if self._disconnecting_future is not None:
await self._disconnecting_future

await asyncio.gather(self._bg_service.wait(), self._empty_event.wait())
self._actor_dispatchers.clear()

@override
def cancel(self, msg: str | None = None) -> None:
"""Stop the local dispatch service."""
"""Stop the local dispatch service and initiate client disconnection."""
self._bg_service.cancel(msg)

for instance in self._actor_dispatchers.values():
instance.cancel()

# Initiate client disconnection asynchronously
self._disconnecting_future = asyncio.ensure_future(self._client.disconnect())

async def wait_for_initialization(self) -> None:
"""Wait until the background service is initialized."""
await self._bg_service.wait_for_initialization()
Expand Down Expand Up @@ -358,6 +363,7 @@ async def __aenter__(self) -> Self:
This background service.
"""
await super().__aenter__()
await self._client.__aenter__()
await self.wait_for_initialization()
return self

Expand Down
Loading