-
-
Notifications
You must be signed in to change notification settings - Fork 96
/
sensor.py
88 lines (73 loc) · 2.85 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
"""
Module for managing a sensor via KNX.
It provides functionality for
* reading the current state from KNX bus.
* watching for state updates from KNX bus.
"""
from .device import Device
from .remote_value_scaling import RemoteValueScaling
from .remote_value_sensor import RemoteValueSensor
class Sensor(Device):
"""Class for managing a sensor."""
def __init__(self,
xknx,
name,
group_address=None,
value_type=None,
device_updated_cb=None):
"""Initialize Sensor class."""
# pylint: disable=too-many-arguments
super(Sensor, self).__init__(xknx, name, device_updated_cb)
self.sensor_value = None
if value_type == "percent":
self.sensor_value = RemoteValueScaling(
xknx,
group_address_state=group_address,
device_name=self.name,
after_update_cb=self.after_update,
range_from=0,
range_to=100)
else:
self.sensor_value = RemoteValueSensor(
xknx,
group_address_state=group_address,
device_name=self.name,
after_update_cb=self.after_update,
value_type=value_type)
@classmethod
def from_config(cls, xknx, name, config):
"""Initialize object from configuration structure."""
group_address = \
config.get('group_address')
value_type = \
config.get('value_type')
return cls(xknx,
name,
group_address=group_address,
value_type=value_type)
def has_group_address(self, group_address):
"""Test if device has given group address."""
return self.sensor_value.has_group_address(group_address)
def state_addresses(self):
"""Return group addresses which should be requested to sync state."""
return self.sensor_value.state_addresses()
async def process_group_write(self, telegram):
"""Process incoming GROUP WRITE telegram."""
await self.sensor_value.process(telegram)
def unit_of_measurement(self):
"""Return the unit of measurement."""
return self.sensor_value.unit_of_measurement
def resolve_state(self):
"""Return the current state of the sensor as a human readable string."""
return self.sensor_value.value
def __str__(self):
"""Return object as readable string."""
return '<Sensor name="{0}" ' \
'sensor="{1}" value="{2}" unit="{3}"/>' \
.format(self.name,
self.sensor_value.group_addr_str(),
self.resolve_state(),
self.unit_of_measurement())
def __eq__(self, other):
"""Equal operator."""
return self.__dict__ == other.__dict__