Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion dask_kubernetes/experimental/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
from .kubecluster import KubeCluster
from .kubecluster import (
KubeCluster,
make_cluster_spec,
make_scheduler_spec,
make_worker_spec,
)
366 changes: 235 additions & 131 deletions dask_kubernetes/experimental/kubecluster.py

Large diffs are not rendered by default.

12 changes: 11 additions & 1 deletion dask_kubernetes/experimental/tests/test_kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from dask.distributed import Client
from distributed.utils import TimeoutError

from dask_kubernetes.experimental import KubeCluster
from dask_kubernetes.experimental import KubeCluster, make_cluster_spec


@pytest.fixture
Expand Down Expand Up @@ -100,3 +100,13 @@ def test_adapt(kopf_runner, docker_image):
# Need to clean up the DaskAutoscaler object
# See https://github.com/dask/dask-kubernetes/issues/546
cluster.scale(0)


def test_custom_spec(kopf_runner, docker_image):
with kopf_runner:
spec = make_cluster_spec("customspec", image=docker_image)
with KubeCluster(
custom_cluster_spec=spec,
) as cluster:
with Client(cluster) as client:
assert client.submit(lambda x: x + 1, 10).result() == 11
76 changes: 44 additions & 32 deletions dask_kubernetes/operator/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ class SchedulerCommError(Exception):
pass


def build_scheduler_pod_spec(name, spec):
def build_scheduler_pod_spec(cluster_name, spec):
return {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": f"{name}-scheduler",
"name": f"{cluster_name}-scheduler",
"labels": {
"dask.org/cluster-name": name,
"dask.org/cluster-name": cluster_name,
"dask.org/component": "scheduler",
"sidecar.istio.io/inject": "false",
},
Expand All @@ -44,14 +44,14 @@ def build_scheduler_pod_spec(name, spec):
}


def build_scheduler_service_spec(name, spec):
def build_scheduler_service_spec(cluster_name, spec):
return {
"apiVersion": "v1",
"kind": "Service",
"metadata": {
"name": f"{name}-service",
"name": f"{cluster_name}-scheduler",
"labels": {
"dask.org/cluster-name": name,
"dask.org/cluster-name": cluster_name,
},
},
"spec": spec,
Expand Down Expand Up @@ -81,7 +81,7 @@ def build_worker_pod_spec(worker_group_name, namespace, cluster_name, uuid, spec
},
{
"name": "DASK_SCHEDULER_ADDRESS",
"value": f"tcp://{cluster_name}-service.{namespace}.svc.cluster.local:8786",
"value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786",
},
]
for i in range(len(pod_spec["spec"]["containers"])):
Expand Down Expand Up @@ -109,7 +109,7 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec):
env = [
{
"name": "DASK_SCHEDULER_ADDRESS",
"value": f"tcp://{cluster_name}-service.{namespace}.svc.cluster.local:8786",
"value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786",
},
]
for i in range(len(pod_spec["spec"]["containers"])):
Expand All @@ -120,13 +120,19 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec):
return pod_spec


def build_worker_group_spec(name, spec):
def build_default_worker_group_spec(cluster_name, spec):
return {
"apiVersion": "kubernetes.dask.org/v1",
"kind": "DaskWorkerGroup",
"metadata": {"name": f"{name}-default-worker-group"},
"metadata": {
"name": f"{cluster_name}-default",
"labels": {
"dask.org/cluster-name": cluster_name,
"dask.org/component": "workergroup",
},
},
"spec": {
"cluster": name,
"cluster": cluster_name,
"worker": spec,
},
}
Expand All @@ -136,7 +142,12 @@ def build_cluster_spec(name, worker_spec, scheduler_spec):
return {
"apiVersion": "kubernetes.dask.org/v1",
"kind": "DaskCluster",
"metadata": {"name": name},
"metadata": {
"name": name,
"labels": {
"dask.org/cluster-name": name,
},
},
"spec": {"worker": worker_spec, "scheduler": scheduler_spec},
}

Expand Down Expand Up @@ -198,9 +209,7 @@ async def daskcluster_create_components(spec, name, namespace, logger, patch, **
)

worker_spec = spec.get("worker", {})
data = build_worker_group_spec(name, worker_spec)
# TODO: Next line is not needed if we can get worker groups adopted by the cluster
kopf.adopt(data)
data = build_default_worker_group_spec(name, worker_spec)
await custom_api.create_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
Expand Down Expand Up @@ -352,7 +361,7 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs):
if workers_needed < 0:
worker_ids = await retire_workers(
n_workers=-workers_needed,
scheduler_service_name=f"{spec['cluster']}-service",
scheduler_service_name=f"{spec['cluster']}-scheduler",
worker_group_name=name,
namespace=namespace,
logger=logger,
Expand All @@ -375,7 +384,7 @@ async def daskjob_create(spec, name, namespace, logger, **kwargs):
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
corev1api = kubernetes.client.CoreV1Api(api_client)

cluster_name = f"{name}-cluster"
cluster_name = f"{name}"
cluster_spec = build_cluster_spec(
cluster_name,
spec["cluster"]["spec"]["worker"],
Expand Down Expand Up @@ -406,20 +415,23 @@ async def daskjob_create(spec, name, namespace, logger, **kwargs):
)


@kopf.on.field("pod", field="status.phase", labels={"dask.org/component": "job-runner"})
@kopf.on.field(
"pod",
field="status.phase",
labels={"dask.org/component": "job-runner"},
new="Succeeded",
)
async def handle_runner_status_change(meta, new, namespace, logger, **kwargs):
logger.info(f"Job now in phase {new}.")
if new == "Succeeded":
logger.info("Job succeeded, deleting Dask cluster.")
async with kubernetes.client.api_client.ApiClient() as api_client:
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
await customobjectsapi.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
)
logger.info("Job succeeded, deleting Dask cluster.")
async with kubernetes.client.api_client.ApiClient() as api_client:
customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
await customobjectsapi.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace=namespace,
name=meta["labels"]["dask.org/cluster-name"],
)


@kopf.on.create("daskautoscaler")
Expand Down Expand Up @@ -455,7 +467,7 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
# Ask the scheduler for the desired number of worker
try:
desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-service",
scheduler_service_name=f"{spec['cluster']}-scheduler",
namespace=namespace,
logger=logger,
)
Expand All @@ -479,6 +491,6 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=f"{spec['cluster']}-default-worker-group",
name=f"{spec['cluster']}-default",
body={"spec": {"replicas": desired_workers}},
)
4 changes: 2 additions & 2 deletions dask_kubernetes/operator/tests/resources/simplecluster.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: kubernetes.dask.org/v1
kind: DaskCluster
metadata:
name: simple-cluster
name: simple
namespace: default
spec:
worker:
Expand Down Expand Up @@ -51,7 +51,7 @@ spec:
service:
type: ClusterIP
selector:
dask.org/cluster-name: simple-cluster
dask.org/cluster-name: simple
dask.org/component: scheduler
ports:
- name: tcp-comm
Expand Down
2 changes: 1 addition & 1 deletion dask_kubernetes/operator/tests/resources/simplejob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ spec:
service:
type: ClusterIP
selector:
dask.org/cluster-name: simple-job-cluster
dask.org/cluster-name: simple-job
dask.org/component: scheduler
ports:
- name: tcp-comm
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
apiVersion: kubernetes.dask.org/v1
kind: DaskWorkerGroup
metadata:
name: simple-cluster-additional-worker-group
name: simple-additional
namespace: default
spec:
cluster: simple-cluster
cluster: simple
worker:
replicas: 2
spec:
Expand Down
25 changes: 12 additions & 13 deletions dask_kubernetes/operator/tests/test_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def gen_cluster(k8s_cluster):
@asynccontextmanager
async def cm():
cluster_path = os.path.join(DIR, "resources", "simplecluster.yaml")
cluster_name = "simple-cluster"
cluster_name = "simple"

# Create cluster resource
k8s_cluster.kubectl("apply", "-f", cluster_path)
Expand Down Expand Up @@ -88,9 +88,9 @@ def test_operator_plugins(kopf_runner):
async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster):
with kopf_runner as runner:
async with gen_cluster() as cluster_name:
scheduler_pod_name = "simple-cluster-scheduler"
worker_pod_name = "simple-cluster-default-worker-group-worker"
service_name = "simple-cluster-service"
scheduler_pod_name = "simple-scheduler"
worker_pod_name = "simple-default-worker"
service_name = "simple-scheduler"
while scheduler_pod_name not in k8s_cluster.kubectl("get", "pods"):
await asyncio.sleep(0.1)
while service_name not in k8s_cluster.kubectl("get", "svc"):
Expand All @@ -109,14 +109,14 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster):
"scale",
"--replicas=5",
"daskworkergroup",
"simple-cluster-default-worker-group",
"simple-default",
)
await client.wait_for_workers(5)
k8s_cluster.kubectl(
"scale",
"--replicas=3",
"daskworkergroup",
"simple-cluster-default-worker-group",
"simple-default",
)
await client.wait_for_workers(3)

Expand All @@ -126,9 +126,9 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster):
async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster):
with kopf_runner as runner:
async with gen_cluster() as cluster_name:
scheduler_pod_name = "simple-cluster-scheduler"
worker_pod_name = "simple-cluster-default-worker-group-worker"
service_name = "simple-cluster-service"
scheduler_pod_name = "simple-scheduler"
worker_pod_name = "simple-default-worker"
service_name = "simple-scheduler"

while scheduler_pod_name not in k8s_cluster.kubectl("get", "pods"):
await asyncio.sleep(0.1)
Expand Down Expand Up @@ -177,23 +177,22 @@ async def test_job(k8s_cluster, kopf_runner, gen_job):
async with gen_job() as job:
assert job

cluster_name = f"{job}-cluster"
runner_name = f"{job}-runner"

# Assert that cluster is created
while cluster_name not in k8s_cluster.kubectl("get", "daskclusters"):
while job not in k8s_cluster.kubectl("get", "daskclusters"):
await asyncio.sleep(0.1)

# Assert job pod is created
while runner_name not in k8s_cluster.kubectl("get", "po"):
while job not in k8s_cluster.kubectl("get", "po"):
await asyncio.sleep(0.1)

# Assert job pod runs to completion (will fail if doesn't connect to cluster)
while "Completed" not in k8s_cluster.kubectl("get", "po", runner_name):
await asyncio.sleep(0.1)

# Assert cluster is removed on completion
while cluster_name in k8s_cluster.kubectl("get", "daskclusters"):
while job in k8s_cluster.kubectl("get", "daskclusters"):
await asyncio.sleep(0.1)

assert "A DaskJob has been created" in runner.stdout
Expand Down
48 changes: 48 additions & 0 deletions doc/source/operator_kubecluster.rst
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,51 @@ Additional worker groups can also be deleted in Python.

Any additional worker groups you create will be deleted when the cluster is deleted.

Custom cluster spec
-------------------

The ``KubeCluster`` class can take a selection of keyword arguments to make it quick and easy to get started, however the underlying :doc:`DaskCluster <operator_resources>` resource can be much more complex and configured in many ways.
Rather than exposing every possibility via keyword arguments instead you can pass a valid ``DaskCluster`` resource spec which will be used when creating the cluster.
You can also generate a spec with :func:`make_cluster_spec` which ``KubeCluster`` uses internally and then modify it with your custom options.


.. code-block:: python

from dask_kubernetes.experimental import KubeCluster, make_cluster_spec

config = {
"name": "foo",
"n_workers": 2,
"resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}}
}

cluster = KubeCluster(**config)
# is equivalent to
cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config))

You can also modify the spec before passing it to ``KubeCluster``, for example if you want to set ``nodeSelector`` on your worker pods you could do it like this:

.. code-block:: python

from dask_kubernetes.experimental import KubeCluster, make_cluster_spec

spec = make_cluster_spec(name="selector-example", n_workers=2)
spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"}

cluster = KubeCluster(custom_cluster_spec=spec)

The ``cluster.add_worker_group()`` method also supports passing a ``custom_spec`` keyword argument which can be generated with :func:`make_worker_spec`.

.. code-block:: python

from dask_kubernetes.experimental import KubeCluster, make_worker_spec

cluster = KubeCluster(name="example")

worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}})
worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"}

cluster.add_worker_group(custom_spec=worker_spec)


.. _api:
Expand All @@ -108,3 +153,6 @@ API

.. autoclass:: KubeCluster
:members:

.. autofunction:: make_cluster_spec
.. autofunction:: make_worker_spec
Loading