Skip to content

Commit

Permalink
Migrate worker deployment test to use kr8s (#743)
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobtomlinson committed Jun 26, 2023
1 parent 6009f16 commit 5ad6367
Showing 1 changed file with 21 additions and 32 deletions.
53 changes: 21 additions & 32 deletions dask_kubernetes/operator/controller/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,39 +380,28 @@ async def test_recreate_scheduler_pod(k8s_cluster, kopf_runner, gen_cluster):
async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster):
with kopf_runner as runner:
async with gen_cluster() as (cluster_name, ns):
scheduler_deployment_name = "simple-scheduler"
worker_deployment_name = "simple-default-worker"
service_name = "simple-scheduler"
while scheduler_deployment_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
await asyncio.sleep(0.1)
while worker_deployment_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)
k8s_cluster.kubectl(
"delete",
"pods",
"-l",
"dask.org/cluster-name=simple,dask.org/component=worker",
"-n",
ns,
)
k8s_cluster.kubectl(
"wait",
"--for=condition=Ready",
"-l",
"dask.org/cluster-name=simple,dask.org/component=worker",
"pod",
"-n",
ns,
"--timeout=60s",
cluster = await DaskCluster.get(cluster_name, namespace=ns)
# Get the default worker group
while not (wgs := await cluster.worker_groups()):
await asyncio.sleep(0.1)
[wg] = wgs
# Wait for worker Pods to be created
while not (pods := await wg.pods()):
await asyncio.sleep(0.1)
# Store number of workers
n_pods = len(pods)
# Wait for worker Pods to be ready
await asyncio.gather(
*[pod.wait(conditions="condition=Ready", timeout=60) for pod in pods]
)
assert worker_deployment_name in k8s_cluster.kubectl(
"get", "pods", "-n", ns
# Delete a worker Pod
await pods[0].delete()
# Wait for Pods to be recreated
while len((pods := await wg.pods())) < n_pods:
await asyncio.sleep(0.1)
# Wait for worker Pods to be ready
await asyncio.gather(
*[pod.wait(conditions="condition=Ready", timeout=60) for pod in pods]
)


Expand Down

0 comments on commit 5ad6367

Please sign in to comment.