-
-
Notifications
You must be signed in to change notification settings - Fork 28.6k
/
data.py
126 lines (99 loc) · 4.9 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
"""Data used by this integration."""
from __future__ import annotations
import asyncio
from collections import defaultdict
from typing import NamedTuple, cast
from async_upnp_client.aiohttp import AiohttpNotifyServer, AiohttpSessionRequester
from async_upnp_client.client import UpnpRequester
from async_upnp_client.client_factory import UpnpFactory
from async_upnp_client.event_handler import UpnpEventHandler
from homeassistant.const import EVENT_HOMEASSISTANT_STOP
from homeassistant.core import CALLBACK_TYPE, Event, HomeAssistant
from homeassistant.helpers import aiohttp_client
from .const import DOMAIN, LOGGER
class EventListenAddr(NamedTuple):
"""Unique identifier for an event listener."""
host: str | None # Specific local IP(v6) address for listening on
port: int # Listening port, 0 means use an ephemeral port
callback_url: str | None
class DlnaDmrData:
"""Storage class for domain global data."""
lock: asyncio.Lock
requester: UpnpRequester
upnp_factory: UpnpFactory
event_notifiers: dict[EventListenAddr, AiohttpNotifyServer]
event_notifier_refs: defaultdict[EventListenAddr, int]
stop_listener_remove: CALLBACK_TYPE | None = None
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize global data."""
self.lock = asyncio.Lock()
session = aiohttp_client.async_get_clientsession(hass, verify_ssl=False)
self.requester = AiohttpSessionRequester(session, with_sleep=True)
self.upnp_factory = UpnpFactory(self.requester, non_strict=True)
self.event_notifiers = {}
self.event_notifier_refs = defaultdict(int)
async def async_cleanup_event_notifiers(self, event: Event) -> None:
"""Clean up resources when Home Assistant is stopped."""
LOGGER.debug("Cleaning resources in DlnaDmrData")
async with self.lock:
tasks = (
server.async_stop_server() for server in self.event_notifiers.values()
)
asyncio.gather(*tasks)
self.event_notifiers = {}
self.event_notifier_refs = defaultdict(int)
async def async_get_event_notifier(
self, listen_addr: EventListenAddr, hass: HomeAssistant
) -> UpnpEventHandler:
"""Return existing event notifier for the listen_addr, or create one.
Only one event notify server is kept for each listen_addr. Must call
async_release_event_notifier when done to cleanup resources.
"""
LOGGER.debug("Getting event handler for %s", listen_addr)
async with self.lock:
# Stop all servers when HA shuts down, to release resources on devices
if not self.stop_listener_remove:
self.stop_listener_remove = hass.bus.async_listen_once(
EVENT_HOMEASSISTANT_STOP, self.async_cleanup_event_notifiers
)
# Always increment the reference counter, for existing or new event handlers
self.event_notifier_refs[listen_addr] += 1
# Return an existing event handler if we can
if listen_addr in self.event_notifiers:
return self.event_notifiers[listen_addr].event_handler
# Start event handler
source = (listen_addr.host or "0.0.0.0", listen_addr.port)
server = AiohttpNotifyServer(
requester=self.requester,
source=source,
callback_url=listen_addr.callback_url,
loop=hass.loop,
)
await server.async_start_server()
LOGGER.debug("Started event handler at %s", server.callback_url)
self.event_notifiers[listen_addr] = server
return server.event_handler
async def async_release_event_notifier(self, listen_addr: EventListenAddr) -> None:
"""Indicate that the event notifier for listen_addr is not used anymore.
This is called once by each caller of async_get_event_notifier, and will
stop the listening server when all users are done.
"""
async with self.lock:
assert self.event_notifier_refs[listen_addr] > 0
self.event_notifier_refs[listen_addr] -= 1
# Shutdown the server when it has no more users
if self.event_notifier_refs[listen_addr] == 0:
server = self.event_notifiers.pop(listen_addr)
await server.async_stop_server()
# Remove the cleanup listener when there's nothing left to cleanup
if not self.event_notifiers:
assert self.stop_listener_remove is not None
self.stop_listener_remove()
self.stop_listener_remove = None
def get_domain_data(hass: HomeAssistant) -> DlnaDmrData:
"""Obtain this integration's domain data, creating it if needed."""
if DOMAIN in hass.data:
return cast(DlnaDmrData, hass.data[DOMAIN])
data = DlnaDmrData(hass)
hass.data[DOMAIN] = data
return data