Skip to content

Commit

Permalink
Update daskjob_create_components to use kr8s (#781)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson committed Jul 26, 2023
1 parent 1ff7534 commit 4d4755e
Showing 1 changed file with 44 additions and 55 deletions.
99 changes: 44 additions & 55 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -663,62 +663,51 @@ async def daskjob_create_components(
spec, name, namespace, logger, patch, meta, **kwargs
):
logger.info("Creating Dask job components.")
async with kubernetes.client.api_client.ApiClient() as api_client:
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
corev1api = kubernetes.client.CoreV1Api(api_client)

cluster_name = f"{name}"
labels = _get_labels(meta)
annotations = _get_annotations(meta)
cluster_spec = spec["cluster"]
if "metadata" in cluster_spec:
if "annotations" in cluster_spec["metadata"]:
annotations.update(**cluster_spec["metadata"]["annotations"])
if "labels" in cluster_spec["metadata"]:
labels.update(**cluster_spec["metadata"]["labels"])
cluster_spec = build_cluster_spec(
cluster_name,
cluster_spec["spec"]["worker"],
cluster_spec["spec"]["scheduler"],
annotations,
labels,
)
kopf.adopt(cluster_spec)
await customobjectsapi.create_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
body=cluster_spec,
)
logger.info(
f"Cluster {cluster_spec['metadata']['name']} for job {name} created in {namespace}."
)
cluster_name = f"{name}"
labels = _get_labels(meta)
annotations = _get_annotations(meta)
cluster_spec = spec["cluster"]
if "metadata" in cluster_spec:
if "annotations" in cluster_spec["metadata"]:
annotations.update(**cluster_spec["metadata"]["annotations"])
if "labels" in cluster_spec["metadata"]:
labels.update(**cluster_spec["metadata"]["labels"])
cluster_spec = build_cluster_spec(
cluster_name,
cluster_spec["spec"]["worker"],
cluster_spec["spec"]["scheduler"],
annotations,
labels,
)
kopf.adopt(cluster_spec)
cluster = await DaskCluster(cluster_spec, namespace=namespace)
await cluster.create()
logger.info(
f"Cluster {cluster_spec['metadata']['name']} for job {name} created in {namespace}."
)

labels = _get_labels(meta)
annotations = _get_annotations(meta)
job_spec = spec["job"]
if "metadata" in job_spec:
if "annotations" in job_spec["metadata"]:
annotations.update(**job_spec["metadata"]["annotations"])
if "labels" in job_spec["metadata"]:
labels.update(**job_spec["metadata"]["labels"])
job_pod_spec = build_job_pod_spec(
job_name=name,
cluster_name=cluster_name,
namespace=namespace,
spec=job_spec["spec"],
annotations=annotations,
labels=labels,
)
kopf.adopt(job_pod_spec)
await corev1api.create_namespaced_pod(
namespace=namespace,
body=job_pod_spec,
)
patch.status["clusterName"] = cluster_name
patch.status["jobStatus"] = "ClusterCreated"
patch.status["jobRunnerPodName"] = get_job_runner_pod_name(name)
labels = _get_labels(meta)
annotations = _get_annotations(meta)
job_spec = spec["job"]
if "metadata" in job_spec:
if "annotations" in job_spec["metadata"]:
annotations.update(**job_spec["metadata"]["annotations"])
if "labels" in job_spec["metadata"]:
labels.update(**job_spec["metadata"]["labels"])
job_pod_spec = build_job_pod_spec(
job_name=name,
cluster_name=cluster_name,
namespace=namespace,
spec=job_spec["spec"],
annotations=annotations,
labels=labels,
)
kopf.adopt(job_pod_spec)
job_pod = await Pod(job_pod_spec, namespace=namespace)
await job_pod.create()
patch.status["clusterName"] = cluster_name
patch.status["jobStatus"] = "ClusterCreated"
patch.status["jobRunnerPodName"] = get_job_runner_pod_name(name)


@kopf.on.field(
Expand Down

0 comments on commit 4d4755e

Please sign in to comment.