Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ci/install-deps.sh
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#!/bin/bash

set -e

curl -L https://istio.io/downloadIstio | sh -
mv istio-*/bin/istioctl /usr/local/bin/istioctl

Expand Down
16 changes: 11 additions & 5 deletions dask_kubernetes/experimental/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ class KubeCluster(Cluster):
List of environment variables to pass to worker pod.
Can be a list of dicts using the same structure as k8s envs
or a single dictionary of key/value pairs
worker_command: List[str] | str
The command to use when starting the worker.
If command consists of multiple words it should be passed as a list of strings.
Defaults to ``"dask-worker"``.
auth: List[ClusterAuth] (optional)
Configuration methods to attempt in order. Defaults to
``[InCluster(), KubeConfig()]``.
Expand Down Expand Up @@ -127,6 +131,7 @@ def __init__(
n_workers=3,
resources={},
env=[],
worker_command="dask-worker",
auth=ClusterAuth.DEFAULT,
port_forward_cluster_ip=None,
create_mode=CreateMode.CREATE_OR_CONNECT,
Expand All @@ -138,6 +143,9 @@ def __init__(
self.n_workers = n_workers
self.resources = resources
self.env = env
self.worker_command = worker_command
if isinstance(self.worker_command, str):
self.worker_command = self.worker_command.split(" ")
self.auth = auth
self.port_forward_cluster_ip = port_forward_cluster_ip
self.create_mode = create_mode
Expand Down Expand Up @@ -535,6 +543,8 @@ def _build_worker_spec(self, service_name):
# If they gave us a list, assume its a list of dicts and already ready to go
env = self.env

args = self.worker_command + ["--name", "$(DASK_WORKER_NAME)"]

return {
"cluster": self.cluster_name,
"replicas": self.n_workers,
Expand All @@ -543,11 +553,7 @@ def _build_worker_spec(self, service_name):
{
"name": "worker",
"image": self.image,
"args": [
"dask-worker",
"--name",
"$(DASK_WORKER_NAME)",
],
"args": args,
"env": env,
"resources": self.resources,
}
Expand Down
11 changes: 11 additions & 0 deletions dask_kubernetes/experimental/tests/test_kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,17 @@ def test_kubecluster(cluster):
assert client.submit(lambda x: x + 1, 10).result() == 11


def test_custom_worker_command(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(
name="customworker",
image=docker_image,
worker_command=["python", "-m", "distributed.cli.dask_worker"],
) as cluster:
with Client(cluster) as client:
assert client.submit(lambda x: x + 1, 10).result() == 11


def test_multiple_clusters(kopf_runner, docker_image):
with kopf_runner:
with KubeCluster(name="bar", image=docker_image) as cluster1:
Expand Down