From 3f360051e60f6a09afd4ac30c6caaf270fc4bcaa Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Wed, 17 Sep 2025 12:27:02 +0500 Subject: [PATCH] Use fleet-combined idle_duration on run apply --- .../tasks/process_submitted_jobs.py | 92 ++++++++++--------- 1 file changed, 51 insertions(+), 41 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 41f925ca1..e8dc7b2f3 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -3,7 +3,7 @@ import math import uuid from datetime import datetime, timedelta -from typing import List, Optional, Tuple +from typing import List, Optional from sqlalchemy import and_, func, not_, or_, select from sqlalchemy.ext.asyncio import AsyncSession @@ -25,6 +25,7 @@ from dstack._internal.core.models.profiles import ( DEFAULT_RUN_TERMINATION_IDLE_TIME, CreationPolicy, + Profile, TerminationPolicy, ) from dstack._internal.core.models.resources import Memory @@ -34,6 +35,7 @@ JobRuntimeData, JobStatus, JobTerminationReason, + Requirements, Run, RunSpec, ) @@ -186,7 +188,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): project = run_model.project run = run_model_to_run(run_model) run_spec = run.run_spec - profile = run_spec.merged_profile + run_profile = run_spec.merged_profile job = find_job(run.jobs, job_model.replica_num, job_model.job_num) multinode = job.job_spec.jobs_per_replica > 1 @@ -333,7 +335,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): job_model.status = JobStatus.PROVISIONING else: # Assigned no instance, create a new one - if profile.creation_policy == CreationPolicy.REUSE: + if run_profile.creation_policy == CreationPolicy.REUSE: logger.debug("%s: reuse instance failed", fmt(job_model)) job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY @@ -362,7 +364,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): return logger.info("%s: now is provisioning a new instance", fmt(job_model)) - job_provisioning_data, offer = run_job_result + job_provisioning_data, offer, effective_profile, _ = run_job_result job_model.job_provisioning_data = job_provisioning_data.json() job_model.status = JobStatus.PROVISIONING if fleet_model is None: @@ -382,12 +384,11 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): instance = _create_instance_model_for_job( project=project, fleet_model=fleet_model, - run_spec=run_spec, job_model=job_model, - job=job, job_provisioning_data=job_provisioning_data, offer=offer, instance_num=instance_num, + profile=effective_profile, ) job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json() # Both this task and process_fleets can add instances to fleets. @@ -546,23 +547,22 @@ async def _find_optimal_fleet_with_offers( fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price candidate_fleet = fleet_model_to_fleet(candidate_fleet_model) - profile = combine_fleet_and_run_profiles( - candidate_fleet.spec.merged_profile, run_spec.merged_profile - ) - fleet_requirements = get_fleet_requirements(candidate_fleet.spec) - requirements = combine_fleet_and_run_requirements( - fleet_requirements, job.job_spec.requirements - ) - multinode = ( - candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER - or job.job_spec.jobs_per_replica > 1 - ) + profile = None + requirements = None + try: + profile, requirements = _get_run_profile_and_requirements_in_fleet( + job=job, + run_spec=run_spec, + fleet=candidate_fleet, + ) + except ValueError: + pass fleet_backend_offers = [] - if ( - _check_can_create_new_instance_in_fleet(candidate_fleet) - and profile is not None - and requirements is not None - ): + if profile is not None and requirements is not None: + multinode = ( + candidate_fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER + or job.job_spec.jobs_per_replica > 1 + ) fleet_backend_offers = await get_offers_by_requirements( project=project, profile=profile, @@ -704,7 +704,7 @@ async def _run_job_on_new_instance( master_job_provisioning_data: Optional[JobProvisioningData] = None, volumes: Optional[List[List[Volume]]] = None, fleet_model: Optional[FleetModel] = None, -) -> Optional[Tuple[JobProvisioningData, InstanceOfferWithAvailability]]: +) -> Optional[tuple[JobProvisioningData, InstanceOfferWithAvailability, Profile, Requirements]]: if volumes is None: volumes = [] profile = run.run_spec.merged_profile @@ -712,21 +712,14 @@ async def _run_job_on_new_instance( fleet = None if fleet_model is not None: fleet = fleet_model_to_fleet(fleet_model) - if not _check_can_create_new_instance_in_fleet(fleet): - logger.debug( - "%s: cannot fit new instance into fleet %s", fmt(job_model), fleet_model.name - ) - return None - profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, profile) - if profile is None: - logger.debug("%s: cannot combine fleet %s profile", fmt(job_model), fleet_model.name) - return None - fleet_requirements = get_fleet_requirements(fleet.spec) - requirements = combine_fleet_and_run_requirements(fleet_requirements, requirements) - if requirements is None: - logger.debug( - "%s: cannot combine fleet %s requirements", fmt(job_model), fleet_model.name + try: + profile, requirements = _get_run_profile_and_requirements_in_fleet( + job=job, + run_spec=run.run_spec, + fleet=fleet, ) + except ValueError as e: + logger.debug("%s: %s", fmt(job_model), e.args[0]) return None # TODO: Respect fleet provisioning properties such as tags @@ -766,7 +759,7 @@ async def _run_job_on_new_instance( project_ssh_private_key, offer_volumes, ) - return job_provisioning_data, offer + return job_provisioning_data, offer, profile, requirements except BackendError as e: logger.warning( "%s: %s launch in %s/%s failed: %s", @@ -789,6 +782,25 @@ async def _run_job_on_new_instance( return None +def _get_run_profile_and_requirements_in_fleet( + job: Job, + run_spec: RunSpec, + fleet: Fleet, +) -> tuple[Profile, Requirements]: + if not _check_can_create_new_instance_in_fleet(fleet): + raise ValueError("Cannot fit new instance into fleet") + profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile) + if profile is None: + raise ValueError("Cannot combine fleet profile") + fleet_requirements = get_fleet_requirements(fleet.spec) + requirements = combine_fleet_and_run_requirements( + fleet_requirements, job.job_spec.requirements + ) + if requirements is None: + raise ValueError("Cannot combine fleet requirements") + return profile, requirements + + def _check_can_create_new_instance_in_fleet(fleet: Fleet) -> bool: if fleet.spec.configuration.ssh_config is not None: return False @@ -857,14 +869,12 @@ async def _get_next_instance_num(session: AsyncSession, fleet_model: FleetModel) def _create_instance_model_for_job( project: ProjectModel, fleet_model: FleetModel, - run_spec: RunSpec, job_model: JobModel, - job: Job, job_provisioning_data: JobProvisioningData, offer: InstanceOfferWithAvailability, instance_num: int, + profile: Profile, ) -> InstanceModel: - profile = run_spec.merged_profile if not job_provisioning_data.dockerized: # terminate vastai/k8s instances immediately termination_policy = TerminationPolicy.DESTROY_AFTER_IDLE