Skip to content

Commit

Permalink
Make SSDP tasks background HassJob to avoid delaying startup (#112668)
Browse files Browse the repository at this point in the history
  • Loading branch information
bdraco committed Mar 9, 2024
1 parent b7d9f26 commit 2b0b3c2
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 40 deletions.
53 changes: 36 additions & 17 deletions homeassistant/components/ssdp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from __future__ import annotations

import asyncio
from collections.abc import Awaitable, Callable, Mapping
from collections.abc import Callable, Coroutine, Mapping
from dataclasses import dataclass, field
from datetime import timedelta
from enum import Enum
from functools import partial
from ipaddress import IPv4Address, IPv6Address
import logging
import socket
Expand Down Expand Up @@ -42,7 +43,7 @@
MATCH_ALL,
__version__ as current_version,
)
from homeassistant.core import Event, HomeAssistant, callback as core_callback
from homeassistant.core import Event, HassJob, HomeAssistant, callback as core_callback
from homeassistant.data_entry_flow import BaseServiceInfo
from homeassistant.helpers import config_validation as cv, discovery_flow
from homeassistant.helpers.aiohttp_client import async_get_clientsession
Expand All @@ -53,6 +54,7 @@
from homeassistant.helpers.typing import ConfigType
from homeassistant.loader import async_get_ssdp, bind_hass
from homeassistant.util.async_ import create_eager_task
from homeassistant.util.logging import catch_log_exception

DOMAIN = "ssdp"
SSDP_SCANNER = "scanner"
Expand Down Expand Up @@ -124,7 +126,9 @@ class SsdpServiceInfo(BaseServiceInfo):


SsdpChange = Enum("SsdpChange", "ALIVE BYEBYE UPDATE")
SsdpCallback = Callable[[SsdpServiceInfo, SsdpChange], Awaitable]
SsdpHassJobCallback = HassJob[
[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None
]

SSDP_SOURCE_SSDP_CHANGE_MAPPING: Mapping[SsdpSource, SsdpChange] = {
SsdpSource.SEARCH_ALIVE: SsdpChange.ALIVE,
Expand All @@ -135,18 +139,30 @@ class SsdpServiceInfo(BaseServiceInfo):
}


def _format_err(name: str, *args: Any) -> str:
"""Format error message."""
return f"Exception in SSDP callback {name}: {args}"


@bind_hass
async def async_register_callback(
hass: HomeAssistant,
callback: SsdpCallback,
callback: Callable[[SsdpServiceInfo, SsdpChange], Coroutine[Any, Any, None] | None],
match_dict: None | dict[str, str] = None,
) -> Callable[[], None]:
"""Register to receive a callback on ssdp broadcast.
Returns a callback that can be used to cancel the registration.
"""
scanner: Scanner = hass.data[DOMAIN][SSDP_SCANNER]
return await scanner.async_register_callback(callback, match_dict)
job = HassJob(
catch_log_exception(
callback,
partial(_format_err, str(callback)),
),
f"ssdp callback {match_dict}",
)
return await scanner.async_register_callback(job, match_dict)


@bind_hass
Expand Down Expand Up @@ -206,14 +222,18 @@ async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
return True


async def _async_process_callbacks(
callbacks: list[SsdpCallback],
@core_callback
def _async_process_callbacks(
hass: HomeAssistant,
callbacks: list[SsdpHassJobCallback],
discovery_info: SsdpServiceInfo,
ssdp_change: SsdpChange,
) -> None:
for callback in callbacks:
try:
await callback(discovery_info, ssdp_change)
hass.async_run_hass_job(
callback, discovery_info, ssdp_change, eager_start=True, background=True
)
except Exception: # pylint: disable=broad-except
_LOGGER.exception("Failed to callback info: %s", discovery_info)

Expand Down Expand Up @@ -287,7 +307,7 @@ def __init__(
self._cancel_scan: Callable[[], None] | None = None
self._ssdp_listeners: list[SsdpListener] = []
self._device_tracker = SsdpDeviceTracker()
self._callbacks: list[tuple[SsdpCallback, dict[str, str]]] = []
self._callbacks: list[tuple[SsdpHassJobCallback, dict[str, str]]] = []
self._description_cache: DescriptionCache | None = None
self.integration_matchers = integration_matchers

Expand All @@ -297,7 +317,7 @@ def _ssdp_devices(self) -> list[SsdpDevice]:
return list(self._device_tracker.devices.values())

async def async_register_callback(
self, callback: SsdpCallback, match_dict: None | dict[str, str] = None
self, callback: SsdpHassJobCallback, match_dict: None | dict[str, str] = None
) -> Callable[[], None]:
"""Register a callback."""
if match_dict is None:
Expand All @@ -310,7 +330,8 @@ async def async_register_callback(
for ssdp_device in self._ssdp_devices:
for headers in ssdp_device.all_combined_headers.values():
if _async_headers_match(headers, lower_match_dict):
await _async_process_callbacks(
_async_process_callbacks(
self.hass,
[callback],
await self._async_headers_to_discovery_info(
ssdp_device, headers
Expand Down Expand Up @@ -426,7 +447,7 @@ async def _async_start_ssdp_listeners(self) -> None:
def _async_get_matching_callbacks(
self,
combined_headers: CaseInsensitiveDict,
) -> list[SsdpCallback]:
) -> list[SsdpHassJobCallback]:
"""Return a list of callbacks that match."""
return [
callback
Expand All @@ -451,10 +472,11 @@ def _ssdp_listener_callback(
_, info_desc = self._description_cache.peek_description_dict(location)
if info_desc is None:
# Fetch info desc in separate task and process from there.
self.hass.async_create_task(
self.hass.async_create_background_task(
self._ssdp_listener_process_callback_with_lookup(
ssdp_device, dst, source
),
name=f"ssdp_info_desc_lookup_{location}",
eager_start=True,
)
return
Expand Down Expand Up @@ -509,10 +531,7 @@ def _ssdp_listener_process_callback(

if callbacks:
ssdp_change = SSDP_SOURCE_SSDP_CHANGE_MAPPING[source]
self.hass.async_create_task(
_async_process_callbacks(callbacks, discovery_info, ssdp_change),
eager_start=True,
)
_async_process_callbacks(self.hass, callbacks, discovery_info, ssdp_change)

# Config flows should only be created for alive/update messages from alive devices
if source == SsdpSource.ADVERTISEMENT_BYEBYE:
Expand Down
16 changes: 8 additions & 8 deletions tests/components/dlna_dmr/test_media_player.py
Original file line number Diff line number Diff line change
Expand Up @@ -1425,7 +1425,7 @@ async def test_become_available(
domain_data_mock.upnp_factory.async_create_device.reset_mock()

# Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -1498,7 +1498,7 @@ async def test_alive_but_gone(
domain_data_mock.upnp_factory.async_create_device.side_effect = UpnpError

# Send an SSDP notification from the still missing device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -1611,7 +1611,7 @@ async def create_device_delayed(_location):
)

# Send two SSDP notifications with the new device URL
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -1651,7 +1651,7 @@ async def test_ssdp_byebye(
) -> None:
"""Test device is disconnected when byebye is received."""
# First byebye will cause a disconnect
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -1703,7 +1703,7 @@ async def test_ssdp_update_seen_bootid(
domain_data_mock.upnp_factory.async_create_device.side_effect = None

# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -1830,7 +1830,7 @@ async def test_ssdp_update_missed_bootid(
domain_data_mock.upnp_factory.async_create_device.side_effect = None

# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -1907,7 +1907,7 @@ async def test_ssdp_bootid(
domain_data_mock.upnp_factory.async_create_device.side_effect = None

# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -2367,7 +2367,7 @@ async def test_connections_restored(
domain_data_mock.upnp_factory.async_create_device.reset_mock()

# Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down
14 changes: 7 additions & 7 deletions tests/components/dlna_dms/test_device_availability.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ async def test_become_available(
upnp_factory_mock.async_create_device.reset_mock()

# Send an SSDP notification from the now alive device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -205,7 +205,7 @@ async def test_alive_but_gone(
upnp_factory_mock.async_create_device.side_effect = UpnpError

# Send an SSDP notification from the still missing device
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -308,7 +308,7 @@ async def create_device_delayed(_location):
upnp_factory_mock.async_create_device.side_effect = create_device_delayed

# Send two SSDP notifications with the new device URL
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -343,7 +343,7 @@ async def test_ssdp_byebye(
) -> None:
"""Test device is disconnected when byebye is received."""
# First byebye will cause a disconnect
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -386,7 +386,7 @@ async def test_ssdp_update_seen_bootid(
upnp_factory_mock.async_create_device.side_effect = None

# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -498,7 +498,7 @@ async def test_ssdp_update_missed_bootid(
upnp_factory_mock.async_create_device.side_effect = None

# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down Expand Up @@ -568,7 +568,7 @@ async def test_ssdp_bootid(
upnp_factory_mock.async_create_device.reset_mock()

# Send SSDP alive with boot ID
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down
2 changes: 1 addition & 1 deletion tests/components/dlna_dms/test_dms_device_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async def test_catch_request_error_unavailable(
) -> None:
"""Test the device is checked for availability before trying requests."""
# DmsDevice notifies of disconnect via SSDP
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0]
ssdp_callback = ssdp_scanner_mock.async_register_callback.call_args.args[0].target
await ssdp_callback(
ssdp.SsdpServiceInfo(
ssdp_usn=MOCK_DEVICE_USN,
Expand Down
15 changes: 8 additions & 7 deletions tests/components/ssdp/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ async def test_flow_start_only_alive(
}
)
ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done()
await hass.async_block_till_done(wait_background_tasks=True)

mock_flow_init.assert_awaited_once_with(
"mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY
Expand Down Expand Up @@ -464,6 +464,7 @@ async def test_start_stop_scanner(mock_source_set, hass: HomeAssistant) -> None:


@pytest.mark.usefixtures("mock_get_source_ip")
@pytest.mark.no_fail_on_log_exception
@patch("homeassistant.components.ssdp.async_get_ssdp", return_value={})
async def test_scan_with_registered_callback(
mock_get_ssdp,
Expand Down Expand Up @@ -523,9 +524,9 @@ async def test_scan_with_registered_callback(
async_match_any_callback = AsyncMock()
await ssdp.async_register_callback(hass, async_match_any_callback)

await hass.async_block_till_done()
await hass.async_block_till_done(wait_background_tasks=True)
ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done()
await hass.async_block_till_done(wait_background_tasks=True)

assert async_integration_callback.call_count == 1
assert async_integration_match_all_callback.call_count == 1
Expand All @@ -549,7 +550,7 @@ async def test_scan_with_registered_callback(
ssdp.ATTR_UPNP_DEVICE_TYPE: "Paulus",
ssdp.ATTR_UPNP_UDN: "uuid:TIVRTLSR7ANF-D6E-1557809135086-RETAIL",
}
assert "Failed to callback info" in caplog.text
assert "Exception in SSDP callback" in caplog.text

async_integration_callback_from_cache = AsyncMock()
await ssdp.async_register_callback(
Expand Down Expand Up @@ -835,7 +836,7 @@ async def test_flow_dismiss_on_byebye(
}
)
ssdp_listener._on_search(mock_ssdp_search_response)
await hass.async_block_till_done()
await hass.async_block_till_done(wait_background_tasks=True)

mock_flow_init.assert_awaited_once_with(
"mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY
Expand All @@ -853,7 +854,7 @@ async def test_flow_dismiss_on_byebye(
}
)
ssdp_listener._on_alive(mock_ssdp_advertisement)
await hass.async_block_till_done()
await hass.async_block_till_done(wait_background_tasks=True)
mock_flow_init.assert_awaited_once_with(
"mock-domain", context={"source": config_entries.SOURCE_SSDP}, data=ANY
)
Expand All @@ -868,7 +869,7 @@ async def test_flow_dismiss_on_byebye(
hass.config_entries.flow, "async_abort"
) as mock_async_abort:
ssdp_listener._on_byebye(mock_ssdp_advertisement)
await hass.async_block_till_done()
await hass.async_block_till_done(wait_background_tasks=True)

assert len(mock_async_progress_by_init_data_type.mock_calls) == 1
assert mock_async_abort.mock_calls[0][1][0] == "mock_flow_id"

0 comments on commit 2b0b3c2

Please sign in to comment.