Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use loop.time in DataUpdateCoordinator #98937

Merged
merged 9 commits into from
Aug 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion homeassistant/components/tomorrowio/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ async def async_setup_entry(self, entry: ConfigEntry) -> None:
await self.async_refresh()

self.update_interval = async_set_update_interval(self.hass, self._api)
self._schedule_refresh()
self._next_refresh = None
self._async_unsub_refresh()
if self._listeners:
self._schedule_refresh()

async def async_unload_entry(self, entry: ConfigEntry) -> bool | None:
"""Unload a config entry from coordinator.
Expand Down
31 changes: 31 additions & 0 deletions homeassistant/helpers/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -1434,6 +1434,37 @@ def unsub_point_in_time_listener() -> None:
track_point_in_utc_time = threaded_listener_factory(async_track_point_in_utc_time)


@callback
@bind_hass
def async_call_at(
hass: HomeAssistant,
action: HassJob[[datetime], Coroutine[Any, Any, None] | None]
| Callable[[datetime], Coroutine[Any, Any, None] | None],
loop_time: float,
) -> CALLBACK_TYPE:
"""Add a listener that is called at <loop_time>."""

@callback
def run_action(job: HassJob[[datetime], Coroutine[Any, Any, None] | None]) -> None:
"""Call the action."""
hass.async_run_hass_job(job, time_tracker_utcnow())

job = (
action
if isinstance(action, HassJob)
else HassJob(action, f"call_at {loop_time}")
)
cancel_callback = hass.loop.call_at(loop_time, run_action, job)

@callback
def unsub_call_later_listener() -> None:
"""Cancel the call_later."""
assert cancel_callback is not None
cancel_callback.cancel()

return unsub_call_later_listener


@callback
@bind_hass
def async_call_later(
Expand Down
31 changes: 16 additions & 15 deletions homeassistant/helpers/update_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ def __init__(
self._shutdown_requested = False
self.config_entry = config_entries.current_entry.get()
self.always_update = always_update
self._next_refresh: float | None = None

# It's None before the first successful update.
# Components should call async_config_entry_first_refresh
Expand All @@ -89,10 +90,11 @@ def __init__(
# when it was already checked during setup.
self.data: _DataT = None # type: ignore[assignment]

# Pick a random microsecond to stagger the refreshes
# Pick a random microsecond in range 0.05..0.50 to stagger the refreshes
# and avoid a thundering herd.
self._microsecond = randint(
event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX
self._microsecond = (
randint(event.RANDOM_MICROSECOND_MIN, event.RANDOM_MICROSECOND_MAX)
/ 10**6
)

self._listeners: dict[CALLBACK_TYPE, tuple[CALLBACK_TYPE, object | None]] = {}
Expand Down Expand Up @@ -182,6 +184,7 @@ def _unschedule_refresh(self) -> None:
"""Unschedule any pending refresh since there is no longer any listeners."""
self._async_unsub_refresh()
self._debounced_refresh.async_cancel()
self._next_refresh = None

def async_contexts(self) -> Generator[Any, None, None]:
"""Return all registered contexts."""
Expand Down Expand Up @@ -214,20 +217,16 @@ def _schedule_refresh(self) -> None:
# than the debouncer cooldown, this would cause the debounce to never be called
self._async_unsub_refresh()

# We _floor_ utcnow to create a schedule on a rounded second,
# minimizing the time between the point and the real activation.
# That way we obtain a constant update frequency,
# as long as the update process takes less than 500ms
#
# We do not align everything to happen at microsecond 0
# since it increases the risk of a thundering herd
# when multiple coordinators are scheduled to update at the same time.
#
# https://github.com/home-assistant/core/issues/82231
self._unsub_refresh = event.async_track_point_in_utc_time(
# We use event.async_call_at because DataUpdateCoordinator does
# not need an exact update interval.
now = self.hass.loop.time()
if self._next_refresh is None or self._next_refresh <= now:
self._next_refresh = int(now) + self._microsecond
self._next_refresh += self.update_interval.total_seconds()
bdraco marked this conversation as resolved.
Show resolved Hide resolved
self._unsub_refresh = event.async_call_at(
self.hass,
self._job,
utcnow().replace(microsecond=self._microsecond) + self.update_interval,
self._next_refresh,
)

async def _handle_refresh_interval(self, _now: datetime) -> None:
Expand Down Expand Up @@ -266,6 +265,7 @@ async def async_config_entry_first_refresh(self) -> None:

async def async_refresh(self) -> None:
"""Refresh data and log errors."""
self._next_refresh = None
await self._async_refresh(log_failures=True)

async def _async_refresh( # noqa: C901
Expand Down Expand Up @@ -405,6 +405,7 @@ def async_set_updated_data(self, data: _DataT) -> None:
"""Manually update data, notify listeners and reset refresh interval."""
self._async_unsub_refresh()
self._debounced_refresh.async_cancel()
self._next_refresh = None

self.data = data
self.last_update_success = True
Expand Down
9 changes: 3 additions & 6 deletions tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -412,12 +412,9 @@ def async_fire_time_changed(
else:
utc_datetime = dt_util.as_utc(datetime_)

if utc_datetime.microsecond < event.RANDOM_MICROSECOND_MAX:
# Allow up to 500000 microseconds to be added to the time
# to handle update_coordinator's and
# async_track_time_interval's
# staggering to avoid thundering herd.
utc_datetime = utc_datetime.replace(microsecond=event.RANDOM_MICROSECOND_MAX)
# Increase the mocked time by 0.5 s to account for up to 0.5 s delay
# added to events scheduled by update_coordinator and async_track_time_interval
utc_datetime += timedelta(microseconds=event.RANDOM_MICROSECOND_MAX)

_async_fire_time_changed(hass, utc_datetime, fire_all)

Expand Down
10 changes: 5 additions & 5 deletions tests/helpers/test_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -4174,27 +4174,27 @@ async def test_periodic_task_entering_dst_2(
)

freezer.move_to(f"{today} 01:59:59.999999+01:00")
async_fire_time_changed(hass)
async_fire_time_changed_exact(hass)
await hass.async_block_till_done()
assert len(specific_runs) == 0

freezer.move_to(f"{today} 03:00:00.999999+02:00")
async_fire_time_changed(hass)
async_fire_time_changed_exact(hass)
await hass.async_block_till_done()
assert len(specific_runs) == 1

freezer.move_to(f"{today} 03:00:01.999999+02:00")
async_fire_time_changed(hass)
async_fire_time_changed_exact(hass)
await hass.async_block_till_done()
assert len(specific_runs) == 2

freezer.move_to(f"{tomorrow} 01:59:59.999999+02:00")
async_fire_time_changed(hass)
async_fire_time_changed_exact(hass)
await hass.async_block_till_done()
assert len(specific_runs) == 3

freezer.move_to(f"{tomorrow} 02:00:00.999999+02:00")
async_fire_time_changed(hass)
async_fire_time_changed_exact(hass)
await hass.async_block_till_done()
assert len(specific_runs) == 4

Expand Down
17 changes: 12 additions & 5 deletions tests/helpers/test_update_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import urllib.error

import aiohttp
from freezegun.api import FrozenDateTimeFactory
import pytest
import requests

Expand Down Expand Up @@ -329,11 +330,14 @@ async def test_refresh_no_update_method(


async def test_update_interval(
hass: HomeAssistant, crd: update_coordinator.DataUpdateCoordinator[int]
hass: HomeAssistant,
freezer: FrozenDateTimeFactory,
crd: update_coordinator.DataUpdateCoordinator[int],
) -> None:
"""Test update interval works."""
# Test we don't update without subscriber
async_fire_time_changed(hass, utcnow() + crd.update_interval)
freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert crd.data is None

Expand All @@ -342,18 +346,21 @@ async def test_update_interval(
unsub = crd.async_add_listener(update_callback)

# Test twice we update with subscriber
async_fire_time_changed(hass, utcnow() + crd.update_interval)
freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert crd.data == 1

async_fire_time_changed(hass, utcnow() + crd.update_interval)
freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done()
assert crd.data == 2

# Test removing listener
unsub()

async_fire_time_changed(hass, utcnow() + crd.update_interval)
freezer.tick(crd.update_interval)
async_fire_time_changed(hass)
await hass.async_block_till_done()

# Test we stop updating after we lose last subscriber
Expand Down