diff --git a/ci/install-deps.sh b/ci/install-deps.sh index 3d632a650..fce727469 100755 --- a/ci/install-deps.sh +++ b/ci/install-deps.sh @@ -1,5 +1,7 @@ #!/bin/bash +set -e + curl -L https://istio.io/downloadIstio | sh - mv istio-*/bin/istioctl /usr/local/bin/istioctl diff --git a/dask_kubernetes/experimental/kubecluster.py b/dask_kubernetes/experimental/kubecluster.py index b97f988be..5d300b079 100644 --- a/dask_kubernetes/experimental/kubecluster.py +++ b/dask_kubernetes/experimental/kubecluster.py @@ -64,6 +64,10 @@ class KubeCluster(Cluster): List of environment variables to pass to worker pod. Can be a list of dicts using the same structure as k8s envs or a single dictionary of key/value pairs + worker_command: List[str] | str + The command to use when starting the worker. + If command consists of multiple words it should be passed as a list of strings. + Defaults to ``"dask-worker"``. auth: List[ClusterAuth] (optional) Configuration methods to attempt in order. Defaults to ``[InCluster(), KubeConfig()]``. @@ -127,6 +131,7 @@ def __init__( n_workers=3, resources={}, env=[], + worker_command="dask-worker", auth=ClusterAuth.DEFAULT, port_forward_cluster_ip=None, create_mode=CreateMode.CREATE_OR_CONNECT, @@ -138,6 +143,9 @@ def __init__( self.n_workers = n_workers self.resources = resources self.env = env + self.worker_command = worker_command + if isinstance(self.worker_command, str): + self.worker_command = self.worker_command.split(" ") self.auth = auth self.port_forward_cluster_ip = port_forward_cluster_ip self.create_mode = create_mode @@ -535,6 +543,8 @@ def _build_worker_spec(self, service_name): # If they gave us a list, assume its a list of dicts and already ready to go env = self.env + args = self.worker_command + ["--name", "$(DASK_WORKER_NAME)"] + return { "cluster": self.cluster_name, "replicas": self.n_workers, @@ -543,11 +553,7 @@ def _build_worker_spec(self, service_name): { "name": "worker", "image": self.image, - "args": [ - "dask-worker", - "--name", - "$(DASK_WORKER_NAME)", - ], + "args": args, "env": env, "resources": self.resources, } diff --git a/dask_kubernetes/experimental/tests/test_kubecluster.py b/dask_kubernetes/experimental/tests/test_kubecluster.py index eda01110b..2e6b10f52 100644 --- a/dask_kubernetes/experimental/tests/test_kubecluster.py +++ b/dask_kubernetes/experimental/tests/test_kubecluster.py @@ -19,6 +19,17 @@ def test_kubecluster(cluster): assert client.submit(lambda x: x + 1, 10).result() == 11 +def test_custom_worker_command(kopf_runner, docker_image): + with kopf_runner: + with KubeCluster( + name="customworker", + image=docker_image, + worker_command=["python", "-m", "distributed.cli.dask_worker"], + ) as cluster: + with Client(cluster) as client: + assert client.submit(lambda x: x + 1, 10).result() == 11 + + def test_multiple_clusters(kopf_runner, docker_image): with kopf_runner: with KubeCluster(name="bar", image=docker_image) as cluster1: