diff --git a/dask_kubernetes/experimental/__init__.py b/dask_kubernetes/experimental/__init__.py index 2c14f9559..b9909f972 100644 --- a/dask_kubernetes/experimental/__init__.py +++ b/dask_kubernetes/experimental/__init__.py @@ -1 +1,6 @@ -from .kubecluster import KubeCluster +from .kubecluster import ( + KubeCluster, + make_cluster_spec, + make_scheduler_spec, + make_worker_spec, +) diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index edd438b2c..4090f57a7 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -22,7 +22,6 @@ from dask_kubernetes.common.auth import ClusterAuth from dask_kubernetes.common.utils import namespace_default from dask_kubernetes.operator import ( - build_cluster_spec, wait_for_service, ) @@ -91,6 +90,9 @@ class KubeCluster(Cluster): then it is likely the controller isn't running or is malfunctioning and we time out and clean up with a useful error. Defaults to ``60`` seconds. + custom_cluster_spec: dict (optional) + A dictionary representation of a ``DaskCluster`` resource object which will be used to create + the cluster instead of generating one from the other keyword arguments. **kwargs: dict Additional keyword arguments to pass to LocalCluster @@ -131,7 +133,7 @@ class KubeCluster(Cluster): def __init__( self, - name, + name=None, namespace=None, image="ghcr.io/dask/dask:latest", n_workers=3, @@ -143,6 +145,7 @@ def __init__( create_mode=CreateMode.CREATE_OR_CONNECT, shutdown_on_close=None, resource_timeout=60, + custom_cluster_spec=None, **kwargs, ): self.namespace = namespace or namespace_default() @@ -158,6 +161,10 @@ def __init__( self.create_mode = create_mode self.shutdown_on_close = shutdown_on_close self._resource_timeout = resource_timeout + self._custom_cluster_spec = custom_cluster_spec + + if self._custom_cluster_spec: + name = self._custom_cluster_spec["metadata"]["name"] self._instances.add(self) @@ -166,10 +173,6 @@ def __init__( self._loop_runner.start() self.sync(self._start) - @property - def cluster_name(self): - return f"{self.name}-cluster" - @property def dashboard_link(self): host = self.scheduler_address.split("://")[1].split("/")[0].split(":")[0] @@ -181,13 +184,13 @@ async def _start(self): if cluster_exists and self.create_mode == CreateMode.CREATE_ONLY: raise ValueError( - f"Cluster {self.cluster_name} already exists and create mode is '{CreateMode.CREATE_ONLY}'" + f"Cluster {self.name} already exists and create mode is '{CreateMode.CREATE_ONLY}'" ) elif cluster_exists: await self._connect_cluster() elif not cluster_exists and self.create_mode == CreateMode.CONNECT_ONLY: raise ValueError( - f"Cluster {self.cluster_name} doesn't and create mode is '{CreateMode.CONNECT_ONLY}'" + f"Cluster {self.name} doesn't and create mode is '{CreateMode.CONNECT_ONLY}'" ) else: await self._create_cluster() @@ -201,12 +204,17 @@ async def _create_cluster(self): core_api = kubernetes.client.CoreV1Api(api_client) custom_objects_api = kubernetes.client.CustomObjectsApi(api_client) - service_name = f"{self.name}-cluster-service" - cluster_name = f"{self.name}-cluster" - worker_spec = self._build_worker_spec(service_name) - scheduler_spec = self._build_scheduler_spec(cluster_name) - - data = build_cluster_spec(cluster_name, worker_spec, scheduler_spec) + if not self._custom_cluster_spec: + data = make_cluster_spec( + name=self.name, + env=self.env, + resources=self.resources, + worker_command=self.worker_command, + n_workers=self.n_workers, + image=self.image, + ) + else: + data = self._custom_cluster_spec try: await custom_objects_api.create_namespaced_custom_object( group="kubernetes.dask.org", @@ -227,13 +235,13 @@ async def _create_cluster(self): except TimeoutError as e: await self._close() raise e - await wait_for_scheduler(cluster_name, self.namespace) - await wait_for_service(core_api, f"{cluster_name}-service", self.namespace) + await wait_for_scheduler(self.name, self.namespace) + await wait_for_service(core_api, f"{self.name}-scheduler", self.namespace) scheduler_address = await self._get_scheduler_address() await wait_for_scheduler_comm(scheduler_address) self.scheduler_comm = rpc(scheduler_address) dashboard_address = await get_scheduler_address( - f"{self.name}-cluster-service", + f"{self.name}-scheduler", self.namespace, port_name="http-dashboard", ) @@ -256,8 +264,8 @@ async def _connect_cluster(self): self.env = container_spec["env"] else: self.env = {} - service_name = f'{cluster_spec["metadata"]["name"]}-service' - await wait_for_scheduler(self.cluster_name, self.namespace) + service_name = f"{cluster_spec['metadata']['name']}-scheduler" + await wait_for_scheduler(self.name, self.namespace) await wait_for_service(core_api, service_name, self.namespace) scheduler_address = await self._get_scheduler_address() await wait_for_scheduler_comm(scheduler_address) @@ -278,14 +286,13 @@ async def _get_cluster(self): version="v1", plural="daskclusters", namespace=self.namespace, - name=self.cluster_name, + name=self.name, ) except kubernetes.client.exceptions.ApiException as e: return None async def _get_scheduler_address(self): - service_name = f"{self.name}-cluster-service" - address = await get_scheduler_address(service_name, self.namespace) + address = await get_scheduler_address(f"{self.name}-scheduler", self.namespace) return address async def _wait_for_controller(self): @@ -299,7 +306,7 @@ async def _wait_for_controller(self): version="v1", plural="daskclusters", namespace=self.namespace, - field_selector=f"metadata.name={self.cluster_name}", + field_selector=f"metadata.name={self.name}", timeout_seconds=self._resource_timeout, ): cluster = event["object"] @@ -316,10 +323,10 @@ def get_logs(self): Examples -------- >>> cluster.get_logs() - {'foo-cluster-scheduler': ..., - 'foo-cluster-default-worker-group-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ..., - 'foo-cluster-default-worker-group-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ..., - 'foo-cluster-default-worker-group-worker-d65bee23bdae423b8d40c5da7a1065b6': ...} + {'foo': ..., + 'foo-default-worker-0269dbfa0cfd4a22bcd9d92ae032f4d2': ..., + 'foo-default-worker-7c1ccb04cd0e498fb21babaedd00e5d4': ..., + 'foo-default-worker-d65bee23bdae423b8d40c5da7a1065b6': ...} Each log will be a string of all logs for that container. To view it is recommeded that you print each log. >>> print(cluster.get_logs()["testdask-scheduler-5c8ffb6b7b-sjgrg"]) @@ -339,7 +346,7 @@ async def _get_logs(self): pods = await core_api.list_namespaced_pod( namespace=self.namespace, - label_selector=f"dask.org/cluster-name={self.name}-cluster", + label_selector=f"dask.org/cluster-name={self.name}", ) for pod in pods.items: @@ -360,7 +367,16 @@ async def _get_logs(self): return logs - def add_worker_group(self, name, n_workers=3, image=None, resources=None, env=None): + def add_worker_group( + self, + name, + n_workers=3, + image=None, + resources=None, + worker_command=None, + env=None, + custom_spec=None, + ): """Create a dask worker group by name Parameters @@ -379,6 +395,9 @@ def add_worker_group(self, name, n_workers=3, image=None, resources=None, env=No env: List[dict] List of environment variables to pass to worker pod. If ommitted will use the cluster default. + custom_spec: dict (optional) + A dictionary representation of a worker spec which will be used to create the ``DaskWorkerGroup`` instead + of generating one from the other keyword arguments. Examples -------- @@ -390,20 +409,38 @@ def add_worker_group(self, name, n_workers=3, image=None, resources=None, env=No n_workers=n_workers, image=image, resources=resources, + worker_command=worker_command, env=env, + custom_spec=custom_spec, ) async def _add_worker_group( - self, name, n_workers=3, image=None, resources=None, env=None + self, + name, + n_workers=3, + image=None, + resources=None, + worker_command=None, + env=None, + custom_spec=None, ): - service_name = f"{self.cluster_name}-service" - spec = self._build_worker_spec(service_name) + if custom_spec is not None: + spec = custom_spec + else: + spec = make_worker_spec( + cluster_name=self.name, + env=env or self.env, + resources=resources or self.resources, + worker_command=worker_command or self.worker_command, + n_workers=n_workers or self.n_workers, + image=image or self.image, + ) data = { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskWorkerGroup", - "metadata": {"name": f"{self.name}-cluster-{name}"}, + "metadata": {"name": f"{self.name}-{name}"}, "spec": { - "cluster": f"{self.name}-cluster", + "cluster": f"{self.name}", "worker": spec, }, } @@ -440,7 +477,7 @@ async def _delete_worker_group(self, name): version="v1", plural="daskworkergroups", namespace=self.namespace, - name=f"{self.name}-cluster-{name}", + name=f"{self.name}-{name}", ) def close(self, timeout=3600): @@ -457,13 +494,13 @@ async def _close(self, timeout=None): version="v1", plural="daskclusters", namespace=self.namespace, - name=self.cluster_name, + name=self.name, ) start = time.time() while (await self._get_cluster()) is not None: if time.time() > start + timeout: raise TimeoutError( - f"Timed out deleting cluster resource {self.cluster_name}" + f"Timed out deleting cluster resource {self.name}" ) await asyncio.sleep(1) @@ -505,7 +542,7 @@ async def _scale(self, n, worker_group="default"): version="v1", plural="daskworkergroups", namespace=self.namespace, - name=f"{self.name}-cluster-{worker_group}-worker-group", + name=f"{self.name}-{worker_group}", body={"spec": {"replicas": n}}, ) @@ -552,108 +589,17 @@ async def _adapt(self, minimum=None, maximum=None): "kind": "DaskAutoscaler", "metadata": { "name": self.name, - "dask.org/cluster-name": self.cluster_name, + "dask.org/cluster-name": self.name, "dask.org/component": "autoscaler", }, "spec": { - "cluster": self.cluster_name, + "cluster": self.name, "minimum": minimum, "maximum": maximum, }, }, ) - def _build_scheduler_spec(self, cluster_name): - # TODO: Take the values provided in the current class constructor - # and build a DaskWorker compatible dict - if isinstance(self.env, dict): - env = [{"name": key, "value": value} for key, value in self.env.items()] - else: - # If they gave us a list, assume its a list of dicts and already ready to go - env = self.env - - return { - "spec": { - "containers": [ - { - "name": "scheduler", - "image": self.image, - "args": ["dask-scheduler", "--host", "0.0.0.0"], - "env": env, - "resources": self.resources, - "ports": [ - { - "name": "tcp-comm", - "containerPort": 8786, - "protocol": "TCP", - }, - { - "name": "http-dashboard", - "containerPort": 8787, - "protocol": "TCP", - }, - ], - "readinessProbe": { - "httpGet": {"port": "http-dashboard", "path": "/health"}, - "initialDelaySeconds": 5, - "periodSeconds": 10, - }, - "livenessProbe": { - "httpGet": {"port": "http-dashboard", "path": "/health"}, - "initialDelaySeconds": 15, - "periodSeconds": 20, - }, - } - ] - }, - "service": { - "type": "ClusterIP", - "selector": { - "dask.org/cluster-name": cluster_name, - "dask.org/component": "scheduler", - }, - "ports": [ - { - "name": "tcp-comm", - "protocol": "TCP", - "port": 8786, - "targetPort": "tcp-comm", - }, - { - "name": "http-dashboard", - "protocol": "TCP", - "port": 8787, - "targetPort": "http-dashboard", - }, - ], - }, - } - - def _build_worker_spec(self, service_name): - if isinstance(self.env, dict): - env = [{"name": key, "value": value} for key, value in self.env.items()] - else: - # If they gave us a list, assume its a list of dicts and already ready to go - env = self.env - - args = self.worker_command + ["--name", "$(DASK_WORKER_NAME)"] - - return { - "cluster": self.cluster_name, - "replicas": self.n_workers, - "spec": { - "containers": [ - { - "name": "worker", - "image": self.image, - "args": args, - "env": env, - "resources": self.resources, - } - ] - }, - } - def __enter__(self): return self @@ -678,6 +624,164 @@ def from_name(cls, name, **kwargs): return cls(name=name, create_mode=CreateMode.CONNECT_ONLY, **kwargs) +def make_cluster_spec( + name, + image="ghcr.io/dask/dask:latest", + n_workers=None, + resources=None, + env=None, + worker_command="dask-worker", +): + """Generate a ``DaskCluster`` kubernetes resource. + + Populate a template with some common options to generate a ``DaskCluster`` kubernetes resource. + + Parameters + ---------- + name: str + Name of the cluster + image: str (optional) + Container image to use for the scheduler and workers + n_workers: int (optional) + Number of workers in the default worker group + resources: dict (optional) + Resource limits to set on scheduler and workers + env: dict (optional) + Environment variables to set on scheduler and workers + worker_command: str (optional) + Worker command to use when starting the workers + """ + return { + "apiVersion": "kubernetes.dask.org/v1", + "kind": "DaskCluster", + "metadata": {"name": name}, + "spec": { + "worker": make_worker_spec( + cluster_name=name, + env=env, + resources=resources, + worker_command=worker_command, + n_workers=n_workers, + image=image, + ), + "scheduler": make_scheduler_spec( + cluster_name=name, + env=env, + resources=resources, + image=image, + ), + }, + } + + +def make_worker_spec( + cluster_name, + image="ghcr.io/dask/dask:latest", + n_workers=3, + resources=None, + env=None, + worker_command="dask-worker", +): + if isinstance(env, dict): + env = [{"name": key, "value": value} for key, value in env.items()] + else: + # If they gave us a list, assume its a list of dicts and already ready to go + env = env + + if isinstance(worker_command, str): + worker_command = worker_command.split(" ") + + args = worker_command + ["--name", "$(DASK_WORKER_NAME)"] + + return { + "cluster": cluster_name, + "replicas": n_workers, + "spec": { + "containers": [ + { + "name": "worker", + "image": image, + "args": args, + "env": env, + "resources": resources, + } + ] + }, + } + + +def make_scheduler_spec( + cluster_name, + env=None, + resources=None, + image="ghcr.io/dask/dask:latest", +): + # TODO: Take the values provided in the current class constructor + # and build a DaskWorker compatible dict + if isinstance(env, dict): + env = [{"name": key, "value": value} for key, value in env.items()] + else: + # If they gave us a list, assume its a list of dicts and already ready to go + env = env + + return { + "spec": { + "containers": [ + { + "name": "scheduler", + "image": image, + "args": ["dask-scheduler", "--host", "0.0.0.0"], + "env": env, + "resources": resources, + "ports": [ + { + "name": "tcp-comm", + "containerPort": 8786, + "protocol": "TCP", + }, + { + "name": "http-dashboard", + "containerPort": 8787, + "protocol": "TCP", + }, + ], + "readinessProbe": { + "httpGet": {"port": "http-dashboard", "path": "/health"}, + "initialDelaySeconds": 5, + "periodSeconds": 10, + }, + "livenessProbe": { + "httpGet": {"port": "http-dashboard", "path": "/health"}, + "initialDelaySeconds": 15, + "periodSeconds": 20, + }, + } + ] + }, + "service": { + "type": "ClusterIP", + "selector": { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "scheduler", + }, + "ports": [ + { + "name": "tcp-comm", + "protocol": "TCP", + "port": 8786, + "targetPort": "tcp-comm", + }, + { + "name": "http-dashboard", + "protocol": "TCP", + "port": 8787, + "targetPort": "http-dashboard", + }, + ], + }, + } + + @atexit.register def reap_clusters(): async def _reap_clusters(): diff --git a/dask_kubernetes/experimental/tests/test_kubecluster.py b/dask_kubernetes/experimental/tests/test_kubecluster.py index 92aa420f8..d5d31e39d 100644 --- a/dask_kubernetes/experimental/tests/test_kubecluster.py +++ b/dask_kubernetes/experimental/tests/test_kubecluster.py @@ -3,7 +3,7 @@ from dask.distributed import Client from distributed.utils import TimeoutError -from dask_kubernetes.experimental import KubeCluster +from dask_kubernetes.experimental import KubeCluster, make_cluster_spec @pytest.fixture @@ -100,3 +100,13 @@ def test_adapt(kopf_runner, docker_image): # Need to clean up the DaskAutoscaler object # See https://github.com/dask/dask-kubernetes/issues/546 cluster.scale(0) + + +def test_custom_spec(kopf_runner, docker_image): + with kopf_runner: + spec = make_cluster_spec("customspec", image=docker_image) + with KubeCluster( + custom_cluster_spec=spec, + ) as cluster: + with Client(cluster) as client: + assert client.submit(lambda x: x + 1, 10).result() == 11 diff --git a/dask_kubernetes/operator/operator.py b/dask_kubernetes/operator/operator.py index 50c1c711b..e39e1922a 100644 --- a/dask_kubernetes/operator/operator.py +++ b/dask_kubernetes/operator/operator.py @@ -28,14 +28,14 @@ class SchedulerCommError(Exception): pass -def build_scheduler_pod_spec(name, spec): +def build_scheduler_pod_spec(cluster_name, spec): return { "apiVersion": "v1", "kind": "Pod", "metadata": { - "name": f"{name}-scheduler", + "name": f"{cluster_name}-scheduler", "labels": { - "dask.org/cluster-name": name, + "dask.org/cluster-name": cluster_name, "dask.org/component": "scheduler", "sidecar.istio.io/inject": "false", }, @@ -44,14 +44,14 @@ def build_scheduler_pod_spec(name, spec): } -def build_scheduler_service_spec(name, spec): +def build_scheduler_service_spec(cluster_name, spec): return { "apiVersion": "v1", "kind": "Service", "metadata": { - "name": f"{name}-service", + "name": f"{cluster_name}-scheduler", "labels": { - "dask.org/cluster-name": name, + "dask.org/cluster-name": cluster_name, }, }, "spec": spec, @@ -81,7 +81,7 @@ def build_worker_pod_spec(worker_group_name, namespace, cluster_name, uuid, spec }, { "name": "DASK_SCHEDULER_ADDRESS", - "value": f"tcp://{cluster_name}-service.{namespace}.svc.cluster.local:8786", + "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", }, ] for i in range(len(pod_spec["spec"]["containers"])): @@ -109,7 +109,7 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec): env = [ { "name": "DASK_SCHEDULER_ADDRESS", - "value": f"tcp://{cluster_name}-service.{namespace}.svc.cluster.local:8786", + "value": f"tcp://{cluster_name}-scheduler.{namespace}.svc.cluster.local:8786", }, ] for i in range(len(pod_spec["spec"]["containers"])): @@ -120,13 +120,19 @@ def build_job_pod_spec(job_name, cluster_name, namespace, spec): return pod_spec -def build_worker_group_spec(name, spec): +def build_default_worker_group_spec(cluster_name, spec): return { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskWorkerGroup", - "metadata": {"name": f"{name}-default-worker-group"}, + "metadata": { + "name": f"{cluster_name}-default", + "labels": { + "dask.org/cluster-name": cluster_name, + "dask.org/component": "workergroup", + }, + }, "spec": { - "cluster": name, + "cluster": cluster_name, "worker": spec, }, } @@ -136,7 +142,12 @@ def build_cluster_spec(name, worker_spec, scheduler_spec): return { "apiVersion": "kubernetes.dask.org/v1", "kind": "DaskCluster", - "metadata": {"name": name}, + "metadata": { + "name": name, + "labels": { + "dask.org/cluster-name": name, + }, + }, "spec": {"worker": worker_spec, "scheduler": scheduler_spec}, } @@ -198,9 +209,7 @@ async def daskcluster_create_components(spec, name, namespace, logger, patch, ** ) worker_spec = spec.get("worker", {}) - data = build_worker_group_spec(name, worker_spec) - # TODO: Next line is not needed if we can get worker groups adopted by the cluster - kopf.adopt(data) + data = build_default_worker_group_spec(name, worker_spec) await custom_api.create_namespaced_custom_object( group="kubernetes.dask.org", version="v1", @@ -352,7 +361,7 @@ async def daskworkergroup_update(spec, name, namespace, logger, **kwargs): if workers_needed < 0: worker_ids = await retire_workers( n_workers=-workers_needed, - scheduler_service_name=f"{spec['cluster']}-service", + scheduler_service_name=f"{spec['cluster']}-scheduler", worker_group_name=name, namespace=namespace, logger=logger, @@ -375,7 +384,7 @@ async def daskjob_create(spec, name, namespace, logger, **kwargs): customobjectsapi = kubernetes.client.CustomObjectsApi(api_client) corev1api = kubernetes.client.CoreV1Api(api_client) - cluster_name = f"{name}-cluster" + cluster_name = f"{name}" cluster_spec = build_cluster_spec( cluster_name, spec["cluster"]["spec"]["worker"], @@ -406,20 +415,23 @@ async def daskjob_create(spec, name, namespace, logger, **kwargs): ) -@kopf.on.field("pod", field="status.phase", labels={"dask.org/component": "job-runner"}) +@kopf.on.field( + "pod", + field="status.phase", + labels={"dask.org/component": "job-runner"}, + new="Succeeded", +) async def handle_runner_status_change(meta, new, namespace, logger, **kwargs): - logger.info(f"Job now in phase {new}.") - if new == "Succeeded": - logger.info("Job succeeded, deleting Dask cluster.") - async with kubernetes.client.api_client.ApiClient() as api_client: - customobjectsapi = kubernetes.client.CustomObjectsApi(api_client) - await customobjectsapi.delete_namespaced_custom_object( - group="kubernetes.dask.org", - version="v1", - plural="daskclusters", - namespace=namespace, - name=meta["labels"]["dask.org/cluster-name"], - ) + logger.info("Job succeeded, deleting Dask cluster.") + async with kubernetes.client.api_client.ApiClient() as api_client: + customobjectsapi = kubernetes.client.CustomObjectsApi(api_client) + await customobjectsapi.delete_namespaced_custom_object( + group="kubernetes.dask.org", + version="v1", + plural="daskclusters", + namespace=namespace, + name=meta["labels"]["dask.org/cluster-name"], + ) @kopf.on.create("daskautoscaler") @@ -455,7 +467,7 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): # Ask the scheduler for the desired number of worker try: desired_workers = await get_desired_workers( - scheduler_service_name=f"{spec['cluster']}-service", + scheduler_service_name=f"{spec['cluster']}-scheduler", namespace=namespace, logger=logger, ) @@ -479,6 +491,6 @@ async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs): version="v1", plural="daskworkergroups", namespace=namespace, - name=f"{spec['cluster']}-default-worker-group", + name=f"{spec['cluster']}-default", body={"spec": {"replicas": desired_workers}}, ) diff --git a/dask_kubernetes/operator/tests/resources/simplecluster.yaml b/dask_kubernetes/operator/tests/resources/simplecluster.yaml index ee52c0eab..441cd362d 100644 --- a/dask_kubernetes/operator/tests/resources/simplecluster.yaml +++ b/dask_kubernetes/operator/tests/resources/simplecluster.yaml @@ -1,7 +1,7 @@ apiVersion: kubernetes.dask.org/v1 kind: DaskCluster metadata: - name: simple-cluster + name: simple namespace: default spec: worker: @@ -51,7 +51,7 @@ spec: service: type: ClusterIP selector: - dask.org/cluster-name: simple-cluster + dask.org/cluster-name: simple dask.org/component: scheduler ports: - name: tcp-comm diff --git a/dask_kubernetes/operator/tests/resources/simplejob.yaml b/dask_kubernetes/operator/tests/resources/simplejob.yaml index e931df292..c9bf33eba 100644 --- a/dask_kubernetes/operator/tests/resources/simplejob.yaml +++ b/dask_kubernetes/operator/tests/resources/simplejob.yaml @@ -64,7 +64,7 @@ spec: service: type: ClusterIP selector: - dask.org/cluster-name: simple-job-cluster + dask.org/cluster-name: simple-job dask.org/component: scheduler ports: - name: tcp-comm diff --git a/dask_kubernetes/operator/tests/resources/simpleworkergroup.yaml b/dask_kubernetes/operator/tests/resources/simpleworkergroup.yaml index 3a4e41ffd..3ea5772d8 100644 --- a/dask_kubernetes/operator/tests/resources/simpleworkergroup.yaml +++ b/dask_kubernetes/operator/tests/resources/simpleworkergroup.yaml @@ -1,10 +1,10 @@ apiVersion: kubernetes.dask.org/v1 kind: DaskWorkerGroup metadata: - name: simple-cluster-additional-worker-group + name: simple-additional namespace: default spec: - cluster: simple-cluster + cluster: simple worker: replicas: 2 spec: diff --git a/dask_kubernetes/operator/tests/test_operator.py b/dask_kubernetes/operator/tests/test_operator.py index fe908d13b..45ffa5ce5 100644 --- a/dask_kubernetes/operator/tests/test_operator.py +++ b/dask_kubernetes/operator/tests/test_operator.py @@ -18,7 +18,7 @@ def gen_cluster(k8s_cluster): @asynccontextmanager async def cm(): cluster_path = os.path.join(DIR, "resources", "simplecluster.yaml") - cluster_name = "simple-cluster" + cluster_name = "simple" # Create cluster resource k8s_cluster.kubectl("apply", "-f", cluster_path) @@ -88,9 +88,9 @@ def test_operator_plugins(kopf_runner): async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): with kopf_runner as runner: async with gen_cluster() as cluster_name: - scheduler_pod_name = "simple-cluster-scheduler" - worker_pod_name = "simple-cluster-default-worker-group-worker" - service_name = "simple-cluster-service" + scheduler_pod_name = "simple-scheduler" + worker_pod_name = "simple-default-worker" + service_name = "simple-scheduler" while scheduler_pod_name not in k8s_cluster.kubectl("get", "pods"): await asyncio.sleep(0.1) while service_name not in k8s_cluster.kubectl("get", "svc"): @@ -109,14 +109,14 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): "scale", "--replicas=5", "daskworkergroup", - "simple-cluster-default-worker-group", + "simple-default", ) await client.wait_for_workers(5) k8s_cluster.kubectl( "scale", "--replicas=3", "daskworkergroup", - "simple-cluster-default-worker-group", + "simple-default", ) await client.wait_for_workers(3) @@ -126,9 +126,9 @@ async def test_scalesimplecluster(k8s_cluster, kopf_runner, gen_cluster): async def test_simplecluster(k8s_cluster, kopf_runner, gen_cluster): with kopf_runner as runner: async with gen_cluster() as cluster_name: - scheduler_pod_name = "simple-cluster-scheduler" - worker_pod_name = "simple-cluster-default-worker-group-worker" - service_name = "simple-cluster-service" + scheduler_pod_name = "simple-scheduler" + worker_pod_name = "simple-default-worker" + service_name = "simple-scheduler" while scheduler_pod_name not in k8s_cluster.kubectl("get", "pods"): await asyncio.sleep(0.1) @@ -177,15 +177,14 @@ async def test_job(k8s_cluster, kopf_runner, gen_job): async with gen_job() as job: assert job - cluster_name = f"{job}-cluster" runner_name = f"{job}-runner" # Assert that cluster is created - while cluster_name not in k8s_cluster.kubectl("get", "daskclusters"): + while job not in k8s_cluster.kubectl("get", "daskclusters"): await asyncio.sleep(0.1) # Assert job pod is created - while runner_name not in k8s_cluster.kubectl("get", "po"): + while job not in k8s_cluster.kubectl("get", "po"): await asyncio.sleep(0.1) # Assert job pod runs to completion (will fail if doesn't connect to cluster) @@ -193,7 +192,7 @@ async def test_job(k8s_cluster, kopf_runner, gen_job): await asyncio.sleep(0.1) # Assert cluster is removed on completion - while cluster_name in k8s_cluster.kubectl("get", "daskclusters"): + while job in k8s_cluster.kubectl("get", "daskclusters"): await asyncio.sleep(0.1) assert "A DaskJob has been created" in runner.stdout diff --git a/doc/source/operator_kubecluster.rst b/doc/source/operator_kubecluster.rst index 74ead87e8..268bf81ae 100644 --- a/doc/source/operator_kubecluster.rst +++ b/doc/source/operator_kubecluster.rst @@ -88,6 +88,51 @@ Additional worker groups can also be deleted in Python. Any additional worker groups you create will be deleted when the cluster is deleted. +Custom cluster spec +------------------- + +The ``KubeCluster`` class can take a selection of keyword arguments to make it quick and easy to get started, however the underlying :doc:`DaskCluster ` resource can be much more complex and configured in many ways. +Rather than exposing every possibility via keyword arguments instead you can pass a valid ``DaskCluster`` resource spec which will be used when creating the cluster. +You can also generate a spec with :func:`make_cluster_spec` which ``KubeCluster`` uses internally and then modify it with your custom options. + + +.. code-block:: python + + from dask_kubernetes.experimental import KubeCluster, make_cluster_spec + + config = { + "name": "foo", + "n_workers": 2, + "resources":{"requests": {"memory": "2Gi"}, "limits": {"memory": "64Gi"}} + } + + cluster = KubeCluster(**config) + # is equivalent to + cluster = KubeCluster(custom_cluster_spec=make_cluster_spec(**config)) + +You can also modify the spec before passing it to ``KubeCluster``, for example if you want to set ``nodeSelector`` on your worker pods you could do it like this: + +.. code-block:: python + + from dask_kubernetes.experimental import KubeCluster, make_cluster_spec + + spec = make_cluster_spec(name="selector-example", n_workers=2) + spec["spec"]["worker"]["spec"]["nodeSelector"] = {"disktype": "ssd"} + + cluster = KubeCluster(custom_cluster_spec=spec) + +The ``cluster.add_worker_group()`` method also supports passing a ``custom_spec`` keyword argument which can be generated with :func:`make_worker_spec`. + +.. code-block:: python + + from dask_kubernetes.experimental import KubeCluster, make_worker_spec + + cluster = KubeCluster(name="example") + + worker_spec = make_worker_spec(cluster_name=cluster.name, n_workers=2, resources={"limits": {"nvidia.com/gpu": 1}}) + worker_spec["spec"]["nodeSelector"] = {"cloud.google.com/gke-nodepool": "gpu-node-pool"} + + cluster.add_worker_group(custom_spec=worker_spec) .. _api: @@ -108,3 +153,6 @@ API .. autoclass:: KubeCluster :members: + +.. autofunction:: make_cluster_spec +.. autofunction:: make_worker_spec diff --git a/doc/source/operator_resources.rst b/doc/source/operator_resources.rst index 951131099..84fa0621b 100644 --- a/doc/source/operator_resources.rst +++ b/doc/source/operator_resources.rst @@ -54,7 +54,7 @@ Let's create an example called ``cluster.yaml`` with the following configuration apiVersion: kubernetes.dask.org/v1 kind: DaskCluster metadata: - name: simple-cluster + name: simple spec: worker: replicas: 2 @@ -97,7 +97,7 @@ Let's create an example called ``cluster.yaml`` with the following configuration service: type: NodePort selector: - dask.org/cluster-name: simple-cluster + dask.org/cluster-name: simple dask.org/component: scheduler ports: - name: tcp-comm @@ -114,7 +114,7 @@ Editing this file will change the default configuration of you Dask cluster. See .. code-block:: console $ kubectl apply -f cluster.yaml - daskcluster.kubernetes.dask.org/simple-cluster created + daskcluster.kubernetes.dask.org/simple created We can list our clusters: @@ -122,15 +122,15 @@ We can list our clusters: $ kubectl get daskclusters NAME AGE - simple-cluster 47s + simple 47s To connect to this Dask cluster we can use the service that was created for us. .. code-block:: console - $ kubectl get svc -l dask.org/cluster-name=simple-cluster + $ kubectl get svc -l dask.org/cluster-name=simple NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE - simple-cluster-service ClusterIP 10.96.85.120 8786/TCP,8787/TCP 86s + simple ClusterIP 10.96.85.120 8786/TCP,8787/TCP 86s We can see here that port ``8786`` has been exposed for the Dask communication along with ``8787`` for the Dashboard. @@ -139,7 +139,7 @@ For this quick example we could use ``kubectl`` to port forward the service to y .. code-block:: console - $ kubectl port-forward svc/simple-cluster-service 8786:8786 + $ kubectl port-forward svc/simple 8786:8786 Forwarding from 127.0.0.1:8786 -> 8786 Forwarding from [::1]:8786 -> 8786 @@ -156,12 +156,12 @@ We can also list all of the pods created by the operator to run our cluster. .. code-block:: console - $ kubectl get po -l dask.org/cluster-name=simple-cluster + $ kubectl get po -l dask.org/cluster-name=simple NAME READY STATUS RESTARTS AGE - simple-cluster-default-worker-group-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 104s - simple-cluster-default-worker-group-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 104s - simple-cluster-default-worker-group-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 104s - simple-cluster-scheduler 1/1 Running 0 104s + simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 104s + simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 104s + simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 104s + simple-scheduler 1/1 Running 0 104s The workers we see here are created by our clusters default ``workergroup`` resource that was also created by the operator. @@ -169,31 +169,31 @@ You can scale the ``workergroup`` like you would a ``Deployment`` or ``ReplicaSe .. code-block:: console - $ kubectl scale --replicas=5 daskworkergroup simple-cluster-default-worker-group - daskworkergroup.kubernetes.dask.org/simple-cluster-default-worker-group scaled + $ kubectl scale --replicas=5 daskworkergroup simple-default + daskworkergroup.kubernetes.dask.org/simple-default We can verify that new pods have been created. .. code-block:: console - $ kubectl get po -l dask.org/cluster-name=simple-cluster + $ kubectl get po -l dask.org/cluster-name=simple NAME READY STATUS RESTARTS AGE - simple-cluster-default-worker-group-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 5m26s - simple-cluster-default-worker-group-worker-a52bf313590f432d9dc7395875583b52 1/1 Running 0 27s - simple-cluster-default-worker-group-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 5m26s - simple-cluster-default-worker-group-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 5m26s - simple-cluster-default-worker-group-worker-f4223a45b49d49288195c540c32f0fc0 1/1 Running 0 27s - simple-cluster-scheduler 1/1 Running 0 5m26s + simple-default-worker-13f4f0d13bbc40a58cfb81eb374f26c3 1/1 Running 0 5m26s + simple-default-worker-a52bf313590f432d9dc7395875583b52 1/1 Running 0 27s + simple-default-worker-aa79dfae83264321a79f1f0ffe91f700 1/1 Running 0 5m26s + simple-default-worker-f13c4f2103e14c2d86c1b272cd138fe6 1/1 Running 0 5m26s + simple-default-worker-f4223a45b49d49288195c540c32f0fc0 1/1 Running 0 27s + simple-scheduler 1/1 Running 0 5m26s Finally we can delete the cluster either by deleting the manifest we applied before, or directly by name: .. code-block:: console $ kubectl delete -f cluster.yaml - daskcluster.kubernetes.dask.org "simple-cluster" deleted + daskcluster.kubernetes.dask.org "simple" deleted - $ kubectl delete daskcluster simple-cluster - daskcluster.kubernetes.dask.org "simple-cluster" deleted + $ kubectl delete daskcluster simple + daskcluster.kubernetes.dask.org "simple" deleted DaskWorkerGroup --------------- @@ -246,9 +246,9 @@ Let's create an example called ``highmemworkers.yaml`` with the following config apiVersion: kubernetes.dask.org/v1 kind: DaskWorkerGroup metadata: - name: simple-cluster-highmem-worker-group + name: simple-highmem spec: - cluster: simple-cluster + cluster: simple worker: replicas: 2 spec: @@ -276,16 +276,16 @@ See the :ref:`config`. Now apply ``highmemworkers.yaml`` .. code-block:: console $ kubectl apply -f highmemworkers.yaml - daskworkergroup.kubernetes.dask.org/simple-cluster-highmem-worker-group created + daskworkergroup.kubernetes.dask.org/simple-highmem created We can list our clusters: .. code-block:: console $ kubectl get daskworkergroups - NAME AGE - simple-cluster-default-worker-group 2 hours - simple-cluster-highmem-worker-group 47s + NAME AGE + simple-default 2 hours + simple-highmem 47s We don't need to worry about deleting this worker group seperately, because it has joined the existing cluster Kubernetes will delete it when the ``DaskCluster`` resource is deleted. @@ -407,7 +407,7 @@ Let's create an example called ``job.yaml`` with the following configuration: service: type: ClusterIP selector: - dask.org/cluster-name: simple-job-cluster + dask.org/cluster-name: simple-job dask.org/component: scheduler ports: - name: tcp-comm @@ -433,10 +433,10 @@ Now if we check our cluster resources we should see our job and cluster pods bei $ kubectl get pods NAME READY STATUS RESTARTS AGE - simple-job-cluster-scheduler 1/1 Running 0 8s + simple-job-scheduler 1/1 Running 0 8s simple-job-runner 1/1 Running 0 8s - simple-job-cluster-default-worker-group-worker-1f6c670fba 1/1 Running 0 8s - simple-job-cluster-default-worker-group-worker-791f93d9ec 1/1 Running 0 8s + simple-job-default-worker-1f6c670fba 1/1 Running 0 8s + simple-job-default-worker-791f93d9ec 1/1 Running 0 8s Our runner pod will be doing whatever we configured it to do. In our example you can see we just create a simple ``dask.distributed.Client`` object like this: @@ -461,9 +461,9 @@ go into a ``Completed`` state and the Dask cluster will be cleaned up automatica $ kubectl get pods NAME READY STATUS RESTARTS AGE simple-job-runner 0/1 Completed 0 14s - simple-job-cluster-scheduler 1/1 Terminating 0 14s - simple-job-cluster-default-worker-group-worker-1f6c670fba 1/1 Terminating 0 14s - simple-job-cluster-default-worker-group-worker-791f93d9ec 1/1 Terminating 0 14s + simple-job-scheduler 1/1 Terminating 0 14s + simple-job-default-worker-1f6c670fba 1/1 Terminating 0 14s + simple-job-default-worker-791f93d9ec 1/1 Terminating 0 14s When you delete the ``DaskJob`` resource everything is delete automatically, whether that's just the ``Completed`` runner pod left over after a successful run or a full Dask cluster and runner that is still running. @@ -530,16 +530,16 @@ and then update the number of replicas in the default ``DaskWorkerGroup``. apiVersion: kubernetes.dask.org/v1 kind: DaskAutoscaler metadata: - name: simple-cluster-autoscaler + name: simple spec: - cluster: "simple-cluster" + cluster: "simple" minimum: 1 # we recommend always having a minimum of 1 worker so that an idle cluster can start working on tasks immediately maximum: 10 # you can place a hard limit on the number of workers regardless of what the scheduler requests .. code-block:: console $ kubectl apply -f autoscaler.yaml - daskautoscaler.kubernetes.dask.org/simple-cluster-autoscaler created + daskautoscaler.kubernetes.dask.org/simple created You can end the autoscaling at any time by deleting the resource. The number of workers will remain at whatever the autoscaler last set it to. @@ -547,7 +547,7 @@ set it to. .. code-block:: console $ kubectl delete -f autoscaler.yaml - daskautoscaler.kubernetes.dask.org/simple-cluster-autoscaler deleted + daskautoscaler.kubernetes.dask.org/simple deleted .. note::