From 278cbaca450fb8a6d4c1abca38ea1e1acbb569fb Mon Sep 17 00:00:00 2001 From: Reinout Eyckerman Date: Thu, 28 Dec 2023 10:55:25 +0100 Subject: [PATCH] refactor(adaptive-core): improved readability recommendations --- distributed/deploy/adaptive.py | 4 +-- distributed/deploy/adaptive_core.py | 44 +++++++++++++++++------------ 2 files changed, 28 insertions(+), 20 deletions(-) diff --git a/distributed/deploy/adaptive.py b/distributed/deploy/adaptive.py index 3f619641004..28a95a0baad 100644 --- a/distributed/deploy/adaptive.py +++ b/distributed/deploy/adaptive.py @@ -8,7 +8,7 @@ import dask.config from dask.utils import parse_timedelta -from distributed.deploy.adaptive_core import AdaptiveCore +from distributed.deploy.adaptive_core import AdaptiveCore, Recommendation from distributed.protocol import pickle from distributed.utils import log_errors @@ -152,7 +152,7 @@ async def target(self): target_duration=self.target_duration ) - async def recommendations(self, target: int) -> dict: + async def recommendations(self, target: int) -> Recommendation: if len(self.plan) != len(self.requested): # Ensure that the number of planned and requested workers # are in sync before making recommendations. diff --git a/distributed/deploy/adaptive_core.py b/distributed/deploy/adaptive_core.py index 0543e117b69..5d3616d6ae8 100644 --- a/distributed/deploy/adaptive_core.py +++ b/distributed/deploy/adaptive_core.py @@ -5,10 +5,11 @@ from collections import defaultdict, deque from collections.abc import Iterable from datetime import timedelta -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Literal, TypedDict, cast import tlz as toolz from tornado.ioloop import IOLoop +from typing_extensions import NotRequired import dask.config from dask.utils import parse_timedelta @@ -23,6 +24,15 @@ logger = logging.getLogger(__name__) +RecommendationStatus = Literal["up", "down", "same"] + + +class Recommendation(TypedDict): + status: RecommendationStatus + workers: NotRequired[set[WorkerState]] + n: NotRequired[int] + + class AdaptiveCore: """ The core logic for adaptive deployments, with none of the cluster details @@ -169,13 +179,13 @@ async def safe_target(self) -> int: return n - async def scale_down(self, n: int) -> None: + async def scale_down(self, workers: Iterable) -> None: raise NotImplementedError() - async def scale_up(self, workers: Iterable) -> None: + async def scale_up(self, n: int) -> None: raise NotImplementedError() - async def recommendations(self, target: int) -> dict: + async def recommendations(self, target: int) -> Recommendation: """ Make scale up/down recommendations based on current state and target """ @@ -185,11 +195,11 @@ async def recommendations(self, target: int) -> dict: if target == len(plan): self.close_counts.clear() - return {"status": "same"} + return Recommendation(status="same") if target > len(plan): self.close_counts.clear() - return {"status": "up", "n": target} + return Recommendation(status="up", n=target) # target < len(plan) not_yet_arrived = requested - observed @@ -212,9 +222,9 @@ async def recommendations(self, target: int) -> dict: del self.close_counts[k] if firmly_close: - return {"status": "down", "workers": list(firmly_close)} + return Recommendation(status="down", workers=firmly_close) else: - return {"status": "same"} + return Recommendation(status="same") async def adapt(self) -> None: """ @@ -229,18 +239,16 @@ async def adapt(self) -> None: try: target = await self.safe_target() - recommendations = await self.recommendations(target) - - if recommendations["status"] != "same": - self.log.append((time(), dict(recommendations))) + recommendation = await self.recommendations(target) - status = recommendations.pop("status") - if status == "same": + if recommendation["status"] == "same": return - if status == "up": - await self.scale_up(**recommendations) - if status == "down": - await self.scale_down(**recommendations) + else: + self.log.append((time(), cast(dict, recommendation))) + if recommendation["status"] == "up": + await self.scale_up(recommendation["n"]) + elif recommendation["status"] == "down": + await self.scale_down(recommendation["workers"]) except OSError: if status != "down": logger.error("Adaptive stopping due to error", exc_info=True)