Skip to content

Commit

Permalink
Migrate KubeCluster component status to kr8s (#792)
Browse files Browse the repository at this point in the history
* Remove unused import

* Get before deletion

* Bump kr8s to 0.8.8
  • Loading branch information
jacobtomlinson committed Jul 31, 2023
1 parent e730a89 commit 8c8827b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 45 deletions.
72 changes: 28 additions & 44 deletions dask_kubernetes/operator/kubecluster/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
from rich.console import Group
from rich.panel import Panel
from rich.spinner import Spinner
import pykube.exceptions
import kubernetes_asyncio as kubernetes
import yaml
import kr8s
from kr8s.asyncio.objects import Pod, Service

import dask.config
from distributed.core import Status, rpc
Expand All @@ -44,9 +44,7 @@
from dask_kubernetes.aiopykube import HTTPClient, KubeConfig
from dask_kubernetes.aiopykube.dask import (
DaskCluster as AIODaskCluster,
DaskWorkerGroup as AIODaskWorkerGroup,
)
from dask_kubernetes.aiopykube.objects import Pod, Service
from dask_kubernetes.exceptions import CrashLoopBackOffError, SchedulerStartupError
from dask_kubernetes.operator._objects import (
DaskCluster,
Expand Down Expand Up @@ -504,53 +502,39 @@ async def _wait_for_controller(self):
async def _watch_component_status(self):
while True:
# Get DaskCluster status
with suppress(pykube.exceptions.ObjectDoesNotExist):
cluster = await AIODaskCluster.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name)
self._startup_component_status["cluster"] = cluster.obj["status"][
"phase"
]
with suppress(kr8s.NotFoundError):
cluster = await DaskCluster.get(self.name, namespace=self.namespace)
if "status" in cluster.raw and "phase" in cluster.status:
self._startup_component_status["cluster"] = cluster.status.phase

# Get Scheduler Pod status
with suppress(pykube.exceptions.ObjectDoesNotExist):
async with kubernetes.client.api_client.ApiClient() as api_client:
core_api = kubernetes.client.CoreV1Api(api_client)
pods = await core_api.list_namespaced_pod(
namespace=self.namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
)
if pods.items:
pod = await Pod.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(pods.items[0].metadata.name)
phase = pod.obj["status"]["phase"]
if phase == "Running":
conditions = {
c["type"]: c["status"]
for c in pod.obj["status"]["conditions"]
}
if "Ready" not in conditions or conditions["Ready"] != "True":
phase = "Health Checking"
if "containerStatuses" in pod.obj["status"]:
for container in pod.obj["status"]["containerStatuses"]:
if "waiting" in container["state"]:
phase = container["state"]["waiting"]["reason"]

self._startup_component_status["schedulerpod"] = phase
with suppress(kr8s.NotFoundError):
scheduler_pod = await Pod.get(
namespace=self.namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={self.name}",
)

phase = scheduler_pod.status.phase
if scheduler_pod.status.phase == "Running":
if not await scheduler_pod.ready():
phase = "Health Checking"
if "container_statuses" in scheduler_pod.status:
for container in scheduler_pod.status.container_statuses:
if "waiting" in container.state:
phase = container.state.waiting.reason

self._startup_component_status["schedulerpod"] = phase

# Get Scheduler Service status
with suppress(pykube.exceptions.ObjectDoesNotExist):
await Service.objects(self.k8s_api).get_by_name(
self.name + "-scheduler"
)
with suppress(kr8s.NotFoundError):
await Service.get(self.name + "-scheduler", namespace=self.namespace)
self._startup_component_status["schedulerservice"] = "Created"

# Get DaskWorkerGroup status
with suppress(pykube.exceptions.ObjectDoesNotExist):
await AIODaskWorkerGroup.objects(
self.k8s_api, namespace=self.namespace
).get_by_name(self.name + "-default")
with suppress(kr8s.NotFoundError):
await DaskWorkerGroup.get(
self.name + "-default", namespace=self.namespace
)
self._startup_component_status["workergroup"] = "Created"

await asyncio.sleep(1)
Expand Down Expand Up @@ -745,7 +729,7 @@ def close(self, timeout=3600):
async def _close(self, timeout=3600):
await super()._close()
if self.shutdown_on_close:
cluster = await DaskCluster(self.name, namespace=self.namespace)
cluster = await DaskCluster.get(self.name, namespace=self.namespace)
try:
await cluster.delete()
except kr8s.NotFoundError:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.8.7
kr8s==0.8.8

0 comments on commit 8c8827b

Please sign in to comment.