From 102ea98047fb6feccf8ad966174f4099b6982f28 Mon Sep 17 00:00:00 2001 From: Rares Gaia Date: Mon, 13 Oct 2025 09:09:47 +0300 Subject: [PATCH] feat: pipeline data We need to save pipeline data to be able to identify its identifiers and total throughput. For that, whenever a new config is being generated, we extract this data and save it in a list. Whenever we need to scale in, we get latest pipeline data that was added to the config and we use it to decide if we can remove it based on its data. --- infscale/configs/job.py | 18 +++++++++++++++-- infscale/controller/planner.py | 37 ++++++++++++++++++++++++++++++---- 2 files changed, 49 insertions(+), 6 deletions(-) diff --git a/infscale/configs/job.py b/infscale/configs/job.py index 2bd58f0d..d40aca87 100644 --- a/infscale/configs/job.py +++ b/infscale/configs/job.py @@ -328,7 +328,7 @@ def max_world_id(self) -> int: return max_id - def _server_id(self) -> str | None: + def server_id(self) -> str | None: """Return server id.""" for worker in self.workers: if worker.is_server: @@ -338,7 +338,7 @@ def _server_id(self) -> str | None: def server_ip(self) -> str: """Return IP address of server.""" - server_id = self._server_id() + server_id = self.server_id() if server_id is None: return "" @@ -395,6 +395,20 @@ def is_identical(x: JobConfig, y: JobConfig) -> bool: def world_name(world_id: int) -> str: """Return world name given a world id.""" return f"w{world_id}" + + @staticmethod + def get_pipeline_identifiers(new_cfg: JobConfig) -> set[str]: + """Get pipeline identifiers based on server id.""" + server_id = new_cfg.server_id() + + wrk_ids = set() + + for wid, worlds_list in new_cfg.flow_graph.items(): + for world_info in worlds_list: + if server_id in world_info.peers: + wrk_ids.add(wid) + + return wrk_ids @staticmethod def merge(base: JobConfig, extra: JobConfig) -> JobConfig: diff --git a/infscale/controller/planner.py b/infscale/controller/planner.py index d924e19a..244cb3ef 100644 --- a/infscale/controller/planner.py +++ b/infscale/controller/planner.py @@ -16,6 +16,7 @@ """planner.py.""" +from dataclasses import dataclass import json from pathlib import Path @@ -80,6 +81,14 @@ def enumerate(self) -> ExecPlan: yield plan +@dataclass +class PipelineData: + """PipelineData class.""" + + worker_ids: set[str] + total_throughput: float + + class Planner: """Planner class.""" @@ -90,6 +99,8 @@ def __init__(self, path: str, autoscale: bool) -> None: self._autoscale = autoscale self._colls: dict[str, PlanCollection] = {} + + self.pipeline_data: dict[str, list[PipelineData]] = {} def build_config( self, @@ -115,8 +126,13 @@ def build_config( if solution is None: raise InsufficientResources("No placement solution found") - gen2 = CfgGen2(solution[0], solution[1], source, "cuda", base_cfg) + placement, agent_ctxts_list, total_throughput = solution + + gen2 = CfgGen2(placement, agent_ctxts_list, source, "cuda", base_cfg) cfg = gen2.generate() + + self._set_pipeline_data(cfg, total_throughput) + return cfg ##### @@ -129,6 +145,19 @@ def build_config( # plan_list = self._colls[source.model].pick_plans(demand) # gen = CfgGen(agent_ctxts, source, plan_list, "cuda", base_cfg) # return gen.generate() + + def _set_pipeline_data(self, cfg: JobConfig, total_throughput) -> None: + """Set pipeline data.""" + job_id = cfg.job_id + + if job_id not in self.pipeline_data: + self.pipeline_data[job_id] = [] + + pipeline_identifiers = JobConfig.get_pipeline_identifiers(cfg) + prev_identifiers = {wid for data in self.pipeline_data[job_id] for wid in data.worker_ids} + new_identifiers = pipeline_identifiers - prev_identifiers + + self.pipeline_data[job_id].append(PipelineData(new_identifiers, total_throughput)) def _search_feasible_placement( self, @@ -138,7 +167,7 @@ def _search_feasible_placement( gpu_count: int, ctx_list: list[AgentContext], dispatcher_on_gpu: bool = True, - ) -> tuple[dict, list[AgentContext]] | None: + ) -> tuple[dict, list[AgentContext], float] | None: # we'd like to search a feasible solution by increasing the number of nodes for num_nodes in range(1, len(ctx_list) + 1): res = placement.calculate_placement( @@ -146,7 +175,7 @@ def _search_feasible_placement( ) meta = res["meta"] if meta["total_throughput"] > demand: - return (res, ctx_list[:num_nodes]) + return (res, ctx_list[:num_nodes], meta["total_throughput"]) return None @@ -156,7 +185,7 @@ def _calculate_placement( agent_ctxts: dict[str, AgentContext], demand: float, dispatcher_on_gpu: bool = True, - ) -> tuple[dict, list[AgentContext]] | None: + ) -> tuple[dict, list[AgentContext], float] | None: gpu_count_and_nodes: dict[int, list[AgentContext]] = {} for ctx in agent_ctxts.values(): count = ctx.avail_gpu_count()