Skip to content

Commit

Permalink
Replace kubernetes_asyncio with kr8s in daskcluster_create_components (
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson committed Jul 26, 2023
1 parent 4d4755e commit fe81094
Showing 1 changed file with 47 additions and 74 deletions.
121 changes: 47 additions & 74 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,7 @@
import aiohttp
import kopf
import kr8s
from kr8s.asyncio.objects import Pod, Deployment
import kubernetes_asyncio as kubernetes
from kr8s.asyncio.objects import Pod, Deployment, Service
from importlib_metadata import entry_points

from dask_kubernetes.operator._objects import (
Expand Down Expand Up @@ -287,79 +286,52 @@ async def daskcluster_create_components(
):
"""When the DaskCluster status.phase goes into Created create the cluster components."""
logger.info("Creating Dask cluster components.")
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CoreV1Api(api_client)
custom_api = kubernetes.client.CustomObjectsApi(api_client)

annotations = _get_annotations(meta)
labels = _get_labels(meta)
scheduler_spec = spec.get("scheduler", {})
if "metadata" in scheduler_spec:
if "annotations" in scheduler_spec["metadata"]:
annotations.update(**scheduler_spec["metadata"]["annotations"])
if "labels" in scheduler_spec["metadata"]:
labels.update(**scheduler_spec["metadata"]["labels"])
data = build_scheduler_deployment_spec(
name, namespace, scheduler_spec.get("spec"), annotations, labels
)
kopf.adopt(data)
pod = await api.list_namespaced_pod(
namespace=namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}",
)
if not pod.items:
await kubernetes.client.AppsV1Api(api_client).create_namespaced_deployment(
namespace=namespace,
body=data,
)
logger.info(
f"Scheduler deployment {data['metadata']['name']} created in {namespace}."
)
# Create scheduler deployment
annotations = _get_annotations(meta)
labels = _get_labels(meta)
scheduler_spec = spec.get("scheduler", {})
if "metadata" in scheduler_spec:
if "annotations" in scheduler_spec["metadata"]:
annotations.update(**scheduler_spec["metadata"]["annotations"])
if "labels" in scheduler_spec["metadata"]:
labels.update(**scheduler_spec["metadata"]["labels"])
data = build_scheduler_deployment_spec(
name, namespace, scheduler_spec.get("spec"), annotations, labels
)
kopf.adopt(data)
scheduler_deployment = await Deployment(data, namespace=namespace)
if not await scheduler_deployment.exists():
await scheduler_deployment.create()
logger.info(
f"Scheduler deployment {scheduler_deployment.name} created in {namespace}."
)

data = build_scheduler_service_spec(
name, scheduler_spec.get("service"), annotations, labels
)
kopf.adopt(data)
service = await api.list_namespaced_service(
namespace=namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}",
)
if not service.items:
await api.create_namespaced_service(
namespace=namespace,
body=data,
)
logger.info(
f"Scheduler service {data['metadata']['name']} created in {namespace}."
)
# Create scheduler service
data = build_scheduler_service_spec(
name, scheduler_spec.get("service"), annotations, labels
)
kopf.adopt(data)
scheduler_service = await Service(data, namespace=namespace)
if not await scheduler_service.exists():
await scheduler_service.create()
logger.info(f"Scheduler service {data['metadata']['name']} created in {namespace}.")

# Create default worker group
worker_spec = spec.get("worker", {})
annotations = _get_annotations(meta)
labels = _get_labels(meta)
if "metadata" in worker_spec:
if "annotations" in worker_spec["metadata"]:
annotations.update(**worker_spec["metadata"]["annotations"])
if "labels" in worker_spec["metadata"]:
labels.update(**worker_spec["metadata"]["labels"])
data = build_default_worker_group_spec(name, worker_spec, annotations, labels)
worker_group = await DaskWorkerGroup(data, namespace=namespace)
if not await worker_group.exists():
await worker_group.create()
logger.info(f"Worker group {data['metadata']['name']} created in {namespace}.")

worker_spec = spec.get("worker", {})
annotations = _get_annotations(meta)
labels = _get_labels(meta)
if "metadata" in worker_spec:
if "annotations" in worker_spec["metadata"]:
annotations.update(**worker_spec["metadata"]["annotations"])
if "labels" in worker_spec["metadata"]:
labels.update(**worker_spec["metadata"]["labels"])
data = build_default_worker_group_spec(name, worker_spec, annotations, labels)
worker_group = await custom_api.list_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
label_selector=f"dask.org/component=workergroup,dask.org/cluster-name={name}",
)
if not worker_group["items"]:
await custom_api.create_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
body=data,
)
logger.info(
f"Worker group {data['metadata']['name']} created in {namespace}."
)
patch.status["phase"] = "Pending"


Expand All @@ -382,8 +354,8 @@ async def handle_scheduler_service_status(


@kopf.on.create("daskworkergroup.kubernetes.dask.org")
async def daskworkergroup_create(body, logger, **kwargs):
wg = await DaskWorkerGroup(body)
async def daskworkergroup_create(body, namespace, logger, **kwargs):
wg = await DaskWorkerGroup(body, namespace=namespace)
cluster = await wg.cluster()
await cluster.adopt(wg)
logger.info(f"Successfully adopted by {cluster.name}")
Expand All @@ -393,6 +365,7 @@ async def daskworkergroup_create(body, logger, **kwargs):
body=body,
logger=logger,
new=wg.replicas,
namespace=namespace,
**kwargs,
)

Expand Down

0 comments on commit fe81094

Please sign in to comment.