-
Notifications
You must be signed in to change notification settings - Fork 17
/
_battery_status_tracker.py
492 lines (407 loc) · 17.6 KB
/
_battery_status_tracker.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
# License: MIT
# Copyright © 2022 Frequenz Energy-as-a-Service GmbH
"""Background service that tracks the status of a battery.
A battery is consider to be WORKING if both the battery and the adjacent inverter are
sending data that shows that they are working.
If either of them stops sending data, or if the data shows that they are not working,
then the battery is considered to be NOT_WORKING.
If a battery and its adjacent inverter are WORKING, but the last request to the battery
failed, then the battery's status is considered to be UNCERTAIN. In this case, the
battery is blocked for a short time, and it is not recommended to use it unless it is
necessary.
"""
import asyncio
import logging
import math
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
# pylint: disable=no-name-in-module
from frequenz.api.microgrid.battery_pb2 import ComponentState as BatteryComponentState
from frequenz.api.microgrid.battery_pb2 import RelayState as BatteryRelayState
from frequenz.api.microgrid.common_pb2 import ErrorLevel
from frequenz.api.microgrid.inverter_pb2 import ComponentState as InverterComponentState
# pylint: enable=no-name-in-module
from frequenz.channels import Receiver, Sender, select, selected_from
from frequenz.channels.timer import SkipMissedAndDrift, Timer
from frequenz.client.microgrid import (
BatteryData,
ComponentCategory,
ComponentData,
InverterData,
)
from typing_extensions import override
from ....microgrid import connection_manager
from ..._background_service import BackgroundService
from ._blocking_status import BlockingStatus
from ._component_status import (
ComponentStatus,
ComponentStatusEnum,
ComponentStatusTracker,
SetPowerResult,
)
_logger = logging.getLogger(__name__)
@dataclass
class _ComponentStreamStatus:
component_id: int
"""Component id."""
data_recv_timer: Timer
"""Timer that is set when no component data has been received for some time."""
last_msg_timestamp: datetime = datetime.now(tz=timezone.utc)
"""Timestamp of the last message from the component."""
last_msg_correct: bool = False
"""Flag whether last message was correct or not."""
class BatteryStatusTracker(ComponentStatusTracker, BackgroundService):
"""Class for tracking if battery is working.
Status updates are sent out only when there is a status change.
"""
_battery_valid_relay: set[BatteryRelayState.ValueType] = {
BatteryRelayState.RELAY_STATE_CLOSED
}
"""The list of valid relay states of a battery.
A working battery in any other battery relay state will be reported as failing.
"""
_battery_valid_state: set[BatteryComponentState.ValueType] = {
BatteryComponentState.COMPONENT_STATE_IDLE,
BatteryComponentState.COMPONENT_STATE_CHARGING,
BatteryComponentState.COMPONENT_STATE_DISCHARGING,
}
"""The list of valid states of a battery.
A working battery in any other battery state will be reported as failing.
"""
_inverter_valid_state: set[InverterComponentState.ValueType] = {
InverterComponentState.COMPONENT_STATE_STANDBY,
InverterComponentState.COMPONENT_STATE_IDLE,
InverterComponentState.COMPONENT_STATE_CHARGING,
InverterComponentState.COMPONENT_STATE_DISCHARGING,
}
"""The list of valid states of an inverter.
A working inverter in any other inverter state will be reported as failing.
"""
@override
def __init__( # pylint: disable=too-many-arguments
self,
component_id: int,
max_data_age: timedelta,
max_blocking_duration: timedelta,
status_sender: Sender[ComponentStatus],
set_power_result_receiver: Receiver[SetPowerResult],
) -> None:
"""Create class instance.
Args:
component_id: Id of this battery
max_data_age: If component stopped sending data, then this is the maximum
time when its last message should be considered as valid. After that
time, component won't be used until it starts sending data.
max_blocking_duration: This value tell what should be the maximum
timeout used for blocking failing component.
status_sender: Channel to send status updates.
set_power_result_receiver: Channel to receive results of the requests to the
components.
Raises:
RuntimeError: If battery has no adjacent inverter.
"""
BackgroundService.__init__(self, name=f"BatteryStatusTracker({component_id})")
self._max_data_age = max_data_age
self._status_sender = status_sender
self._set_power_result_receiver = set_power_result_receiver
# First battery is considered as not working.
# Change status after first messages are received.
self._last_status: ComponentStatusEnum = ComponentStatusEnum.NOT_WORKING
self._blocking_status: BlockingStatus = BlockingStatus(
min_duration=timedelta(seconds=1.0), max_duration=max_blocking_duration
)
self._timedelta_zero = timedelta(seconds=0.0)
inverter_id = self._find_adjacent_inverter_id(component_id)
if inverter_id is None:
raise RuntimeError(
f"Can't find inverter adjacent to battery: {component_id}"
)
self._battery: _ComponentStreamStatus = _ComponentStreamStatus(
component_id,
data_recv_timer=Timer(max_data_age, SkipMissedAndDrift()),
)
self._inverter: _ComponentStreamStatus = _ComponentStreamStatus(
inverter_id,
data_recv_timer=Timer(max_data_age, SkipMissedAndDrift()),
)
# Select needs receivers that can be get in async way only.
@override
def start(self) -> None:
"""Start the BatteryStatusTracker instance."""
self._tasks.add(
asyncio.create_task(
self._run(self._status_sender, self._set_power_result_receiver)
)
)
@property
def battery_id(self) -> int:
"""Get battery id.
Returns:
Battery id
"""
return self._battery.component_id
def _handle_status_battery(self, bat_data: BatteryData) -> None:
self._battery.last_msg_correct = (
self._is_message_reliable(bat_data)
and self._is_battery_state_correct(bat_data)
and self._no_critical_error(bat_data)
and self._is_capacity_present(bat_data)
)
self._battery.last_msg_timestamp = bat_data.timestamp
self._battery.data_recv_timer.reset()
def _handle_status_inverter(self, inv_data: InverterData) -> None:
self._inverter.last_msg_correct = (
self._is_message_reliable(inv_data)
and self._is_inverter_state_correct(inv_data)
and self._no_critical_error(inv_data)
)
self._inverter.last_msg_timestamp = inv_data.timestamp
self._inverter.data_recv_timer.reset()
def _handle_status_set_power_result(self, result: SetPowerResult) -> None:
if self.battery_id in result.succeeded:
self._blocking_status.unblock()
elif (
self.battery_id in result.failed
and self._last_status != ComponentStatusEnum.NOT_WORKING
):
duration = self._blocking_status.block()
if duration > self._timedelta_zero:
_logger.warning(
"battery %d failed last response. block it for %s",
self.battery_id,
duration,
)
def _handle_status_battery_timer(self) -> None:
if self._battery.last_msg_correct:
self._battery.last_msg_correct = False
_logger.warning(
"Battery %d stopped sending data, last timestamp: %s",
self._battery.component_id,
self._battery.last_msg_timestamp,
)
def _handle_status_inverter_timer(self) -> None:
if self._inverter.last_msg_correct:
self._inverter.last_msg_correct = False
_logger.warning(
"Inverter %d stopped sending data, last timestamp: %s",
self._inverter.component_id,
self._inverter.last_msg_timestamp,
)
def _get_new_status_if_changed(self) -> ComponentStatusEnum | None:
current_status = self._get_current_status()
if self._last_status != current_status:
self._last_status = current_status
_logger.info(
"battery %d changed status %s",
self.battery_id,
str(self._last_status),
)
return current_status
return None
async def _run(
self,
status_sender: Sender[ComponentStatus],
set_power_result_receiver: Receiver[SetPowerResult],
) -> None:
"""Process data from the components and set_power_result_receiver.
New status is send only when it change.
Args:
status_sender: Channel to send status updates.
set_power_result_receiver: Channel to receive results of the requests to the
components.
"""
api_client = connection_manager.get().api_client
battery_receiver = await api_client.battery_data(self._battery.component_id)
inverter_receiver = await api_client.inverter_data(self._inverter.component_id)
battery = battery_receiver
battery_timer = self._battery.data_recv_timer
inverter_timer = self._inverter.data_recv_timer
inverter = inverter_receiver
set_power_result = set_power_result_receiver
while True:
try:
async for selected in select(
battery,
battery_timer,
inverter_timer,
inverter,
set_power_result,
):
new_status = None
if selected_from(selected, battery):
self._handle_status_battery(selected.message)
elif selected_from(selected, inverter):
self._handle_status_inverter(selected.message)
elif selected_from(selected, set_power_result):
self._handle_status_set_power_result(selected.message)
elif selected_from(selected, battery_timer):
if (
datetime.now(tz=timezone.utc)
- self._battery.last_msg_timestamp
) < self._max_data_age:
# This means that we have received data from the battery
# since the timer triggered, but the timer event arrived
# late, so we can ignore it.
continue
self._handle_status_battery_timer()
elif selected_from(selected, inverter_timer):
if (
datetime.now(tz=timezone.utc)
- self._inverter.last_msg_timestamp
) < self._max_data_age:
# This means that we have received data from the inverter
# since the timer triggered, but the timer event arrived
# late, so we can ignore it.
continue
self._handle_status_inverter_timer()
else:
_logger.error("Unknown message returned from select")
new_status = self._get_new_status_if_changed()
if new_status is not None:
await status_sender.send(
ComponentStatus(self.battery_id, new_status)
)
except Exception as err: # pylint: disable=broad-except
_logger.exception("BatteryStatusTracker crashed with error: %s", err)
def _get_current_status(self) -> ComponentStatusEnum:
"""Get current battery status.
Returns:
Battery status.
"""
is_msg_correct = (
self._battery.last_msg_correct and self._inverter.last_msg_correct
)
if not is_msg_correct:
return ComponentStatusEnum.NOT_WORKING
if self._last_status == ComponentStatusEnum.NOT_WORKING:
# If message just become correct, then try to use it
self._blocking_status.unblock()
return ComponentStatusEnum.WORKING
if self._blocking_status.is_blocked():
return ComponentStatusEnum.UNCERTAIN
return ComponentStatusEnum.WORKING
def _is_capacity_present(self, msg: BatteryData) -> bool:
"""Check whether the battery capacity is NaN or not.
If battery capacity is missing, then we can't work with it.
Args:
msg: battery message
Returns:
True if battery capacity is present, false otherwise.
"""
if math.isnan(msg.capacity):
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
"Battery %d capacity is NaN",
msg.component_id,
)
return False
return True
def _no_critical_error(self, msg: BatteryData | InverterData) -> bool:
"""Check if battery or inverter message has any critical error.
Args:
msg: message.
Returns:
True if message has no critical error, False otherwise.
"""
critical = ErrorLevel.ERROR_LEVEL_CRITICAL
# pylint: disable=protected-access
critical_err = next((err for err in msg._errors if err.level == critical), None)
if critical_err is not None:
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
"Component %d has critical error: %s",
msg.component_id,
str(critical_err),
)
return False
return True
def _is_inverter_state_correct(self, msg: InverterData) -> bool:
"""Check if inverter is in correct state from message.
Args:
msg: message
Returns:
True if inverter is in correct state. False otherwise.
"""
# Component state is not exposed to the user.
# pylint: disable=protected-access
state = msg._component_state
if state not in BatteryStatusTracker._inverter_valid_state:
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
"Inverter %d has invalid state: %s",
msg.component_id,
InverterComponentState.Name(state),
)
return False
return True
def _is_battery_state_correct(self, msg: BatteryData) -> bool:
"""Check if battery is in correct state from message.
Args:
msg: message
Returns:
True if battery is in correct state. False otherwise.
"""
# Component state is not exposed to the user.
# pylint: disable=protected-access
state = msg._component_state
if state not in BatteryStatusTracker._battery_valid_state:
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
"Battery %d has invalid state: %s",
self.battery_id,
BatteryComponentState.Name(state),
)
return False
# Component state is not exposed to the user.
# pylint: disable=protected-access
relay_state = msg._relay_state
if relay_state not in BatteryStatusTracker._battery_valid_relay:
if self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
"Battery %d has invalid relay state: %s",
self.battery_id,
BatteryRelayState.Name(relay_state),
)
return False
return True
def _is_timestamp_outdated(self, timestamp: datetime) -> bool:
"""Return if timestamp is to old.
Args:
timestamp: timestamp
Returns:
_True if timestamp is to old, False otherwise
"""
now = datetime.now(tz=timezone.utc)
diff = now - timestamp
return diff > self._max_data_age
def _is_message_reliable(self, message: ComponentData) -> bool:
"""Check if message is too old to be considered as reliable.
Args:
message: message to check
Returns:
True if message is reliable, False otherwise.
"""
is_outdated = self._is_timestamp_outdated(message.timestamp)
if is_outdated and self._last_status == ComponentStatusEnum.WORKING:
_logger.warning(
"Component %d stopped sending data. Last timestamp: %s.",
message.component_id,
str(message.timestamp),
)
return not is_outdated
def _find_adjacent_inverter_id(self, battery_id: int) -> int | None:
"""Find inverter adjacent to this battery.
Args:
battery_id: battery id adjacent to the wanted inverter
Returns:
Id of the inverter. If battery hasn't adjacent inverter, then return None.
"""
graph = connection_manager.get().component_graph
return next(
(
comp.component_id
for comp in graph.predecessors(battery_id)
if comp.category == ComponentCategory.INVERTER
),
None,
)