Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions check_product_counts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import sys
import logging
import json
import requests
import HTTPAuthOptions
# token here -> .token
# https://keycloak.grid.cesnet.cz/token-portal/index.php

# Keycloak authentication data
TOKEN_URL="https://keycloak.grid.cesnet.cz"
REALM = "dhr"
CLIENT_ID="gss"

MISSIONS = ["S1", "S2A", "S2B", "S2C", "S3", "S5P"]
SITE_URL = "https://gss.vm.cesnet.cz/odata/v1"

def get_mission_product_count(site, auth, mission, year):
url = f"{site}/Products/$count?$filter=startswith(Name,'{mission}')"
if year:
url += f" and ContentDate/Start lt {year}-01-01T00:00:00.000Z and Online eq True"
r = requests.get(url, auth=auth)
if r.status_code == 200:
return r.text
else:
logging.debug(r.text)
return "FAIL"

def gss_metrics(site_url, auth=None, year=None):
if not auth:
auth = HTTPAuthOptions.KeycloakTokenAuth(server_url=TOKEN_URL, realm=REALM, client_id=CLIENT_ID)

metrics = []

for mission in MISSIONS:
mission_product_count = get_mission_product_count(site_url, auth, mission, year)
logging.debug(f"Site: {site_url} Mission: {mission} Count: {mission_product_count}")
prom_line = f'gss_product_count{{site="{site_url}", mission="{mission}"}} {mission_product_count}'
metrics.append(prom_line)

return metrics


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

if len(sys.argv) > 1:
year = sys.argv[1]
else:
year = None
logging.debug(f"Year lt {year}")

print("\n".join(gss_metrics(SITE_URL, None, year)))
4 changes: 3 additions & 1 deletion latency/products.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def _get_filter_from_product(self, product):
return product['filter']

def get_first_product(self):
return GssProduct(self.products[0])
if len(self.products) > 0:
return GssProduct(self.products[0])
return None


class GssProduct:
Expand Down
89 changes: 89 additions & 0 deletions push_metrics_to_vm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#!/usr/bin/env python3
import subprocess
import requests
import json
import logging
import check_product_counts


def get_endpoint_ip(namespace, service):
# Get the Endpoints object in JSON
cmd = ["kubectl", "get", "endpoints", service, "-n", namespace, "-o", "json"]
ep_json = subprocess.check_output(cmd, text=True)
eps = json.loads(ep_json)

# Extract IPs + ports
subsets = eps.get("subsets", [])
if not subsets:
raise RuntimeError("No endpoints found")

addresses = subsets[0].get("addresses", [])
ports = subsets[0].get("ports", [])

if not addresses or not ports:
raise RuntimeError("No ready addresses or ports in endpoint")

ip = addresses[0]["ip"]
port = ports[0]["port"]
return f"{ip}:{port}"

def parse_kafka(output: str):
metrics = []
for line in output.splitlines():
# skip headers and empty lines
if not line.strip():
continue
if line.startswith("GROUP") or line.startswith("group"):
continue

parts = line.split()
if len(parts) < 7:
continue

group = parts[0]
topic = parts[1]
partition = parts[2]
current_offset = parts[3]
logend_offset = parts[4]
lag = parts[5]
client_id = parts[6]

#labels = f'group="{group}",topic="{topic}",partition="{partition}",client="{client_id}"'
labels = f'group="{group}",topic="{topic}",partition="{partition}"'

metrics.append(f'kafka_consumer_offset{{{labels}}} {current_offset}')
metrics.append(f'kafka_logend_offset{{{labels}}} {logend_offset}')
metrics.append(f'kafka_consumer_lag{{{labels}}} {lag}')

return metrics


if __name__ == "__main__":

logging.basicConfig(level=logging.INFO)

# ---- Config ----
# http://vms-victoria-metrics-single-server.monitoring.svc.cluster.local.:8428
VM_HOST = get_endpoint_ip('monitoring', 'vms-victoria-metrics-single-server')
VICTORIA_URL = f"http://{VM_HOST}/api/v1/import/prometheus"
COMMAND = ["/root/bin/kafka-get-queue-informations.sh", "relay", "-q"]

GSS_URL = "https://gss.vm.cesnet.cz/odata/v1"

metrics = []

# Run the command and capture output
proc = subprocess.run(COMMAND, capture_output=True, text=True, check=True)
metrics += parse_kafka(proc.stdout)
metrics += check_product_counts.gss_metrics(GSS_URL)

metrics_text = "\n".join(metrics) + "\n"

logging.debug(VICTORIA_URL)
logging.debug(metrics_text)

# Push to VictoriaMetrics
resp = requests.post(VICTORIA_URL, data=metrics_text.encode("utf-8"))
resp.raise_for_status()
logging.debug(f"Pushed metrics successfully: {resp.text}")