-
Notifications
You must be signed in to change notification settings - Fork 28
/
healthcheck.py
151 lines (107 loc) · 4.42 KB
/
healthcheck.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from celery.app import default_app as celery_app
from django.conf import settings
from health_check.backends import BaseHealthCheckBackend
from health_check.exceptions import ServiceReturnedUnexpectedResult, ServiceUnavailable
from pydash import get
from core.common.utils import flower_get, es_get
class BaseHealthCheck(BaseHealthCheckBackend):
def __init__(self, **kwargs):
if 'critical_service' in kwargs:
self.critical_service = kwargs['critical_service']
super().__init__()
def check_status(self):
raise NotImplementedError
class FlowerHealthCheck(BaseHealthCheck):
critical_service = False
def check_status(self):
try:
response = flower_get('metrics', timeout=2)
if not response.ok:
raise ServiceUnavailable('Flower Unavailable')
except Exception as ex:
raise ServiceUnavailable(ex.args) from ex
def identifier(self):
return "Flower"
class ESHealthCheck(BaseHealthCheck):
critical_service = False
def check_status(self):
try:
response = es_get('_cluster/health', timeout=2)
status = get(response.json(), 'status')
is_ok = status == 'green'
if not is_ok:
raise ServiceReturnedUnexpectedResult("Status {}".format(status))
except Exception as ex:
raise ServiceReturnedUnexpectedResult(ex.args) from ex
def identifier(self):
return 'ElasticSearch'
class CeleryQueueHealthCheck(BaseHealthCheck):
critical_service = False
CORRECT_PING_RESPONSE = {"ok": "pong"}
def check_status(self):
timeout = getattr(settings, "HEALTHCHECK_CELERY_PING_TIMEOUT", 2)
try:
ping_result = celery_app.control.ping(timeout=timeout)
except IOError as ex:
self.add_error(ServiceUnavailable("IOError"), ex)
except NotImplementedError as exc:
self.add_error(
ServiceUnavailable(
"NotImplementedError: Make sure CELERY_RESULT_BACKEND is set"
),
exc,
)
except BaseException as exc:
self.add_error(ServiceUnavailable("Unknown error"), exc)
else:
if not ping_result:
self.add_error(
ServiceUnavailable("Celery workers unavailable"),
)
else:
self._check_ping_result(ping_result)
def _check_ping_result(self, ping_result):
active_workers = []
for result in ping_result:
worker, response = list(result.items())[0]
if response != self.CORRECT_PING_RESPONSE:
self.add_error(
ServiceUnavailable(
f"Celery worker {worker} response was incorrect"
),
)
continue
active_workers.append(worker)
if not self.errors:
self._check_active_queues(active_workers)
def _check_active_queues(self, active_workers):
defined_queues = self.queues
if not defined_queues:
return
active_queues = set()
for queues in celery_app.control.inspect(active_workers).active_queues().values():
active_queues.update([queue.get("name") for queue in queues])
for queue in defined_queues.difference(active_queues):
self.add_error(
ServiceUnavailable(f"No worker for Celery task queue {queue}"),
)
def identifier(self): # Display name on the endpoint.
if self.queues:
return "celery@{}".format(list(self.queues)[0])
return self.__class__.__name__
class CeleryDefaultQueueHealthCheck(CeleryQueueHealthCheck):
queues = {'default'}
class CeleryConcurrentThreadsHealthCheck(CeleryQueueHealthCheck):
queues = {'concurrent'}
class CeleryIndexingQueueHealthCheck(CeleryQueueHealthCheck):
queues = {'indexing'}
class CeleryBulkImport0QueueHealthCheck(CeleryQueueHealthCheck):
queues = {'bulk_import_0'}
class CeleryBulkImport1QueueHealthCheck(CeleryQueueHealthCheck):
queues = {'bulk_import_1'}
class CeleryBulkImport2QueueHealthCheck(CeleryQueueHealthCheck):
queues = {'bulk_import_2'}
class CeleryBulkImport3QueueHealthCheck(CeleryQueueHealthCheck):
queues = {'bulk_import_3'}
class CeleryBulkImportRootQueueHealthCheck(CeleryQueueHealthCheck):
queues = {'bulk_import_root'}