-
Notifications
You must be signed in to change notification settings - Fork 605
/
tasks.py
476 lines (407 loc) · 16.7 KB
/
tasks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
"""A collection of tasks."""
import logging
from ..const import AddonState
from ..coresys import CoreSysAttributes
from ..exceptions import (
AddonsError,
AudioError,
CliError,
CoreDNSError,
HomeAssistantError,
MulticastError,
ObserverError,
)
from ..host.const import HostFeature
from ..jobs.decorator import Job, JobCondition
_LOGGER: logging.Logger = logging.getLogger(__name__)
HASS_WATCHDOG_API = "HASS_WATCHDOG_API"
RUN_UPDATE_SUPERVISOR = 29100
RUN_UPDATE_ADDONS = 57600
RUN_UPDATE_CLI = 28100
RUN_UPDATE_DNS = 30100
RUN_UPDATE_AUDIO = 30200
RUN_UPDATE_MULTICAST = 30300
RUN_UPDATE_OBSERVER = 30400
RUN_RELOAD_ADDONS = 10800
RUN_RELOAD_BACKUPS = 72000
RUN_RELOAD_HOST = 7600
RUN_RELOAD_UPDATER = 7200
RUN_RELOAD_INGRESS = 930
RUN_WATCHDOG_HOMEASSISTANT_DOCKER = 15
RUN_WATCHDOG_HOMEASSISTANT_API = 120
RUN_WATCHDOG_DNS_DOCKER = 30
RUN_WATCHDOG_AUDIO_DOCKER = 60
RUN_WATCHDOG_CLI_DOCKER = 60
RUN_WATCHDOG_OBSERVER_DOCKER = 60
RUN_WATCHDOG_MULTICAST_DOCKER = 60
RUN_WATCHDOG_ADDON_DOCKER = 30
RUN_WATCHDOG_ADDON_APPLICATON = 120
RUN_WATCHDOG_OBSERVER_APPLICATION = 180
RUN_REFRESH_ADDON = 15
RUN_CHECK_CONNECTIVITY = 30
class Tasks(CoreSysAttributes):
"""Handle Tasks inside Supervisor."""
def __init__(self, coresys):
"""Initialize Tasks."""
self.coresys = coresys
self._cache = {}
async def load(self):
"""Add Tasks to scheduler."""
# Update
self.sys_scheduler.register_task(self._update_addons, RUN_UPDATE_ADDONS)
self.sys_scheduler.register_task(self._update_supervisor, RUN_UPDATE_SUPERVISOR)
self.sys_scheduler.register_task(self._update_cli, RUN_UPDATE_CLI)
self.sys_scheduler.register_task(self._update_dns, RUN_UPDATE_DNS)
self.sys_scheduler.register_task(self._update_audio, RUN_UPDATE_AUDIO)
self.sys_scheduler.register_task(self._update_multicast, RUN_UPDATE_MULTICAST)
self.sys_scheduler.register_task(self._update_observer, RUN_UPDATE_OBSERVER)
# Reload
self.sys_scheduler.register_task(self.sys_store.reload, RUN_RELOAD_ADDONS)
self.sys_scheduler.register_task(self.sys_updater.reload, RUN_RELOAD_UPDATER)
self.sys_scheduler.register_task(self.sys_backups.reload, RUN_RELOAD_BACKUPS)
self.sys_scheduler.register_task(self.sys_host.reload, RUN_RELOAD_HOST)
self.sys_scheduler.register_task(self.sys_ingress.reload, RUN_RELOAD_INGRESS)
# Watchdog
self.sys_scheduler.register_task(
self._watchdog_homeassistant_docker, RUN_WATCHDOG_HOMEASSISTANT_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_homeassistant_api, RUN_WATCHDOG_HOMEASSISTANT_API
)
self.sys_scheduler.register_task(
self._watchdog_dns_docker, RUN_WATCHDOG_DNS_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_audio_docker, RUN_WATCHDOG_AUDIO_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_cli_docker, RUN_WATCHDOG_CLI_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_observer_docker, RUN_WATCHDOG_OBSERVER_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_observer_application, RUN_WATCHDOG_OBSERVER_APPLICATION
)
self.sys_scheduler.register_task(
self._watchdog_multicast_docker, RUN_WATCHDOG_MULTICAST_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_addon_docker, RUN_WATCHDOG_ADDON_DOCKER
)
self.sys_scheduler.register_task(
self._watchdog_addon_application, RUN_WATCHDOG_ADDON_APPLICATON
)
# Refresh
self.sys_scheduler.register_task(self._refresh_addon, RUN_REFRESH_ADDON)
# Connectivity
self.sys_scheduler.register_task(
self._check_connectivity, RUN_CHECK_CONNECTIVITY
)
_LOGGER.info("All core tasks are scheduled")
@Job(
conditions=[
JobCondition.HEALTHY,
JobCondition.FREE_SPACE,
JobCondition.INTERNET_HOST,
JobCondition.RUNNING,
]
)
async def _update_addons(self):
"""Check if an update is available for an Add-on and update it."""
for addon in self.sys_addons.all:
if not addon.is_installed or not addon.auto_update:
continue
# Evaluate available updates
if not addon.need_update:
continue
if not addon.test_update_schema():
_LOGGER.warning(
"Add-on %s will be ignored, schema tests failed", addon.slug
)
continue
# Run Add-on update sequential
# avoid issue on slow IO
_LOGGER.info("Add-on auto update process %s", addon.slug)
try:
await addon.update(backup=True)
except AddonsError:
_LOGGER.error("Can't auto update Add-on %s", addon.slug)
@Job(
conditions=[
JobCondition.FREE_SPACE,
JobCondition.INTERNET_HOST,
JobCondition.RUNNING,
]
)
async def _update_supervisor(self):
"""Check and run update of Supervisor Supervisor."""
if not self.sys_supervisor.need_update:
return
_LOGGER.info(
"Found new Supervisor version %s, updating",
self.sys_supervisor.latest_version,
)
await self.sys_supervisor.update()
async def _watchdog_homeassistant_docker(self):
"""Check running state of Docker and start if they is close."""
if not self.sys_homeassistant.watchdog:
# Watchdog is not enabled for Home Assistant
return
if self.sys_homeassistant.error_state:
# Home Assistant is in an error state, this is handled by the rollback feature
return
if not await self.sys_homeassistant.core.is_failed():
# The home assistant container is not in a failed state
return
if self.sys_homeassistant.core.in_progress:
# Home Assistant has a task in progress
return
if await self.sys_homeassistant.core.is_running():
# Home Assistant is running
return
_LOGGER.warning("Watchdog found a problem with Home Assistant Docker!")
try:
await self.sys_homeassistant.core.start()
except HomeAssistantError as err:
_LOGGER.error("Home Assistant watchdog reanimation failed!")
self.sys_capture_exception(err)
else:
return
_LOGGER.info("Rebuilding the Home Assistant Container")
await self.sys_homeassistant.core.rebuild()
async def _watchdog_homeassistant_api(self):
"""Create scheduler task for monitoring running state of API.
Try 2 times to call API before we restart Home-Assistant. Maybe we had
a delay in our system.
"""
if not self.sys_homeassistant.watchdog:
# Watchdog is not enabled for Home Assistant
return
if self.sys_homeassistant.error_state:
# Home Assistant is in an error state, this is handled by the rollback feature
return
if not await self.sys_homeassistant.core.is_running():
# The home assistant container is not running
return
if self.sys_homeassistant.core.in_progress:
# Home Assistant has a task in progress
return
if await self.sys_homeassistant.api.check_api_state():
# Home Assistant is running properly
return
# Init cache data
retry_scan = self._cache.get(HASS_WATCHDOG_API, 0)
# Look like we run into a problem
retry_scan += 1
if retry_scan == 1:
self._cache[HASS_WATCHDOG_API] = retry_scan
_LOGGER.warning("Watchdog miss API response from Home Assistant")
return
_LOGGER.error("Watchdog found a problem with Home Assistant API!")
try:
await self.sys_homeassistant.core.restart()
except HomeAssistantError as err:
_LOGGER.error("Home Assistant watchdog reanimation failed!")
self.sys_capture_exception(err)
finally:
self._cache[HASS_WATCHDOG_API] = 0
@Job(conditions=JobCondition.RUNNING)
async def _update_cli(self):
"""Check and run update of cli."""
if not self.sys_plugins.cli.need_update:
return
_LOGGER.info(
"Found new cli version %s, updating", self.sys_plugins.cli.latest_version
)
await self.sys_plugins.cli.update()
@Job(conditions=JobCondition.RUNNING)
async def _update_dns(self):
"""Check and run update of CoreDNS plugin."""
if not self.sys_plugins.dns.need_update:
return
_LOGGER.info(
"Found new CoreDNS plugin version %s, updating",
self.sys_plugins.dns.latest_version,
)
await self.sys_plugins.dns.update()
@Job(conditions=JobCondition.RUNNING)
async def _update_audio(self):
"""Check and run update of PulseAudio plugin."""
if not self.sys_plugins.audio.need_update:
return
_LOGGER.info(
"Found new PulseAudio plugin version %s, updating",
self.sys_plugins.audio.latest_version,
)
await self.sys_plugins.audio.update()
@Job(conditions=JobCondition.RUNNING)
async def _update_observer(self):
"""Check and run update of Observer plugin."""
if not self.sys_plugins.observer.need_update:
return
_LOGGER.info(
"Found new Observer plugin version %s, updating",
self.sys_plugins.observer.latest_version,
)
await self.sys_plugins.observer.update()
@Job(conditions=JobCondition.RUNNING)
async def _update_multicast(self):
"""Check and run update of multicast."""
if not self.sys_plugins.multicast.need_update:
return
_LOGGER.info(
"Found new Multicast version %s, updating",
self.sys_plugins.multicast.latest_version,
)
await self.sys_plugins.multicast.update()
async def _watchdog_dns_docker(self):
"""Check running state of Docker and start if they is close."""
# if CoreDNS is active
if await self.sys_plugins.dns.is_running() or self.sys_plugins.dns.in_progress:
return
_LOGGER.warning("Watchdog found a problem with CoreDNS plugin!")
# Detect loop
await self.sys_plugins.dns.loop_detection()
try:
await self.sys_plugins.dns.start()
except CoreDNSError:
_LOGGER.error("CoreDNS watchdog reanimation failed!")
async def _watchdog_audio_docker(self):
"""Check running state of Docker and start if they is close."""
# if PulseAudio plugin is active
if (
await self.sys_plugins.audio.is_running()
or self.sys_plugins.audio.in_progress
):
return
_LOGGER.warning("Watchdog found a problem with PulseAudio plugin!")
try:
await self.sys_plugins.audio.start()
except AudioError:
_LOGGER.error("PulseAudio watchdog reanimation failed!")
async def _watchdog_cli_docker(self):
"""Check running state of Docker and start if they is close."""
# if cli plugin is active
if await self.sys_plugins.cli.is_running() or self.sys_plugins.cli.in_progress:
return
_LOGGER.warning("Watchdog found a problem with cli plugin!")
try:
await self.sys_plugins.cli.start()
except CliError:
_LOGGER.error("CLI watchdog reanimation failed!")
async def _watchdog_observer_docker(self):
"""Check running state of Docker and start if they is close."""
# if observer plugin is active
if (
await self.sys_plugins.observer.is_running()
or self.sys_plugins.observer.in_progress
):
return
_LOGGER.warning("Watchdog/Docker found a problem with observer plugin!")
try:
await self.sys_plugins.observer.start()
except ObserverError:
_LOGGER.error("Observer watchdog reanimation failed!")
async def _watchdog_observer_application(self):
"""Check running state of application and rebuild if they is not response."""
# if observer plugin is active
if (
self.sys_plugins.observer.in_progress
or await self.sys_plugins.observer.check_system_runtime()
):
return
_LOGGER.warning("Watchdog/Application found a problem with observer plugin!")
try:
await self.sys_plugins.observer.rebuild()
except ObserverError:
_LOGGER.error("Observer watchdog reanimation failed!")
async def _watchdog_multicast_docker(self):
"""Check running state of Docker and start if they is close."""
# if multicast plugin is active
if (
await self.sys_plugins.multicast.is_running()
or self.sys_plugins.multicast.in_progress
):
return
_LOGGER.warning("Watchdog found a problem with Multicast plugin!")
try:
await self.sys_plugins.multicast.start()
except MulticastError:
_LOGGER.error("Multicast watchdog reanimation failed!")
async def _watchdog_addon_docker(self):
"""Check running state of Docker and start if they is close."""
for addon in self.sys_addons.installed:
# if watchdog need looking for
if not addon.watchdog or await addon.is_running():
continue
# if Addon have running actions
if addon.in_progress or addon.state != AddonState.STARTED:
continue
_LOGGER.warning("Watchdog found a problem with %s!", addon.slug)
try:
await addon.start()
except AddonsError as err:
_LOGGER.error("%s watchdog reanimation failed with %s", addon.slug, err)
self.sys_capture_exception(err)
async def _watchdog_addon_application(self):
"""Check running state of the application and start if they is hangs."""
for addon in self.sys_addons.installed:
# if watchdog need looking for
if not addon.watchdog or addon.state != AddonState.STARTED:
continue
# Init cache data
retry_scan = self._cache.get(addon.slug, 0)
# if Addon have running actions / Application work
if addon.in_progress or await addon.watchdog_application():
continue
# Look like we run into a problem
retry_scan += 1
if retry_scan == 1:
self._cache[addon.slug] = retry_scan
_LOGGER.warning(
"Watchdog missing application response from %s", addon.slug
)
return
_LOGGER.warning("Watchdog found a problem with %s application!", addon.slug)
try:
await addon.restart()
except AddonsError as err:
_LOGGER.error("%s watchdog reanimation failed with %s", addon.slug, err)
self.sys_capture_exception(err)
finally:
self._cache[addon.slug] = 0
async def _refresh_addon(self) -> None:
"""Refresh addon state."""
for addon in self.sys_addons.installed:
# if watchdog need looking for
if addon.watchdog or addon.state != AddonState.STARTED:
continue
# if Addon have running actions
if addon.in_progress or await addon.is_running():
continue
# Adjust state
addon.state = AddonState.STOPPED
async def _check_connectivity(self) -> None:
"""Check system connectivity."""
value = self._cache.get("connectivity", 0)
# Need only full check if not connected or each 10min
if value >= 600:
pass
elif (
self.sys_supervisor.connectivity
and self.sys_host.network.connectivity is None
) or (
self.sys_supervisor.connectivity
and self.sys_host.network.connectivity is not None
and self.sys_host.network.connectivity
):
self._cache["connectivity"] = value + RUN_CHECK_CONNECTIVITY
return
# Check connectivity
try:
await self.sys_supervisor.check_connectivity()
if HostFeature.NETWORK in self.sys_host.features:
await self.sys_host.network.check_connectivity()
finally:
self._cache["connectivity"] = 0