-
-
Notifications
You must be signed in to change notification settings - Fork 28.6k
/
helpers.py
743 lines (602 loc) · 23.7 KB
/
helpers.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
"""Helper classes for Google Assistant integration."""
from __future__ import annotations
from abc import ABC, abstractmethod
from asyncio import gather
from collections.abc import Callable, Mapping
from datetime import datetime, timedelta
from http import HTTPStatus
import logging
import pprint
from aiohttp.web import json_response
from awesomeversion import AwesomeVersion
from yarl import URL
from homeassistant.components import webhook
from homeassistant.const import (
ATTR_DEVICE_CLASS,
ATTR_SUPPORTED_FEATURES,
CLOUD_NEVER_EXPOSED_ENTITIES,
CONF_NAME,
STATE_UNAVAILABLE,
)
from homeassistant.core import CALLBACK_TYPE, Context, HomeAssistant, State, callback
from homeassistant.helpers import (
area_registry as ar,
device_registry as dr,
entity_registry as er,
start,
)
from homeassistant.helpers.event import async_call_later
from homeassistant.helpers.network import get_url
from homeassistant.helpers.storage import Store
from homeassistant.util.dt import utcnow
from . import trait
from .const import (
CONF_ALIASES,
CONF_ROOM_HINT,
DEVICE_CLASS_TO_GOOGLE_TYPES,
DOMAIN,
DOMAIN_TO_GOOGLE_TYPES,
ERR_FUNCTION_NOT_SUPPORTED,
NOT_EXPOSE_LOCAL,
SOURCE_LOCAL,
STORE_AGENT_USER_IDS,
STORE_GOOGLE_LOCAL_WEBHOOK_ID,
)
from .error import SmartHomeError
SYNC_DELAY = 15
_LOGGER = logging.getLogger(__name__)
LOCAL_SDK_VERSION_HEADER = "HA-Cloud-Version"
LOCAL_SDK_MIN_VERSION = AwesomeVersion("2.1.5")
@callback
def _get_registry_entries(
hass: HomeAssistant, entity_id: str
) -> tuple[er.RegistryEntry | None, dr.DeviceEntry | None, ar.AreaEntry | None,]:
"""Get registry entries."""
ent_reg = er.async_get(hass)
dev_reg = dr.async_get(hass)
area_reg = ar.async_get(hass)
if (entity_entry := ent_reg.async_get(entity_id)) and entity_entry.device_id:
device_entry = dev_reg.devices.get(entity_entry.device_id)
else:
device_entry = None
if entity_entry and entity_entry.area_id:
area_id = entity_entry.area_id
elif device_entry and device_entry.area_id:
area_id = device_entry.area_id
else:
area_id = None
if area_id is not None:
area_entry = area_reg.async_get_area(area_id)
else:
area_entry = None
return entity_entry, device_entry, area_entry
class AbstractConfig(ABC):
"""Hold the configuration for Google Assistant."""
_store: GoogleConfigStore
_unsub_report_state: Callable[[], None] | None = None
def __init__(self, hass: HomeAssistant) -> None:
"""Initialize abstract config."""
self.hass = hass
self._google_sync_unsub: dict[str, CALLBACK_TYPE] = {}
self._local_sdk_active = False
self._local_last_active: datetime | None = None
self._local_sdk_version_warn = False
self.is_supported_cache: dict[str, tuple[int | None, bool]] = {}
async def async_initialize(self) -> None:
"""Perform async initialization of config."""
self._store = GoogleConfigStore(self.hass)
await self._store.async_initialize()
if not self.enabled:
return
async def sync_google(_):
"""Sync entities to Google."""
await self.async_sync_entities_all()
start.async_at_start(self.hass, sync_google)
@property
def enabled(self):
"""Return if Google is enabled."""
return False
@property
def entity_config(self):
"""Return entity config."""
return {}
@property
def secure_devices_pin(self):
"""Return entity config."""
return None
@property
def is_reporting_state(self):
"""Return if we're actively reporting states."""
return self._unsub_report_state is not None
@property
def is_local_sdk_active(self):
"""Return if we're actively accepting local messages."""
return self._local_sdk_active
@property
def should_report_state(self):
"""Return if states should be proactively reported."""
return False
@property
def is_local_connected(self) -> bool:
"""Return if local is connected."""
return (
self._local_last_active is not None
# We get a reachable devices intent every minute.
and self._local_last_active > utcnow() - timedelta(seconds=70)
)
def get_local_agent_user_id(self, webhook_id):
"""Return the user ID to be used for actions received via the local SDK.
Return None is no agent user id is found.
"""
found_agent_user_id = None
for agent_user_id, agent_user_data in self._store.agent_user_ids.items():
if agent_user_data[STORE_GOOGLE_LOCAL_WEBHOOK_ID] == webhook_id:
found_agent_user_id = agent_user_id
break
return found_agent_user_id
def get_local_webhook_id(self, agent_user_id):
"""Return the webhook ID to be used for actions for a given agent user id via the local SDK."""
if data := self._store.agent_user_ids.get(agent_user_id):
return data[STORE_GOOGLE_LOCAL_WEBHOOK_ID]
return None
@abstractmethod
def get_agent_user_id(self, context):
"""Get agent user ID from context."""
@abstractmethod
def should_expose(self, state) -> bool:
"""Return if entity should be exposed."""
def should_2fa(self, state):
"""If an entity should have 2FA checked."""
return True
async def async_report_state(self, message, agent_user_id: str):
"""Send a state report to Google."""
raise NotImplementedError
async def async_report_state_all(self, message):
"""Send a state report to Google for all previously synced users."""
jobs = [
self.async_report_state(message, agent_user_id)
for agent_user_id in self._store.agent_user_ids
]
await gather(*jobs)
@callback
def async_enable_report_state(self) -> None:
"""Enable proactive mode."""
# Circular dep
# pylint: disable-next=import-outside-toplevel
from .report_state import async_enable_report_state
if self._unsub_report_state is None:
self._unsub_report_state = async_enable_report_state(self.hass, self)
@callback
def async_disable_report_state(self) -> None:
"""Disable report state."""
if self._unsub_report_state is not None:
self._unsub_report_state()
self._unsub_report_state = None
async def async_sync_entities(self, agent_user_id: str):
"""Sync all entities to Google."""
# Remove any pending sync
self._google_sync_unsub.pop(agent_user_id, lambda: None)()
status = await self._async_request_sync_devices(agent_user_id)
if status == HTTPStatus.NOT_FOUND:
await self.async_disconnect_agent_user(agent_user_id)
return status
async def async_sync_entities_all(self) -> int:
"""Sync all entities to Google for all registered agents."""
if not self._store.agent_user_ids:
return 204
res = await gather(
*(
self.async_sync_entities(agent_user_id)
for agent_user_id in self._store.agent_user_ids
)
)
return max(res, default=204)
@callback
def async_schedule_google_sync(self, agent_user_id: str):
"""Schedule a sync."""
async def _schedule_callback(_now):
"""Handle a scheduled sync callback."""
self._google_sync_unsub.pop(agent_user_id, None)
await self.async_sync_entities(agent_user_id)
self._google_sync_unsub.pop(agent_user_id, lambda: None)()
self._google_sync_unsub[agent_user_id] = async_call_later(
self.hass, SYNC_DELAY, _schedule_callback
)
@callback
def async_schedule_google_sync_all(self) -> None:
"""Schedule a sync for all registered agents."""
for agent_user_id in self._store.agent_user_ids:
self.async_schedule_google_sync(agent_user_id)
async def _async_request_sync_devices(self, agent_user_id: str) -> int:
"""Trigger a sync with Google.
Return value is the HTTP status code of the sync request.
"""
raise NotImplementedError
async def async_connect_agent_user(self, agent_user_id: str):
"""Add a synced and known agent_user_id.
Called before sending a sync response to Google.
"""
self._store.add_agent_user_id(agent_user_id)
async def async_disconnect_agent_user(self, agent_user_id: str):
"""Turn off report state and disable further state reporting.
Called when:
- The user disconnects their account from Google.
- When the cloud configuration is initialized
- When sync entities fails with 404
"""
self._store.pop_agent_user_id(agent_user_id)
@callback
def async_enable_local_sdk(self) -> None:
"""Enable the local SDK."""
setup_successful = True
setup_webhook_ids = []
# Don't enable local SDK if ssl is enabled
if self.hass.config.api and self.hass.config.api.use_ssl:
self._local_sdk_active = False
return
for user_agent_id, _ in self._store.agent_user_ids.items():
if (webhook_id := self.get_local_webhook_id(user_agent_id)) is None:
setup_successful = False
break
try:
webhook.async_register(
self.hass,
DOMAIN,
"Local Support for " + user_agent_id,
webhook_id,
self._handle_local_webhook,
local_only=True,
)
setup_webhook_ids.append(webhook_id)
except ValueError:
_LOGGER.warning(
"Webhook handler %s for agent user id %s is already defined!",
webhook_id,
user_agent_id,
)
setup_successful = False
break
if not setup_successful:
_LOGGER.warning(
"Local fulfillment failed to setup, falling back to cloud fulfillment"
)
for setup_webhook_id in setup_webhook_ids:
webhook.async_unregister(self.hass, setup_webhook_id)
self._local_sdk_active = setup_successful
@callback
def async_disable_local_sdk(self) -> None:
"""Disable the local SDK."""
if not self._local_sdk_active:
return
for agent_user_id in self._store.agent_user_ids:
webhook.async_unregister(
self.hass, self.get_local_webhook_id(agent_user_id)
)
self._local_sdk_active = False
async def _handle_local_webhook(self, hass, webhook_id, request):
"""Handle an incoming local SDK message."""
# Circular dep
# pylint: disable-next=import-outside-toplevel
from . import smart_home
self._local_last_active = utcnow()
# Check version local SDK.
version = request.headers.get("HA-Cloud-Version")
if not self._local_sdk_version_warn and (
not version or AwesomeVersion(version) < LOCAL_SDK_MIN_VERSION
):
_LOGGER.warning(
(
"Local SDK version is too old (%s), check documentation on how to"
" update to the latest version"
),
version,
)
self._local_sdk_version_warn = True
payload = await request.json()
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug(
"Received local message from %s (JS %s):\n%s\n",
request.remote,
request.headers.get("HA-Cloud-Version", "unknown"),
pprint.pformat(payload),
)
if (agent_user_id := self.get_local_agent_user_id(webhook_id)) is None:
# No agent user linked to this webhook, means that the user has somehow unregistered
# removing webhook and stopping processing of this request.
_LOGGER.error(
(
"Cannot process request for webhook %s as no linked agent user is"
" found:\n%s\n"
),
webhook_id,
pprint.pformat(payload),
)
webhook.async_unregister(self.hass, webhook_id)
return None
if not self.enabled:
return json_response(
smart_home.api_disabled_response(payload, agent_user_id)
)
result = await smart_home.async_handle_message(
self.hass,
self,
agent_user_id,
payload,
SOURCE_LOCAL,
)
if _LOGGER.isEnabledFor(logging.DEBUG):
_LOGGER.debug("Responding to local message:\n%s\n", pprint.pformat(result))
return json_response(result)
class GoogleConfigStore:
"""A configuration store for google assistant."""
_STORAGE_VERSION = 1
_STORAGE_KEY = DOMAIN
def __init__(self, hass):
"""Initialize a configuration store."""
self._hass = hass
self._store = Store(hass, self._STORAGE_VERSION, self._STORAGE_KEY)
self._data = None
async def async_initialize(self):
"""Finish initializing the ConfigStore."""
should_save_data = False
if (data := await self._store.async_load()) is None:
# if the store is not found create an empty one
# Note that the first request is always a cloud request,
# and that will store the correct agent user id to be used for local requests
data = {
STORE_AGENT_USER_IDS: {},
}
should_save_data = True
for agent_user_id, agent_user_data in data[STORE_AGENT_USER_IDS].items():
if STORE_GOOGLE_LOCAL_WEBHOOK_ID not in agent_user_data:
data[STORE_AGENT_USER_IDS][agent_user_id] = {
**agent_user_data,
STORE_GOOGLE_LOCAL_WEBHOOK_ID: webhook.async_generate_id(),
}
should_save_data = True
if should_save_data:
await self._store.async_save(data)
self._data = data
@property
def agent_user_ids(self):
"""Return a list of connected agent user_ids."""
return self._data[STORE_AGENT_USER_IDS]
@callback
def add_agent_user_id(self, agent_user_id):
"""Add an agent user id to store."""
if agent_user_id not in self._data[STORE_AGENT_USER_IDS]:
self._data[STORE_AGENT_USER_IDS][agent_user_id] = {
STORE_GOOGLE_LOCAL_WEBHOOK_ID: webhook.async_generate_id(),
}
self._store.async_delay_save(lambda: self._data, 1.0)
@callback
def pop_agent_user_id(self, agent_user_id):
"""Remove agent user id from store."""
if agent_user_id in self._data[STORE_AGENT_USER_IDS]:
self._data[STORE_AGENT_USER_IDS].pop(agent_user_id, None)
self._store.async_delay_save(lambda: self._data, 1.0)
class RequestData:
"""Hold data associated with a particular request."""
def __init__(
self,
config: AbstractConfig,
user_id: str,
source: str,
request_id: str,
devices: list[dict] | None,
) -> None:
"""Initialize the request data."""
self.config = config
self.source = source
self.request_id = request_id
self.context = Context(user_id=user_id)
self.devices = devices
@property
def is_local_request(self):
"""Return if this is a local request."""
return self.source == SOURCE_LOCAL
def get_google_type(domain, device_class):
"""Google type based on domain and device class."""
typ = DEVICE_CLASS_TO_GOOGLE_TYPES.get((domain, device_class))
return typ if typ is not None else DOMAIN_TO_GOOGLE_TYPES[domain]
class GoogleEntity:
"""Adaptation of Entity expressed in Google's terms."""
def __init__(
self, hass: HomeAssistant, config: AbstractConfig, state: State
) -> None:
"""Initialize a Google entity."""
self.hass = hass
self.config = config
self.state = state
self._traits: list[trait._Trait] | None = None
@property
def entity_id(self):
"""Return entity ID."""
return self.state.entity_id
@callback
def traits(self) -> list[trait._Trait]:
"""Return traits for entity."""
if self._traits is not None:
return self._traits
state = self.state
domain = state.domain
attributes = state.attributes
features = attributes.get(ATTR_SUPPORTED_FEATURES, 0)
if not isinstance(features, int):
_LOGGER.warning(
"Entity %s contains invalid supported_features value %s",
self.entity_id,
features,
)
return []
device_class = state.attributes.get(ATTR_DEVICE_CLASS)
self._traits = [
Trait(self.hass, state, self.config)
for Trait in trait.TRAITS
if Trait.supported(domain, features, device_class, attributes)
]
return self._traits
@callback
def should_expose(self):
"""If entity should be exposed."""
return self.config.should_expose(self.state)
@callback
def should_expose_local(self) -> bool:
"""Return if the entity should be exposed locally."""
return (
self.should_expose()
and get_google_type(
self.state.domain, self.state.attributes.get(ATTR_DEVICE_CLASS)
)
not in NOT_EXPOSE_LOCAL
and not self.might_2fa()
)
@callback
def is_supported(self) -> bool:
"""Return if the entity is supported by Google."""
features: int | None = self.state.attributes.get(ATTR_SUPPORTED_FEATURES)
result = self.config.is_supported_cache.get(self.entity_id)
if result is None or result[0] != features:
result = self.config.is_supported_cache[self.entity_id] = (
features,
bool(self.traits()),
)
return result[1]
@callback
def might_2fa(self) -> bool:
"""Return if the entity might encounter 2FA."""
if not self.config.should_2fa(self.state):
return False
return self.might_2fa_traits()
@callback
def might_2fa_traits(self) -> bool:
"""Return if the entity might encounter 2FA based on just traits."""
state = self.state
domain = state.domain
features = state.attributes.get(ATTR_SUPPORTED_FEATURES, 0)
device_class = state.attributes.get(ATTR_DEVICE_CLASS)
return any(
trait.might_2fa(domain, features, device_class) for trait in self.traits()
)
def sync_serialize(self, agent_user_id, instance_uuid):
"""Serialize entity for a SYNC response.
https://developers.google.com/actions/smarthome/create-app#actiondevicessync
"""
state = self.state
traits = self.traits()
entity_config = self.config.entity_config.get(state.entity_id, {})
name = (entity_config.get(CONF_NAME) or state.name).strip()
# Find entity/device/area registry entries
entity_entry, device_entry, area_entry = _get_registry_entries(
self.hass, self.entity_id
)
# Build the device info
device = {
"id": state.entity_id,
"name": {"name": name},
"attributes": {},
"traits": [trait.name for trait in traits],
"willReportState": self.config.should_report_state,
"type": get_google_type(
state.domain, state.attributes.get(ATTR_DEVICE_CLASS)
),
}
# Add aliases
if (config_aliases := entity_config.get(CONF_ALIASES, [])) or (
entity_entry and entity_entry.aliases
):
device["name"]["nicknames"] = [name] + config_aliases
if entity_entry:
device["name"]["nicknames"].extend(entity_entry.aliases)
# Add local SDK info if enabled
if self.config.is_local_sdk_active and self.should_expose_local():
device["otherDeviceIds"] = [{"deviceId": self.entity_id}]
device["customData"] = {
"webhookId": self.config.get_local_webhook_id(agent_user_id),
"httpPort": URL(get_url(self.hass, allow_external=False)).port,
"uuid": instance_uuid,
}
# Add trait sync attributes
for trt in traits:
device["attributes"].update(trt.sync_attributes())
# Add roomhint
if room := entity_config.get(CONF_ROOM_HINT):
device["roomHint"] = room
elif area_entry and area_entry.name:
device["roomHint"] = area_entry.name
# Add deviceInfo
if not device_entry:
return device
device_info = {}
if device_entry.manufacturer:
device_info["manufacturer"] = device_entry.manufacturer
if device_entry.model:
device_info["model"] = device_entry.model
if device_entry.sw_version:
device_info["swVersion"] = device_entry.sw_version
if device_info:
device["deviceInfo"] = device_info
return device
@callback
def query_serialize(self):
"""Serialize entity for a QUERY response.
https://developers.google.com/actions/smarthome/create-app#actiondevicesquery
"""
state = self.state
if state.state == STATE_UNAVAILABLE:
return {"online": False}
attrs = {"online": True}
for trt in self.traits():
deep_update(attrs, trt.query_attributes())
return attrs
@callback
def reachable_device_serialize(self):
"""Serialize entity for a REACHABLE_DEVICE response."""
return {"verificationId": self.entity_id}
async def execute(self, data, command_payload):
"""Execute a command.
https://developers.google.com/actions/smarthome/create-app#actiondevicesexecute
"""
command = command_payload["command"]
params = command_payload.get("params", {})
challenge = command_payload.get("challenge", {})
executed = False
for trt in self.traits():
if trt.can_execute(command, params):
await trt.execute(command, data, params, challenge)
executed = True
break
if not executed:
raise SmartHomeError(
ERR_FUNCTION_NOT_SUPPORTED,
f"Unable to execute {command} for {self.state.entity_id}",
)
@callback
def async_update(self):
"""Update the entity with latest info from Home Assistant."""
self.state = self.hass.states.get(self.entity_id)
if self._traits is None:
return
for trt in self._traits:
trt.state = self.state
def deep_update(target, source):
"""Update a nested dictionary with another nested dictionary."""
for key, value in source.items():
if isinstance(value, Mapping):
target[key] = deep_update(target.get(key, {}), value)
else:
target[key] = value
return target
@callback
def async_get_entities(
hass: HomeAssistant, config: AbstractConfig
) -> list[GoogleEntity]:
"""Return all entities that are supported by Google."""
entities = []
for state in hass.states.async_all():
if state.entity_id in CLOUD_NEVER_EXPOSED_ENTITIES:
continue
entity = GoogleEntity(hass, config, state)
if entity.is_supported():
entities.append(entity)
return entities