/
__init__.py
143 lines (117 loc) · 5.11 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""Component to embed TP-Link smart home devices."""
from __future__ import annotations
import asyncio
from datetime import timedelta
from typing import Any
from kasa import SmartDevice, SmartDeviceException
from kasa.discover import Discover
from homeassistant import config_entries
from homeassistant.components import network
from homeassistant.config_entries import ConfigEntry
from homeassistant.const import (
CONF_HOST,
CONF_MAC,
CONF_NAME,
EVENT_HOMEASSISTANT_STARTED,
)
from homeassistant.core import HomeAssistant, callback
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.helpers import (
config_validation as cv,
device_registry as dr,
discovery_flow,
)
from homeassistant.helpers.event import async_track_time_interval
from homeassistant.helpers.typing import ConfigType
from .const import DOMAIN, PLATFORMS
from .coordinator import TPLinkDataUpdateCoordinator
from .models import TPLinkData
DISCOVERY_INTERVAL = timedelta(minutes=15)
CONFIG_SCHEMA = cv.config_entry_only_config_schema(DOMAIN)
@callback
def async_trigger_discovery(
hass: HomeAssistant,
discovered_devices: dict[str, SmartDevice],
) -> None:
"""Trigger config flows for discovered devices."""
for formatted_mac, device in discovered_devices.items():
discovery_flow.async_create_flow(
hass,
DOMAIN,
context={"source": config_entries.SOURCE_INTEGRATION_DISCOVERY},
data={
CONF_NAME: device.alias,
CONF_HOST: device.host,
CONF_MAC: formatted_mac,
},
)
async def async_discover_devices(hass: HomeAssistant) -> dict[str, SmartDevice]:
"""Discover TPLink devices on configured network interfaces."""
broadcast_addresses = await network.async_get_ipv4_broadcast_addresses(hass)
tasks = [Discover.discover(target=str(address)) for address in broadcast_addresses]
discovered_devices: dict[str, SmartDevice] = {}
for device_list in await asyncio.gather(*tasks):
for device in device_list.values():
discovered_devices[dr.format_mac(device.mac)] = device
return discovered_devices
async def async_setup(hass: HomeAssistant, config: ConfigType) -> bool:
"""Set up the TP-Link component."""
hass.data[DOMAIN] = {}
if discovered_devices := await async_discover_devices(hass):
async_trigger_discovery(hass, discovered_devices)
async def _async_discovery(*_: Any) -> None:
if discovered := await async_discover_devices(hass):
async_trigger_discovery(hass, discovered)
hass.bus.async_listen_once(EVENT_HOMEASSISTANT_STARTED, _async_discovery)
async_track_time_interval(
hass, _async_discovery, DISCOVERY_INTERVAL, cancel_on_shutdown=True
)
return True
async def async_setup_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Set up TPLink from a config entry."""
host = entry.data[CONF_HOST]
try:
device: SmartDevice = await Discover.discover_single(host, timeout=10)
except SmartDeviceException as ex:
raise ConfigEntryNotReady from ex
found_mac = dr.format_mac(device.mac)
if found_mac != entry.unique_id:
# If the mac address of the device does not match the unique_id
# of the config entry, it likely means the DHCP lease has expired
# and the device has been assigned a new IP address. We need to
# wait for the next discovery to find the device at its new address
# and update the config entry so we do not mix up devices.
raise ConfigEntryNotReady(
f"Unexpected device found at {host}; expected {entry.unique_id}, found {found_mac}"
)
parent_coordinator = TPLinkDataUpdateCoordinator(hass, device, timedelta(seconds=5))
child_coordinators: list[TPLinkDataUpdateCoordinator] = []
if device.is_strip:
child_coordinators = [
# The child coordinators only update energy data so we can
# set a longer update interval to avoid flooding the device
TPLinkDataUpdateCoordinator(hass, child, timedelta(seconds=60))
for child in device.children
]
hass.data[DOMAIN][entry.entry_id] = TPLinkData(
parent_coordinator, child_coordinators
)
await hass.config_entries.async_forward_entry_setups(entry, PLATFORMS)
return True
async def async_unload_entry(hass: HomeAssistant, entry: ConfigEntry) -> bool:
"""Unload a config entry."""
hass_data: dict[str, Any] = hass.data[DOMAIN]
data: TPLinkData = hass_data[entry.entry_id]
device = data.parent_coordinator.device
if unload_ok := await hass.config_entries.async_unload_platforms(entry, PLATFORMS):
hass_data.pop(entry.entry_id)
await device.protocol.close()
return unload_ok
def legacy_device_id(device: SmartDevice) -> str:
"""Convert the device id so it matches what was used in the original version."""
device_id: str = device.device_id
# Plugs are prefixed with the mac in python-kasa but not
# in pyHS100 so we need to strip off the mac
if "_" not in device_id:
return device_id
return device_id.split("_")[1]