/
coordinator.py
197 lines (167 loc) · 7.23 KB
/
coordinator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""Helpers to help coordinate updates."""
from datetime import timedelta
import json
import logging
from typing import Any, Dict, List, Optional, Union
from aiohttp import ServerDisconnectedError
from homeassistant.core import HomeAssistant
from homeassistant.helpers import device_registry
from homeassistant.helpers.update_coordinator import DataUpdateCoordinator, UpdateFailed
from pyoverkiz.client import OverkizClient
from pyoverkiz.enums import EventName, ExecutionState
from pyoverkiz.exceptions import (
BadCredentialsException,
MaintenanceException,
NotAuthenticatedException,
TooManyRequestsException,
)
from pyoverkiz.models import DataType, Device, Place, State
from .const import DOMAIN, UPDATE_INTERVAL
TYPES = {
DataType.NONE: None,
DataType.INTEGER: int,
DataType.DATE: int,
DataType.STRING: str,
DataType.FLOAT: float,
DataType.BOOLEAN: bool,
DataType.JSON_ARRAY: json.loads,
DataType.JSON_OBJECT: json.loads,
}
_LOGGER = logging.getLogger(__name__)
class OverkizDataUpdateCoordinator(DataUpdateCoordinator):
"""Class to manage fetching data from Overkiz platform."""
def __init__(
self,
hass: HomeAssistant,
logger: logging.Logger,
*,
name: str,
client: OverkizClient,
devices: List[Device],
places: Place,
update_interval: Optional[timedelta] = None,
config_entry_id: str,
):
"""Initialize global data updater."""
super().__init__(
hass,
logger,
name=name,
update_interval=update_interval,
)
self.data = {}
self.client = client
self.devices: Dict[str, Device] = {d.device_url: d for d in devices}
self.is_stateless = all(
device.device_url.startswith("rts://")
or device.device_url.startswith("internal://")
for device in devices
)
self.executions: Dict[str, Dict[str, str]] = {}
self.areas = self.places_to_area(places)
self._config_entry_id = config_entry_id
async def _async_update_data(self) -> Dict[str, Device]:
"""Fetch Overkiz data via event listener."""
try:
events = await self.client.fetch_events()
except BadCredentialsException as exception:
# Keep retrying until Somfy fixes their servers (https://github.com/iMicknl/ha-tahoma/issues/599)
raise UpdateFailed("Invalid authentication.") from exception
# raise ConfigEntryAuthFailed() from exception
except TooManyRequestsException as exception:
raise UpdateFailed("Too many requests, try again later.") from exception
except MaintenanceException as exception:
raise UpdateFailed("Server is down for maintenance.") from exception
except TimeoutError as exception:
raise UpdateFailed("Failed to connect.") from exception
except (ServerDisconnectedError, NotAuthenticatedException):
self.executions = {}
# During the relogin, similar exceptions can be thrown.
try:
await self.client.login()
self.devices = await self._get_devices()
except BadCredentialsException as exception:
# Keep retrying until Somfy fixes their servers (https://github.com/iMicknl/ha-tahoma/issues/599)
raise UpdateFailed("Invalid authentication.") from exception
# raise ConfigEntryAuthFailed() from exception
except TooManyRequestsException as exception:
raise UpdateFailed("Too many requests, try again later.") from exception
return self.devices
except Exception as exception:
_LOGGER.debug(exception)
raise UpdateFailed(exception) from exception
for event in events:
_LOGGER.debug(event)
if event.name == EventName.DEVICE_AVAILABLE:
self.devices[event.device_url].available = True
elif event.name in [
EventName.DEVICE_UNAVAILABLE,
EventName.DEVICE_DISABLED,
]:
self.devices[event.device_url].available = False
elif event.name in [
EventName.DEVICE_CREATED,
EventName.DEVICE_UPDATED,
]:
self.hass.async_create_task(
self.hass.config_entries.async_reload(self._config_entry_id)
)
return None
elif event.name == EventName.DEVICE_REMOVED:
base_device_url, *_ = event.device_url.split("#")
registry = await device_registry.async_get_registry(self.hass)
if device := registry.async_get_device({(DOMAIN, base_device_url)}):
registry.async_remove_device(device.id)
del self.devices[event.device_url]
elif event.name == EventName.DEVICE_STATE_CHANGED:
for state in event.device_states:
device = self.devices[event.device_url]
if state.name not in device.states:
device.states[state.name] = state
device.states[state.name].value = self._get_state(state)
elif event.name == EventName.EXECUTION_REGISTERED:
if event.exec_id not in self.executions:
self.executions[event.exec_id] = {}
if not self.is_stateless:
self.update_interval = timedelta(seconds=1)
elif (
event.name == EventName.EXECUTION_STATE_CHANGED
and event.exec_id in self.executions
and event.new_state in [ExecutionState.COMPLETED, ExecutionState.FAILED]
):
del self.executions[event.exec_id]
# Log errors via `overkiz_event`
if event.failure_type_code:
self.hass.bus.fire(
"overkiz.event",
{
"event_name": event.name.value,
"failure_type_code": event.failure_type_code.value,
"failure_type": event.failure_type,
},
)
if not self.executions:
self.update_interval = UPDATE_INTERVAL
return self.devices
async def _get_devices(self) -> Dict[str, Device]:
"""Fetch devices."""
_LOGGER.debug("Fetching all devices and state via /setup/devices")
return {d.device_url: d for d in await self.client.get_devices(refresh=True)}
@staticmethod
def _get_state(
state: State,
) -> Union[Dict[Any, Any], List[Any], float, int, bool, str, None]:
"""Cast string value to the right type."""
if state.type != DataType.NONE:
caster = TYPES.get(DataType(state.type))
return caster(state.value)
return state.value
def places_to_area(self, place):
"""Convert places with sub_places to a flat dictionary."""
areas = {}
if isinstance(place, Place):
areas[place.oid] = place.label
if isinstance(place.sub_places, list):
for sub_place in place.sub_places:
areas.update(self.places_to_area(sub_place))
return areas