Skip to content

Commit

Permalink
benchmark: add monitor command
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladyslav Moisieienkov committed Nov 19, 2021
1 parent d26efe1 commit ed0a1bc
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 0 deletions.
14 changes: 14 additions & 0 deletions reana/reana_benchmark/cli.py
Expand Up @@ -19,6 +19,7 @@
from reana.reana_benchmark.start import start
from reana.reana_benchmark.submit import submit
from reana.reana_benchmark.utils import logger
from reana.reana_benchmark.monitor import monitor

urllib3.disable_warnings()

Expand Down Expand Up @@ -210,3 +211,16 @@ def collect_command(workflow: str, force: bool) -> NoReturn:
collect(workflow, force)
except Exception as e:
logger.error(f"Something went wrong when collecting results: {e}")


@reana_benchmark.command(name="monitor")
@workflow_option
@click.option(
"--sleep", "-s", help="Sleep between querying in seconds", type=int, default=30,
)
def monitor_command(workflow: str, sleep: int) -> NoReturn:
"""Monitor various metrics and record results."""
try:
monitor(workflow, sleep)
except Exception as e:
logger.error(f"Something went wrong during monitoring: {e}")
156 changes: 156 additions & 0 deletions reana/reana_benchmark/monitor.py
@@ -0,0 +1,156 @@
# This file is part of REANA.
# Copyright (C) 2021 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.

"""Responsible for monitoring K8s cluster, DB connections."""

import json
import time
from pathlib import Path
from typing import Dict, Any, List
from collections import defaultdict
import subprocess
from abc import abstractmethod, ABC

from reana.reana_benchmark.utils import get_utc_now_timestamp, logger


class BaseMetric(ABC):
"""Base class for other metrics."""

@property
@abstractmethod
def name(self) -> str:
"""Name of the metric."""
raise NotImplementedError

@abstractmethod
def _collect(self) -> Any:
raise NotImplementedError

def collect(self) -> Dict[str, Any]: # noqa: D102
result = self._collect()
return {
self.name: result,
}


class NumberOfDBConnectionsMetric(BaseMetric):
"""Count number of server processes in REANA DB."""

name = "db_connections_number"

def _collect(self) -> Any:
cmd = [
"kubectl",
"exec",
"deployment/reana-db",
"--",
"psql",
"-U",
"reana",
"-c",
"SELECT COUNT(*) FROM pg_stat_activity;",
]
output = subprocess.check_output(cmd).decode("ascii")
result = int(output.splitlines()[2].strip())
return result


class WorkflowPodsMetric(BaseMetric):
"""Count number of job and batch jobs in different phases."""

name = "workflows_pods_status"

@staticmethod
def _filter(pods: List[Dict], name_contains: str) -> List[Dict]:
selected_pods = []
for pod in pods:
name = pod.get("metadata", {}).get("name", "")
if name_contains in name:
selected_pods.append(pod)
return selected_pods

@staticmethod
def _count(pods: List[Dict]) -> Dict[str, int]:
statistics = defaultdict(lambda: 0)
for pod in pods:
phase = pod.get("status", {}).get("phase")
statistics[phase] += 1
return dict(statistics)

def _collect(self) -> Any:
kubectl_cmd = ("kubectl", "get", "pods", "-o", "json")
output = subprocess.check_output(kubectl_cmd)
pods = json.loads(output).get("items", [])

batch_pods = self._filter(pods, "run-batch")
job_pods = self._filter(pods, "run-job")

result = {
"batch_pods": self._count(batch_pods),
"job_pods": self._count(job_pods),
}

return result


METRICS = [
NumberOfDBConnectionsMetric(),
WorkflowPodsMetric(),
]


def _build_monitored_results_path(workflow: str) -> Path:
return Path(f"{workflow}_monitored_results.json")


def _save_metrics(workflow: str, results: Dict) -> None:
with open(_build_monitored_results_path(workflow), "w") as f:
json.dump(results, f)


def _collect_metrics() -> Dict[str, Any]:
collected_metrics = {}
for metric in METRICS:
try:
result = metric.collect()
collected_metrics = dict(collected_metrics, **result)
except Exception as error:
logger.error(
f"Error during collection of {metric.name} metric. Details: {error}"
)
return collected_metrics


def _print_metrics() -> None:
logger.info("Following metrics will be collected:")
for m in METRICS:
logger.info(f"- {m.name}")


def monitor(workflow: str, sleep: int) -> None:
"""Start periodically collect defined metrics and save them to JSON file.
This function is blocking.
"""
_print_metrics()
logger.info("Starting monitoring...")

all_metrics = {}

try:
while True:
# if metrics will take, for example, couple of seconds to collect monitored_date will be less accurate
monitored_date = get_utc_now_timestamp()
collected_metrics = _collect_metrics()
all_metrics[monitored_date] = collected_metrics
_save_metrics(workflow, all_metrics)

time.sleep(sleep)
except KeyboardInterrupt:
logger.info("Stopping monitoring...")
finally:
_save_metrics(workflow, all_metrics)

0 comments on commit ed0a1bc

Please sign in to comment.