diff --git a/infscale/configs/job.py b/infscale/configs/job.py index 2bd58f0d..d40aca87 100644 --- a/infscale/configs/job.py +++ b/infscale/configs/job.py @@ -328,7 +328,7 @@ def max_world_id(self) -> int: return max_id - def _server_id(self) -> str | None: + def server_id(self) -> str | None: """Return server id.""" for worker in self.workers: if worker.is_server: @@ -338,7 +338,7 @@ def _server_id(self) -> str | None: def server_ip(self) -> str: """Return IP address of server.""" - server_id = self._server_id() + server_id = self.server_id() if server_id is None: return "" @@ -395,6 +395,20 @@ def is_identical(x: JobConfig, y: JobConfig) -> bool: def world_name(world_id: int) -> str: """Return world name given a world id.""" return f"w{world_id}" + + @staticmethod + def get_pipeline_identifiers(new_cfg: JobConfig) -> set[str]: + """Get pipeline identifiers based on server id.""" + server_id = new_cfg.server_id() + + wrk_ids = set() + + for wid, worlds_list in new_cfg.flow_graph.items(): + for world_info in worlds_list: + if server_id in world_info.peers: + wrk_ids.add(wid) + + return wrk_ids @staticmethod def merge(base: JobConfig, extra: JobConfig) -> JobConfig: diff --git a/infscale/controller/planner.py b/infscale/controller/planner.py index d924e19a..244cb3ef 100644 --- a/infscale/controller/planner.py +++ b/infscale/controller/planner.py @@ -16,6 +16,7 @@ """planner.py.""" +from dataclasses import dataclass import json from pathlib import Path @@ -80,6 +81,14 @@ def enumerate(self) -> ExecPlan: yield plan +@dataclass +class PipelineData: + """PipelineData class.""" + + worker_ids: set[str] + total_throughput: float + + class Planner: """Planner class.""" @@ -90,6 +99,8 @@ def __init__(self, path: str, autoscale: bool) -> None: self._autoscale = autoscale self._colls: dict[str, PlanCollection] = {} + + self.pipeline_data: dict[str, list[PipelineData]] = {} def build_config( self, @@ -115,8 +126,13 @@ def build_config( if solution is None: raise InsufficientResources("No placement solution found") - gen2 = CfgGen2(solution[0], solution[1], source, "cuda", base_cfg) + placement, agent_ctxts_list, total_throughput = solution + + gen2 = CfgGen2(placement, agent_ctxts_list, source, "cuda", base_cfg) cfg = gen2.generate() + + self._set_pipeline_data(cfg, total_throughput) + return cfg ##### @@ -129,6 +145,19 @@ def build_config( # plan_list = self._colls[source.model].pick_plans(demand) # gen = CfgGen(agent_ctxts, source, plan_list, "cuda", base_cfg) # return gen.generate() + + def _set_pipeline_data(self, cfg: JobConfig, total_throughput) -> None: + """Set pipeline data.""" + job_id = cfg.job_id + + if job_id not in self.pipeline_data: + self.pipeline_data[job_id] = [] + + pipeline_identifiers = JobConfig.get_pipeline_identifiers(cfg) + prev_identifiers = {wid for data in self.pipeline_data[job_id] for wid in data.worker_ids} + new_identifiers = pipeline_identifiers - prev_identifiers + + self.pipeline_data[job_id].append(PipelineData(new_identifiers, total_throughput)) def _search_feasible_placement( self, @@ -138,7 +167,7 @@ def _search_feasible_placement( gpu_count: int, ctx_list: list[AgentContext], dispatcher_on_gpu: bool = True, - ) -> tuple[dict, list[AgentContext]] | None: + ) -> tuple[dict, list[AgentContext], float] | None: # we'd like to search a feasible solution by increasing the number of nodes for num_nodes in range(1, len(ctx_list) + 1): res = placement.calculate_placement( @@ -146,7 +175,7 @@ def _search_feasible_placement( ) meta = res["meta"] if meta["total_throughput"] > demand: - return (res, ctx_list[:num_nodes]) + return (res, ctx_list[:num_nodes], meta["total_throughput"]) return None @@ -156,7 +185,7 @@ def _calculate_placement( agent_ctxts: dict[str, AgentContext], demand: float, dispatcher_on_gpu: bool = True, - ) -> tuple[dict, list[AgentContext]] | None: + ) -> tuple[dict, list[AgentContext], float] | None: gpu_count_and_nodes: dict[int, list[AgentContext]] = {} for ctx in agent_ctxts.values(): count = ctx.avail_gpu_count()