From 558acafe827db34a8ab0d6b6ab02b93ef1c44b5a Mon Sep 17 00:00:00 2001 From: "Mathias L. Baumann" Date: Thu, 12 Jun 2025 20:36:47 +0200 Subject: [PATCH] Properly start/stop timer in DispatchScheduler Signed-off-by: Mathias L. Baumann --- src/frequenz/dispatch/_actor_dispatcher.py | 3 +- src/frequenz/dispatch/_bg_service.py | 112 ++++++++++++--------- tests/test_frequenz_dispatch.py | 3 +- tests/test_managing_actor.py | 2 +- 4 files changed, 66 insertions(+), 54 deletions(-) diff --git a/src/frequenz/dispatch/_actor_dispatcher.py b/src/frequenz/dispatch/_actor_dispatcher.py index 177d398..b260b4b 100644 --- a/src/frequenz/dispatch/_actor_dispatcher.py +++ b/src/frequenz/dispatch/_actor_dispatcher.py @@ -12,8 +12,7 @@ from frequenz.channels import Broadcast, Receiver, Sender, select from frequenz.channels.timer import SkipMissedAndDrift, Timer -from frequenz.client.common.microgrid.components import ComponentCategory -from frequenz.client.microgrid import ComponentId +from frequenz.client.common.microgrid.components import ComponentCategory, ComponentId from frequenz.sdk.actor import Actor, BackgroundService from ._dispatch import Dispatch diff --git a/src/frequenz/dispatch/_bg_service.py b/src/frequenz/dispatch/_bg_service.py index 8be9dcf..5b5b00c 100644 --- a/src/frequenz/dispatch/_bg_service.py +++ b/src/frequenz/dispatch/_bg_service.py @@ -10,6 +10,7 @@ import logging from abc import ABC, abstractmethod from collections.abc import Mapping +from contextlib import closing from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from heapq import heappop, heappush @@ -113,13 +114,6 @@ def __init__( ) self._running_state_status_tx = self._running_state_status_channel.new_sender() - self._next_event_timer = Timer( - timedelta(seconds=100), SkipMissedAndResync(), auto_start=False - ) - """The timer to schedule the next event. - - Interval is chosen arbitrarily, as it will be reset on the first event. - """ self._scheduled_events: list["DispatchScheduler.QueueItem"] = [] """The scheduled events, sorted by time. @@ -188,7 +182,7 @@ async def new_running_state_event_receiver( Raises: RuntimeError: If the dispatch service is not running. """ - if not self._tasks: + if not self.is_running: raise RuntimeError("Dispatch service not started") # Find all matching dispatches based on the type and collect them @@ -230,44 +224,59 @@ async def _run(self) -> None: self._microgrid_id, ) - # Initial fetch - await self._fetch() - - stream = self._client.stream(microgrid_id=self._microgrid_id) - # Streaming updates - async for selected in select(self._next_event_timer, stream): - if selected_from(selected, self._next_event_timer): - if not self._scheduled_events: - continue - await self._execute_scheduled_event( - heappop(self._scheduled_events).dispatch - ) - elif selected_from(selected, stream): - _logger.debug("Received dispatch event: %s", selected.message) - dispatch = Dispatch(selected.message.dispatch) - match selected.message.event: - case Event.CREATED: - self._dispatches[dispatch.id] = dispatch - await self._update_dispatch_schedule_and_notify(dispatch, None) - await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) - case Event.UPDATED: - await self._update_dispatch_schedule_and_notify( - dispatch, self._dispatches[dispatch.id] - ) - self._dispatches[dispatch.id] = dispatch - await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) - case Event.DELETED: - self._dispatches.pop(dispatch.id) - await self._update_dispatch_schedule_and_notify(None, dispatch) - - await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) - - async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: + with closing( + Timer(timedelta(seconds=100), SkipMissedAndResync(), auto_start=False) + ) as next_event_timer: + # Initial fetch + await self._fetch(next_event_timer) + stream = self._client.stream(microgrid_id=self._microgrid_id) + + async for selected in select(next_event_timer, stream): + if selected_from(selected, next_event_timer): + if not self._scheduled_events: + continue + await self._execute_scheduled_event( + heappop(self._scheduled_events).dispatch, next_event_timer + ) + elif selected_from(selected, stream): + _logger.debug("Received dispatch event: %s", selected.message) + dispatch = Dispatch(selected.message.dispatch) + match selected.message.event: + case Event.CREATED: + self._dispatches[dispatch.id] = dispatch + await self._update_dispatch_schedule_and_notify( + dispatch, None, next_event_timer + ) + await self._lifecycle_events_tx.send( + Created(dispatch=dispatch) + ) + case Event.UPDATED: + await self._update_dispatch_schedule_and_notify( + dispatch, + self._dispatches[dispatch.id], + next_event_timer, + ) + self._dispatches[dispatch.id] = dispatch + await self._lifecycle_events_tx.send( + Updated(dispatch=dispatch) + ) + case Event.DELETED: + self._dispatches.pop(dispatch.id) + await self._update_dispatch_schedule_and_notify( + None, dispatch, next_event_timer + ) + + await self._lifecycle_events_tx.send( + Deleted(dispatch=dispatch) + ) + + async def _execute_scheduled_event(self, dispatch: Dispatch, timer: Timer) -> None: """Execute a scheduled event. Args: dispatch: The dispatch to execute. + timer: The timer to use for scheduling the next event. """ _logger.debug("Executing scheduled event: %s (%s)", dispatch, dispatch.started) await self._send_running_state_change(dispatch) @@ -282,9 +291,9 @@ async def _execute_scheduled_event(self, dispatch: Dispatch) -> None: else: self._schedule_start(dispatch) - self._update_timer() + self._update_timer(timer) - async def _fetch(self) -> None: + async def _fetch(self, timer: Timer) -> None: """Fetch all relevant dispatches using list. This is used for the initial fetch and for re-fetching all dispatches @@ -305,12 +314,14 @@ async def _fetch(self) -> None: old_dispatch = old_dispatches.pop(dispatch.id, None) if not old_dispatch: _logger.debug("New dispatch: %s", dispatch) - await self._update_dispatch_schedule_and_notify(dispatch, None) + await self._update_dispatch_schedule_and_notify( + dispatch, None, timer + ) await self._lifecycle_events_tx.send(Created(dispatch=dispatch)) elif dispatch.update_time != old_dispatch.update_time: _logger.debug("Updated dispatch: %s", dispatch) await self._update_dispatch_schedule_and_notify( - dispatch, old_dispatch + dispatch, old_dispatch, timer ) await self._lifecycle_events_tx.send(Updated(dispatch=dispatch)) @@ -324,7 +335,7 @@ async def _fetch(self) -> None: for dispatch in old_dispatches.values(): _logger.debug("Deleted dispatch: %s", dispatch) await self._lifecycle_events_tx.send(Deleted(dispatch=dispatch)) - await self._update_dispatch_schedule_and_notify(None, dispatch) + await self._update_dispatch_schedule_and_notify(None, dispatch, timer) # Set deleted only here as it influences the result of dispatch.started # which is used in above in _running_state_change @@ -334,7 +345,7 @@ async def _fetch(self) -> None: self._initial_fetch_event.set() async def _update_dispatch_schedule_and_notify( - self, dispatch: Dispatch | None, old_dispatch: Dispatch | None + self, dispatch: Dispatch | None, old_dispatch: Dispatch | None, timer: Timer ) -> None: """Update the schedule for a dispatch. @@ -350,6 +361,7 @@ async def _update_dispatch_schedule_and_notify( Args: dispatch: The dispatch to update the schedule for. old_dispatch: The old dispatch, if available. + timer: The timer to use for scheduling the next event. """ # If dispatch is None, the dispatch was deleted # and we need to cancel any existing event for it @@ -392,13 +404,13 @@ async def _update_dispatch_schedule_and_notify( self._schedule_start(dispatch) # We modified the schedule, so we need to reset the timer - self._update_timer() + self._update_timer(timer) - def _update_timer(self) -> None: + def _update_timer(self, timer: Timer) -> None: """Update the timer to the next event.""" if self._scheduled_events: due_at: datetime = self._scheduled_events[0].time - self._next_event_timer.reset(interval=due_at - datetime.now(timezone.utc)) + timer.reset(interval=due_at - datetime.now(timezone.utc)) _logger.debug("Next event scheduled at %s", self._scheduled_events[0].time) def _remove_scheduled(self, dispatch: Dispatch) -> bool: diff --git a/tests/test_frequenz_dispatch.py b/tests/test_frequenz_dispatch.py index d030a7d..c96b584 100644 --- a/tests/test_frequenz_dispatch.py +++ b/tests/test_frequenz_dispatch.py @@ -461,6 +461,7 @@ async def test_dispatch_new_but_finished( test_env.client.set_dispatches(test_env.microgrid_id, [finished_dispatch]) await test_env.service.stop() test_env.service.start() + test_env = replace( test_env, lifecycle_events=test_env.service.new_lifecycle_events_receiver("TEST_TYPE"), @@ -470,8 +471,8 @@ async def test_dispatch_new_but_finished( ) ), ) - fake_time.shift(timedelta(seconds=1)) + # Process the lifecycle event caused by the old dispatch at startup await test_env.lifecycle_events.receive() diff --git a/tests/test_managing_actor.py b/tests/test_managing_actor.py index 96c8a8e..baca83c 100644 --- a/tests/test_managing_actor.py +++ b/tests/test_managing_actor.py @@ -14,11 +14,11 @@ import pytest import time_machine from frequenz.channels import Broadcast, Receiver, Sender +from frequenz.client.common.microgrid.components import ComponentId from frequenz.client.dispatch import recurrence from frequenz.client.dispatch.recurrence import Frequency, RecurrenceRule from frequenz.client.dispatch.test.client import FakeClient from frequenz.client.dispatch.test.generator import DispatchGenerator -from frequenz.client.microgrid import ComponentId from frequenz.sdk.actor import Actor from pytest import fixture