/
devices.py
81 lines (64 loc) · 2.59 KB
/
devices.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
"""
Module for handling a vector/array of devices.
More or less an array with devices. Adds some search functionality to find devices.
"""
from typing import Awaitable, Callable
from xknx.telegram import Telegram
from .device import Device
class Devices:
"""Class for handling a vector/array of devices."""
def __init__(self) -> None:
"""Initialize Devices class."""
self.__devices = []
self.device_updated_cbs = []
def register_device_updated_cb(
self, device_updated_cb: Callable[[Device], Awaitable[None]]
) -> None:
"""Register callback for devices beeing updated."""
self.device_updated_cbs.append(device_updated_cb)
def unregister_device_updated_cb(self, device_updated_cb):
"""Unregister callback for devices beeing updated."""
self.device_updated_cbs.remove(device_updated_cb)
def __iter__(self):
"""Iterate registered devices."""
yield from self.__devices
def devices_by_group_address(self, group_address):
"""Return device(s) by group address."""
for device in self.__devices:
if device.has_group_address(group_address):
yield device
def __getitem__(self, key):
"""Return device by name or by index."""
for device in self.__devices:
if device.name == key:
return device
if isinstance(key, int):
return self.__devices[key]
raise KeyError
def __len__(self):
"""Return number of devices within vector."""
return len(self.__devices)
def __contains__(self, key):
"""Return if devices with name 'key' is within devices."""
for device in self.__devices:
if device.name == key:
return True
return False
def add(self, device):
"""Add device to devices vector."""
if not isinstance(device, Device):
raise TypeError()
device.register_device_updated_cb(self.device_updated)
self.__devices.append(device)
async def device_updated(self, device):
"""Call all registered device updated callbacks of device."""
for device_updated_cb in self.device_updated_cbs:
await device_updated_cb(device)
async def process(self, telegram: Telegram) -> None:
"""Process telegram."""
for device in self.devices_by_group_address(telegram.destination_address):
await device.process(telegram)
async def sync(self):
"""Read state of devices from KNX bus."""
for device in self.__devices:
await device.sync()