forked from reanahub/reana
-
Notifications
You must be signed in to change notification settings - Fork 0
/
monitor.py
147 lines (114 loc) · 4.06 KB
/
monitor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
# This file is part of REANA.
# Copyright (C) 2021 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
"""Responsible for monitoring K8s cluster, DB connections."""
import json
import time
from pathlib import Path
from typing import Dict, Any, List
from collections import defaultdict
import subprocess
from abc import abstractmethod, ABC
from reana.reana_benchmark.utils import get_utc_now_timestamp, logger
class BaseMetric(ABC):
"""Base class for other metrics."""
@property
@abstractmethod
def name(self) -> str:
"""Name of the metric."""
raise NotImplementedError
@abstractmethod
def _collect(self) -> Any:
raise NotImplementedError
def collect(self) -> Dict[str, Any]: # noqa: D102
result = self._collect()
return {
self.name: result,
}
class NumberOfDBConnectionsMetric(BaseMetric):
name = "db_connections_number"
def _collect(self) -> Any:
"""Count number of server processes in REANA DB."""
cmd = [
"kubectl",
"exec",
"deployment/reana-db",
"--",
"psql",
"-U",
"reana",
"-c",
"SELECT COUNT(*) FROM pg_stat_activity;",
]
output = subprocess.check_output(cmd).decode("ascii")
result = int(output.splitlines()[2].strip())
return result
class WorkflowPodsMetric(BaseMetric):
name = "workflows_pods_status"
@staticmethod
def _filter(pods: List[Dict], name_contains: str) -> List[Dict]:
selected_pods = []
for pod in pods:
name = pod.get("metadata", {}).get("name", "")
if name_contains in name:
selected_pods.append(pod)
return selected_pods
@staticmethod
def _count(pods: List[Dict]) -> Dict[str, int]:
statistics = defaultdict(lambda: 0)
for pod in pods:
phase = pod.get("status", {}).get("phase")
statistics[phase] += 1
return dict(statistics)
def _collect(self) -> Any:
"""Count number of job and batch jobs in different phases."""
kubectl_cmd = ("kubectl", "get", "pods", "-o", "json")
output = subprocess.check_output(kubectl_cmd)
pods = json.loads(output).get("items", [])
batch_pods = self._filter(pods, "run-batch")
job_pods = self._filter(pods, "run-job")
result = {
"batch_pods": self._count(batch_pods),
"job_pods": self._count(job_pods),
}
return result
metrics = [
NumberOfDBConnectionsMetric(),
WorkflowPodsMetric(),
]
def _build_monitored_results_path(workflow: str) -> Path:
return Path(f"{workflow}_monitored_results.json")
def _save_metrics(workflow: str, results: Dict) -> None:
with open(_build_monitored_results_path(workflow), "w") as f:
json.dump(results, f)
def _collect_metrics() -> Dict[str, Any]:
collected_metrics = {}
for metric in metrics:
result = metric.collect()
collected_metrics = dict(collected_metrics, **result)
return collected_metrics
def _print_metrics() -> None:
logger.info("Following metrics will be collected:")
for m in metrics:
logger.info(f"- {m.name}")
def monitor(workflow: str, sleep: int) -> None:
"""Start periodically collect defined metrics and save them to JSON file.
This function is blocking.
"""
_print_metrics()
logger.info("Starting monitoring...")
all_metrics = {}
try:
while True:
# if metrics will take, for example, couple of seconds to collect monitored_date will be less accurate
monitored_date = get_utc_now_timestamp()
collected_metrics = _collect_metrics()
all_metrics[monitored_date] = collected_metrics
_save_metrics(workflow, all_metrics)
time.sleep(sleep)
except KeyboardInterrupt:
logger.info("Stopping monitoring...")
finally:
_save_metrics(workflow, all_metrics)