Skip to content

Commit

Permalink
Replace kubernetes_asyncio with kr8s in daskworkergroup_create (#758)
Browse files Browse the repository at this point in the history
* Replace kubernetes_asyncio with kr8s in daskworkergroup_create

* Increase timeout

* Be explicit with owner kwarg

* Bump kr8s to workaround bug

* Bump kr8s again to workaround bug in Python 3.9 too

* Revert timeout increase

* Reinstate timeout increase

* Switch to kr8s adoption method

* Simplify

* Fix problem with body not being a dict

* Revert workaround due to upstream fix in kr8s 0.8.6

* Fail tests early so we can see tracebacks for those that time out

* Remove maxfail
  • Loading branch information
jacobtomlinson committed Jul 14, 2023
1 parent e3b1786 commit 0cd5def
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 29 deletions.
35 changes: 7 additions & 28 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,38 +383,17 @@ async def handle_scheduler_service_status(


@kopf.on.create("daskworkergroup.kubernetes.dask.org")
async def daskworkergroup_create(spec, name, namespace, logger, **kwargs):
async with kubernetes.client.api_client.ApiClient() as api_client:
api = kubernetes.client.CustomObjectsApi(api_client)
cluster = await api.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=spec["cluster"],
)
new_spec = dict(spec)
kopf.adopt(new_spec, owner=cluster)
api.api_client.set_default_header(
"content-type", "application/merge-patch+json"
)
await api.patch_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=name,
body=new_spec,
)
logger.info(f"Successfully adopted by {spec['cluster']}")
async def daskworkergroup_create(body, logger, **kwargs):
wg = await DaskWorkerGroup(body)
cluster = await wg.cluster()
await cluster.adopt(wg)
logger.info(f"Successfully adopted by {cluster.name}")

del kwargs["new"]
await daskworkergroup_replica_update(
spec=spec,
name=name,
namespace=namespace,
body=body,
logger=logger,
new=spec["worker"]["replicas"],
new=wg.replicas,
**kwargs,
)

Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.8.5
kr8s==0.8.6

0 comments on commit 0cd5def

Please sign in to comment.