Skip to content

Commit

Permalink
Add checks for scheduler, service, and worker groups (#719)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt711 committed May 19, 2023
1 parent ada3a2d commit 5bfb527
Showing 1 changed file with 30 additions and 10 deletions.
40 changes: 30 additions & 10 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,6 @@ async def daskcluster_create_components(

annotations = _get_annotations(meta)
labels = _get_labels(meta)
# TODO Check for existing scheduler pod
scheduler_spec = spec.get("scheduler", {})
if "metadata" in scheduler_spec:
if "annotations" in scheduler_spec["metadata"]:
Expand All @@ -275,21 +274,32 @@ async def daskcluster_create_components(
name, scheduler_spec.get("spec"), annotations, labels
)
kopf.adopt(data)
await api.create_namespaced_pod(
pod = await api.list_namespaced_pod(
namespace=namespace,
body=data,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}",
)
logger.info(f"Scheduler pod {data['metadata']['name']} created in {namespace}.")
if not pod.items:
await api.create_namespaced_pod(
namespace=namespace,
body=data,
)
logger.info(
f"Scheduler pod {data['metadata']['name']} created in {namespace}."
)

# TODO Check for existing scheduler service
data = build_scheduler_service_spec(
name, scheduler_spec.get("service"), annotations, labels
)
kopf.adopt(data)
await api.create_namespaced_service(
service = await api.list_namespaced_service(
namespace=namespace,
body=data,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={name}",
)
if not pod.items:
await api.create_namespaced_service(
namespace=namespace,
body=data,
)
logger.info(
f"Scheduler service {data['metadata']['name']} created in {namespace}."
)
Expand All @@ -303,14 +313,24 @@ async def daskcluster_create_components(
if "labels" in worker_spec["metadata"]:
labels.update(**worker_spec["metadata"]["labels"])
data = build_default_worker_group_spec(name, worker_spec, annotations, labels)
await custom_api.create_namespaced_custom_object(
worker_group = await custom_api.list_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
body=data,
label_selector=f"dask.org/component=workergroup,dask.org/cluster-name={name}",
)
logger.info(f"Worker group {data['metadata']['name']} created in {namespace}.")
if not worker_group["items"]:
await custom_api.create_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
body=data,
)
logger.info(
f"Worker group {data['metadata']['name']} created in {namespace}."
)
patch.status["phase"] = "Pending"


Expand Down

0 comments on commit 5bfb527

Please sign in to comment.