Skip to content

Commit

Permalink
Replace kubernetes_asyncio with kr8s in daskautoscaler_create (#756)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson committed Jun 30, 2023
1 parent 2aa43ed commit cfae6dd
Showing 1 changed file with 4 additions and 24 deletions.
28 changes: 4 additions & 24 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -854,31 +854,11 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg


@kopf.on.create("daskautoscaler.kubernetes.dask.org")
async def daskautoscaler_create(spec, name, namespace, logger, **kwargs):
async def daskautoscaler_create(name, spec, namespace, logger, patch, **kwargs):
"""When an autoscaler is created make it a child of the associated cluster for cascade deletion."""
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CustomObjectsApi(api_client)
cluster = await api.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=spec["cluster"],
)
new_spec = dict(spec)
kopf.adopt(new_spec, owner=cluster)
api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
await api.patch_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=namespace,
name=name,
body=new_spec,
)
logger.info(f"Successfully adopted by {spec['cluster']}")
cluster = await DaskCluster.get(spec["cluster"], namespace=namespace)
kopf.adopt(patch, owner=cluster.raw)
logger.info(f"Autoscaler {name} successfully adopted by cluster {spec['cluster']}")


@kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0)
Expand Down

0 comments on commit cfae6dd

Please sign in to comment.