Skip to content

Commit

Permalink
Replace kubernetes_asyncio with kr8s 0.7.0 in daskautoscaler_adapt (#757
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jacobtomlinson committed Jul 3, 2023
1 parent cfae6dd commit 8156bfc
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 104 deletions.
1 change: 1 addition & 0 deletions dask_kubernetes/operator/_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class DaskWorkerGroup(APIObject):
singular = "daskworkergroup"
namespaced = True
scalable = True
scalable_spec = "worker.replicas"

async def pods(self) -> List[Pod]:
return await self.api.get(
Expand Down
166 changes: 63 additions & 103 deletions dask_kubernetes/operator/controller/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,16 @@

import aiohttp
import kopf
from kr8s.asyncio.objects import Pod
import kubernetes_asyncio as kubernetes
from importlib_metadata import entry_points
from kubernetes_asyncio.client import ApiException

from dask_kubernetes.operator._objects import DaskCluster
from dask_kubernetes.operator._objects import (
DaskCluster,
DaskAutoscaler,
DaskWorkerGroup,
)
from dask_kubernetes.common.auth import ClusterAuth
from dask_kubernetes.common.networking import get_scheduler_address
from distributed.core import rpc, clean_exception
Expand Down Expand Up @@ -863,121 +868,76 @@ async def daskautoscaler_create(name, spec, namespace, logger, patch, **kwargs):

@kopf.timer("daskautoscaler.kubernetes.dask.org", interval=5.0)
async def daskautoscaler_adapt(spec, name, namespace, logger, **kwargs):
async with kubernetes.client.api_client.ApiClient() as api_client:
coreapi = kubernetes.client.CoreV1Api(api_client)
try:
scheduler = await Pod.get(
label_selector={
"dask.org/component": "scheduler",
"dask.org/cluster-name": spec["cluster"],
},
)
if not await scheduler.ready():
raise ValueError()
except ValueError:
logger.info("Scheduler not ready, skipping autoscaling")
return

pod_ready = False
try:
pods = await coreapi.list_namespaced_pod(
namespace=namespace,
label_selector=f"dask.org/component=scheduler,dask.org/cluster-name={spec['cluster']}",
)
scheduler_pod = await coreapi.read_namespaced_pod(
pods.items[0].metadata.name, namespace
)
if scheduler_pod.status.phase == "Running":
pod_ready = True
except ApiException as e:
if e.status != 404:
raise e

if not pod_ready:
logger.info("Scheduler not ready, skipping autoscaling")
return
autoscaler = await DaskAutoscaler.get(name, namespace=namespace)
worker_group = await DaskWorkerGroup.get(
f"{spec['cluster']}-default", namespace=namespace
)

customobjectsapi = kubernetes.client.CustomObjectsApi(api_client)
customobjectsapi.api_client.set_default_header(
"content-type", "application/merge-patch+json"
current_replicas = worker_group.replicas
cooldown_until = float(
autoscaler.annotations.get(
DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION, time.time()
)
)

autoscaler_resource = await customobjectsapi.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=namespace,
name=name,
)
# Cooldown autoscaling to prevent thrashing
if time.time() < cooldown_until:
logger.debug("Autoscaler for %s is in cooldown", spec["cluster"])
return

worker_group_resource = await customobjectsapi.get_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
# Ask the scheduler for the desired number of worker
try:
desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-scheduler",
namespace=namespace,
name=f"{spec['cluster']}-default",
logger=logger,
)
except SchedulerCommError:
logger.error("Unable to get desired number of workers from scheduler.")
return

current_replicas = int(worker_group_resource["spec"]["worker"]["replicas"])
cooldown_until = float(
autoscaler_resource.get("metadata", {})
.get("annotations", {})
.get(DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION, time.time())
)

# Cooldown autoscaling to prevent thrashing
if time.time() < cooldown_until:
logger.debug("Autoscaler for %s is in cooldown", spec["cluster"])
return

# Ask the scheduler for the desired number of worker
try:
desired_workers = await get_desired_workers(
scheduler_service_name=f"{spec['cluster']}-scheduler",
namespace=namespace,
logger=logger,
)
except SchedulerCommError:
logger.error("Unable to get desired number of workers from scheduler.")
return

# Ensure the desired number is within the min and max
desired_workers = max(spec["minimum"], desired_workers)
desired_workers = min(spec["maximum"], desired_workers)
# Ensure the desired number is within the min and max
desired_workers = max(spec["minimum"], desired_workers)
desired_workers = min(spec["maximum"], desired_workers)

if current_replicas > 0:
max_scale_down = int(current_replicas * 0.25)
max_scale_down = 1 if max_scale_down == 0 else max_scale_down
desired_workers = max(current_replicas - max_scale_down, desired_workers)
if current_replicas > 0:
max_scale_down = int(current_replicas * 0.25)
max_scale_down = 1 if max_scale_down == 0 else max_scale_down
desired_workers = max(current_replicas - max_scale_down, desired_workers)

# Update the default DaskWorkerGroup
if desired_workers != current_replicas:
await customobjectsapi.patch_namespaced_custom_object_scale(
group="kubernetes.dask.org",
version="v1",
plural="daskworkergroups",
namespace=namespace,
name=f"{spec['cluster']}-default",
body={"spec": {"replicas": desired_workers}},
)
# Update the default DaskWorkerGroup
if desired_workers != current_replicas:
await worker_group.scale(desired_workers)

cooldown_until = time.time() + 15
cooldown_until = time.time() + 15

await customobjectsapi.patch_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace=namespace,
name=name,
body={
"metadata": {
"annotations": {
DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: str(
cooldown_until
)
}
}
},
)
await autoscaler.annotate(
{DASK_AUTOSCALER_COOLDOWN_UNTIL_ANNOTATION: str(cooldown_until)}
)

logger.info(
"Autoscaler updated %s worker count from %d to %d",
spec["cluster"],
current_replicas,
desired_workers,
)
else:
logger.debug(
"Not autoscaling %s with %d workers", spec["cluster"], current_replicas
)
logger.info(
"Autoscaler updated %s worker count from %d to %d",
spec["cluster"],
current_replicas,
desired_workers,
)
else:
logger.debug(
"Not autoscaling %s with %d workers", spec["cluster"], current_replicas
)


@kopf.timer("daskcluster.kubernetes.dask.org", interval=5.0)
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ kubernetes-asyncio>=12.0.1
kopf>=1.35.3
pykube-ng>=22.9.0
rich>=12.5.1
kr8s==0.6.0
kr8s==0.7.0

0 comments on commit 8156bfc

Please sign in to comment.