Skip to content

Commit

Permalink
Update DaskJob state transitions to use kr8s (#755)
Browse files Browse the repository at this point in the history
* Release 2023.6.1

* Update DaskJob state transitions to use kr8s

* Improve readibility

* Ensure we patch the subresource
  • Loading branch information
jacobtomlinson committed Jul 12, 2023
1 parent fb4a4aa commit c14d751
Showing 1 changed file with 38 additions and 62 deletions.
100 changes: 38 additions & 62 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
DaskCluster,
DaskAutoscaler,
DaskWorkerGroup,
DaskJob,
)
from dask_kubernetes.common.auth import ClusterAuth
from dask_kubernetes.common.networking import get_scheduler_address
Expand Down Expand Up @@ -760,22 +761,17 @@ async def daskjob_create_components(
)
async def handle_runner_status_change_running(meta, namespace, logger, **kwargs):
logger.info("Job now in running")
async with kubernetes.client.api_client.ApiClient() as api_client:
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
api_client.set_default_header("content-type", "application/merge-patch+json")
await customobjectsapi.patch_namespaced_custom_object_status(
group="kubernetes.dask.org",
version="v1",
plural="daskjobs",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
body={
"status": {
"jobStatus": "Running",
"startTime": datetime.utcnow().strftime(KUBERNETES_DATETIME_FORMAT),
}
},
)
name = meta["labels"]["dask.org/cluster-name"]
job = await DaskJob.get(name, namespace=namespace)
await job.patch(
{
"status": {
"jobStatus": "Running",
"startTime": datetime.utcnow().strftime(KUBERNETES_DATETIME_FORMAT),
}
},
subresource="status",
)


@kopf.on.field(
Expand All @@ -786,29 +782,19 @@ async def handle_runner_status_change_running(meta, namespace, logger, **kwargs)
)
async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwargs):
logger.info("Job succeeded, deleting Dask cluster.")
async with kubernetes.client.api_client.ApiClient() as api_client:
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
await customobjectsapi.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
)
api_client.set_default_header("content-type", "application/merge-patch+json")
await customobjectsapi.patch_namespaced_custom_object_status(
group="kubernetes.dask.org",
version="v1",
plural="daskjobs",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
body={
"status": {
"jobStatus": "Successful",
"endTime": datetime.utcnow().strftime(KUBERNETES_DATETIME_FORMAT),
}
},
)
name = meta["labels"]["dask.org/cluster-name"]
cluster = await DaskCluster.get(name, namespace=namespace)
await cluster.delete()
job = await DaskJob.get(name, namespace=namespace)
await job.patch(
{
"status": {
"jobStatus": "Successful",
"endTime": datetime.utcnow().strftime(KUBERNETES_DATETIME_FORMAT),
}
},
subresource="status",
)


@kopf.on.field(
Expand All @@ -819,29 +805,19 @@ async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwarg
)
async def handle_runner_status_change_succeeded(meta, namespace, logger, **kwargs):
logger.info("Job failed, deleting Dask cluster.")
async with kubernetes.client.api_client.ApiClient() as api_client:
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
await customobjectsapi.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
)
api_client.set_default_header("content-type", "application/merge-patch+json")
await customobjectsapi.patch_namespaced_custom_object_status(
group="kubernetes.dask.org",
version="v1",
plural="daskjobs",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
body={
"status": {
"jobStatus": "Failed",
"endTime": datetime.utcnow().strftime(KUBERNETES_DATETIME_FORMAT),
}
},
)
name = meta["labels"]["dask.org/cluster-name"]
cluster = await DaskCluster.get(name, namespace=namespace)
await cluster.delete()
job = await DaskJob.get(name, namespace=namespace)
await job.patch(
{
"status": {
"jobStatus": "Failed",
"endTime": datetime.utcnow().strftime(KUBERNETES_DATETIME_FORMAT),
}
},
subresource="status",
)


@kopf.on.create("daskautoscaler.kubernetes.dask.org")
Expand Down

0 comments on commit c14d751

Please sign in to comment.