Skip to content

Commit

Permalink
Take into account multiple replicas when discovering jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
iszulcdeepsense committed Mar 1, 2023
1 parent 5e37869 commit 9d2e53f
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 20 deletions.
25 changes: 18 additions & 7 deletions src/monitor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from typing import Callable, Iterable

from kubernetes import client
from kubernetes.client import V1ObjectMeta, ApiException
from kubernetes.client import V1ObjectMeta, V1PodStatus, ApiException

from lifecycle.config import Config
from lifecycle.monitor.base import JobMonitor
Expand All @@ -15,7 +15,7 @@
from racetrack_commons.entities.dto import JobDto, JobStatus
from racetrack_client.log.logs import get_logger

from utils import k8s_api_client, K8S_JOB_NAME_LABEL, K8S_JOB_VERSION_LABEL, \
from utils import get_recent_job_pod, k8s_api_client, K8S_JOB_NAME_LABEL, K8S_JOB_VERSION_LABEL, \
K8S_NAMESPACE, K8S_JOB_RESOURCE_LABEL, get_job_deployments, get_job_pods

logger = get_logger(__name__)
Expand All @@ -35,21 +35,31 @@ def list_jobs(self, config: Config) -> Iterable[JobDto]:

with wrap_context('listing Kubernetes API'):
deployments = get_job_deployments(apps_api)
pods = get_job_pods(core_api)
pods_by_job = get_job_pods(core_api)

for resource_name, deployment in deployments.items():
pod = pods.get(resource_name)
if pod is None:
pods = pods_by_job.get(resource_name)
if pods is None or len(pods) == 0:
continue

metadata: V1ObjectMeta = pod.metadata
recent_pod = get_recent_job_pod(pods)
metadata: V1ObjectMeta = recent_pod.metadata
job_name = metadata.labels.get(K8S_JOB_NAME_LABEL)
job_version = metadata.labels.get(K8S_JOB_VERSION_LABEL)
if not (job_name and job_version):
continue

start_timestamp = datetime_to_timestamp(pod.metadata.creation_timestamp)
start_timestamp = datetime_to_timestamp(recent_pod.metadata.creation_timestamp)
internal_name = f'{resource_name}.{K8S_NAMESPACE}.svc:7000'

replica_internal_names = []
for pod in pods:
pod_status: V1PodStatus = pod.status
pod_ip_dns: str = pod_status.pod_ip.replace('.', '-')
replica_internal_names.append(
f'{pod_ip_dns}.{resource_name}.{K8S_NAMESPACE}.svc:7000'
)

job = JobDto(
name=job_name,
version=job_version,
Expand All @@ -60,6 +70,7 @@ def list_jobs(self, config: Config) -> Iterable[JobDto]:
internal_name=internal_name,
error=None,
infrastructure_target=self.infrastructure_name,
replica_internal_names=replica_internal_names,
)
try:
job_url = self._get_internal_job_url(job)
Expand Down
2 changes: 1 addition & 1 deletion src/plugin-manifest.yaml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
name: kubernetes-infrastructure
version: 1.2.1
version: 1.2.2
url: https://github.com/TheRacetrack/plugin-kubernetes-infrastructure
25 changes: 13 additions & 12 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations
from collections import defaultdict
import os
from typing import List, Dict

from kubernetes import client
from kubernetes.client import V1ObjectMeta, V1Pod, V1Deployment
from kubernetes.client import V1ObjectMeta, V1Pod, V1Deployment, V1PodStatus
from kubernetes.config import load_incluster_config

K8S_NAMESPACE = os.environ.get('JOB_K8S_NAMESPACE', 'racetrack')
Expand All @@ -16,30 +17,29 @@ def k8s_api_client() -> client.ApiClient:
return client.ApiClient()


def get_recent_job_pod(pods: List[V1Pod]) -> str:
def get_recent_job_pod(pods: list[V1Pod]) -> V1Pod:
"""If many pods are found, return the latest alive pod"""
assert pods, 'no pod found with expected job label'
pods_alive = [pod for pod in pods if pod.metadata.deletion_timestamp is None] # ignore Terminating pods
assert pods_alive, 'no alive pod found with expected job label'
recent_pod = sorted(pods_alive, key=lambda pod: pod.metadata.creation_timestamp)[-1]
metadata: V1ObjectMeta = recent_pod.metadata
return metadata.name
return recent_pod


def get_job_pod_names(pods: List[V1Pod]) -> List[str]:
def get_job_pod_names(pods: list[V1Pod]) -> list[str]:
"""Get alive job pods names"""
assert pods, 'empty pods list'
pods_alive = [pod for pod in pods if pod.metadata.deletion_timestamp is None] # ignore Terminating pods
assert pods_alive, 'no alive pod found'
return [pod.metadata.name for pod in pods_alive]


def get_job_deployments(apps_api: client.AppsV1Api) -> Dict[str, V1Deployment]:
def get_job_deployments(apps_api: client.AppsV1Api) -> dict[str, V1Deployment]:
job_deployments = {}
_continue = None # pointer to the query in case of multiple pages
while True:
ret = apps_api.list_namespaced_deployment(K8S_NAMESPACE, limit=100, _continue=_continue)
deployments: List[V1Deployment] = ret.items
deployments: list[V1Deployment] = ret.items

for deployment in deployments:
metadata: V1ObjectMeta = deployment.metadata
Expand All @@ -54,12 +54,13 @@ def get_job_deployments(apps_api: client.AppsV1Api) -> Dict[str, V1Deployment]:
return job_deployments


def get_job_pods(core_api: client.CoreV1Api) -> Dict[str, V1Pod]:
job_pods = {}
def get_job_pods(core_api: client.CoreV1Api) -> dict[str, list[V1Pod]]:
"""Return mapping: resource name (job_name & job_version) -> list of pods"""
job_pods = defaultdict(list)
_continue = None # pointer to the query in case of multiple pages
while True:
ret = core_api.list_namespaced_pod(K8S_NAMESPACE, limit=100, _continue=_continue)
pods: List[V1Pod] = ret.items
pods: list[V1Pod] = ret.items

for pod in pods:
metadata: V1ObjectMeta = pod.metadata
Expand All @@ -71,7 +72,7 @@ def get_job_pods(core_api: client.CoreV1Api) -> Dict[str, V1Pod]:

if K8S_JOB_RESOURCE_LABEL in metadata.labels:
name = metadata.labels[K8S_JOB_RESOURCE_LABEL]
job_pods[name] = pod
job_pods[name].append(pod)

_continue = ret.metadata._continue
if _continue is None:
Expand Down

0 comments on commit 9d2e53f

Please sign in to comment.