Skip to content

Commit

Permalink
Change autoscaler adoption to use kr8s (#765)
Browse files Browse the repository at this point in the history
* Release 2023.7.0

* Make autoscaler adoption more explicit with kr8s

* More minor tweaks to improve readability

* Update objects to use dot notation

* Even more simplification
  • Loading branch information
jacobtomlinson committed Jul 13, 2023
1 parent 35d42e5 commit 7493355
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
8 changes: 4 additions & 4 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async def pods(self) -> List[Pod]:
Pod.endpoint,
label_selector=",".join(
[
f"dask.org/cluster-name={self.spec['cluster']}",
f"dask.org/cluster-name={self.spec.cluster}",
"dask.org/component=worker",
f"dask.org/workergroup-name={self.name}",
]
Expand All @@ -98,7 +98,7 @@ async def deployments(self) -> List[Deployment]:
Deployment.endpoint,
label_selector=",".join(
[
f"dask.org/cluster-name={self.spec['cluster']}",
f"dask.org/cluster-name={self.spec.cluster}",
"dask.org/component=worker",
f"dask.org/workergroup-name={self.name}",
]
Expand All @@ -107,7 +107,7 @@ async def deployments(self) -> List[Deployment]:
)

async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec["cluster"], namespace=self.namespace)
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)


class DaskAutoscaler(APIObject):
Expand All @@ -119,7 +119,7 @@ class DaskAutoscaler(APIObject):
namespaced = True

async def cluster(self) -> DaskCluster:
return await DaskCluster.get(self.spec["cluster"], namespace=self.namespace)
return await DaskCluster.get(self.spec.cluster, namespace=self.namespace)


class DaskJob(APIObject):
Expand Down
9 changes: 5 additions & 4 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,11 +831,12 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg


@kopf.on.create("daskautoscaler.kubernetes.dask.org")
async def daskautoscaler_create(name, spec, namespace, logger, patch, **kwargs):
async def daskautoscaler_create(body, logger, **_):
"""When an autoscaler is created make it a child of the associated cluster for cascade deletion."""
cluster = await DaskCluster.get(spec["cluster"], namespace=namespace)
kopf.adopt(patch, owner=cluster.raw)
logger.info(f"Autoscaler {name} successfully adopted by cluster {spec['cluster']}")
autoscaler = await DaskAutoscaler(body)
cluster = await autoscaler.cluster()
await cluster.adopt(autoscaler)
logger.info(f"Autoscaler {autoscaler.name} adopted by cluster {cluster.name}")


@kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.8.4
kr8s==0.8.5

0 comments on commit 7493355

Please sign in to comment.