-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_loop.py
599 lines (522 loc) · 21.2 KB
/
test_loop.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
"""Test for loop.py"""
import asyncio
import logging
import re
from collections import defaultdict
from datetime import datetime
from decimal import Decimal
from pathlib import Path
import pytest
import yaml
from prometheus_exporter.metric import MetricsRegistry
from src import loop
from src.config import DataBaseConfig, load_config
from src.db import DataBase
class re_match:
"""Assert that comparison matches the specified regexp."""
def __init__(self, pattern, flags=0):
self._re = re.compile(pattern, flags)
def __eq__(self, string):
return bool(self._re.match(string))
def __repr__(self):
return self._re.pattern # pragma: nocover
@pytest.fixture
def config_data_fixture():
"""Config data."""
yield {
"databases": {"db": {"dsn": "sqlite://"}},
"metrics": {"m": {"type": "gauge"}},
"queries": {
"q": {
"interval": 10,
"databases": ["db"],
"metrics": ["m"],
"sql": "SELECT 100.0 AS m",
},
},
}
@pytest.fixture
def registry_fixture():
"""Registry."""
yield MetricsRegistry()
@pytest.fixture
async def make_query_loop_fixture(tmp_path, config_data, registry):
"""Make query loop."""
query_loops = []
def make_loop():
config_file = tmp_path / "config.yaml"
config_file.write_text(yaml.dump(config_data), "utf-8")
with config_file.open() as fh:
config = load_config(fh, logging.getLogger())
registry.create_metrics(config.metrics.values())
query_loop = loop.QueryLoop(config, registry, logging)
query_loops.append(query_loop)
return query_loop
yield make_loop
await asyncio.gather(
*(query_loop.stop() for query_loop in query_loops),
return_exceptions=True,
)
@pytest.fixture
async def query_loop_fixture(make_query_loop):
"""Query loop."""
yield make_query_loop()
@pytest.fixture
def metrics_expiration_fixture():
"""Metric expiration."""
yield {
"m1": 50,
"m2": 100,
"m3": None,
}
def metric_values(metric, by_labels=()):
"""Return values for the metric."""
if metric._type == "gauge":
suffix = ""
elif metric._type == "counter":
suffix = "_total"
values = defaultdict(list)
for sample_suffix, labels, value, *_ in metric._samples():
if sample_suffix == suffix:
if by_labels:
label_values = tuple(labels[label] for label in by_labels)
values[label_values] = value
else:
values[sample_suffix].append(value)
return values if by_labels else values[suffix]
async def run_queries(db_file: Path, *queries: str):
"""Test run queries."""
config = DataBaseConfig(name="db", dsn=f"sqlite:///{db_file}")
async with DataBase(config) as db:
for query in queries:
await db.execute_sql(query)
class TestMetricsLastSeen:
"""Test for MetricLastSeen class."""
def test_update(self, metrics_expiration):
"""Last seen times are tracked for each series of metrics with expiration."""
last_seen = loop.MetricsLastSeen(metrics_expiration)
last_seen.update("m1", {"l1": "v1", "l2": "v2"}, 100)
last_seen.update("m1", {"l1": "v3", "l2": "v4"}, 200)
last_seen.update("other", {"l3": "v100"}, 300)
assert last_seen._last_seen == {
"m1": {
("v1", "v2"): 100,
("v3", "v4"): 200,
}
}
def test_update_label_values_sorted_by_name(self, metrics_expiration):
"""Last values are sorted by label names."""
last_seen = loop.MetricsLastSeen(metrics_expiration)
last_seen.update("m1", {"l2": "v2", "l1": "v1"}, 100)
assert last_seen._last_seen == {"m1": {("v1", "v2"): 100}}
def test_expire_series(self, metrics_expiration):
"""Expired metric series are returned and removed."""
last_seen = loop.MetricsLastSeen(metrics_expiration)
last_seen.update("m1", {"l1": "v1", "l2": "v2"}, 10)
last_seen.update("m1", {"l1": "v3", "l2": "v4"}, 100)
last_seen.update("m2", {"l3": "v100"}, 100)
expired = last_seen.expire_series(120)
assert expired == {"m1": [("v1", "v2")], "m2": []}
assert last_seen._last_seen == {
"m1": {("v3", "v4"): 100},
"m2": {("v100",): 100},
}
@pytest.mark.asyncio
class TestQueryLoop:
async def test_start(self, query_loop):
"""The start method starts timed calls for queries."""
await query_loop.start()
timed_call = query_loop._timed_calls["q"]
assert timed_call.running
async def test_stop(self, query_loop):
"""The stop method stops timed calls for queries."""
await query_loop.start()
timed_call = query_loop._timed_calls["q"]
await query_loop.stop()
assert not timed_call.running
async def test_run_query(self, query_tracker, query_loop, registry):
"""Queries are run and update metrics."""
await query_loop.start()
await query_tracker.wait_results()
# the metric is updated
metric = registry.get_metric("m")
assert metric_values(metric) == [100.0]
# the number of queries is updated
queries_metric = registry.get_metric("queries")
assert metric_values(queries_metric, by_labels=("status",)) == {
("success",): 1.0
}
async def test_run_scheduled_query(
self,
mocker,
event_loop,
advance_time,
query_tracker,
registry,
config_data,
make_query_loop,
):
"""Queries are run and update metrics."""
def croniter(*args):
while True:
# sync croniter time with the loop one
yield event_loop.time() + 60
mock_croniter = mocker.patch.object(loop, "croniter")
mock_croniter.side_effect = croniter
# ensure that both clocks advance in sync
mocker.patch.object(loop.time, "time", lambda: event_loop.time())
del config_data["queries"]["q"]["interval"]
config_data["queries"]["q"]["schedule"] = "*/2 * * * *"
query_loop = make_query_loop()
await query_loop.start()
mock_croniter.assert_called_once()
async def test_run_query_with_parameters(
self, query_tracker, registry, config_data, make_query_loop
):
"""Queries are run with declared parameters."""
config_data["metrics"]["m"]["type"] = "counter"
config_data["metrics"]["m"]["labels"] = ["l"]
config_data["queries"]["q"]["sql"] = "SELECT :param AS m, :label as l"
config_data["queries"]["q"]["parameters"] = [
{"param": 10.0, "label": "l1"},
{"param": 20.0, "label": "l2"},
]
query_loop = make_query_loop()
await query_loop.start()
await query_tracker.wait_results()
# the metric is updated
metric = registry.get_metric("m")
assert metric_values(metric, by_labels=("l",)) == {
("l1",): 10,
("l2",): 20,
}
# the number of queries is updated
queries_metric = registry.get_metric("queries")
assert metric_values(queries_metric, by_labels=("status",)) == {
("success",): 2.0
}
async def test_run_query_null_value(
self, query_tracker, registry, config_data, make_query_loop
):
"""A null value in query results is treated like a zero."""
config_data["queries"]["q"]["sql"] = "SELECT NULL AS m"
query_loop = make_query_loop()
await query_loop.start()
await query_tracker.wait_results()
metric = registry.get_metric("m")
assert metric_values(metric) == [0]
async def test_run_query_counter_no_increment(
self, query_tracker, registry, config_data, make_query_loop
):
"""If increment is set to False, counter is set to the new value."""
config_data["metrics"]["m"]["type"] = "counter"
config_data["metrics"]["m"]["increment"] = False
config_data["queries"]["q"]["sql"] = "SELECT :param AS m"
config_data["queries"]["q"]["parameters"] = [
{"param": 10.0},
{"param": 20.0},
]
query_loop = make_query_loop()
await query_loop.start()
await query_tracker.wait_results()
# the metric is updated
metric = registry.get_metric("m")
assert metric_values(metric) == [20.0]
async def test_run_query_metrics_with_database_labels(
self, query_tracker, registry, config_data, make_query_loop
):
"""If databases have extra labels, they're set for metrics."""
config_data["databases"] = {
"db1": {"dsn": "sqlite://", "labels": {"l1": "v1", "l2": "v2"}},
"db2": {"dsn": "sqlite://", "labels": {"l1": "v3", "l2": "v4"}},
}
config_data["queries"]["q"]["databases"] = ["db1", "db2"]
query_loop = make_query_loop()
await query_loop.start()
await query_tracker.wait_results()
metric = registry.get_metric("m")
assert metric_values(metric, by_labels=("database", "l1", "l2")) == {
("db1", "v1", "v2"): 100.0,
("db2", "v3", "v4"): 100.0,
}
async def test_update_metric_decimal_value(self, registry, make_query_loop):
"""A Decimal value in query results is converted to float."""
db = DataBase(DataBaseConfig(name="db", dsn="sqlite://"))
query_loop = make_query_loop()
query_loop._update_metric(db, "m", Decimal("100.123"))
metric = registry.get_metric("m")
[value] = metric_values(metric)
assert value == 100.123
assert isinstance(value, float)
async def test_run_query_log(self, caplog, query_tracker, query_loop):
"""Debug messages are logged on query execution."""
caplog.set_level(logging.DEBUG)
await query_loop.start()
await query_tracker.wait_queries()
assert caplog.messages == [
'connected to database "db"',
'running query "q" on database "db"',
'updating metric "m" set 100.0 {database="db"}',
re_match(
r'updating metric "query_latency" observe .* \{database="db",query="q"\}'
),
'updating metric "queries" inc 1 {database="db",query="q",status="success"}',
]
async def test_run_query_log_labels(
self, caplog, query_tracker, config_data, make_query_loop
):
"""Debug messages include metric labels."""
config_data["metrics"]["m"]["labels"] = ["l"]
config_data["queries"]["q"]["sql"] = 'SELECT 100.0 AS m, "foo" AS l'
query_loop = make_query_loop()
caplog.set_level(logging.DEBUG)
await query_loop.start()
await query_tracker.wait_queries()
assert caplog.messages == [
'connected to database "db"',
'running query "q" on database "db"',
'updating metric "m" set 100.0 {database="db",l="foo"}',
re_match(
r'updating metric "query_latency" observe .* \{database="db",query="q"\}'
),
'updating metric "queries" inc 1 {database="db",query="q",status="success"}',
]
async def test_run_query_increase_db_error_count(
self, query_tracker, config_data, make_query_loop, registry
):
"""Query errors are logged."""
config_data["databases"]["db"]["dsn"] = "sqlite:////invalid"
query_loop = make_query_loop()
await query_loop.start()
await query_tracker.wait_failures()
queries_metric = registry.get_metric("database_errors")
assert metric_values(queries_metric) == [1.0]
async def test_run_query_increase_database_error_count(
self, mocker, query_tracker, config_data, make_query_loop, registry
):
"""Count of database errors is incremented on failed connection."""
query_loop = make_query_loop()
db = query_loop._databases["db"]
mock_connect = mocker.patch.object(db._engine, "connect")
mock_connect.side_effect = Exception("connection failed")
await query_loop.start()
await query_tracker.wait_failures()
queries_metric = registry.get_metric("database_errors")
assert metric_values(queries_metric) == [1.0]
async def test_run_query_increase_query_error_count(
self, query_tracker, config_data, make_query_loop, registry
):
"""Count of errored queries is incremented on error."""
config_data["queries"]["q"]["sql"] = "SELECT 100.0 AS a, 200.0 AS b"
query_loop = make_query_loop()
await query_loop.start()
await query_tracker.wait_failures()
queries_metric = registry.get_metric("queries")
assert metric_values(queries_metric, by_labels=("status",)) == {("error",): 1.0}
async def test_run_query_increase_timeout_count(
self, query_tracker, config_data, make_query_loop, registry
):
"""Count of errored queries is incremented on timeout."""
config_data["queries"]["q"]["timeout"] = 0.1
query_loop = make_query_loop()
await query_loop.start()
db = query_loop._databases["db"]
await db.connect()
async def execute(sql, parameters):
await asyncio.sleep(1) # longer than timeout
db._conn.execute = execute
await query_tracker.wait_failures()
queries_metric = registry.get_metric("queries")
assert metric_values(queries_metric, by_labels=("status",)) == {
("timeout",): 1.0
}
async def test_run_query_at_interval(self, advance_time, query_tracker, query_loop):
"""Queries are run at the specified time interval."""
await query_loop.start()
await advance_time(0) # kick the first run
# the query has been run once
assert len(query_tracker.queries) == 1
await advance_time(5)
# no more runs yet
assert len(query_tracker.queries) == 1
# now the query runs again
await advance_time(5)
assert len(query_tracker.queries) == 2
async def test_run_timed_queries_invalid_result_count(
self, query_tracker, config_data, make_query_loop, advance_time
):
"""Timed queries returning invalid elements count are removed."""
config_data["queries"]["q"]["sql"] = "SELECT 100.0 AS a, 200.0 AS b"
query_loop = make_query_loop()
await query_loop.start()
await advance_time(0) # kick the first run
assert len(query_tracker.queries) == 1
assert len(query_tracker.results) == 0
# the query is not run again
await advance_time(5)
assert len(query_tracker.results) == 0
await advance_time(5)
assert len(query_tracker.results) == 0
async def test_run_timed_queries_invalid_result_count_stop_task(
self, query_tracker, config_data, make_query_loop
):
"""Timed queries returning invalid result counts are stopped."""
config_data["queries"]["q"]["sql"] = "SELECT 100.0 AS a, 200.0 AS b"
config_data["queries"]["q"]["interval"] = 1.0
query_loop = make_query_loop()
await query_loop.start()
timed_call = query_loop._timed_calls["q"]
await asyncio.sleep(1.1)
await query_tracker.wait_failures()
# the query has been stopped and removed
assert not timed_call.running
assert query_loop._timed_calls == {}
async def test_run_timed_queries_not_removed_if_not_failing_on_all_dbs(
self, tmp_path, query_tracker, config_data, make_query_loop
):
"""Timed queries are removed when they fail on all databases."""
db1 = tmp_path / "db1.sqlite"
db2 = tmp_path / "db2.sqlite"
config_data["databases"] = {
"db1": {"dsn": f"sqlite:///{db1}"},
"db2": {"dsn": f"sqlite:///{db2}"},
}
config_data["queries"]["q"].update(
{
"databases": ["db1", "db2"],
"sql": "SELECT * FROM test",
"interval": 1.0,
}
)
await run_queries(
db1,
"CREATE TABLE test (m INTEGER)",
"INSERT INTO test VALUES (10)",
)
# the query on the second database returns more columns
await run_queries(
db2,
"CREATE TABLE test (m INTEGER, other INTERGER)",
"INSERT INTO test VALUES (10, 20)",
)
query_loop = make_query_loop()
await query_loop.start()
await asyncio.sleep(0.1)
await query_tracker.wait_failures()
assert len(query_tracker.queries) == 2
assert len(query_tracker.results) == 1
assert len(query_tracker.failures) == 1
await asyncio.sleep(1.1)
# succeeding query is run again, failing one is not
assert len(query_tracker.results) == 2
assert len(query_tracker.failures) == 1
async def test_run_aperiodic_queries(
self, query_tracker, config_data, make_query_loop
):
"""Queries with null interval can be run explicitly."""
del config_data["queries"]["q"]["interval"]
query_loop = make_query_loop()
await query_loop.run_aperiodic_queries()
assert len(query_tracker.queries) == 1
await query_loop.run_aperiodic_queries()
assert len(query_tracker.queries) == 2
async def test_run_aperiodic_queries_invalid_result_count(
self, query_tracker, config_data, make_query_loop
):
"""Aperiodic queries returning invalid elements count are removed."""
config_data["queries"]["q"]["sql"] = "SELECT 100.0 AS a, 200.0 AS b"
del config_data["queries"]["q"]["interval"]
query_loop = make_query_loop()
await query_loop.run_aperiodic_queries()
assert len(query_tracker.queries) == 1
# the query is not run again
await query_loop.run_aperiodic_queries()
await query_tracker.wait_failures()
assert len(query_tracker.queries) == 1
async def test_run_aperiodic_queries_not_removed_if_not_failing_on_all_dbs(
self, tmp_path, query_tracker, config_data, make_query_loop
):
"""Periodic queries are removed when they fail on all databases."""
db1 = tmp_path / "db1.sqlite"
db2 = tmp_path / "db2.sqlite"
config_data["databases"] = {
"db1": {"dsn": f"sqlite:///{db1}"},
"db2": {"dsn": f"sqlite:///{db2}"},
}
config_data["queries"]["q"].update(
{
"databases": ["db1", "db2"],
"sql": "SELECT * FROM test",
"interval": None,
}
)
await run_queries(
db1,
"CREATE TABLE test (m INTEGER)",
"INSERT INTO test VALUES (10)",
)
# the query on the second database returns more columns
await run_queries(
db2,
"CREATE TABLE test (m INTEGER, other INTERGER)",
"INSERT INTO test VALUES (10, 20)",
)
query_loop = make_query_loop()
await query_loop.run_aperiodic_queries()
await query_tracker.wait_failures()
assert len(query_tracker.queries) == 2
assert len(query_tracker.results) == 1
assert len(query_tracker.failures) == 1
await query_loop.run_aperiodic_queries()
# succeeding query is run again, failing one is not
assert len(query_tracker.results) == 2
assert len(query_tracker.failures) == 1
async def test_clear_expired_series(
self,
mocker,
tmp_path,
query_tracker,
config_data,
make_query_loop,
registry,
):
"""The query loop clears out series last seen earlier than the specified interval."""
db = tmp_path / "db.sqlite"
config_data["databases"]["db"]["dsn"] = f"sqlite:///{db}"
config_data["metrics"]["m"].update(
{
"labels": ["l"],
"expiration": 10,
}
)
# call metric collection directly
config_data["queries"]["q"]["sql"] = "SELECT * FROM test"
del config_data["queries"]["q"]["interval"]
await run_queries(
db,
"CREATE TABLE test (m INTEGER, l TEXT)",
'INSERT INTO test VALUES (10, "foo")',
'INSERT INTO test VALUES (20, "bar")',
)
query_loop = make_query_loop()
mock_timestamp = mocker.patch.object(query_loop, "_timestamp")
mock_timestamp.return_value = datetime.now().timestamp()
await query_loop.run_aperiodic_queries()
await query_tracker.wait_results()
queries_metric = registry.get_metric("m")
assert metric_values(queries_metric, by_labels=("l",)), {
("foo",): 10.0,
("bar",): 20.0,
}
await run_queries(
db,
"DELETE FROM test WHERE m = 10",
)
# mock that more time has passed than expiration
mock_timestamp.return_value += 20
await query_loop.run_aperiodic_queries()
await query_tracker.wait_results()
query_loop.clear_expired_series()
assert metric_values(queries_metric, by_labels=("l",)), {
("bar",): 20.0,
}