In [12]:
# Gym
import os
import random
from typing import Any

# helpers
import numpy as np
from gymnasium import Env
from gymnasium.spaces import Box, Dict, Discrete, MultiBinary, MultiDiscrete, Tuple

# Stable-baseline
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from environment import KubernetesEnv

# Custom
from typing import Optional
from database.influxdb import InfluxDB
import logging
from kubernetes import client
from utils import setup_logger
from prometheus_api_client import PrometheusConnect
import time

In [13]:
logger = setup_logger(
    "kubernetes_agent", log_level=os.getenv("LOG_LEVEL", "INFO"), log_to_file=True
)

In [None]:
def wait_for_pods_ready(
    prometheus: PrometheusConnect,
    namespace: str,
    deployment_name: str,
    timeout: int,
):
    start_time = time.time()
    ready_replicas = 0

    scope_ready = f"""
        (kube_pod_status_ready{{namespace="{namespace}", condition="true"}} == 1)
        and on(pod)
        (
          label_replace(
            kube_pod_owner{{namespace="{namespace}", owner_kind="ReplicaSet"}},
            "replicaset", "$1", "owner_name", "(.*)"
          )
          * on(namespace, replicaset) group_left(owner_name)
            kube_replicaset_owner{{
              namespace="{namespace}", owner_kind="Deployment", owner_name="{deployment_name}"
            }}
        )
    """  # noqa: E501
    q_desired = f"""
    scalar(
      sum(
        kube_deployment_spec_replicas{{namespace="{namespace}",
        deployment="{deployment_name}"}}
      )
    )
    """
    q_ready = f"""
      scalar(sum({scope_ready}))
    """

    while time.time() - start_time < timeout:
        desired_result = prometheus.custom_query(q_desired)
        desired = desired_result[1]
        ready_result = prometheus.custom_query(query=q_ready)
        ready = ready_result[1]
        if ready == desired:
            return True, desired, ready
        logger.info(
            f"Waiting for pods to be ready: {ready}/{desired}"
        )
        time.sleep(1)
    return False, desired, ready

In [3]:
def _metrics_query(
    namespace: str,
    deployment_name: str,
    interval: int = 15,
    desired_replicas: int | None = None,
    quantile: float = 0.90,
    endpoints_method: list[tuple[str, str]] = (("/", "GET"), ("/docs", "GET")),
) -> tuple[str, str, str, str, str]:
    """
    Build pod-scoped queries and cap to the youngest desired pods.

    We use topk on pod start time to keep only the newest N pods (desired replicas),
    so older pods that are still Ready after a scale-down do not contribute.
    """
    # Default to a reasonable cap if desired_replicas is None
    pod_window = max(1, desired_replicas or 50)

    pod_filter = f"""
        topk({pod_window},
          kube_pod_start_time{{
            namespace="{namespace}",
            pod=~"{deployment_name}-.*"
          }}
          * on(pod) group_left()
            (kube_pod_status_ready{{
                namespace="{namespace}",
                pod=~"{deployment_name}-.*",
                condition="true"
            }} == 1)
        )
    """

    cpu_query = f"""
        sum by (pod) (
            rate(container_cpu_usage_seconds_total{{
                namespace="{namespace}",
                pod=~"{deployment_name}-.*",
                container!="",
                container!="POD"
            }}[{interval}s])
        )
        * on(pod) group_left() {pod_filter}
        """

    memory_query = f"""
        sum by (pod) (
            container_memory_working_set_bytes{{
                namespace="{namespace}",
                pod=~"{deployment_name}-.*",
                container!="",
                container!="POD"
            }}
        )
        * on(pod) group_left() {pod_filter}
        """

    cpu_limits_query = f"""
        sum by (pod) (
            kube_pod_container_resource_limits{{
                namespace="{namespace}",
                pod=~"{deployment_name}-.*",
                resource="cpu",
                unit="core"
            }}
        )
        * on(pod) group_left() {pod_filter}
        """

    # Query for memory limits
    memory_limits_query = f"""
        sum by (pod) (
            kube_pod_container_resource_limits{{
                namespace="{namespace}",
                pod=~"{deployment_name}-.*",
                resource="memory",
                unit="byte"
            }}
        )
        * on(pod) group_left() {pod_filter}
        """

    response_time_query = []
    for endpoint, method in endpoints_method:
        response_time_query.append(f"""
                1000 *
                histogram_quantile(
                {quantile},
                sum by (le) (
                    rate(http_request_duration_seconds_bucket{{
                    namespace="{namespace}",
                    pod=~"{deployment_name}-.*",
                    method="{method}",
                    path="{endpoint}"
                    }}[{interval}s])
                )
                )
            """
        )
    return (
        cpu_query,
        memory_query,
        cpu_limits_query,
        memory_limits_query,
        response_time_query,
    )

In [6]:
def process_metrics(cpu_usage, memory_usage, cpu_limits, memory_limits, response_times, max_response_time):
    cpu_percentages = []
    memory_percentages = []

    cpu_limits_by_pod = {}
    memory_limits_by_pod = {}
    for item in cpu_limits:
        pod = item["metric"]["pod"]
        limit = float(item["value"][1])
        cpu_limits_by_pod[pod] = limit

    for item in memory_limits:
        pod = item["metric"]["pod"]
        limit = float(item["value"][1])
        memory_limits_by_pod[pod] = limit

    for result in cpu_usage:
        pod_name = result["metric"].get("pod")
        limit = float(cpu_limits_by_pod.get(pod_name))
        rate_cores = float(result["value"][1])
        cpu_percentage = (rate_cores / limit) * 100
        cpu_percentages.append(cpu_percentage)

    for result in memory_usage:
        pod_name = result["metric"].get("pod")
        limit = float(memory_limits_by_pod.get(pod_name))
        usage_bytes = float(result["value"][1])
        memory_percentage = (usage_bytes / limit) * 100
        memory_percentages.append(memory_percentage)

    response_time = np.mean(response_times)
    response_time_percentage = (response_time / max_response_time) * 100.0
    response_time_percentage = min(response_time_percentage, 1000.0)

    return cpu_percentages, memory_percentages, response_time_percentage

In [None]:
def get_metrics(
    prometheus: PrometheusConnect,
    namespace: str,
    deployment_name: str,
    interval: int,
    replica: int,
    max_response_time: float,
):
    (
        cpu_query,
        memory_query,
        cpu_limits_query,
        memory_limits_query,
        response_time_query,
    )= _metrics_query(
        namespace,
        deployment_name,
        interval=interval,
        desired_replicas=replica,
    )
    cpu_usage_results = prometheus.custom_query(cpu_query)
    memory_usage_results = prometheus.custom_query(memory_query)
    cpu_limits_results = prometheus.custom_query(cpu_limits_query)
    memory_limits_results = prometheus.custom_query(memory_limits_query)

    response_time_results = []
    for query in response_time_query:
        response = prometheus.custom_query(query)
        response_time_results.append(float(response[0]["value"][1]))

    cpu_percentages, memory_percentages, response_time_percentage = process_metrics(
        cpu_usage_results,
        memory_usage_results,
        cpu_limits_results,
        memory_limits_results,
        response_time_results,
        max_response_time,
    )

    return cpu_percentages, memory_percentages, response_time_percentage

In [None]:
class KubernetesEnv(Env):
    def __init__(
        self,
        min_replicas: int = 1,
        max_replicas: int = 50,
        iteration: int = 50,
        namespace: str = "default",
        deployment_name: str = "default",
        min_cpu: float = 20,
        min_memory: float = 20,
        max_cpu: float = 90,
        max_memory: float = 90,
        max_response_time: float = 100.0,
        timeout: int = 120,
        wait_time: int = 30,
        verbose: bool = False,
        logger: Optional[logging.Logger] = None,
        influxdb: Optional[InfluxDB] = None,
        prometheus_url: str = "http://localhost:1234/prom",
        metrics_endpoints_method: list[tuple[str, str]] = (
            ("/", "GET"),
            ("/docs", "GET"),
        ),
        metrics_interval: int = 15,
        metrics_quantile: float = 0.90,
        max_scaling_retries: int = 1000,
    ):
        self.api = client.AppsV1Api()
        self.namespace = namespace
        self.deployment_name = deployment_name
        self.prometheus = PrometheusConnect(
            url=prometheus_url,
            disable_ssl=True,
        )
        self.timeout = timeout
        self.wait_time = wait_time
        self.metrics_interval = metrics_interval
        self.metrics_quantile = metrics_quantile
        self.metrics_endpoints_method = metrics_endpoints_method

        self.logger = logger

        self.action_space = Discrete(100)
        self.observation_space = Box(low=0.0, high=100.0, shape=(17,), dtype=np.float32)
        self.state = 0
        self.iteration = 50
        self.min_replicas: int = min_replicas
        self.max_replicas: int = max_replicas
        self.range_replicas: int = max(1, self.max_replicas - self.min_replicas)
        self.max_response_time: float = max_response_time
        self.min_cpu: float = min_cpu
        self.min_memory: float = min_memory
        self.max_cpu: float = max_cpu
        self.max_memory: float = max_memory

    def step(self, action: int):
        replica = action * self.range_replicas // 100 + self.min_replicas
        logger.info(f"Set state to {self.state} replicas")

        cpu, memory, response_time = self.scale_and_get_metrics(replica)

        cpu, memory = self.calculate_distance(cpu, memory)

        reward = self.calculate_reward(action, cpu, memory, response_time)

        observation = self.observation(action, response_time, cpu, memory)

        return observation, reward, False, False, {}

    def scale_and_get_metrics(self, replica: int) -> None:
        self.api.patch_namespaced_deployment_scale(
            name=self.deployment_name,
            namespace=self.namespace,
            body={"spec": {"replicas": replica}},
        )
        logger.info(f"Scaled to {replica} replicas")
        _, _, _ = wait_for_pods_ready(
            prometheus=self.prometheus,
            deployment_name=self.deployment_name,
            desired_replicas=replica,
            namespace=self.namespace,
            timeout=self.timeout,
            logger=self.logger,
        )
        cpu, memory, response_time = get_metrics(
            prometheus=self.prometheus,
            namespace=self.namespace,
            deployment_name=self.deployment_name,
            interval=self.metrics_interval,
            replica=replica,
            max_response_time=self.max_response_time,
        )
        return cpu, memory, response_time

    def calculate_reward(self, action: int, cpu: float, memory: float, response_time: float) -> float:

        cpu_penalty = min(cpu * cpu, 1.0)
        memory_penalty = min(memory * memory, 1.0)

        RESPONSE_TIME_HIGH_THRESHOLD = 80.0
        RESPONSE_TIME_VIOLATION_THRESHOLD = 100.0
        MAX_RESPONSE_PENALTY = 2.0
        if response_time <= RESPONSE_TIME_HIGH_THRESHOLD:
            response_time_penalty = 0.0
        elif response_time <= RESPONSE_TIME_VIOLATION_THRESHOLD:
            response_time_penalty = (response_time - RESPONSE_TIME_HIGH_THRESHOLD) / (
                RESPONSE_TIME_VIOLATION_THRESHOLD - RESPONSE_TIME_HIGH_THRESHOLD
            )
        else:
            over = (
                response_time - RESPONSE_TIME_VIOLATION_THRESHOLD
            ) / RESPONSE_TIME_VIOLATION_THRESHOLD
            response_time_penalty = 1.0 + over

        response_time_penalty = max(0.0, min(response_time_penalty, MAX_RESPONSE_PENALTY))

        cost_pen = action / 100.0
        reward = 1.0 - 2.0 * (
            cpu_penalty + memory_penalty + response_time_penalty + cost_pen
        )
        return max(reward, -1.0)

    def calculate_distance(self,cpu: float, memory: float) -> tuple[float, float]:
        cpu_distance = (
            (self.min_cpu - cpu) if cpu < self.min_cpu else (cpu - self.max_cpu)
        )
        cpu_bandwidth = self.max_cpu - self.min_cpu
        cpu_normalized = cpu_distance / cpu_bandwidth
        cpu = cpu_normalized * 100.0


        memory_distance = (
            (self.min_memory - memory)
            if memory < self.min_memory
            else (memory - self.max_memory)
        )
        memory_bandwidth = self.max_memory - self.min_memory
        memory_normalized = memory_distance / memory_bandwidth
        memory = memory_normalized * 100.0

        return cpu, memory

    def observation(self, action: int, response_time: float, cpu: float, memory: float):
        return np.array(
            [
                action,
                cpu,
                memory,
                response_time,
            ],
            dtype=np.float32,
        )

In [9]:
env = KubernetesEnv()

In [10]:
env.observation_space.sample()

array([73.97517  , 21.329208 , 55.635826 , 81.22282  , 27.32877  ,
       15.381341 ,  4.8464737, 51.462143 , 30.69403  , 62.61727  ,
        5.507853 , 24.718836 , 92.99492  , 83.80141  , 27.520195 ,
       42.68573  , 95.34111  ], dtype=float32)

In [11]:
env.action_space.sample()

np.int64(45)