-
Notifications
You must be signed in to change notification settings - Fork 100
/
sensor.py
518 lines (443 loc) · 16.4 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
import logging
import math
from operator import itemgetter
from statistics import mean, median
import homeassistant.helpers.config_validation as cv
import voluptuous as vol
from homeassistant.components.sensor import PLATFORM_SCHEMA
from homeassistant.const import CONF_REGION
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.template import Template, attach
from homeassistant.util import dt as dt_utils
# Import sensor entity and classes.
from homeassistant.components.sensor import SensorDeviceClass, SensorStateClass, SensorEntity
from jinja2 import pass_context
from . import (
DOMAIN,
EVENT_NEW_DAY,
EVENT_NEW_PRICE,
EVENT_NEW_HOUR,
SENTINEL,
RANDOM_MINUTE,
RANDOM_SECOND,
)
from .misc import start_of, stock, round_decimal
_LOGGER = logging.getLogger(__name__)
_CENT_MULTIPLIER = 100
_PRICE_IN = {"kWh": 1000, "MWh": 1, "Wh": 1000 * 1000}
_REGIONS = {
"DK1": ["DKK", "Denmark", 0.25],
"DK2": ["DKK", "Denmark", 0.25],
"FI": ["EUR", "Finland", 0.24],
"EE": ["EUR", "Estonia", 0.22],
"LT": ["EUR", "Lithuania", 0.21],
"LV": ["EUR", "Latvia", 0.21],
"Oslo": ["NOK", "Norway", 0.25],
"Kr.sand": ["NOK", "Norway", 0.25],
"Bergen": ["NOK", "Norway", 0.25],
"Molde": ["NOK", "Norway", 0.25],
"Tr.heim": ["NOK", "Norway", 0.25],
"Tromsø": ["NOK", "Norway", 0.25],
"SE1": ["SEK", "Sweden", 0.25],
"SE2": ["SEK", "Sweden", 0.25],
"SE3": ["SEK", "Sweden", 0.25],
"SE4": ["SEK", "Sweden", 0.25],
# What zone is this?
"SYS": ["EUR", "System zone", 0.25],
"FR": ["EUR", "France", 0.055],
"NL": ["EUR", "Netherlands", 0.21],
"BE": ["EUR", "Belgium", 0.06],
"AT": ["EUR", "Austria", 0.20],
# Tax is disabled for now, i need to split the areas
# to handle the tax.
"DE-LU": ["EUR", "Germany and Luxembourg", 0],
}
# Needed incase a user wants the prices in non local currency
_CURRENCY_TO_LOCAL = {"DKK": "Kr", "NOK": "Kr", "SEK": "Kr", "EUR": "€"}
_CURRENTY_TO_CENTS = {"DKK": "Øre", "NOK": "Øre", "SEK": "Öre", "EUR": "c"}
DEFAULT_CURRENCY = "NOK"
DEFAULT_REGION = "Kr.sand"
DEFAULT_NAME = "Elspot"
DEFAULT_TEMPLATE = "{{0.0|float}}"
PLATFORM_SCHEMA = PLATFORM_SCHEMA.extend(
{
vol.Optional(CONF_REGION, default=DEFAULT_REGION): vol.In(
list(_REGIONS.keys())
),
vol.Optional("friendly_name", default=""): cv.string,
# This is only needed if you want the some area but want the prices in a non local currency
vol.Optional("currency", default=""): cv.string,
vol.Optional("VAT", default=True): cv.boolean,
vol.Optional("precision", default=3): cv.positive_int,
vol.Optional("low_price_cutoff", default=1.0): cv.small_float,
vol.Optional("price_type", default="kWh"): vol.In(list(_PRICE_IN.keys())),
vol.Optional("price_in_cents", default=False): cv.boolean,
vol.Optional("additional_costs", default=DEFAULT_TEMPLATE): cv.template,
}
)
def _dry_setup(hass, config, add_devices, discovery_info=None):
"""Setup the damn platform using yaml."""
_LOGGER.debug("Dumping config %r", config)
_LOGGER.debug("timezone set in ha %r", hass.config.time_zone)
region = config.get(CONF_REGION)
friendly_name = config.get("friendly_name", "")
price_type = config.get("price_type")
precision = config.get("precision")
low_price_cutoff = config.get("low_price_cutoff")
currency = config.get("currency")
vat = config.get("VAT")
use_cents = config.get("price_in_cents")
ad_template = config.get("additional_costs")
api = hass.data[DOMAIN]
sensor = NordpoolSensor(
friendly_name,
region,
price_type,
precision,
low_price_cutoff,
currency,
vat,
use_cents,
api,
ad_template,
hass,
)
add_devices([sensor])
async def async_setup_platform(hass, config, add_devices, discovery_info=None) -> None:
_dry_setup(hass, config, add_devices)
return True
async def async_setup_entry(hass, config_entry, async_add_devices):
"""Setup sensor platform for the ui"""
config = config_entry.data
_dry_setup(hass, config, async_add_devices)
return True
class NordpoolSensor(SensorEntity):
"Sensors data"
_attr_device_class = SensorDeviceClass.MONETARY
_attr_suggested_display_precision = None
_attr_state_class = SensorStateClass.TOTAL
def __init__(
self,
friendly_name,
area,
price_type,
precision,
low_price_cutoff,
currency,
vat,
use_cents,
api,
ad_template,
hass,
) -> None:
self._area = area
self._currency = currency or _REGIONS[area][0]
self._price_type = price_type
# Should be depricated in a future version
self._precision = precision
self._attr_suggested_display_precision = precision
self._low_price_cutoff = low_price_cutoff
self._use_cents = use_cents
self._api = api
self._ad_template = ad_template
self._hass = hass
self._attr_force_update = True
if vat is True:
self._vat = _REGIONS[area][2]
else:
self._vat = 0
# Price by current hour.
self._current_price = None
# Holds the data for today and morrow.
self._data_today = SENTINEL
self._data_tomorrow = SENTINEL
# Values for the day
self._average = None
self._max = None
self._min = None
self._mean = None
self._off_peak_1 = None
self._off_peak_2 = None
self._peak = None
self._additional_costs_value = None
_LOGGER.debug("Template %s", str(ad_template))
# Check incase the sensor was setup using config flow.
# This blow up if the template isnt valid.
if not isinstance(self._ad_template, Template):
if self._ad_template in (None, ""):
self._ad_template = DEFAULT_TEMPLATE
self._ad_template = cv.template(self._ad_template)
# check for yaml setup.
else:
if self._ad_template.template in ("", None):
self._ad_template = cv.template(DEFAULT_TEMPLATE)
attach(self._hass, self._ad_template)
# To control the updates.
self._last_tick = None
@property
def name(self) -> str:
return self.unique_id
@property
def should_poll(self):
"""No need to poll. Coordinator notifies entity of updates."""
return False
@property
def icon(self) -> str:
return "mdi:flash"
@property
def unit(self) -> str:
"""Unit"""
return self._price_type
@property
def unit_of_measurement(self) -> str: # FIXME
"""Return the unit of measurement this sensor expresses itself in."""
_currency = self._currency
if self._use_cents is True:
# Convert unit of measurement to cents based on chosen currency
_currency = _CURRENTY_TO_CENTS[_currency]
return "%s/%s" % (_currency, self._price_type)
@property
def unique_id(self):
name = "nordpool_%s_%s_%s_%s_%s_%s" % (
self._price_type,
self._area,
self._currency,
self._precision,
self._low_price_cutoff,
self._vat,
)
name = name.lower().replace(".", "")
return name
@property
def device_info(self):
return {
"identifiers": {(DOMAIN, self.unique_id)},
"name": self.name,
"manufacturer": DOMAIN,
}
@property
def additional_costs(self):
"""Additional costs."""
return self._additional_costs_value
@property
def low_price(self) -> bool:
"""Check if the price is lower then avg depending on settings"""
return (
self.current_price < self._average * self._low_price_cutoff
if isinstance(self.current_price, (int, float))
and isinstance(self._average, (float, int))
else None
)
@property
def price_percent_to_average(self) -> float:
"""Price in percent to average price"""
return (
self.current_price / self._average
if isinstance(self.current_price, (int, float))
and isinstance(self._average, (float, int))
else None
)
def _calc_price(self, value=None, fake_dt=None) -> float:
"""Calculate price based on the users settings."""
if value is None:
value = self._current_price
if value is None or math.isinf(value):
# _LOGGER.debug("api returned junk infinty %s", value)
return None
def faker():
def inner(*_, **__):
return fake_dt or dt_utils.now()
return pass_context(inner)
price = value / _PRICE_IN[self._price_type] * (float(1 + self._vat))
template_value = self._ad_template.async_render(
now=faker(), current_price=price
)
# Seems like the template is rendered as a string if the number is complex
# Just force it to be a float.
if not isinstance(template_value, (int, float)):
try:
template_value = float(template_value)
except (TypeError, ValueError):
_LOGGER.exception(
"Failed to convert %s %s to float",
template_value,
type(template_value),
)
raise
self._additional_costs_value = template_value
try:
price += template_value
except Exception:
_LOGGER.debug(
"price %s template value %s type %s dt %s current_price %s ",
price,
template_value,
type(template_value),
fake_dt,
self._current_price,
)
raise
# Convert price to cents if specified by the user.
if self._use_cents:
price = price * _CENT_MULTIPLIER
return round(price, self._precision)
def _update(self):
"""Set attrs"""
today = self.today
if not today:
_LOGGER.debug("No data for today, unable to set attrs")
return
self._average = mean(today)
self._min = min(today)
self._max = max(today)
self._off_peak_1 = mean(today[0:8])
self._off_peak_2 = mean(today[20:])
self._peak = mean(today[8:20])
self._mean = median(today)
@property
def current_price(self) -> float:
"""This the current price for the hour we are in at any given time."""
res = self._calc_price()
# _LOGGER.debug("Current hours price for %s is %s", self.name, res)
return res
def _someday(self, data) -> list:
"""The data is already sorted in the xml,
but I don't trust that to continue forever. That's why we sort it ourselves."""
if data is None or data is SENTINEL:
return []
local_times = []
for item in data.get("values", []):
i = {
"start": dt_utils.as_local(item["start"]),
"end": dt_utils.as_local(item["end"]),
"value": item["value"],
}
local_times.append(i)
data["values"] = local_times
return sorted(data.get("values", []), key=itemgetter("start"))
@property
def today(self) -> list:
"""Get todays prices
Returns:
list: sorted list where today[0] is the price of hour 00.00 - 01.00
"""
return [
self._calc_price(i["value"], fake_dt=i["start"])
for i in self._someday(self._data_today)
if i
]
@property
def tomorrow(self) -> list:
"""Get tomorrows prices
Returns:
list: sorted where tomorrow[0] is the price of hour 00.00 - 01.00 etc.
"""
return [
self._calc_price(i["value"], fake_dt=i["start"])
for i in self._someday(self._data_tomorrow)
if i
]
@property
def extra_state_attributes(self) -> dict:
return {
"average": self._average,
"off_peak_1": self._off_peak_1,
"off_peak_2": self._off_peak_2,
"peak": self._peak,
"min": self._min,
"max": self._max,
"mean": self._mean,
"unit": self.unit,
"currency": self._currency,
"country": _REGIONS[self._area][1],
"region": self._area,
"low_price": self.low_price,
"price_percent_to_average": self.price_percent_to_average,
"today": self.today,
"tomorrow": self.tomorrow,
"tomorrow_valid": self.tomorrow_valid,
"raw_today": self.raw_today,
"raw_tomorrow": self.raw_tomorrow,
"current_price": self.current_price,
"additional_costs_current_hour": self.additional_costs,
"price_in_cents": self._use_cents,
}
def _add_raw(self, data) -> list:
"""Helper"""
result = []
for res in self._someday(data):
item = {
"start": res["start"],
"end": res["end"],
"value": self._calc_price(res["value"], fake_dt=res["start"]),
}
result.append(item)
return result
@property
def raw_today(self) -> list:
"""Raw today"""
return self._add_raw(self._data_today)
@property
def raw_tomorrow(self) -> list:
"""Raw tomorrow"""
return self._add_raw(self._data_tomorrow)
@property
def tomorrow_valid(self) -> bool:
"""Verify that we have the values for tomorrow."""
# this should be checked a better way
return len([i for i in self.tomorrow if i not in (None, float("inf"))]) >= 23
async def _update_current_price(self) -> None:
"""update the current price (price this hour)"""
local_now = dt_utils.now()
data = await self._api.today(self._area, self._currency)
if data:
for item in self._someday(data):
if item["start"] == start_of(local_now, "hour"):
self._current_price = item["value"]
_LOGGER.debug(
"Updated %s _current_price %s", self.name, item["value"]
)
else:
_LOGGER.debug("Cant update _update_current_price because it was no data")
async def handle_new_day(self):
"""Update attrs for the new day"""
_LOGGER.debug("handle_new_day")
self._data_tomorrow = None
# update attrs for the new day
await self.handle_new_hr()
async def handle_new_hr(self):
"""Update attrs for the new hour"""
_LOGGER.debug("handle_new_hr")
today = await self._api.today(self._area, self._currency)
if today:
self._data_today = today
now = dt_utils.now()
if self._data_tomorrow is SENTINEL and stock(now) >= stock(now).replace(
hour=13, minute=RANDOM_MINUTE, second=RANDOM_SECOND
):
tomorrow = await self._api.tomorrow(self._area, self._currency)
if tomorrow:
self._data_tomorrow = tomorrow
self._update()
# Updates the current for this hour.
await self._update_current_price()
# This is not to make sure the correct template costs are set. Issue 258
self._attr_native_value = self.current_price
self.async_write_ha_state()
async def handle_new_price(self):
"""Update atts because of the new prices"""
_LOGGER.debug("handle_new_price")
tomorrow = await self._api.tomorrow(self._area, self._currency)
if tomorrow:
self._data_tomorrow = tomorrow
await self.handle_new_hr()
async def async_added_to_hass(self):
"""Connect to dispatcher listening for entity data notifications."""
await super().async_added_to_hass()
_LOGGER.debug("called async_added_to_hass %s", self.name)
async_dispatcher_connect(self._api._hass, EVENT_NEW_DAY, self.handle_new_day)
async_dispatcher_connect(
self._api._hass, EVENT_NEW_PRICE, self.handle_new_price
)
async_dispatcher_connect(self._api._hass, EVENT_NEW_HOUR, self.handle_new_hr)
await self.handle_new_hr()