From 93a6a7f65d619d92fa8c589b79a18cc2feea9dde Mon Sep 17 00:00:00 2001 From: Vladyslav Moisieienkov Date: Fri, 19 Nov 2021 14:13:37 +0100 Subject: [PATCH] benchmark: add monitor command closes #574 --- reana/reana_benchmark/cli.py | 14 +++ reana/reana_benchmark/monitor.py | 147 +++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 reana/reana_benchmark/monitor.py diff --git a/reana/reana_benchmark/cli.py b/reana/reana_benchmark/cli.py index fcb95319..b24c184a 100644 --- a/reana/reana_benchmark/cli.py +++ b/reana/reana_benchmark/cli.py @@ -19,6 +19,7 @@ from reana.reana_benchmark.start import start from reana.reana_benchmark.submit import submit from reana.reana_benchmark.utils import logger +from reana.reana_benchmark.monitor import monitor urllib3.disable_warnings() @@ -210,3 +211,16 @@ def collect_command(workflow: str, force: bool) -> NoReturn: collect(workflow, force) except Exception as e: logger.error(f"Something went wrong when collecting results: {e}") + + +@reana_benchmark.command(name="monitor") +@workflow_option +@click.option( + "--sleep", "-s", help="Sleep between querying in seconds", type=int, default=30, +) +def monitor_command(workflow: str, sleep: int) -> NoReturn: + """Monitor various metrics and record results.""" + try: + monitor(workflow, sleep) + except Exception as e: + logger.error(f"Something went wrong during monitoring: {e}") diff --git a/reana/reana_benchmark/monitor.py b/reana/reana_benchmark/monitor.py new file mode 100644 index 00000000..d9dc43fe --- /dev/null +++ b/reana/reana_benchmark/monitor.py @@ -0,0 +1,147 @@ +# This file is part of REANA. +# Copyright (C) 2021 CERN. +# +# REANA is free software; you can redistribute it and/or modify it +# under the terms of the MIT License; see LICENSE file for more details. + +"""Responsible for monitoring K8s cluster, DB connections.""" +import json +import time +from pathlib import Path +from typing import Dict, Any, List +from collections import defaultdict +import subprocess +from abc import abstractmethod, ABC + +from reana.reana_benchmark.utils import get_utc_now_timestamp, logger + + +class BaseMetric(ABC): + """Base class for other metrics.""" + @property + @abstractmethod + def name(self) -> str: + """Name of the metric.""" + raise NotImplementedError + + @abstractmethod + def _collect(self) -> Any: + raise NotImplementedError + + def collect(self) -> Dict[str, Any]: # noqa: D102 + result = self._collect() + return { + self.name: result, + } + + +class NumberOfDBConnectionsMetric(BaseMetric): + name = "db_connections_number" + + def _collect(self) -> Any: + """Count number of server processes in REANA DB.""" + cmd = [ + "kubectl", + "exec", + "deployment/reana-db", + "--", + "psql", + "-U", + "reana", + "-c", + "SELECT COUNT(*) FROM pg_stat_activity;", + ] + output = subprocess.check_output(cmd).decode("ascii") + result = int(output.splitlines()[2].strip()) + return result + + +class WorkflowPodsMetric(BaseMetric): + name = "workflows_pods_status" + + @staticmethod + def _filter(pods: List[Dict], name_contains: str) -> List[Dict]: + selected_pods = [] + for pod in pods: + name = pod.get("metadata", {}).get("name", "") + if name_contains in name: + selected_pods.append(pod) + return selected_pods + + @staticmethod + def _count(pods: List[Dict]) -> Dict[str, int]: + statistics = defaultdict(lambda: 0) + for pod in pods: + phase = pod.get("status", {}).get("phase") + statistics[phase] += 1 + return dict(statistics) + + def _collect(self) -> Any: + """Count number of job and batch jobs in different phases.""" + kubectl_cmd = ("kubectl", "get", "pods", "-o", "json") + output = subprocess.check_output(kubectl_cmd) + pods = json.loads(output).get("items", []) + + batch_pods = self._filter(pods, "run-batch") + job_pods = self._filter(pods, "run-job") + + result = { + "batch_pods": self._count(batch_pods), + "job_pods": self._count(job_pods), + } + + return result + + +metrics = [ + NumberOfDBConnectionsMetric(), + WorkflowPodsMetric(), +] + + +def _build_monitored_results_path(workflow: str) -> Path: + return Path(f"{workflow}_monitored_results.json") + + +def _save_metrics(workflow: str, results: Dict) -> None: + with open(_build_monitored_results_path(workflow), "w") as f: + json.dump(results, f) + + +def _collect_metrics() -> Dict[str, Any]: + collected_metrics = {} + for metric in metrics: + result = metric.collect() + collected_metrics = dict(collected_metrics, **result) + return collected_metrics + + +def _print_metrics() -> None: + logger.info("Following metrics will be collected:") + for m in metrics: + logger.info(f"- {m.name}") + + +def monitor(workflow: str, sleep: int) -> None: + """Start periodically collect defined metrics and save them to JSON file. + + This function is blocking. + """ + _print_metrics() + logger.info("Starting monitoring...") + + all_metrics = {} + + try: + while True: + # if metrics will take, for example, couple of seconds to collect monitored_date will be less accurate + monitored_date = get_utc_now_timestamp() + collected_metrics = _collect_metrics() + all_metrics[monitored_date] = collected_metrics + _save_metrics(workflow, all_metrics) + + time.sleep(sleep) + except KeyboardInterrupt: + logger.info("Stopping monitoring...") + finally: + _save_metrics(workflow, all_metrics)