-
Notifications
You must be signed in to change notification settings - Fork 68
/
observability.py
512 lines (451 loc) · 18.1 KB
/
observability.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
# Copyright 2017 The Wallaroo Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
from copy import deepcopy
import json
import logging
import time
from types import FunctionType
from functools import partial
import re
import traceback
import sys
import time
from watchdog.observers import Observer
from watchdog.events import (DirDeletedEvent,
FileCreatedEvent,
FileSystemEventHandler)
watchdog_logger = logging.getLogger("watchdog")
watchdog_logger.setLevel(logging.WARNING)
from .errors import (DuplicateKeyError,
TimeoutError)
from .external import run_shell_cmd
from .logger import INFO2
from .stoppable_thread import StoppableThread
# Make string instance checking py2 and py3 compatible below
try:
basestring
except:
basestring = str
class ObservabilityTimeoutError(TimeoutError):
pass
class ObservabilityQueryError(Exception):
pass
class ObservabilityResponseError(Exception):
pass
QUERY_TYPES = {
'partition-query': 'partition-query',
'partition-counts': 'partition-count-query',
'cluster-status': 'cluster-status-query',
'state-entity-query': 'state-entity-query'}
def external_sender_query(addr, query_type):
"""
Use external_sender to query the cluster for observability data.
"""
t = QUERY_TYPES[query_type]
cmd = ('external_sender --external {} --type {} --json'
.format(addr, t))
res = run_shell_cmd(cmd)
try:
assert(res.success)
except AssertionError:
raise AssertionError("Failed to query cluster for '{}' with the "
"following error:\n{}".format(t, res.output))
return res.output
def partitions_query(addr):
"""
Query the worker at the given address for its partition routing
information.
"""
stdout = external_sender_query(addr, 'partition-query')
try:
return json.loads(stdout)
except Exception as err:
e = ObservabilityResponseError("Failed to deserialize observability"
" response:\n{!r}".format(stdout))
logging.error(e)
raise
def multi_states_query(addresses):
"""
Query the workers at the given addresses for their partitions.
Returns a dictionary of {address: {'stdout': raw_response,
'data': parsed response}}
"""
responses = {}
# collect responses
for name, addr in addresses:
try:
resp = external_sender_query(addr, 'state-entity-query')
except Exception as err:
logging.error(err)
raise err
try:
# try to parse responses
responses[name] = json.loads(resp)
except Exception as err:
e = ObservabilityResponseError(
"Failed to deserialize observability response from {} ({}):\n{!r}"
.format(name, addr, resp))
logging.error(e)
raise
return responses
def cluster_status_query(addr):
"""
Query the worker at the given address for its cluster status information.
"""
stdout = external_sender_query(addr, 'cluster-status')
try:
return json.loads(stdout)
except Exception as err:
e = ObservabilityResponseError("Failed to deserialize observability"
" response from {!r}:\n{!r}"
.format(addr, stdout))
logging.error(e)
raise
def partition_counts_query(addr):
"""
Query the worker at the given address for its partition counts
information.
"""
stdout = external_sender_query(addr, 'partition-counts')
try:
return json.loads(stdout)
except Exception as err:
e = ObservabilityResponseError("Failed to deserialize observability"
" response:\n{!r}".format(stdout))
logging.error(e)
raise
def state_entity_query(addr):
"""
Query the worker at the given address for its state entities
"""
stdout = external_sender_query(addr, 'state-entity-query')
try:
return json.loads(stdout)
except Exception as err:
e = ObservabilityResponseError("Failed to deserialize observability"
" response:\n{!r}".format(stdout))
logging.error(e)
raise
def get_func_name(f):
"""
Recursively look for the original function's name.
Works for both regular functions created with `def` as well as
functions created with `functools.partial`.
"""
if isinstance(f, FunctionType):
return f.__name__
elif isinstance(f, partial):
return get_func_name(f.func)
raise ValueError("Can't get func_name of provided function {}".format(f))
class ObservabilityNotifier(StoppableThread):
"""
A notifier based status test.
A list of tests is applied to an observability query result set. If all
tests pass, the notifier thread exits with a successful status. If any of
the tests fail, the notifier thread sleeps for 1 second and retries. If
the timeout period elapses before all tests pass, the thread exits with an
error status.
"""
__base_name__ = 'ObservabilityNotifier'
def __init__(self, query_func, query_args, tests, timeout=90, period=2):
"""
- `query_func` is an argument-free function to query an observability
status. You can use `functools.partial` to create it.
- `query_args` is a list of arguments to pass the query function.
Pass an empty list if no arguments are required.
- `tests` is either a single function or a list of functions, each of
which will be executed on a copy of the result set. A test should fail
by raising an error, and pass by returning True.
- `timeout` is the period in seconds the notifier should wait before
exiting with an error status if any of the tests is still failing.
The default is 90 seconds.
- `period` is the time in seconds to wait between queries. The default
is 2 seconds.
"""
super(ObservabilityNotifier, self).__init__()
self.name = self.__base_name__
self.timeout = timeout
self.error = None
if isinstance(tests, (list, tuple)):
# make sure it's not a (test, args kwargs) tuple
if len(tests) in (2,3):
if isinstance(tests[1], (list, tuple, dict)):
self.tests = [tests]
else:
self.tests = tests
else:
self.tests = [tests]
self.tests = [self._normalize_test_tuple(t) for t in self.tests]
self.query_func = query_func
if isinstance(query_args, (bytes, basestring)):
self.query_args = [query_args]
else:
self.query_args = query_args
self.query_func_name = get_func_name(query_func)
self.period = period
def _normalize_test_tuple(self, t):
if isinstance(t, (tuple,list)):
name = get_func_name(t[0])
test = t[0]
args = tuple()
kwargs = tuple()
if len(t) == 1:
pass
elif len(t) == 2:
if isinstance(t[1], dict):
kwargs = tuple(t[1].items())
else:
args = tuple(t[1])
elif len(t) == 3:
args = tuple(t[1])
kwargs = tuple(t[2].items())
else:
raise ValueError("Test tuple must be "
"(test[, args[, kwargs]])")
return (name, test, args, kwargs)
else:
return (get_func_name(t), t, tuple(), frozenset())
def run(self):
logging.log(1, "Starting observabilitynotifier loop")
started = time.time()
while not self.stopped():
try:
logging.log(1, "Try query")
query_result = self.query_func(*self.query_args)
except Exception as err:
logging.debug("Query failed: {}".format(err))
# sleep and retry but only if timeout hasn't elapsed
if (time.time() - started) <= self.timeout:
time.sleep(self.period)
continue
else: # Timeout has elapsed, return this error!
logging.log(1, "ObservabilityNotifier timed out")
self.error = ObservabilityQueryError(
"Query function '{}' has experienced an error:\n{}({})"
.format(self.query_func_name, type(err).__name__,
str(err)))
self.stop()
break
# Run tests, collect results
errors = {}
for t in self.tests:
try:
t_res = t[1](deepcopy(query_result), *t[2], **dict(t[3]))
except Exception as err:
tb = traceback.format_exc()
errors[t] = (err, tb)
# If all result are error free, break
if not errors:
self.stop()
break
# Else if any result is not True, either wait for next cycle
# or timeout.
if time.time() - started > self.timeout:
self.error = ObservabilityTimeoutError(
"Observability test timed out after {} seconds with the"
" following tests"
" still failing:\n{}".format(self.timeout, "\n".join((
" - {}: {}({})\n{}".format(
t[0], type(err).__name__,
str(err),
'\n'.join((
" -> {}".format(s) for s in tb.splitlines())))
for t, (err, tb),in errors.items()))))
# Pack the query result into the error object
self.error.query_result = query_result
self.stop()
break
time.sleep(self.period)
class RunnerReadyChecker(StoppableThread):
__base_name__ = 'RunnerReadyChecker'
pattern = re.compile('Application has successfully initialized')
def __init__(self, runners, timeout=90):
super(RunnerReadyChecker, self).__init__()
self.runners = runners
self.name = self.__base_name__
self._path = self.runners[0].file.name
self.timeout = timeout
self.error = None
def run(self):
with open(self._path, 'r') as r:
started = time.time()
while not self.stopped():
r.seek(0)
stdout = r.read()
if not stdout:
time.sleep(0.1)
continue
else:
if self.pattern.search(stdout):
logging.debug('Application reports it is ready.')
self.stop()
break
if time.time() - started > self.timeout:
outputs = runners_output_format(self.runners)
self.error = TimeoutError(
'Application did not report as ready after {} '
'seconds. It had the following outputs:\n===\n{}'
.format(self.timeout, outputs))
self.stop()
break
class RunnerChecker(StoppableThread):
__base_name__ = 'RunnerChecker'
def __init__(self, runner, patterns, timeout=90, start_from=0):
super(RunnerChecker, self).__init__()
self.name = self.__base_name__
self.runner = runner
self.runner_name = runner.name
self.start_from = start_from
self._path = runner.file.name
self.timeout = timeout
self.error = None
if isinstance(patterns, (list, tuple)):
self.patterns = patterns
else:
self.patterns = [pattern]
self.compiled = [re.compile(p) for p in patterns]
def run(self):
with open(self._path, 'r') as r:
last_match = self.start_from
started = time.time()
while not self.stopped():
r.seek(last_match)
stdout = r.read()
if not stdout:
time.sleep(0.1)
continue
else:
match = self.compiled[0].search(stdout)
if match:
logging.debug('Pattern %r found in runner STDOUT.'
% match.re.pattern)
self.compiled.pop(0)
last_match = self.start_from
if self.compiled:
continue
self.stop()
break
if time.time() - started > self.timeout:
r.seek(self.start_from)
stdout = r.read()
self.error = TimeoutError(
'Runner {!r} did not have patterns {!r}'
' after {} seconds.'
.format(self.runner_name,
[rx.pattern for rx in self.compiled],
self.timeout))
self.stop()
break
####################################
# Query response parsing functions #
####################################
def coalesce_partition_query_responses(responses):
"""
Coalesce partition query responses from multiple workers into a single
partition map.
Raise error on duplicate partitions.
"""
steps = {}
for worker in responses.keys():
for step in responses[worker].keys():
if step not in steps:
steps[step] = {}
for part in responses[worker][step]:
if part in steps[step]:
dup0 = worker
dup1 = steps[step][part]
raise DuplicateKeyError("Found duplicate keys! Step: {}, "
"Key: {}, Loc1: {}, Loc2: {}"
.format(step, part, dup0, dup1))
steps[step][part] = worker
return steps
####################################
# Log File Observability Functions #
####################################
class LogRotationEventHandler(FileSystemEventHandler):
def __init__(self, notifier, path, log_suffix):
logging.debug("Created LogRotationEventHandler: {}".format(
notifier, path, log_suffix))
super(LogRotationEventHandler, self).__init__()
self.notifier = notifier
self.path = path
self.log_suffix = log_suffix
def dispatch(self, event):
if isinstance(event, DirDeletedEvent):
if event.src_path == self.path:
logging.info("Watch directory deleted.")
self.notifier.stop()
else:
# only process events we care about
if event.src_path.endswith(self.log_suffix):
super(LogRotationEventHandler, self).dispatch(event)
else:
logging.log(1, "Event path doesn't end with log_suffix: {}. "
"Discarding event: {}".format(self.log_suffix, event))
def on_created(self, event):
if isinstance(event, FileCreatedEvent):
self.notifier.file_created(event.src_path)
def on_deleted(self, event):
if isinstance(event, DirDeletedEvent):
self.notifier.dir_deleted(event.src_path)
else:
self.notifier.file_deleted(event.src_path)
class EvLogFileNotifier(StoppableThread):
"""
Watch the resilience directory and keep track of log files.
Call a handler when a log file rotates.
"""
__base_name__ = 'EvLogFileNotifier'
def __init__(self, handler, path, log_suffix='.evlog'):
logging.log(1, "{}({}, {}, {})".format(self.__base_name__,
handler, path, log_suffix))
super(EvLogFileNotifier, self).__init__()
self.handler = handler
self.path = path
self.log_suffix = log_suffix
self.observer = Observer()
self._event_handler = LogRotationEventHandler(self, self.path,
self.log_suffix)
# State management
self.log_files = {}
def stop(self, error=None):
self.observer.stop()
super(EvLogFileNotifier, self).stop(error)
self.handler.evlognotifier_stopped(self)
def run(self):
logging.debug("Scheduling watchdog observer")
self.observer.schedule(self._event_handler, self.path)
self.observer.start()
while not self.stopped():
time.sleep(0.1)
self.observer.join()
def parse_log_file_name(self, filename):
base_name, chunk = filename.split('.evlog')[0].rsplit('-', 1)
return (base_name, chunk)
def file_created(self, filename):
base_name, chunk = self.parse_log_file_name(filename)
fn_logs = self.log_files.setdefault(base_name, {})
fn_logs.setdefault('chunks', []).append(chunk)
fn_logs.setdefault('active', chunk)
new_chunk = chunk
old_chunk = (fn_logs['chunks'][-2]
if len(fn_logs['chunks'])>1
else '0000000000000000')
self.handler.file_created(base_name, new_chunk, old_chunk)
def file_deleted(self, filename):
base_name, chunk = self.parse_log_file_name(filename)
fn_logs = self.log_files.setdefault(base_name, {})
if fn_logs['current'] == chunk:
fn_logs['current'] = None
self.handler.file_deleted(basename, chunk)