Skip to content

Commit

Permalink
Add batch worker provisioning capability to the Operator (#748)
Browse files Browse the repository at this point in the history
* Add batch worker provisioning to the Operator

* Expose config options to controller, and scale workers in batches

* Improve scaling logic

* Default to workers_needed if not batching

* Typo *workerAllocation

* Add a test
  • Loading branch information
Matt711 committed Jul 13, 2023
1 parent c14d751 commit 35d42e5
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 4 deletions.
6 changes: 6 additions & 0 deletions dask_kubernetes/kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ kubernetes:
protocol: "tcp://"
dashboard_address: ":8787"

# Dask Operator Controller options
controller:
worker-allocation:
batch-size: null
delay: null

# Timeout to wait for the scheduler service to be up (in seconds)
# Set it to 0 to wait indefinitely (not recommended)
scheduler-service-wait-timeout: 30
Expand Down
18 changes: 14 additions & 4 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dask_kubernetes.common.networking import get_scheduler_address
from distributed.core import rpc, clean_exception
from distributed.protocol.pickle import dumps
import dask.config

_ANNOTATION_NAMESPACES_TO_IGNORE = (
"kopf.zalando.org",
Expand Down Expand Up @@ -632,8 +633,12 @@ async def daskworkergroup_replica_update(
annotations.update(**worker_spec["metadata"]["annotations"])
if "labels" in worker_spec["metadata"]:
labels.update(**worker_spec["metadata"]["labels"])

SIZE = dask.config.get("kubernetes.controller.worker-allocation.batch-size")
DELAY = dask.config.get("kubernetes.controller.worker-allocation.delay")
batch_size = min(workers_needed, SIZE) if SIZE else workers_needed
if workers_needed > 0:
for _ in range(workers_needed):
for _ in range(batch_size):
data = build_worker_deployment_spec(
worker_group_name=name,
namespace=namespace,
Expand All @@ -651,9 +656,14 @@ async def daskworkergroup_replica_update(
namespace=namespace,
body=data,
)
logger.info(
f"Scaled worker group {name} up to {desired_workers} workers."
)
if SIZE:
if workers_needed > SIZE:
raise kopf.TemporaryError(
"Added maximum number of workers for this batch but still need to create more workers, "
f"waiting for {DELAY} seconds before continuing.",
delay=DELAY,
)
logger.info(f"Scaled worker group {name} up to {desired_workers} workers.")
if workers_needed < 0:
worker_ids = await retire_workers(
n_workers=-workers_needed,
Expand Down
39 changes: 39 additions & 0 deletions dask_kubernetes/operator/controller/tests/test_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import pytest
import yaml
from dask.distributed import Client
import dask.config

from kr8s.asyncio.objects import Pod, Deployment, Service
from dask_kubernetes.operator.controller import (
Expand Down Expand Up @@ -405,6 +406,44 @@ async def test_recreate_worker_pods(k8s_cluster, kopf_runner, gen_cluster):
)


@pytest.mark.asyncio
async def test_simplecluster_batched_worker_deployments(
k8s_cluster, kopf_runner, gen_cluster
):
with kopf_runner as runner:
with dask.config.set(
{
"kubernetes.controller.worker-allocation.batch-size": 1,
"kubernetes.controller.worker-allocation.delay": 5,
}
):
async with gen_cluster() as (cluster_name, ns):
scheduler_deployment_name = "simple-scheduler"
worker_pod_name = "simple-default-worker"
service_name = "simple-scheduler"
while scheduler_deployment_name not in k8s_cluster.kubectl(
"get", "deployments", "-n", ns
):
await asyncio.sleep(0.1)
while service_name not in k8s_cluster.kubectl("get", "svc", "-n", ns):
await asyncio.sleep(0.1)
while worker_pod_name not in k8s_cluster.kubectl(
"get", "pods", "-n", ns
):
await asyncio.sleep(0.1)

with k8s_cluster.port_forward(
f"service/{service_name}", 8786, "-n", ns
) as port:
async with Client(
f"tcp://localhost:{port}", asynchronous=True
) as client:
await client.wait_for_workers(2)
futures = client.map(lambda x: x + 1, range(10))
total = client.submit(sum, futures)
assert (await total) == sum(map(lambda x: x + 1, range(10)))


def _get_job_status(k8s_cluster, ns):
return json.loads(
k8s_cluster.kubectl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ The following table lists the configurable parameters of the Dask-kubernetes-ope
| `metrics.worker.podMonitor.jobLabel` | The label to use to retrieve the job name from. | `""` |
| `metrics.worker.podMonitor.podTargetLabels` | PodTargetLabels transfers labels on the Kubernetes Pod onto the target. | `["dask.org/cluster-name", "dask.org/workergroup-name"]` |
| `metrics.worker.podMonitor.metricRelabelings` | MetricRelabelConfigs to apply to samples before ingestion. | `[]` |
| `workerAllocation.size` | | `"nil"` |
| `workerAllocation.delay` | | `"nil"` |



Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ spec:
{{- toYaml .Values.securityContext | nindent 12 }}
image: "{{ .Values.image.name }}:{{ .Values.image.tag }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
env:
- name: DASK_KUBERNETES__CONTROLLER__WORKER_ALLOCATION__BATCH_SIZE
value:
{{- toYaml .Values.workerAllocation.size | nindent 16 }}
- name: DASK_KUBERNETES__CONTROLLER__WORKER_ALLOCATION__DELAY
value:
{{- toYaml .Values.workerAllocation.delay | nindent 16 }}
args:
- --liveness=http://0.0.0.0:8080/healthz
{{- with .Values.kopfArgs }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ metrics:
- dask.org/cluster-name
- dask.org/workergroup-name
metricRelabelings: [] # MetricRelabelConfigs to apply to samples before ingestion.

workerAllocation:
size: nil
delay: nil

0 comments on commit 35d42e5

Please sign in to comment.