-
-
Notifications
You must be signed in to change notification settings - Fork 28.6k
/
sensor.py
141 lines (107 loc) · 4.24 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
"""Support for Streamlabs Water Monitor Usage."""
from __future__ import annotations
from datetime import timedelta
from homeassistant.components.sensor import SensorDeviceClass, SensorEntity
from homeassistant.const import VOLUME_GALLONS
from homeassistant.core import HomeAssistant
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.typing import ConfigType, DiscoveryInfoType
from homeassistant.util import Throttle
from . import DOMAIN as STREAMLABSWATER_DOMAIN
DEPENDENCIES = ["streamlabswater"]
WATER_ICON = "mdi:water"
MIN_TIME_BETWEEN_USAGE_UPDATES = timedelta(seconds=60)
NAME_DAILY_USAGE = "Daily Water"
NAME_MONTHLY_USAGE = "Monthly Water"
NAME_YEARLY_USAGE = "Yearly Water"
def setup_platform(
hass: HomeAssistant,
config: ConfigType,
add_devices: AddEntitiesCallback,
discovery_info: DiscoveryInfoType | None = None,
) -> None:
"""Set up water usage sensors."""
client = hass.data[STREAMLABSWATER_DOMAIN]["client"]
location_id = hass.data[STREAMLABSWATER_DOMAIN]["location_id"]
location_name = hass.data[STREAMLABSWATER_DOMAIN]["location_name"]
streamlabs_usage_data = StreamlabsUsageData(location_id, client)
streamlabs_usage_data.update()
add_devices(
[
StreamLabsDailyUsage(location_name, streamlabs_usage_data),
StreamLabsMonthlyUsage(location_name, streamlabs_usage_data),
StreamLabsYearlyUsage(location_name, streamlabs_usage_data),
]
)
class StreamlabsUsageData:
"""Track and query usage data."""
def __init__(self, location_id, client):
"""Initialize the usage data."""
self._location_id = location_id
self._client = client
self._today = None
self._this_month = None
self._this_year = None
@Throttle(MIN_TIME_BETWEEN_USAGE_UPDATES)
def update(self):
"""Query and store usage data."""
water_usage = self._client.get_water_usage_summary(self._location_id)
self._today = round(water_usage["today"], 1)
self._this_month = round(water_usage["thisMonth"], 1)
self._this_year = round(water_usage["thisYear"], 1)
def get_daily_usage(self):
"""Return the day's usage."""
return self._today
def get_monthly_usage(self):
"""Return the month's usage."""
return self._this_month
def get_yearly_usage(self):
"""Return the year's usage."""
return self._this_year
class StreamLabsDailyUsage(SensorEntity):
"""Monitors the daily water usage."""
_attr_device_class = SensorDeviceClass.VOLUME
def __init__(self, location_name, streamlabs_usage_data):
"""Initialize the daily water usage device."""
self._location_name = location_name
self._streamlabs_usage_data = streamlabs_usage_data
self._state = None
@property
def name(self):
"""Return the name for daily usage."""
return f"{self._location_name} {NAME_DAILY_USAGE}"
@property
def icon(self):
"""Return the daily usage icon."""
return WATER_ICON
@property
def native_value(self):
"""Return the current daily usage."""
return self._streamlabs_usage_data.get_daily_usage()
@property
def native_unit_of_measurement(self):
"""Return gallons as the unit measurement for water."""
return VOLUME_GALLONS
def update(self) -> None:
"""Retrieve the latest daily usage."""
self._streamlabs_usage_data.update()
class StreamLabsMonthlyUsage(StreamLabsDailyUsage):
"""Monitors the monthly water usage."""
@property
def name(self):
"""Return the name for monthly usage."""
return f"{self._location_name} {NAME_MONTHLY_USAGE}"
@property
def native_value(self):
"""Return the current monthly usage."""
return self._streamlabs_usage_data.get_monthly_usage()
class StreamLabsYearlyUsage(StreamLabsDailyUsage):
"""Monitors the yearly water usage."""
@property
def name(self):
"""Return the name for yearly usage."""
return f"{self._location_name} {NAME_YEARLY_USAGE}"
@property
def native_value(self):
"""Return the current yearly usage."""
return self._streamlabs_usage_data.get_yearly_usage()