From 764f22a629d3c70e7b41168cdf334b41deb0602e Mon Sep 17 00:00:00 2001 From: fous Date: Fri, 19 Dec 2025 08:58:15 +0100 Subject: [PATCH 1/2] +push_metrics_to_vm.py, check_product_counts.py monitoring scripts --- check_product_counts.py | 52 ++++++++++++++++++++++++ push_metrics_to_vm.py | 89 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+) create mode 100644 check_product_counts.py create mode 100644 push_metrics_to_vm.py diff --git a/check_product_counts.py b/check_product_counts.py new file mode 100644 index 0000000..7353a51 --- /dev/null +++ b/check_product_counts.py @@ -0,0 +1,52 @@ +import sys +import logging +import json +import requests +import HTTPAuthOptions +# token here -> .token +# https://keycloak.grid.cesnet.cz/token-portal/index.php + +# Keycloak authentication data +TOKEN_URL="https://keycloak.grid.cesnet.cz" +REALM = "dhr" +CLIENT_ID="gss" + +MISSIONS = ["S1", "S2A", "S2B", "S2C", "S3", "S5P"] +SITE_URL = "https://gss.vm.cesnet.cz/odata/v1" + +def get_mission_product_count(site, auth, mission, year): + url = f"{site}/Products/$count?$filter=startswith(Name,'{mission}')" + if year: + url += f" and ContentDate/Start lt {year}-01-01T00:00:00.000Z and Online eq True" + r = requests.get(url, auth=auth) + if r.status_code == 200: + return r.text + else: + logging.debug(r.text) + return "FAIL" + +def gss_metrics(site_url, auth=None, year=None): + if not auth: + auth = HTTPAuthOptions.KeycloakTokenAuth(server_url=TOKEN_URL, realm=REALM, client_id=CLIENT_ID) + + metrics = [] + + for mission in MISSIONS: + mission_product_count = get_mission_product_count(site_url, auth, mission, year) + logging.debug(f"Site: {site_url} Mission: {mission} Count: {mission_product_count}") + prom_line = f'gss_product_count{{site="{site_url}", mission="{mission}"}} {mission_product_count}' + metrics.append(prom_line) + + return metrics + + +if __name__ == "__main__": + logging.basicConfig(level=logging.INFO) + + if len(sys.argv) > 1: + year = sys.argv[1] + else: + year = None + logging.debug(f"Year lt {year}") + + print("\n".join(gss_metrics(SITE_URL, None, year))) diff --git a/push_metrics_to_vm.py b/push_metrics_to_vm.py new file mode 100644 index 0000000..ddb5262 --- /dev/null +++ b/push_metrics_to_vm.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 +import subprocess +import requests +import json +import logging +import check_product_counts + + +def get_endpoint_ip(namespace, service): + # Get the Endpoints object in JSON + cmd = ["kubectl", "get", "endpoints", service, "-n", namespace, "-o", "json"] + ep_json = subprocess.check_output(cmd, text=True) + eps = json.loads(ep_json) + + # Extract IPs + ports + subsets = eps.get("subsets", []) + if not subsets: + raise RuntimeError("No endpoints found") + + addresses = subsets[0].get("addresses", []) + ports = subsets[0].get("ports", []) + + if not addresses or not ports: + raise RuntimeError("No ready addresses or ports in endpoint") + + ip = addresses[0]["ip"] + port = ports[0]["port"] + return f"{ip}:{port}" + +def parse_kafka(output: str): + metrics = [] + for line in output.splitlines(): + # skip headers and empty lines + if not line.strip(): + continue + if line.startswith("GROUP") or line.startswith("group"): + continue + + parts = line.split() + if len(parts) < 7: + continue + + group = parts[0] + topic = parts[1] + partition = parts[2] + current_offset = parts[3] + logend_offset = parts[4] + lag = parts[5] + client_id = parts[6] + + #labels = f'group="{group}",topic="{topic}",partition="{partition}",client="{client_id}"' + labels = f'group="{group}",topic="{topic}",partition="{partition}"' + + metrics.append(f'kafka_consumer_offset{{{labels}}} {current_offset}') + metrics.append(f'kafka_logend_offset{{{labels}}} {logend_offset}') + metrics.append(f'kafka_consumer_lag{{{labels}}} {lag}') + + return metrics + + +if __name__ == "__main__": + + logging.basicConfig(level=logging.INFO) + + # ---- Config ---- + # http://vms-victoria-metrics-single-server.monitoring.svc.cluster.local.:8428 + VM_HOST = get_endpoint_ip('monitoring', 'vms-victoria-metrics-single-server') + VICTORIA_URL = f"http://{VM_HOST}/api/v1/import/prometheus" + COMMAND = ["/root/bin/kafka-get-queue-informations.sh", "relay", "-q"] + + GSS_URL = "https://gss.vm.cesnet.cz/odata/v1" + + metrics = [] + + # Run the command and capture output + proc = subprocess.run(COMMAND, capture_output=True, text=True, check=True) + metrics += parse_kafka(proc.stdout) + metrics += check_product_counts.gss_metrics(GSS_URL) + + metrics_text = "\n".join(metrics) + "\n" + + logging.debug(VICTORIA_URL) + logging.debug(metrics_text) + + # Push to VictoriaMetrics + resp = requests.post(VICTORIA_URL, data=metrics_text.encode("utf-8")) + resp.raise_for_status() + logging.debug(f"Pushed metrics successfully: {resp.text}") + From b5065f8747a8dd79f3c743517c899fc14d838a66 Mon Sep 17 00:00:00 2001 From: Jan Krcmar Date: Tue, 6 Jan 2026 08:33:50 +0100 Subject: [PATCH 2/2] class GssProducts fix get_first_product list index --- latency/products.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/latency/products.py b/latency/products.py index 29d1926..0414212 100644 --- a/latency/products.py +++ b/latency/products.py @@ -50,7 +50,9 @@ def _get_filter_from_product(self, product): return product['filter'] def get_first_product(self): - return GssProduct(self.products[0]) + if len(self.products) > 0: + return GssProduct(self.products[0]) + return None class GssProduct: