Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dask_kubernetes/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def kopf_runner(k8s_cluster):
@pytest.fixture(scope="session")
def docker_image():
image_name = "dask-kubernetes:dev"
subprocess.check_output(["docker", "build", "-t", image_name, "./ci/"])
subprocess.run(["docker", "build", "-t", image_name, "./ci/"], check=True)
return image_name


Expand All @@ -40,7 +40,9 @@ def k8s_cluster(kind_cluster, docker_image):
def install_istio(k8s_cluster):
if bool(os.environ.get("TEST_ISTIO", False)):
check_dependency("istioctl")
subprocess.check_output(["istioctl", "install", "--set", "profile=demo", "-y"])
subprocess.run(
["istioctl", "install", "--set", "profile=demo", "-y"], check=True
)
k8s_cluster.kubectl(
"label", "namespace", "default", "istio-injection=enabled", "--overwrite"
)
Expand Down
3 changes: 1 addition & 2 deletions dask_kubernetes/experimental/kubecluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ def __init__(
shutdown_on_close=None,
**kwargs,
):
self.name = name
self.namespace = namespace or namespace_default()
self.image = image
self.n_workers = n_workers
Expand All @@ -146,7 +145,7 @@ def __init__(

self._instances.add(self)

super().__init__(**kwargs)
super().__init__(name=name, **kwargs)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
Expand Down
6 changes: 4 additions & 2 deletions dask_kubernetes/helm/helmcluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,13 @@ def __init__(
worker_name="worker",
node_host=None,
node_port=None,
name=None,
**kwargs,
):
self.release_name = release_name
self.namespace = namespace or namespace_default()
self.name = self.release_name + "." + self.namespace
if name is None:
name = self.release_name + "." + self.namespace
check_dependency("helm")
check_dependency("kubectl")
status = subprocess.run(
Expand All @@ -113,7 +115,7 @@ def __init__(
self.node_host = node_host
self.node_port = node_port

super().__init__(**kwargs)
super().__init__(name=name, **kwargs)
if not self.asynchronous:
self._loop_runner.start()
self.sync(self._start)
Expand Down
18 changes: 10 additions & 8 deletions dask_kubernetes/helm/tests/test_helm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
@pytest.fixture(scope="session")
def chart_repo():
repo_name = "dask"
subprocess.check_output(
["helm", "repo", "add", repo_name, "https://helm.dask.org/"]
subprocess.run(
["helm", "repo", "add", repo_name, "https://helm.dask.org/"], check=True
)
subprocess.check_output(["helm", "repo", "update"])
subprocess.run(["helm", "repo", "update"], check=True)
return repo_name


Expand All @@ -51,7 +51,7 @@ def test_namespace():

@pytest.fixture(scope="session") # Creating this fixture is slow so we should reuse it.
def release(k8s_cluster, chart_name, test_namespace, release_name, config_path):
subprocess.check_output(
subprocess.run(
[
"helm",
"install",
Expand All @@ -63,10 +63,11 @@ def release(k8s_cluster, chart_name, test_namespace, release_name, config_path):
"--wait",
"-f",
config_path,
]
],
check=True,
)
# Scale back the additional workers group for now
subprocess.check_output(
subprocess.run(
[
"kubectl",
"scale",
Expand All @@ -75,10 +76,11 @@ def release(k8s_cluster, chart_name, test_namespace, release_name, config_path):
"deployment",
f"{release_name}-dask-worker-foo",
"--replicas=0",
]
],
check=True,
)
yield release_name
subprocess.check_output(["helm", "delete", "-n", test_namespace, release_name])
subprocess.run(["helm", "delete", "-n", test_namespace, release_name], check=True)


@pytest_asyncio.fixture
Expand Down