From 1f0e76eb515da5070ef8e068c4d44f0f576e0704 Mon Sep 17 00:00:00 2001 From: Erik Date: Thu, 24 Aug 2023 09:25:03 +0200 Subject: [PATCH 1/9] Use loop.time in DataUpdateCoordinator --- homeassistant/helpers/event.py | 31 +++++++++++++++++++++ homeassistant/helpers/update_coordinator.py | 21 ++++++-------- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/homeassistant/helpers/event.py b/homeassistant/helpers/event.py index daad994bbd42dc..11bfe04473affa 100644 --- a/homeassistant/helpers/event.py +++ b/homeassistant/helpers/event.py @@ -1434,6 +1434,37 @@ def unsub_point_in_time_listener() -> None: track_point_in_utc_time = threaded_listener_factory(async_track_point_in_utc_time) +@callback +@bind_hass +def async_call_at( + hass: HomeAssistant, + action: HassJob[[datetime], Coroutine[Any, Any, None] | None] + | Callable[[datetime], Coroutine[Any, Any, None] | None], + loop_time: float, +) -> CALLBACK_TYPE: + """Add a listener that is called at .""" + + @callback + def run_action(job: HassJob[[datetime], Coroutine[Any, Any, None] | None]) -> None: + """Call the action.""" + hass.async_run_hass_job(job, time_tracker_utcnow()) + + job = ( + action + if isinstance(action, HassJob) + else HassJob(action, f"call_at {loop_time}") + ) + cancel_callback = hass.loop.call_at(loop_time, run_action, job) + + @callback + def unsub_call_later_listener() -> None: + """Cancel the call_later.""" + assert cancel_callback is not None + cancel_callback.cancel() + + return unsub_call_later_listener + + @callback @bind_hass def async_call_later( diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index a050c0da9e405f..e053a6dde33686 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -81,6 +81,7 @@ def __init__( self._shutdown_requested = False self.config_entry = config_entries.current_entry.get() self.always_update = always_update + self._next_refresh: float | None = None # It's None before the first successful update. # Components should call async_config_entry_first_refresh @@ -214,20 +215,15 @@ def _schedule_refresh(self) -> None: # than the debouncer cooldown, this would cause the debounce to never be called self._async_unsub_refresh() - # We _floor_ utcnow to create a schedule on a rounded second, - # minimizing the time between the point and the real activation. - # That way we obtain a constant update frequency, - # as long as the update process takes less than 500ms - # - # We do not align everything to happen at microsecond 0 - # since it increases the risk of a thundering herd - # when multiple coordinators are scheduled to update at the same time. - # - # https://github.com/home-assistant/core/issues/82231 - self._unsub_refresh = event.async_track_point_in_utc_time( + # We use event.async_call_at because DataUpdateCoordinator does + # not guarantee an exact update interval + if self._next_refresh is None: + self._next_refresh = self.hass.loop.time() + self._next_refresh += self.update_interval.total_seconds() + self._unsub_refresh = event.async_call_at( self.hass, self._job, - utcnow().replace(microsecond=self._microsecond) + self.update_interval, + self._next_refresh, ) async def _handle_refresh_interval(self, _now: datetime) -> None: @@ -266,6 +262,7 @@ async def async_config_entry_first_refresh(self) -> None: async def async_refresh(self) -> None: """Refresh data and log errors.""" + self._next_refresh = None await self._async_refresh(log_failures=True) async def _async_refresh( # noqa: C901 From 6aef232228ecc8d3851a07f00a3f5d8dd6ceb09c Mon Sep 17 00:00:00 2001 From: Erik Date: Thu, 24 Aug 2023 09:38:09 +0200 Subject: [PATCH 2/9] Fix test --- tests/helpers/test_update_coordinator.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/helpers/test_update_coordinator.py b/tests/helpers/test_update_coordinator.py index 4258a508c34714..182ed6c3cb46e2 100644 --- a/tests/helpers/test_update_coordinator.py +++ b/tests/helpers/test_update_coordinator.py @@ -6,6 +6,7 @@ import urllib.error import aiohttp +from freezegun.api import FrozenDateTimeFactory import pytest import requests @@ -329,11 +330,14 @@ async def test_refresh_no_update_method( async def test_update_interval( - hass: HomeAssistant, crd: update_coordinator.DataUpdateCoordinator[int] + hass: HomeAssistant, + freezer: FrozenDateTimeFactory, + crd: update_coordinator.DataUpdateCoordinator[int], ) -> None: """Test update interval works.""" # Test we don't update without subscriber - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() assert crd.data is None @@ -342,18 +346,21 @@ async def test_update_interval( unsub = crd.async_add_listener(update_callback) # Test twice we update with subscriber - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() assert crd.data == 1 - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() assert crd.data == 2 # Test removing listener unsub() - async_fire_time_changed(hass, utcnow() + crd.update_interval) + freezer.tick(crd.update_interval) + async_fire_time_changed(hass) await hass.async_block_till_done() # Test we stop updating after we lose last subscriber From 22aafbc36c5a5faf6394679decbd502477945b60 Mon Sep 17 00:00:00 2001 From: Erik Date: Thu, 24 Aug 2023 09:49:38 +0200 Subject: [PATCH 3/9] Remove unused _microsecond member --- homeassistant/helpers/update_coordinator.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index e053a6dde33686..4a8a96324d5f0f 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -6,7 +6,6 @@ from collections.abc import Awaitable, Callable, Coroutine, Generator from datetime import datetime, timedelta import logging -from random import randint from time import monotonic from typing import Any, Generic, Protocol, TypeVar import urllib.error @@ -90,12 +89,6 @@ def __init__( # when it was already checked during setup. self.data: _DataT = None # type: ignore[assignment] - # Pick a random microsecond to stagger the refreshes - # and avoid a thundering herd. - self._microsecond = randint( - event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX - ) - self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {} job_name = "DataUpdateCoordinator" type_name = type(self).__name__ From 2b5175865fe7fabef2692883a75b41c980ab0d88 Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 25 Aug 2023 17:08:48 +0200 Subject: [PATCH 4/9] Fix logic --- homeassistant/helpers/update_coordinator.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index 4a8a96324d5f0f..55d5ca61b0c776 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -176,6 +176,7 @@ def _unschedule_refresh(self) -> None: """Unschedule any pending refresh since there is no longer any listeners.""" self._async_unsub_refresh() self._debounced_refresh.async_cancel() + self._next_refresh = None def async_contexts(self) -> Generator[Any, None, None]: """Return all registered contexts.""" @@ -210,8 +211,9 @@ def _schedule_refresh(self) -> None: # We use event.async_call_at because DataUpdateCoordinator does # not guarantee an exact update interval - if self._next_refresh is None: - self._next_refresh = self.hass.loop.time() + now = self.hass.loop.time() + if self._next_refresh is None or self._next_refresh <= now: + self._next_refresh = now self._next_refresh += self.update_interval.total_seconds() self._unsub_refresh = event.async_call_at( self.hass, @@ -395,6 +397,7 @@ def async_set_updated_data(self, data: _DataT) -> None: """Manually update data, notify listeners and reset refresh interval.""" self._async_unsub_refresh() self._debounced_refresh.async_cancel() + self._next_refresh = None self.data = data self.last_update_success = True From dcbb365ab588363bcf51ad2ce6e10253248739ef Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 25 Aug 2023 17:09:12 +0200 Subject: [PATCH 5/9] Update tomorrowio coordinator --- homeassistant/components/tomorrowio/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/homeassistant/components/tomorrowio/__init__.py b/homeassistant/components/tomorrowio/__init__.py index 6d1b84ec5d7049..41fa8158624ec8 100644 --- a/homeassistant/components/tomorrowio/__init__.py +++ b/homeassistant/components/tomorrowio/__init__.py @@ -221,7 +221,10 @@ async def async_setup_entry(self, entry: ConfigEntry) -> None: await self.async_refresh() self.update_interval = async_set_update_interval(self.hass, self._api) - self._schedule_refresh() + self._next_refresh = None + self._async_unsub_refresh() + if self._listeners: + self._schedule_refresh() async def async_unload_entry(self, entry: ConfigEntry) -> bool | None: """Unload a config entry from coordinator. From a2c70201c6decce086555eb547bd6ef9c1f76c84 Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 25 Aug 2023 17:18:54 +0200 Subject: [PATCH 6/9] Add staggering of refreshes back --- homeassistant/helpers/update_coordinator.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index 55d5ca61b0c776..9f252b74efccd1 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -6,6 +6,7 @@ from collections.abc import Awaitable, Callable, Coroutine, Generator from datetime import datetime, timedelta import logging +from random import randint from time import monotonic from typing import Any, Generic, Protocol, TypeVar import urllib.error @@ -89,6 +90,13 @@ def __init__( # when it was already checked during setup. self.data: _DataT = None # type: ignore[assignment] + # Pick a random microsecond to stagger the refreshes + # and avoid a thundering herd. + self._microsecond = ( + randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX) + / 10**6 + ) + self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {} job_name = "DataUpdateCoordinator" type_name = type(self).__name__ @@ -213,7 +221,7 @@ def _schedule_refresh(self) -> None: # not guarantee an exact update interval now = self.hass.loop.time() if self._next_refresh is None or self._next_refresh <= now: - self._next_refresh = now + self._next_refresh = now + self._microsecond self._next_refresh += self.update_interval.total_seconds() self._unsub_refresh = event.async_call_at( self.hass, From 3bb314dbfaa6518b383af3f697a7cca1dd3b5bde Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 25 Aug 2023 17:54:19 +0200 Subject: [PATCH 7/9] Tweak logic for staggering refreshes --- homeassistant/helpers/update_coordinator.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/homeassistant/helpers/update_coordinator.py b/homeassistant/helpers/update_coordinator.py index 9f252b74efccd1..34651fcaf9d1f4 100644 --- a/homeassistant/helpers/update_coordinator.py +++ b/homeassistant/helpers/update_coordinator.py @@ -90,7 +90,7 @@ def __init__( # when it was already checked during setup. self.data: _DataT = None # type: ignore[assignment] - # Pick a random microsecond to stagger the refreshes + # Pick a random microsecond in range 0.05..0.50 to stagger the refreshes # and avoid a thundering herd. self._microsecond = ( randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX) @@ -218,10 +218,10 @@ def _schedule_refresh(self) -> None: self._async_unsub_refresh() # We use event.async_call_at because DataUpdateCoordinator does - # not guarantee an exact update interval + # not need an exact update interval. now = self.hass.loop.time() if self._next_refresh is None or self._next_refresh <= now: - self._next_refresh = now + self._microsecond + self._next_refresh = int(now) + self._microsecond self._next_refresh += self.update_interval.total_seconds() self._unsub_refresh = event.async_call_at( self.hass, From c941cff595302eefa8631dfe9e038c440e90d5ed Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 25 Aug 2023 19:57:35 +0200 Subject: [PATCH 8/9] Tweak async_fire_time_changed --- tests/common.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/common.py b/tests/common.py index 6ee38b72532d29..eb154a697b3390 100644 --- a/tests/common.py +++ b/tests/common.py @@ -412,12 +412,9 @@ def async_fire_time_changed( else: utc_datetime = dt_util.as_utc(datetime_) - if utc_datetime.microsecond < event.RANDOM_MICROSECOND_MAX: - # Allow up to 500000 microseconds to be added to the time - # to handle update_coordinator's and - # async_track_time_interval's - # staggering to avoid thundering herd. - utc_datetime = utc_datetime.replace(microsecond=event.RANDOM_MICROSECOND_MAX) + # Increase the mocked time by 0.5 s to account for up to 0.5 s delay + # added to events scheduled by update_coordinator and async_track_time_interval + utc_datetime += timedelta(microseconds=event.RANDOM_MICROSECOND_MAX) _async_fire_time_changed(hass, utc_datetime, fire_all) From 26b7ae66eb3480805011b5ba31967282821a69b8 Mon Sep 17 00:00:00 2001 From: Erik Date: Fri, 25 Aug 2023 22:51:52 +0200 Subject: [PATCH 9/9] Fix test --- tests/helpers/test_event.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/helpers/test_event.py b/tests/helpers/test_event.py index 572a0d22e92723..dc06b9d94c84e1 100644 --- a/tests/helpers/test_event.py +++ b/tests/helpers/test_event.py @@ -4174,27 +4174,27 @@ async def test_periodic_task_entering_dst_2( ) freezer.move_to(f"{today} 01:59:59.999999+01:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 0 freezer.move_to(f"{today} 03:00:00.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 1 freezer.move_to(f"{today} 03:00:01.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 2 freezer.move_to(f"{tomorrow} 01:59:59.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 3 freezer.move_to(f"{tomorrow} 02:00:00.999999+02:00") - async_fire_time_changed(hass) + async_fire_time_changed_exact(hass) await hass.async_block_till_done() assert len(specific_runs) == 4