diff --git a/docs/docs/reference/cli/dstack/offer.md b/docs/docs/reference/cli/dstack/offer.md index fb56e73a4..ac84308ae 100644 --- a/docs/docs/reference/cli/dstack/offer.md +++ b/docs/docs/reference/cli/dstack/offer.md @@ -4,6 +4,9 @@ Displays available offers (hardware configurations) from configured backends or The output shows backend, region, instance type, resources, spot availability, and pricing. +!!! info "Experimental" + `dstack offer` command is currently an experimental feature. Backward compatibility is not guaranteed across releases. + ## Usage This command accepts most of the same arguments as [`dstack apply`](apply.md). diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 64ba0b031..4ab2633d9 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -38,10 +38,12 @@ from dstack._internal.server.services.prometheus.client_metrics import run_metrics from dstack._internal.server.services.runs import ( fmt, - is_replica_registered, process_terminating_run, - retry_run_replica_jobs, run_model_to_run, +) +from dstack._internal.server.services.runs.replicas import ( + is_replica_registered, + retry_run_replica_jobs, scale_run_replicas, ) from dstack._internal.server.services.secrets import get_project_secrets_mapping diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index d26f26d70..60e72a1af 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -1,12 +1,11 @@ import asyncio import itertools -import math import uuid from contextlib import AsyncExitStack from datetime import datetime, timedelta from typing import List, Optional, Union -from sqlalchemy import and_, func, not_, or_, select +from sqlalchemy import func, or_, select from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import contains_eager, joinedload, load_only, noload, selectinload @@ -25,7 +24,6 @@ from dstack._internal.core.models.common import NetworkMode from dstack._internal.core.models.compute_groups import ComputeGroupProvisioningData from dstack._internal.core.models.fleets import ( - Fleet, FleetConfiguration, FleetNodesSpec, FleetSpec, @@ -48,7 +46,6 @@ JobTerminationReason, Requirements, Run, - RunSpec, ) from dstack._internal.core.models.volumes import Volume from dstack._internal.core.services.profiles import get_termination @@ -74,23 +71,20 @@ ) from dstack._internal.server.services.backends import get_project_backend_by_type_or_error from dstack._internal.server.services.fleets import ( + check_can_create_new_cloud_instance_in_fleet, fleet_model_to_fleet, generate_fleet_name, - get_fleet_requirements, + get_fleet_master_instance_provisioning_data, get_next_instance_num, is_cloud_cluster, ) from dstack._internal.server.services.instances import ( - filter_pool_instances, - get_instance_offer, get_instance_provisioning_data, - get_shared_pool_instances_with_offers, ) from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, find_job, find_jobs, - get_instances_ids_with_detaching_volumes, get_job_configured_volume_models, get_job_configured_volumes, get_job_runtime_data, @@ -107,14 +101,19 @@ placement_group_model_to_placement_group_optional, schedule_fleet_placement_groups_deletion, ) -from dstack._internal.server.services.requirements.combine import ( - combine_fleet_and_run_profiles, - combine_fleet_and_run_requirements, -) from dstack._internal.server.services.runs import ( - check_run_spec_requires_instance_mounts, run_model_to_run, ) +from dstack._internal.server.services.runs.plan import ( + find_optimal_fleet_with_offers, + get_run_candidate_fleet_models_filters, + get_run_profile_and_requirements_in_fleet, + select_run_candidate_fleet_models_with_filters, +) +from dstack._internal.server.services.runs.spec import ( + check_run_spec_requires_instance_mounts, + get_nodes_required_num, +) from dstack._internal.server.services.volumes import ( volume_model_to_volume, ) @@ -286,7 +285,7 @@ async def _process_submitted_job( # Then, the job runs on the assigned instance or a new instance is provisioned. # This is needed to avoid holding instances lock for a long time. if not job_model.instance_assigned: - fleet_filters, instance_filters = await _get_candidate_fleet_models_filters( + fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( session=session, project=project, run_model=run_model, @@ -295,10 +294,11 @@ async def _process_submitted_job( ( fleet_models_with_instances, fleet_models_without_instances, - ) = await _select_fleet_models_with_filters( + ) = await select_run_candidate_fleet_models_with_filters( session=session, fleet_filters=fleet_filters, instance_filters=instance_filters, + lock_instances=True, ) instances_ids = sorted( itertools.chain.from_iterable( @@ -319,7 +319,7 @@ async def _process_submitted_job( instance_filters=instance_filters, ) fleet_models = fleet_models_with_instances + fleet_models_without_instances - fleet_model, fleet_instances_with_offers = await _find_optimal_fleet_with_offers( + fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( project=project, fleet_models=fleet_models, run_model=run_model, @@ -327,6 +327,7 @@ async def _process_submitted_job( job=job, master_job_provisioning_data=master_job_provisioning_data, volumes=volumes, + exclude_not_available=True, ) if fleet_model is None: if run_spec.merged_profile.fleets is not None: @@ -511,68 +512,6 @@ async def _process_submitted_job( await session.commit() -async def _get_candidate_fleet_models_filters( - session: AsyncSession, - project: ProjectModel, - run_model: RunModel, - run_spec: RunSpec, -) -> tuple[list, list]: - # If another job freed the instance but is still trying to detach volumes, - # do not provision on it to prevent attaching volumes that are currently detaching. - detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) - fleet_filters = [ - FleetModel.project_id == project.id, - FleetModel.deleted == False, - ] - if run_model.fleet is not None: - fleet_filters.append(FleetModel.id == run_model.fleet_id) - if run_spec.merged_profile.fleets is not None: - fleet_filters.append(FleetModel.name.in_(run_spec.merged_profile.fleets)) - instance_filters = [ - InstanceModel.deleted == False, - InstanceModel.id.not_in(detaching_instances_ids), - ] - return fleet_filters, instance_filters - - -async def _select_fleet_models_with_filters( - session: AsyncSession, fleet_filters: list, instance_filters: list -) -> tuple[list[FleetModel], list[FleetModel]]: - # Selecting fleets in two queries since Postgres does not allow - # locking nullable side of an outer join. So, first lock instances with inner join. - # Then select left out fleets without instances. - res = await session.execute( - select(FleetModel) - .join(FleetModel.instances) - .where(*fleet_filters) - .where(*instance_filters) - .options(contains_eager(FleetModel.instances)) - .order_by(InstanceModel.id) # take locks in order - .with_for_update(key_share=True, of=InstanceModel) - .execution_options(populate_existing=True) - ) - fleet_models_with_instances = list(res.unique().scalars().all()) - fleet_models_with_instances_ids = [f.id for f in fleet_models_with_instances] - res = await session.execute( - select(FleetModel) - .outerjoin(FleetModel.instances) - .where( - *fleet_filters, - FleetModel.id.not_in(fleet_models_with_instances_ids), - ) - .where( - or_( - InstanceModel.id.is_(None), - not_(and_(*instance_filters)), - ) - ) - .options(noload(FleetModel.instances)) - .execution_options(populate_existing=True) - ) - fleet_models_without_instances = list(res.unique().scalars().all()) - return fleet_models_with_instances, fleet_models_without_instances - - async def _refetch_fleet_models_with_instances( session: AsyncSession, fleets_ids: list[uuid.UUID], @@ -597,161 +536,6 @@ async def _refetch_fleet_models_with_instances( return fleet_models -async def _find_optimal_fleet_with_offers( - project: ProjectModel, - fleet_models: list[FleetModel], - run_model: RunModel, - run_spec: RunSpec, - job: Job, - master_job_provisioning_data: Optional[JobProvisioningData], - volumes: Optional[list[list[Volume]]], -) -> tuple[Optional[FleetModel], list[tuple[InstanceModel, InstanceOfferWithAvailability]]]: - if run_model.fleet is not None: - # Using the fleet that was already chosen by the master job - fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers( - fleet_model=run_model.fleet, - run_spec=run_spec, - job=job, - master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, - ) - return run_model.fleet, fleet_instances_with_pool_offers - - nodes_required_num = _get_nodes_required_num_for_run(run_spec) - # The current strategy is first to consider fleets that can accommodate - # the run without additional provisioning and choose the one with the cheapest pool offer. - # Then choose a fleet with the cheapest pool offer among all fleets with pool offers. - # If there are no fleets with pool offers, choose a fleet with a cheapest backend offer. - # Fallback to autocreated fleet if fleets have no pool or backend offers. - # TODO: Consider trying all backend offers and then choosing a fleet. - candidate_fleets_with_offers: list[ - tuple[ - Optional[FleetModel], - list[tuple[InstanceModel, InstanceOfferWithAvailability]], - int, - int, - tuple[int, float, float], - ] - ] = [] - for candidate_fleet_model in fleet_models: - candidate_fleet = fleet_model_to_fleet(candidate_fleet_model) - if ( - is_multinode_job(job) - and candidate_fleet.spec.configuration.placement != InstanceGroupPlacement.CLUSTER - ): - # Limit multinode runs to cluster fleets to guarantee best connectivity. - continue - - fleet_instances_with_pool_offers = _get_fleet_instances_with_pool_offers( - fleet_model=candidate_fleet_model, - run_spec=run_spec, - job=job, - # No need to pass master_job_provisioning_data for master job - # as all pool offers are suitable. - master_job_provisioning_data=None, - volumes=volumes, - ) - fleet_has_pool_capacity = nodes_required_num <= len(fleet_instances_with_pool_offers) - fleet_cheapest_pool_offer = math.inf - if len(fleet_instances_with_pool_offers) > 0: - fleet_cheapest_pool_offer = fleet_instances_with_pool_offers[0][1].price - - try: - _check_can_create_new_instance_in_fleet(candidate_fleet) - profile, requirements = _get_run_profile_and_requirements_in_fleet( - job=job, - run_spec=run_spec, - fleet=candidate_fleet, - ) - except ValueError: - fleet_backend_offers = [] - else: - # Master job offers must be in the same cluster as existing instances. - master_instance_provisioning_data = _get_fleet_master_instance_provisioning_data( - fleet_model=candidate_fleet_model, - fleet_spec=candidate_fleet.spec, - ) - # Handle multinode for old jobs that don't have requirements.multinode set. - # TODO: Drop multinode param. - multinode = requirements.multinode or is_multinode_job(job) - fleet_backend_offers = await get_offers_by_requirements( - project=project, - profile=profile, - requirements=requirements, - exclude_not_available=True, - multinode=multinode, - master_job_provisioning_data=master_instance_provisioning_data, - volumes=volumes, - privileged=job.job_spec.privileged, - instance_mounts=check_run_spec_requires_instance_mounts(run_spec), - ) - - fleet_cheapest_backend_offer = math.inf - if len(fleet_backend_offers) > 0: - fleet_cheapest_backend_offer = fleet_backend_offers[0][1].price - - if not _run_can_fit_into_fleet(run_spec, candidate_fleet): - logger.debug("Skipping fleet %s from consideration: run cannot fit into fleet") - continue - - fleet_priority = ( - not fleet_has_pool_capacity, - fleet_cheapest_pool_offer, - fleet_cheapest_backend_offer, - ) - candidate_fleets_with_offers.append( - ( - candidate_fleet_model, - fleet_instances_with_pool_offers, - len(fleet_instances_with_pool_offers), - len(fleet_backend_offers), - fleet_priority, - ) - ) - if len(candidate_fleets_with_offers) == 0: - return None, [] - if ( - not FeatureFlags.AUTOCREATED_FLEETS_DISABLED - and run_spec.merged_profile.fleets is None - and all(t[2] == 0 and t[3] == 0 for t in candidate_fleets_with_offers) - ): - # If fleets are not specified and no fleets have available pool - # or backend offers, create a new fleet. - # This is for compatibility with non-fleet-first UX when runs created new fleets - # if there are no instances to reuse. - return None, [] - candidate_fleets_with_offers.sort(key=lambda t: t[-1]) - return candidate_fleets_with_offers[0][:2] - - -def _get_nodes_required_num_for_run(run_spec: RunSpec) -> int: - nodes_required_num = 1 - if run_spec.configuration.type == "task": - nodes_required_num = run_spec.configuration.nodes - elif ( - run_spec.configuration.type == "service" - and run_spec.configuration.replicas.min is not None - ): - nodes_required_num = run_spec.configuration.replicas.min - return nodes_required_num - - -def _get_fleet_master_instance_provisioning_data( - fleet_model: FleetModel, - fleet_spec: FleetSpec, -) -> Optional[JobProvisioningData]: - master_instance_provisioning_data = None - if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER: - # Offers for master jobs must be in the same cluster as existing instances. - fleet_instance_models = [im for im in fleet_model.instances if not im.deleted] - if len(fleet_instance_models) > 0: - master_instance_model = fleet_instance_models[0] - master_instance_provisioning_data = JobProvisioningData.__response__.parse_raw( - master_instance_model.job_provisioning_data - ) - return master_instance_provisioning_data - - async def _fetch_fleet_with_master_instance_provisioning_data( exit_stack: AsyncExitStack, session: AsyncSession, @@ -800,86 +584,13 @@ async def _fetch_fleet_with_master_instance_provisioning_data( .execution_options(populate_existing=True) ) fleet_model = res.unique().scalar_one() - master_instance_provisioning_data = _get_fleet_master_instance_provisioning_data( + master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( fleet_model=fleet_model, fleet_spec=fleet.spec, ) return master_instance_provisioning_data -def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool: - """ - Returns `False` if the run cannot fit into fleet for sure. - This is helpful heuristic to avoid even considering fleets too small for a run. - A run may not fit even if this function returns `True`. - This will lead to some jobs failing due to exceeding `nodes.max` - or more than `nodes.max` instances being provisioned - and eventually removed by the fleet consolidation logic. - """ - # No check for cloud fleets with blocks > 1 since we don't know - # how many jobs such fleets can accommodate. - nodes_required_num = _get_nodes_required_num_for_run(run_spec) - if ( - fleet.spec.configuration.nodes is not None - and fleet.spec.configuration.blocks == 1 - and fleet.spec.configuration.nodes.max is not None - ): - busy_instances = [i for i in fleet.instances if i.busy_blocks > 0] - fleet_available_capacity = fleet.spec.configuration.nodes.max - len(busy_instances) - if fleet_available_capacity < nodes_required_num: - return False - elif fleet.spec.configuration.ssh_config is not None: - # Currently assume that each idle block can run a job. - # TODO: Take resources / eligible offers into account. - total_idle_blocks = 0 - for instance in fleet.instances: - total_blocks = instance.total_blocks or 1 - total_idle_blocks += total_blocks - instance.busy_blocks - if total_idle_blocks < nodes_required_num: - return False - return True - - -def _get_fleet_instances_with_pool_offers( - fleet_model: FleetModel, - run_spec: RunSpec, - job: Job, - master_job_provisioning_data: Optional[JobProvisioningData] = None, - volumes: Optional[List[List[Volume]]] = None, -) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: - pool_instances = fleet_model.instances - instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] - profile = run_spec.merged_profile - multinode = is_multinode_job(job) - nonshared_instances = filter_pool_instances( - pool_instances=pool_instances, - profile=profile, - requirements=job.job_spec.requirements, - status=InstanceStatus.IDLE, - fleet_model=fleet_model, - multinode=multinode, - master_job_provisioning_data=master_job_provisioning_data, - volumes=volumes, - shared=False, - ) - instances_with_offers = [ - (instance, common_utils.get_or_error(get_instance_offer(instance))) - for instance in nonshared_instances - ] - shared_instances_with_offers = get_shared_pool_instances_with_offers( - pool_instances=pool_instances, - profile=profile, - requirements=job.job_spec.requirements, - idle_only=True, - fleet_model=fleet_model, - multinode=multinode, - volumes=volumes, - ) - instances_with_offers.extend(shared_instances_with_offers) - instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) - return instances_with_offers - - async def _assign_job_to_fleet_instance( session: AsyncSession, fleet_model: Optional[FleetModel], @@ -983,8 +694,8 @@ async def _run_jobs_on_new_instances( if fleet_model is not None: fleet = fleet_model_to_fleet(fleet_model) try: - _check_can_create_new_instance_in_fleet(fleet) - profile, requirements = _get_run_profile_and_requirements_in_fleet( + check_can_create_new_cloud_instance_in_fleet(fleet) + profile, requirements = get_run_profile_and_requirements_in_fleet( job=job, run_spec=run.run_spec, fleet=fleet, @@ -1111,43 +822,6 @@ async def _run_jobs_on_new_instances( return None -def _get_run_profile_and_requirements_in_fleet( - job: Job, - run_spec: RunSpec, - fleet: Fleet, -) -> tuple[Profile, Requirements]: - profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile) - if profile is None: - raise ValueError("Cannot combine fleet profile") - fleet_requirements = get_fleet_requirements(fleet.spec) - requirements = combine_fleet_and_run_requirements( - fleet_requirements, job.job_spec.requirements - ) - if requirements is None: - raise ValueError("Cannot combine fleet requirements") - return profile, requirements - - -def _check_can_create_new_instance_in_fleet(fleet: Fleet): - if not _can_create_new_instance_in_fleet(fleet): - raise ValueError("Cannot fit new instance into fleet") - - -def _can_create_new_instance_in_fleet(fleet: Fleet) -> bool: - if fleet.spec.configuration.ssh_config is not None: - return False - active_instances = [i for i in fleet.instances if i.status.is_active()] - # nodes.max is a soft limit that can be exceeded when provisioning concurrently. - # The fleet consolidation logic will remove redundant nodes eventually. - if ( - fleet.spec.configuration.nodes is not None - and fleet.spec.configuration.nodes.max is not None - and len(active_instances) >= fleet.spec.configuration.nodes.max - ): - return False - return True - - async def _create_fleet_model_for_job( exit_stack: AsyncExitStack, session: AsyncSession, @@ -1157,7 +831,7 @@ async def _create_fleet_model_for_job( placement = InstanceGroupPlacement.ANY if run.run_spec.configuration.type == "task" and run.run_spec.configuration.nodes > 1: placement = InstanceGroupPlacement.CLUSTER - nodes = _get_nodes_required_num_for_run(run.run_spec) + nodes = get_nodes_required_num(run.run_spec) lock_namespace = f"fleet_names_{project.name}" if is_db_sqlite(): # Start new transaction to see committed changes after lock diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 5bf4b48bb..3a5329f6e 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -41,7 +41,7 @@ SpotPolicy, ) from dstack._internal.core.models.resources import ResourcesSpec -from dstack._internal.core.models.runs import Requirements, get_policy_map +from dstack._internal.core.models.runs import JobProvisioningData, Requirements, get_policy_map from dstack._internal.core.models.users import GlobalRole from dstack._internal.core.services import validate_dstack_resource_name from dstack._internal.core.services.diff import ModelDiff, copy_model, diff_models @@ -680,6 +680,42 @@ def get_next_instance_num(taken_instance_nums: set[int]) -> int: instance_num += 1 +def get_fleet_master_instance_provisioning_data( + fleet_model: FleetModel, + fleet_spec: FleetSpec, +) -> Optional[JobProvisioningData]: + master_instance_provisioning_data = None + if fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER: + # Offers for master jobs must be in the same cluster as existing instances. + fleet_instance_models = [im for im in fleet_model.instances if not im.deleted] + if len(fleet_instance_models) > 0: + master_instance_model = fleet_instance_models[0] + master_instance_provisioning_data = JobProvisioningData.__response__.parse_raw( + master_instance_model.job_provisioning_data + ) + return master_instance_provisioning_data + + +def can_create_new_cloud_instance_in_fleet(fleet: Fleet) -> bool: + if fleet.spec.configuration.ssh_config is not None: + return False + active_instances = [i for i in fleet.instances if i.status.is_active()] + # nodes.max is a soft limit that can be exceeded when provisioning concurrently. + # The fleet consolidation logic will remove redundant nodes eventually. + if ( + fleet.spec.configuration.nodes is not None + and fleet.spec.configuration.nodes.max is not None + and len(active_instances) >= fleet.spec.configuration.nodes.max + ): + return False + return True + + +def check_can_create_new_cloud_instance_in_fleet(fleet: Fleet): + if not can_create_new_cloud_instance_in_fleet(fleet): + raise ValueError("Cannot fit new cloud instance into fleet") + + async def _create_fleet( session: AsyncSession, project: ProjectModel, diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index e20a033a6..314bacb42 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -735,6 +735,10 @@ async def get_job_attached_volumes( return job_volumes +def remove_job_spec_sensitive_info(spec: JobSpec): + spec.ssh_key = None + + def _get_job_mount_point_attached_volume( volumes: List[Volume], job_provisioning_data: JobProvisioningData, diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs/__init__.py similarity index 66% rename from src/dstack/_internal/server/services/runs.py rename to src/dstack/_internal/server/services/runs/__init__.py index cff08e5b0..76702ac45 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs/__init__.py @@ -18,26 +18,12 @@ ServerClientError, ) from dstack._internal.core.models.common import ApplyAction -from dstack._internal.core.models.configurations import ( - RUN_PRIORITY_DEFAULT, - AnyRunConfiguration, - ServiceConfiguration, -) -from dstack._internal.core.models.instances import ( - InstanceAvailability, - InstanceOfferWithAvailability, - InstanceStatus, -) from dstack._internal.core.models.profiles import ( - CreationPolicy, RetryEvent, ) -from dstack._internal.core.models.repos.virtual import DEFAULT_VIRTUAL_REPO_ID, VirtualRunRepoData from dstack._internal.core.models.runs import ( - LEGACY_REPO_DIR, ApplyRunPlanInput, Job, - JobPlan, JobSpec, JobStatus, JobSubmission, @@ -51,13 +37,6 @@ RunTerminationReason, ServiceSpec, ) -from dstack._internal.core.models.volumes import ( - InstanceMountPoint, - Volume, -) -from dstack._internal.core.services import validate_dstack_resource_name -from dstack._internal.core.services.diff import diff_models -from dstack._internal.server import settings from dstack._internal.server.db import get_db, is_db_postgres, is_db_sqlite from dstack._internal.server.models import ( FleetModel, @@ -70,31 +49,27 @@ ) from dstack._internal.server.services import repos as repos_services from dstack._internal.server.services import services -from dstack._internal.server.services.docker import is_valid_docker_volume_target -from dstack._internal.server.services.instances import ( - filter_pool_instances, - get_instance_offer, - get_pool_instances, - get_shared_pool_instances_with_offers, -) from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, delay_job_instance_termination, - get_instances_ids_with_detaching_volumes, get_job_configured_volumes, get_jobs_from_run_spec, - group_jobs_by_replica_latest, - is_multinode_job, job_model_to_job_submission, + remove_job_spec_sensitive_info, stop_runner, ) from dstack._internal.server.services.locking import get_locker, string_to_lock_id from dstack._internal.server.services.logging import fmt -from dstack._internal.server.services.offers import get_offers_by_requirements from dstack._internal.server.services.plugins import apply_plugin_policies from dstack._internal.server.services.probes import is_probe_ready from dstack._internal.server.services.projects import list_user_project_models from dstack._internal.server.services.resources import set_resources_defaults +from dstack._internal.server.services.runs.plan import get_job_plans +from dstack._internal.server.services.runs.spec import ( + can_update_run_spec, + check_can_update_run_spec, + validate_run_spec_and_set_defaults, +) from dstack._internal.server.services.secrets import get_project_secrets_mapping from dstack._internal.server.services.users import get_user_model_by_name from dstack._internal.utils.logging import get_logger @@ -108,8 +83,6 @@ JobTerminationReason.FAILED_TO_START_DUE_TO_NO_CAPACITY, } -DEFAULT_MAX_OFFERS = 50 - async def list_user_runs( session: AsyncSession, @@ -320,14 +293,12 @@ async def get_plan( spec=effective_run_spec, ) effective_run_spec = RunSpec.parse_obj(effective_run_spec.dict()) - _validate_run_spec_and_set_defaults( + validate_run_spec_and_set_defaults( user=user, run_spec=effective_run_spec, legacy_default_working_dir=legacy_default_working_dir, ) - profile = effective_run_spec.merged_profile - creation_policy = profile.creation_policy current_resource = None action = ApplyAction.CREATE @@ -341,67 +312,18 @@ async def get_plan( # For backward compatibility (current_resource may has been submitted before # some fields, e.g., CPUSpec.arch, were added) set_resources_defaults(current_resource.run_spec.configuration.resources) - if not current_resource.status.is_finished() and _can_update_run_spec( + if not current_resource.status.is_finished() and can_update_run_spec( current_resource.run_spec, effective_run_spec ): action = ApplyAction.UPDATE - secrets = await get_project_secrets_mapping(session=session, project=project) - jobs = await get_jobs_from_run_spec( - run_spec=effective_run_spec, - secrets=secrets, - replica_num=0, - ) - - volumes = await get_job_configured_volumes( - session=session, - project=project, - run_spec=effective_run_spec, - job_num=0, - ) - - pool_offers = await _get_pool_offers( + job_plans = await get_job_plans( session=session, project=project, - run_spec=effective_run_spec, - job=jobs[0], - volumes=volumes, + profile=profile, + run_spec=run_spec, + max_offers=max_offers, ) - effective_run_spec.run_name = "dry-run" # will regenerate jobs on submission - - # Get offers once for all jobs - offers = [] - if creation_policy == CreationPolicy.REUSE_OR_CREATE: - offers = await get_offers_by_requirements( - project=project, - profile=profile, - requirements=jobs[0].job_spec.requirements, - exclude_not_available=False, - multinode=jobs[0].job_spec.jobs_per_replica > 1, - volumes=volumes, - privileged=jobs[0].job_spec.privileged, - instance_mounts=check_run_spec_requires_instance_mounts(effective_run_spec), - ) - - job_plans = [] - for job in jobs: - job_offers: List[InstanceOfferWithAvailability] = [] - job_offers.extend(pool_offers) - job_offers.extend(offer for _, offer in offers) - job_offers.sort(key=lambda offer: not offer.availability.is_available()) - - job_spec = job.job_spec - _remove_job_spec_sensitive_info(job_spec) - - job_plan = JobPlan( - job_spec=job_spec, - offers=job_offers[: (max_offers or DEFAULT_MAX_OFFERS)], - total_offers=len(job_offers), - max_price=max((offer.price for offer in job_offers), default=None), - ) - job_plans.append(job_plan) - - effective_run_spec.run_name = run_spec.run_name # restore run_name run_plan = RunPlan( project_name=project.name, user=user.name, @@ -430,7 +352,7 @@ async def apply_plan( ) # Spec must be copied by parsing to calculate merged_profile run_spec = RunSpec.parse_obj(run_spec.dict()) - _validate_run_spec_and_set_defaults( + validate_run_spec_and_set_defaults( user=user, run_spec=run_spec, legacy_default_working_dir=legacy_default_working_dir ) if run_spec.run_name is None: @@ -457,7 +379,7 @@ async def apply_plan( # some fields, e.g., CPUSpec.arch, were added) set_resources_defaults(current_resource.run_spec.configuration.resources) try: - _check_can_update_run_spec(current_resource.run_spec, run_spec) + check_can_update_run_spec(current_resource.run_spec, run_spec) except ServerClientError: # The except is only needed to raise an appropriate error if run is active if not current_resource.status.is_finished(): @@ -499,7 +421,7 @@ async def submit_run( project: ProjectModel, run_spec: RunSpec, ) -> Run: - _validate_run_spec_and_set_defaults(user, run_spec) + validate_run_spec_and_set_defaults(user, run_spec) repo = await _get_run_repo_or_error( session=session, project=project, @@ -791,7 +713,7 @@ def _get_run_jobs_with_submissions( # Use the spec from the latest submission. Submissions can have different specs job_spec = JobSpec.__response__.parse_raw(job_model.job_spec_data) if not include_sensitive: - _remove_job_spec_sensitive_info(job_spec) + remove_job_spec_sensitive_info(job_spec) jobs.append(Job(job_spec=job_spec, job_submissions=submissions)) return jobs @@ -851,51 +773,6 @@ def _get_run_fleet(run_model: RunModel) -> Optional[RunFleet]: ) -async def _get_pool_offers( - session: AsyncSession, - project: ProjectModel, - run_spec: RunSpec, - job: Job, - volumes: List[List[Volume]], -) -> list[InstanceOfferWithAvailability]: - pool_offers: list[InstanceOfferWithAvailability] = [] - - detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) - pool_instances = await get_pool_instances(session, project) - pool_instances = [i for i in pool_instances if i.id not in detaching_instances_ids] - multinode = is_multinode_job(job) - - shared_instances_with_offers = get_shared_pool_instances_with_offers( - pool_instances=pool_instances, - profile=run_spec.merged_profile, - requirements=job.job_spec.requirements, - volumes=volumes, - multinode=multinode, - ) - for _, offer in shared_instances_with_offers: - pool_offers.append(offer) - - nonshared_instances = filter_pool_instances( - pool_instances=pool_instances, - profile=run_spec.merged_profile, - requirements=job.job_spec.requirements, - multinode=multinode, - volumes=volumes, - shared=False, - ) - for instance in nonshared_instances: - offer = get_instance_offer(instance) - if offer is None: - continue - offer.availability = InstanceAvailability.BUSY - if instance.status == InstanceStatus.IDLE: - offer.availability = InstanceAvailability.IDLE - pool_offers.append(offer) - - pool_offers.sort(key=lambda offer: offer.price) - return pool_offers - - async def _generate_run_name( session: AsyncSession, project: ProjectModel, @@ -916,13 +793,6 @@ async def _generate_run_name( idx += 1 -def check_run_spec_requires_instance_mounts(run_spec: RunSpec) -> bool: - return any( - isinstance(mp, InstanceMountPoint) and not mp.optional - for mp in run_spec.configuration.volumes - ) - - async def _validate_run( session: AsyncSession, user: UserModel, @@ -995,150 +865,6 @@ def _get_job_submission_cost(job_submission: JobSubmission) -> float: return job_submission.job_provisioning_data.price * duration_hours -def _validate_run_spec_and_set_defaults( - user: UserModel, run_spec: RunSpec, legacy_default_working_dir: bool = False -): - # This function may set defaults for null run_spec values, - # although most defaults are resolved when building job_spec - # so that we can keep both the original user-supplied value (null in run_spec) - # and the default in job_spec. - # If a property is stored in job_spec - resolve the default there. - # Server defaults are preferable over client defaults so that - # the defaults depend on the server version, not the client version. - if run_spec.run_name is not None: - validate_dstack_resource_name(run_spec.run_name) - for mount_point in run_spec.configuration.volumes: - if not is_valid_docker_volume_target(mount_point.path): - raise ServerClientError(f"Invalid volume mount path: {mount_point.path}") - if run_spec.repo_id is None and run_spec.repo_data is not None: - raise ServerClientError("repo_data must not be set if repo_id is not set") - if run_spec.repo_id is not None and run_spec.repo_data is None: - raise ServerClientError("repo_id must not be set if repo_data is not set") - # Some run_spec parameters have to be set here and not in the model defaults since - # the client may not pass them or pass null, but they must be always present, e.g. for runner. - if run_spec.repo_id is None: - run_spec.repo_id = DEFAULT_VIRTUAL_REPO_ID - if run_spec.repo_data is None: - run_spec.repo_data = VirtualRunRepoData() - if ( - run_spec.merged_profile.utilization_policy is not None - and run_spec.merged_profile.utilization_policy.time_window - > settings.SERVER_METRICS_RUNNING_TTL_SECONDS - ): - raise ServerClientError( - f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s" - ) - if isinstance(run_spec.configuration, ServiceConfiguration): - if run_spec.merged_profile.schedule and run_spec.configuration.replicas.min == 0: - raise ServerClientError( - "Scheduled services with autoscaling to zero are not supported" - ) - if len(run_spec.configuration.probes) > settings.MAX_PROBES_PER_JOB: - raise ServerClientError( - f"Cannot configure more than {settings.MAX_PROBES_PER_JOB} probes" - ) - if any( - p.timeout is not None and p.timeout > settings.MAX_PROBE_TIMEOUT - for p in run_spec.configuration.probes - ): - raise ServerClientError( - f"Probe timeout cannot be longer than {settings.MAX_PROBE_TIMEOUT}s" - ) - if run_spec.configuration.priority is None: - run_spec.configuration.priority = RUN_PRIORITY_DEFAULT - set_resources_defaults(run_spec.configuration.resources) - if run_spec.ssh_key_pub is None: - if user.ssh_public_key: - run_spec.ssh_key_pub = user.ssh_public_key - else: - raise ServerClientError("ssh_key_pub must be set if the user has no ssh_public_key") - if run_spec.configuration.working_dir is None and legacy_default_working_dir: - run_spec.configuration.working_dir = LEGACY_REPO_DIR - - -_UPDATABLE_SPEC_FIELDS = ["configuration_path", "configuration"] -_TYPE_SPECIFIC_UPDATABLE_SPEC_FIELDS = { - "service": [ - # rolling deployment - "repo_data", - "repo_code_hash", - "file_archives", - "working_dir", - ], -} -_CONF_UPDATABLE_FIELDS = ["priority"] -_TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS = { - "dev-environment": ["inactivity_duration"], - "service": [ - # in-place - "replicas", - "scaling", - # rolling deployment - # NOTE: keep this list in sync with the "Rolling deployment" section in services.md - "port", - "probes", - "resources", - "volumes", - "docker", - "files", - "image", - "user", - "privileged", - "entrypoint", - "working_dir", - "python", - "nvcc", - "single_branch", - "env", - "shell", - "commands", - ], -} - - -def _can_update_run_spec(current_run_spec: RunSpec, new_run_spec: RunSpec) -> bool: - try: - _check_can_update_run_spec(current_run_spec, new_run_spec) - except ServerClientError as e: - logger.debug("Run cannot be updated: %s", repr(e)) - return False - return True - - -def _check_can_update_run_spec(current_run_spec: RunSpec, new_run_spec: RunSpec): - spec_diff = diff_models(current_run_spec, new_run_spec) - changed_spec_fields = list(spec_diff.keys()) - updatable_spec_fields = _UPDATABLE_SPEC_FIELDS + _TYPE_SPECIFIC_UPDATABLE_SPEC_FIELDS.get( - new_run_spec.configuration.type, [] - ) - for key in changed_spec_fields: - if key not in updatable_spec_fields: - raise ServerClientError( - f"Failed to update fields {changed_spec_fields}." - f" Can only update {updatable_spec_fields}." - ) - _check_can_update_configuration(current_run_spec.configuration, new_run_spec.configuration) - - -def _check_can_update_configuration( - current: AnyRunConfiguration, new: AnyRunConfiguration -) -> None: - if current.type != new.type: - raise ServerClientError( - f"Configuration type changed from {current.type} to {new.type}, cannot update" - ) - updatable_fields = _CONF_UPDATABLE_FIELDS + _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get( - new.type, [] - ) - diff = diff_models(current, new) - changed_fields = list(diff.keys()) - for key in changed_fields: - if key not in updatable_fields: - raise ServerClientError( - f"Failed to update fields {changed_fields}. Can only update {updatable_fields}" - ) - - async def process_terminating_run(session: AsyncSession, run_model: RunModel): """ Used by both `process_runs` and `stop_run` to process a TERMINATING run. @@ -1200,136 +926,10 @@ async def process_terminating_run(session: AsyncSession, run_model: RunModel): ) -async def scale_run_replicas(session: AsyncSession, run_model: RunModel, replicas_diff: int): - if replicas_diff == 0: - # nothing to do - return - - logger.info( - "%s: scaling %s %s replica(s)", - fmt(run_model), - "UP" if replicas_diff > 0 else "DOWN", - abs(replicas_diff), - ) - - # lists of (importance, is_out_of_date, replica_num, jobs) - active_replicas = [] - inactive_replicas = [] - - for replica_num, replica_jobs in group_jobs_by_replica_latest(run_model.jobs): - statuses = set(job.status for job in replica_jobs) - deployment_num = replica_jobs[0].deployment_num # same for all jobs - is_out_of_date = deployment_num < run_model.deployment_num - if {JobStatus.TERMINATING, *JobStatus.finished_statuses()} & statuses: - # if there are any terminating or finished jobs, the replica is inactive - inactive_replicas.append((0, is_out_of_date, replica_num, replica_jobs)) - elif JobStatus.SUBMITTED in statuses: - # if there are any submitted jobs, the replica is active and has the importance of 0 - active_replicas.append((0, is_out_of_date, replica_num, replica_jobs)) - elif {JobStatus.PROVISIONING, JobStatus.PULLING} & statuses: - # if there are any provisioning or pulling jobs, the replica is active and has the importance of 1 - active_replicas.append((1, is_out_of_date, replica_num, replica_jobs)) - elif not is_replica_registered(replica_jobs): - # all jobs are running, but not receiving traffic, the replica is active and has the importance of 2 - active_replicas.append((2, is_out_of_date, replica_num, replica_jobs)) - else: - # all jobs are running and ready, the replica is active and has the importance of 3 - active_replicas.append((3, is_out_of_date, replica_num, replica_jobs)) - - # sort by is_out_of_date (up-to-date first), importance (desc), and replica_num (asc) - active_replicas.sort(key=lambda r: (r[1], -r[0], r[2])) - run_spec = RunSpec.__response__.parse_raw(run_model.run_spec) - - if replicas_diff < 0: - for _, _, _, replica_jobs in reversed(active_replicas[-abs(replicas_diff) :]): - # scale down the less important replicas first - for job in replica_jobs: - if job.status.is_finished() or job.status == JobStatus.TERMINATING: - continue - job.status = JobStatus.TERMINATING - job.termination_reason = JobTerminationReason.SCALED_DOWN - # background task will process the job later - else: - scheduled_replicas = 0 - - # rerun inactive replicas - for _, _, _, replica_jobs in inactive_replicas: - if scheduled_replicas == replicas_diff: - break - await retry_run_replica_jobs(session, run_model, replica_jobs, only_failed=False) - scheduled_replicas += 1 - - secrets = await get_project_secrets_mapping( - session=session, - project=run_model.project, - ) - - for replica_num in range( - len(active_replicas) + scheduled_replicas, len(active_replicas) + replicas_diff - ): - # FIXME: Handle getting image configuration errors or skip it. - jobs = await get_jobs_from_run_spec( - run_spec=run_spec, - secrets=secrets, - replica_num=replica_num, - ) - for job in jobs: - job_model = create_job_model_for_new_submission( - run_model=run_model, - job=job, - status=JobStatus.SUBMITTED, - ) - session.add(job_model) - - -async def retry_run_replica_jobs( - session: AsyncSession, run_model: RunModel, latest_jobs: List[JobModel], *, only_failed: bool -): - # FIXME: Handle getting image configuration errors or skip it. - secrets = await get_project_secrets_mapping( - session=session, - project=run_model.project, - ) - new_jobs = await get_jobs_from_run_spec( - run_spec=RunSpec.__response__.parse_raw(run_model.run_spec), - secrets=secrets, - replica_num=latest_jobs[0].replica_num, - ) - assert len(new_jobs) == len(latest_jobs), ( - "Changing the number of jobs within a replica is not yet supported" - ) - for job_model, new_job in zip(latest_jobs, new_jobs): - if not (job_model.status.is_finished() or job_model.status == JobStatus.TERMINATING): - if only_failed: - # No need to resubmit, skip - continue - # The job is not finished, but we have to retry all jobs. Terminate it - job_model.status = JobStatus.TERMINATING - job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER - - new_job_model = create_job_model_for_new_submission( - run_model=run_model, - job=new_job, - status=JobStatus.SUBMITTED, - ) - # dirty hack to avoid passing all job submissions - new_job_model.submission_num = job_model.submission_num + 1 - session.add(new_job_model) - - def is_job_ready(probes: Iterable[ProbeModel], probe_specs: Iterable[ProbeSpec]) -> bool: return all(is_probe_ready(probe, probe_spec) for probe, probe_spec in zip(probes, probe_specs)) -def is_replica_registered(jobs: list[JobModel]) -> bool: - # Only job_num=0 is supposed to receive service requests - return jobs[0].registered - - -def _remove_job_spec_sensitive_info(spec: JobSpec): - spec.ssh_key = None - - def _get_next_triggered_at(run_spec: RunSpec) -> Optional[datetime]: if run_spec.merged_profile.schedule is None: return None diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py new file mode 100644 index 000000000..b19876bd6 --- /dev/null +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -0,0 +1,603 @@ +import math +from typing import List, Optional, Union + +from sqlalchemy import and_, not_, or_, select +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import contains_eager, noload + +from dstack._internal.core.backends.base.backend import Backend +from dstack._internal.core.models.fleets import Fleet, InstanceGroupPlacement +from dstack._internal.core.models.instances import ( + InstanceAvailability, + InstanceOfferWithAvailability, + InstanceStatus, +) +from dstack._internal.core.models.profiles import CreationPolicy, Profile +from dstack._internal.core.models.runs import ( + Job, + JobPlan, + JobProvisioningData, + Requirements, + RunSpec, +) +from dstack._internal.core.models.volumes import Volume +from dstack._internal.server.models import FleetModel, InstanceModel, ProjectModel, RunModel +from dstack._internal.server.services.fleets import ( + check_can_create_new_cloud_instance_in_fleet, + fleet_model_to_fleet, + get_fleet_master_instance_provisioning_data, + get_fleet_requirements, +) +from dstack._internal.server.services.instances import ( + filter_pool_instances, + get_instance_offer, + get_pool_instances, + get_shared_pool_instances_with_offers, +) +from dstack._internal.server.services.jobs import ( + get_instances_ids_with_detaching_volumes, + get_job_configured_volumes, + get_jobs_from_run_spec, + is_multinode_job, + remove_job_spec_sensitive_info, +) +from dstack._internal.server.services.offers import get_offers_by_requirements +from dstack._internal.server.services.requirements.combine import ( + combine_fleet_and_run_profiles, + combine_fleet_and_run_requirements, +) +from dstack._internal.server.services.runs.spec import ( + check_run_spec_requires_instance_mounts, + get_nodes_required_num, +) +from dstack._internal.server.services.secrets import get_project_secrets_mapping +from dstack._internal.settings import FeatureFlags +from dstack._internal.utils import common as common_utils +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +DEFAULT_MAX_OFFERS = 50 + + +async def get_job_plans( + session: AsyncSession, + project: ProjectModel, + profile: Profile, + run_spec: RunSpec, + max_offers: Optional[int], +) -> list[JobPlan]: + run_name = run_spec.run_name + if run_spec.run_name is None: + # Set/unset dummy run name to generate job names for run plan. + run_spec.run_name = "dry-run" + + secrets = await get_project_secrets_mapping(session=session, project=project) + jobs = await get_jobs_from_run_spec( + run_spec=run_spec, + secrets=secrets, + replica_num=0, + ) + volumes = await get_job_configured_volumes( + session=session, + project=project, + run_spec=run_spec, + job_num=0, + ) + candidate_fleet_models = await _select_candidate_fleet_models( + session=session, + project=project, + run_model=None, + run_spec=run_spec, + ) + fleet_model, instance_offers, backend_offers = await find_optimal_fleet_with_offers( + project=project, + fleet_models=candidate_fleet_models, + run_model=None, + run_spec=run_spec, + job=jobs[0], + master_job_provisioning_data=None, + volumes=volumes, + exclude_not_available=False, + ) + if _should_force_non_fleet_offers(run_spec) or ( + not FeatureFlags.AUTOCREATED_FLEETS_DISABLED + and profile.fleets is None + and fleet_model is None + ): + # Keep the old behavior returning all offers irrespective of fleets. + # Needed for supporting offers with autocreated fleets flow (and for `dstack offer`). + # TODO: Consider dropping when autocreated fleets are dropped. + instance_offers, backend_offers = await _get_non_fleet_offers( + session=session, + project=project, + profile=profile, + run_spec=run_spec, + job=jobs[0], + volumes=volumes, + ) + + job_plans = [] + for job in jobs: + job_plan = _get_job_plan( + instance_offers=instance_offers, + backend_offers=backend_offers, + profile=profile, + job=job, + max_offers=max_offers, + ) + job_plans.append(job_plan) + + run_spec.run_name = run_name + return job_plans + + +async def get_run_candidate_fleet_models_filters( + session: AsyncSession, + project: ProjectModel, + run_model: Optional[RunModel], + run_spec: RunSpec, +) -> tuple[list, list]: + """ + Returns ORM fleet and instance filters for selecting run candidate fleet models with instances. + """ + # If another job freed the instance but is still trying to detach volumes, + # do not provision on it to prevent attaching volumes that are currently detaching. + detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) + fleet_filters = [ + FleetModel.project_id == project.id, + FleetModel.deleted == False, + ] + if run_model is not None and run_model.fleet is not None: + fleet_filters.append(FleetModel.id == run_model.fleet_id) + if run_spec.merged_profile.fleets is not None: + fleet_filters.append(FleetModel.name.in_(run_spec.merged_profile.fleets)) + instance_filters = [ + InstanceModel.deleted == False, + InstanceModel.id.not_in(detaching_instances_ids), + ] + return fleet_filters, instance_filters + + +async def select_run_candidate_fleet_models_with_filters( + session: AsyncSession, + fleet_filters: list, + instance_filters: list, + lock_instances: bool, +) -> tuple[list[FleetModel], list[FleetModel]]: + # Selecting fleets in two queries since Postgres does not allow + # locking nullable side of an outer join. So, first lock instances with inner join. + # Then select left out fleets without instances. + stmt = ( + select(FleetModel) + .join(FleetModel.instances) + .where(*fleet_filters) + .where(*instance_filters) + .options(contains_eager(FleetModel.instances)) + .execution_options(populate_existing=True) + ) + if lock_instances: + stmt = stmt.order_by(InstanceModel.id).with_for_update( # take locks in order + key_share=True, of=InstanceModel + ) + res = await session.execute(stmt) + fleet_models_with_instances = list(res.unique().scalars().all()) + fleet_models_with_instances_ids = [f.id for f in fleet_models_with_instances] + res = await session.execute( + select(FleetModel) + .outerjoin(FleetModel.instances) + .where( + *fleet_filters, + FleetModel.id.not_in(fleet_models_with_instances_ids), + ) + .where( + or_( + InstanceModel.id.is_(None), + not_(and_(*instance_filters)), + ) + ) + .options(noload(FleetModel.instances)) + .execution_options(populate_existing=True) + ) + fleet_models_without_instances = list(res.unique().scalars().all()) + return fleet_models_with_instances, fleet_models_without_instances + + +async def find_optimal_fleet_with_offers( + project: ProjectModel, + fleet_models: list[FleetModel], + run_model: Optional[RunModel], + run_spec: RunSpec, + job: Job, + master_job_provisioning_data: Optional[JobProvisioningData], + volumes: Optional[list[list[Volume]]], + exclude_not_available: bool, +) -> tuple[ + Optional[FleetModel], + list[tuple[InstanceModel, InstanceOfferWithAvailability]], + list[tuple[Backend, InstanceOfferWithAvailability]], +]: + """ + Finds the optimal fleet for the run among the given fleets and returns + the fleet model, pool offers with instances, and backend offers. + Returns empty backend offers if run_model.fleet is set since + backend offer from this function are needed only for run plan. + Only available offers are considered for selecting fleets but may return + either available or all offers depending on `exclude_not_available`. + """ + if run_model is not None and run_model.fleet is not None: + # Using the fleet that was already chosen by the master job + instance_offers = _get_run_fleet_instance_offers( + fleet_model=run_model.fleet, + run_spec=run_spec, + job=job, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + ) + if exclude_not_available: + instance_offers = _exclude_non_available_instance_offers(instance_offers) + return run_model.fleet, instance_offers, [] + + nodes_required_num = get_nodes_required_num(run_spec) + # The current strategy is first to consider fleets that can accommodate + # the run without additional provisioning and choose the one with the cheapest pool offer. + # Then choose a fleet with the cheapest pool offer among all fleets with pool offers. + # If there are no fleets with pool offers, choose a fleet with a cheapest backend offer. + # Fallback to autocreated fleet if fleets have no pool or backend offers. + # TODO: Consider trying all backend offers and then choosing a fleet. + candidate_fleets_with_offers: list[ + tuple[ + Optional[FleetModel], + list[tuple[InstanceModel, InstanceOfferWithAvailability]], + list[tuple[Backend, InstanceOfferWithAvailability]], + int, + int, + tuple[int, float, float], + ] + ] = [] + for candidate_fleet_model in fleet_models: + candidate_fleet = fleet_model_to_fleet(candidate_fleet_model) + if ( + is_multinode_job(job) + and candidate_fleet.spec.configuration.placement != InstanceGroupPlacement.CLUSTER + ): + # Limit multinode runs to cluster fleets to guarantee best connectivity. + continue + + instance_offers = _get_run_fleet_instance_offers( + fleet_model=candidate_fleet_model, + run_spec=run_spec, + job=job, + # No need to pass master_job_provisioning_data for master job + # as all pool offers are suitable. + master_job_provisioning_data=None, + volumes=volumes, + ) + available_instance_offers = _exclude_non_available_instance_offers(instance_offers) + if exclude_not_available: + instance_offers = available_instance_offers + has_pool_capacity = nodes_required_num <= len(available_instance_offers) + min_instance_offer_price = _get_min_instance_or_backend_offer_price( + available_instance_offers + ) + + try: + check_can_create_new_cloud_instance_in_fleet(candidate_fleet) + profile, requirements = get_run_profile_and_requirements_in_fleet( + job=job, + run_spec=run_spec, + fleet=candidate_fleet, + ) + except ValueError: + backend_offers = [] + else: + # Master job offers must be in the same cluster as existing instances. + master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( + fleet_model=candidate_fleet_model, + fleet_spec=candidate_fleet.spec, + ) + # Handle multinode for old jobs that don't have requirements.multinode set. + # TODO: Drop multinode param. + multinode = requirements.multinode or is_multinode_job(job) + backend_offers = await get_offers_by_requirements( + project=project, + profile=profile, + requirements=requirements, + multinode=multinode, + master_job_provisioning_data=master_instance_provisioning_data, + volumes=volumes, + privileged=job.job_spec.privileged, + instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + ) + + available_backend_offers = _exclude_non_available_backend_offers(backend_offers) + if exclude_not_available: + backend_offers = available_backend_offers + min_backend_offer_price = _get_min_instance_or_backend_offer_price( + available_backend_offers + ) + + if not _run_can_fit_into_fleet(run_spec, candidate_fleet): + logger.debug("Skipping fleet %s from consideration: run cannot fit into fleet") + continue + + fleet_priority = ( + not has_pool_capacity, + min_instance_offer_price, + min_backend_offer_price, + ) + candidate_fleets_with_offers.append( + ( + candidate_fleet_model, + instance_offers, + backend_offers, + len(available_instance_offers), + len(available_backend_offers), + fleet_priority, + ) + ) + if len(candidate_fleets_with_offers) == 0: + return None, [], [] + if ( + not FeatureFlags.AUTOCREATED_FLEETS_DISABLED + and run_spec.merged_profile.fleets is None + and all(t[3] == 0 and t[4] == 0 for t in candidate_fleets_with_offers) + ): + # If fleets are not specified and no fleets have available pool + # or backend offers, create a new fleet. + # This is for compatibility with non-fleet-first UX when runs created new fleets + # if there are no instances to reuse. + return None, [], [] + candidate_fleets_with_offers.sort(key=lambda t: t[-1]) + return candidate_fleets_with_offers[0][:3] + + +def get_run_profile_and_requirements_in_fleet( + job: Job, + run_spec: RunSpec, + fleet: Fleet, +) -> tuple[Profile, Requirements]: + profile = combine_fleet_and_run_profiles(fleet.spec.merged_profile, run_spec.merged_profile) + if profile is None: + raise ValueError("Cannot combine fleet profile") + fleet_requirements = get_fleet_requirements(fleet.spec) + requirements = combine_fleet_and_run_requirements( + fleet_requirements, job.job_spec.requirements + ) + if requirements is None: + raise ValueError("Cannot combine fleet requirements") + return profile, requirements + + +async def _select_candidate_fleet_models( + session: AsyncSession, + project: ProjectModel, + run_model: Optional[RunModel], + run_spec: RunSpec, +) -> list[FleetModel]: + fleet_filters, instance_filters = await get_run_candidate_fleet_models_filters( + session=session, + project=project, + run_model=run_model, + run_spec=run_spec, + ) + ( + fleet_models_with_instances, + fleet_models_without_instances, + ) = await select_run_candidate_fleet_models_with_filters( + session=session, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + lock_instances=False, + ) + return fleet_models_with_instances + fleet_models_without_instances + + +def _get_run_fleet_instance_offers( + fleet_model: FleetModel, + run_spec: RunSpec, + job: Job, + master_job_provisioning_data: Optional[JobProvisioningData] = None, + volumes: Optional[List[List[Volume]]] = None, +) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: + pool_instances = fleet_model.instances + profile = run_spec.merged_profile + multinode = is_multinode_job(job) + nonshared_instances = filter_pool_instances( + pool_instances=pool_instances, + profile=profile, + requirements=job.job_spec.requirements, + fleet_model=fleet_model, + multinode=multinode, + master_job_provisioning_data=master_job_provisioning_data, + volumes=volumes, + shared=False, + ) + instances_with_offers = _get_offers_from_instances(nonshared_instances) + shared_instances_with_offers = get_shared_pool_instances_with_offers( + pool_instances=pool_instances, + profile=profile, + requirements=job.job_spec.requirements, + fleet_model=fleet_model, + multinode=multinode, + volumes=volumes, + ) + instances_with_offers.extend(shared_instances_with_offers) + instances_with_offers.sort(key=lambda o: o[0].price or 0) + return instances_with_offers + + +def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool: + """ + Returns `False` if the run cannot fit into fleet for sure. + This is helpful heuristic to avoid even considering fleets too small for a run. + A run may not fit even if this function returns `True`. + This will lead to some jobs failing due to exceeding `nodes.max` + or more than `nodes.max` instances being provisioned + and eventually removed by the fleet consolidation logic. + """ + # No check for cloud fleets with blocks > 1 since we don't know + # how many jobs such fleets can accommodate. + nodes_required_num = get_nodes_required_num(run_spec) + if ( + fleet.spec.configuration.nodes is not None + and fleet.spec.configuration.blocks == 1 + and fleet.spec.configuration.nodes.max is not None + ): + busy_instances = [i for i in fleet.instances if i.busy_blocks > 0] + fleet_available_capacity = fleet.spec.configuration.nodes.max - len(busy_instances) + if fleet_available_capacity < nodes_required_num: + return False + elif fleet.spec.configuration.ssh_config is not None: + # Currently assume that each idle block can run a job. + # TODO: Take resources / eligible offers into account. + total_idle_blocks = 0 + for instance in fleet.instances: + total_blocks = instance.total_blocks or 1 + total_idle_blocks += total_blocks - instance.busy_blocks + if total_idle_blocks < nodes_required_num: + return False + return True + + +async def _get_pool_offers( + session: AsyncSession, + project: ProjectModel, + run_spec: RunSpec, + job: Job, + volumes: List[List[Volume]], +) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: + pool_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] + detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) + pool_instances = await get_pool_instances(session, project) + pool_instances = [i for i in pool_instances if i.id not in detaching_instances_ids] + multinode = is_multinode_job(job) + shared_instances_with_offers = get_shared_pool_instances_with_offers( + pool_instances=pool_instances, + profile=run_spec.merged_profile, + requirements=job.job_spec.requirements, + volumes=volumes, + multinode=multinode, + ) + for offer in shared_instances_with_offers: + pool_offers.append(offer) + + nonshared_instances = filter_pool_instances( + pool_instances=pool_instances, + profile=run_spec.merged_profile, + requirements=job.job_spec.requirements, + multinode=multinode, + volumes=volumes, + shared=False, + ) + nonshared_instances_with_offers = _get_offers_from_instances(nonshared_instances) + pool_offers.extend(nonshared_instances_with_offers) + pool_offers.sort(key=lambda o: o[1].price) + return pool_offers + + +async def _get_non_fleet_offers( + session: AsyncSession, + project: ProjectModel, + profile: Profile, + run_spec: RunSpec, + job: Job, + volumes: List[List[Volume]], +) -> tuple[ + list[tuple[InstanceModel, InstanceOfferWithAvailability]], + list[tuple[Backend, InstanceOfferWithAvailability]], +]: + """ + Returns instance and backend offers for job irrespective of fleets, + i.e. all pool instances and project backends matching the spec. + """ + instance_offers = await _get_pool_offers( + session=session, + project=project, + run_spec=run_spec, + job=job, + volumes=volumes, + ) + backend_offers = await get_offers_by_requirements( + project=project, + profile=profile, + requirements=job.job_spec.requirements, + exclude_not_available=False, + multinode=is_multinode_job(job), + volumes=volumes, + privileged=job.job_spec.privileged, + instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + ) + return instance_offers, backend_offers + + +def _get_job_plan( + instance_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]], + backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]], + profile: Profile, + job: Job, + max_offers: Optional[int], +) -> JobPlan: + job_offers: List[InstanceOfferWithAvailability] = [] + job_offers.extend(offer for _, offer in instance_offers) + if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE: + job_offers.extend(offer for _, offer in backend_offers) + job_offers.sort(key=lambda offer: not offer.availability.is_available()) + remove_job_spec_sensitive_info(job.job_spec) + return JobPlan( + job_spec=job.job_spec, + offers=job_offers[: (max_offers or DEFAULT_MAX_OFFERS)], + total_offers=len(job_offers), + max_price=max((offer.price for offer in job_offers), default=None), + ) + + +def _should_force_non_fleet_offers(run_spec: RunSpec) -> bool: + # A hack to force non-fleet offers for `dstack offer` command that uses + # get run plan API to show offers and the only way to distinguish it is commands. + # Assuming real runs will not use such commands. + return run_spec.configuration.type == "task" and run_spec.configuration.commands == [":"] + + +def _get_offers_from_instances( + instances: list[InstanceModel], +) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: + instances_with_offers = [] + for instance in instances: + offer = common_utils.get_or_error(get_instance_offer(instance)) + offer.availability = InstanceAvailability.BUSY + if instance.status == InstanceStatus.IDLE: + offer.availability = InstanceAvailability.IDLE + instances_with_offers.append((instance, offer)) + return instances_with_offers + + +def _get_min_instance_or_backend_offer_price( + offers: Union[ + list[tuple[InstanceModel, InstanceOfferWithAvailability]], + list[tuple[Backend, InstanceOfferWithAvailability]], + ], +) -> float: + min_offer_price = math.inf + if len(offers) > 0: + min_offer_price = offers[0][1].price + return min_offer_price + + +def _exclude_non_available_instance_offers( + instance_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]], +) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: + return [ + (instance, offer) + for instance, offer in instance_offers + if offer.availability.is_available() + ] + + +def _exclude_non_available_backend_offers( + backend_offers: list[tuple[Backend, InstanceOfferWithAvailability]], +) -> list[tuple[Backend, InstanceOfferWithAvailability]]: + return [ + (backend, offer) for backend, offer in backend_offers if offer.availability.is_available() + ] diff --git a/src/dstack/_internal/server/services/runs/replicas.py b/src/dstack/_internal/server/services/runs/replicas.py new file mode 100644 index 000000000..b1c33c90c --- /dev/null +++ b/src/dstack/_internal/server/services/runs/replicas.py @@ -0,0 +1,135 @@ +from typing import List + +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.runs import JobStatus, JobTerminationReason, RunSpec +from dstack._internal.server.models import JobModel, RunModel +from dstack._internal.server.services.jobs import ( + get_jobs_from_run_spec, + group_jobs_by_replica_latest, +) +from dstack._internal.server.services.logging import fmt +from dstack._internal.server.services.runs import create_job_model_for_new_submission, logger +from dstack._internal.server.services.secrets import get_project_secrets_mapping + + +async def retry_run_replica_jobs( + session: AsyncSession, run_model: RunModel, latest_jobs: List[JobModel], *, only_failed: bool +): + # FIXME: Handle getting image configuration errors or skip it. + secrets = await get_project_secrets_mapping( + session=session, + project=run_model.project, + ) + new_jobs = await get_jobs_from_run_spec( + run_spec=RunSpec.__response__.parse_raw(run_model.run_spec), + secrets=secrets, + replica_num=latest_jobs[0].replica_num, + ) + assert len(new_jobs) == len(latest_jobs), ( + "Changing the number of jobs within a replica is not yet supported" + ) + for job_model, new_job in zip(latest_jobs, new_jobs): + if not (job_model.status.is_finished() or job_model.status == JobStatus.TERMINATING): + if only_failed: + # No need to resubmit, skip + continue + # The job is not finished, but we have to retry all jobs. Terminate it + job_model.status = JobStatus.TERMINATING + job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + + new_job_model = create_job_model_for_new_submission( + run_model=run_model, + job=new_job, + status=JobStatus.SUBMITTED, + ) + # dirty hack to avoid passing all job submissions + new_job_model.submission_num = job_model.submission_num + 1 + session.add(new_job_model) + + +def is_replica_registered(jobs: list[JobModel]) -> bool: + # Only job_num=0 is supposed to receive service requests + return jobs[0].registered + + +async def scale_run_replicas(session: AsyncSession, run_model: RunModel, replicas_diff: int): + if replicas_diff == 0: + # nothing to do + return + + logger.info( + "%s: scaling %s %s replica(s)", + fmt(run_model), + "UP" if replicas_diff > 0 else "DOWN", + abs(replicas_diff), + ) + + # lists of (importance, is_out_of_date, replica_num, jobs) + active_replicas = [] + inactive_replicas = [] + + for replica_num, replica_jobs in group_jobs_by_replica_latest(run_model.jobs): + statuses = set(job.status for job in replica_jobs) + deployment_num = replica_jobs[0].deployment_num # same for all jobs + is_out_of_date = deployment_num < run_model.deployment_num + if {JobStatus.TERMINATING, *JobStatus.finished_statuses()} & statuses: + # if there are any terminating or finished jobs, the replica is inactive + inactive_replicas.append((0, is_out_of_date, replica_num, replica_jobs)) + elif JobStatus.SUBMITTED in statuses: + # if there are any submitted jobs, the replica is active and has the importance of 0 + active_replicas.append((0, is_out_of_date, replica_num, replica_jobs)) + elif {JobStatus.PROVISIONING, JobStatus.PULLING} & statuses: + # if there are any provisioning or pulling jobs, the replica is active and has the importance of 1 + active_replicas.append((1, is_out_of_date, replica_num, replica_jobs)) + elif not is_replica_registered(replica_jobs): + # all jobs are running, but not receiving traffic, the replica is active and has the importance of 2 + active_replicas.append((2, is_out_of_date, replica_num, replica_jobs)) + else: + # all jobs are running and ready, the replica is active and has the importance of 3 + active_replicas.append((3, is_out_of_date, replica_num, replica_jobs)) + + # sort by is_out_of_date (up-to-date first), importance (desc), and replica_num (asc) + active_replicas.sort(key=lambda r: (r[1], -r[0], r[2])) + run_spec = RunSpec.__response__.parse_raw(run_model.run_spec) + + if replicas_diff < 0: + for _, _, _, replica_jobs in reversed(active_replicas[-abs(replicas_diff) :]): + # scale down the less important replicas first + for job in replica_jobs: + if job.status.is_finished() or job.status == JobStatus.TERMINATING: + continue + job.status = JobStatus.TERMINATING + job.termination_reason = JobTerminationReason.SCALED_DOWN + # background task will process the job later + else: + scheduled_replicas = 0 + + # rerun inactive replicas + for _, _, _, replica_jobs in inactive_replicas: + if scheduled_replicas == replicas_diff: + break + await retry_run_replica_jobs(session, run_model, replica_jobs, only_failed=False) + scheduled_replicas += 1 + + secrets = await get_project_secrets_mapping( + session=session, + project=run_model.project, + ) + + for replica_num in range( + len(active_replicas) + scheduled_replicas, len(active_replicas) + replicas_diff + ): + # FIXME: Handle getting image configuration errors or skip it. + jobs = await get_jobs_from_run_spec( + run_spec=run_spec, + secrets=secrets, + replica_num=replica_num, + ) + for job in jobs: + job_model = create_job_model_for_new_submission( + run_model=run_model, + job=job, + status=JobStatus.SUBMITTED, + ) + session.add(job_model) diff --git a/src/dstack/_internal/server/services/runs/spec.py b/src/dstack/_internal/server/services/runs/spec.py new file mode 100644 index 000000000..3ac859c0b --- /dev/null +++ b/src/dstack/_internal/server/services/runs/spec.py @@ -0,0 +1,177 @@ +from dstack._internal.core.errors import ServerClientError +from dstack._internal.core.models.configurations import RUN_PRIORITY_DEFAULT, ServiceConfiguration +from dstack._internal.core.models.repos.virtual import DEFAULT_VIRTUAL_REPO_ID, VirtualRunRepoData +from dstack._internal.core.models.runs import LEGACY_REPO_DIR, AnyRunConfiguration, RunSpec +from dstack._internal.core.models.volumes import InstanceMountPoint +from dstack._internal.core.services import validate_dstack_resource_name +from dstack._internal.core.services.diff import diff_models +from dstack._internal.server import settings +from dstack._internal.server.models import UserModel +from dstack._internal.server.services.docker import is_valid_docker_volume_target +from dstack._internal.server.services.resources import set_resources_defaults +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +_UPDATABLE_SPEC_FIELDS = ["configuration_path", "configuration"] +_TYPE_SPECIFIC_UPDATABLE_SPEC_FIELDS = { + "service": [ + # rolling deployment + "repo_data", + "repo_code_hash", + "file_archives", + "working_dir", + ], +} +_CONF_UPDATABLE_FIELDS = ["priority"] +_TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS = { + "dev-environment": ["inactivity_duration"], + "service": [ + # in-place + "replicas", + "scaling", + # rolling deployment + # NOTE: keep this list in sync with the "Rolling deployment" section in services.md + "port", + "probes", + "resources", + "volumes", + "docker", + "files", + "image", + "user", + "privileged", + "entrypoint", + "working_dir", + "python", + "nvcc", + "single_branch", + "env", + "shell", + "commands", + ], +} + + +def validate_run_spec_and_set_defaults( + user: UserModel, run_spec: RunSpec, legacy_default_working_dir: bool = False +): + # This function may set defaults for null run_spec values, + # although most defaults are resolved when building job_spec + # so that we can keep both the original user-supplied value (null in run_spec) + # and the default in job_spec. + # If a property is stored in job_spec - resolve the default there. + # Server defaults are preferable over client defaults so that + # the defaults depend on the server version, not the client version. + if run_spec.run_name is not None: + validate_dstack_resource_name(run_spec.run_name) + for mount_point in run_spec.configuration.volumes: + if not is_valid_docker_volume_target(mount_point.path): + raise ServerClientError(f"Invalid volume mount path: {mount_point.path}") + if run_spec.repo_id is None and run_spec.repo_data is not None: + raise ServerClientError("repo_data must not be set if repo_id is not set") + if run_spec.repo_id is not None and run_spec.repo_data is None: + raise ServerClientError("repo_id must not be set if repo_data is not set") + # Some run_spec parameters have to be set here and not in the model defaults since + # the client may not pass them or pass null, but they must be always present, e.g. for runner. + if run_spec.repo_id is None: + run_spec.repo_id = DEFAULT_VIRTUAL_REPO_ID + if run_spec.repo_data is None: + run_spec.repo_data = VirtualRunRepoData() + if ( + run_spec.merged_profile.utilization_policy is not None + and run_spec.merged_profile.utilization_policy.time_window + > settings.SERVER_METRICS_RUNNING_TTL_SECONDS + ): + raise ServerClientError( + f"Maximum utilization_policy.time_window is {settings.SERVER_METRICS_RUNNING_TTL_SECONDS}s" + ) + if isinstance(run_spec.configuration, ServiceConfiguration): + if run_spec.merged_profile.schedule and run_spec.configuration.replicas.min == 0: + raise ServerClientError( + "Scheduled services with autoscaling to zero are not supported" + ) + if len(run_spec.configuration.probes) > settings.MAX_PROBES_PER_JOB: + raise ServerClientError( + f"Cannot configure more than {settings.MAX_PROBES_PER_JOB} probes" + ) + if any( + p.timeout is not None and p.timeout > settings.MAX_PROBE_TIMEOUT + for p in run_spec.configuration.probes + ): + raise ServerClientError( + f"Probe timeout cannot be longer than {settings.MAX_PROBE_TIMEOUT}s" + ) + if run_spec.configuration.priority is None: + run_spec.configuration.priority = RUN_PRIORITY_DEFAULT + set_resources_defaults(run_spec.configuration.resources) + if run_spec.ssh_key_pub is None: + if user.ssh_public_key: + run_spec.ssh_key_pub = user.ssh_public_key + else: + raise ServerClientError("ssh_key_pub must be set if the user has no ssh_public_key") + if run_spec.configuration.working_dir is None and legacy_default_working_dir: + run_spec.configuration.working_dir = LEGACY_REPO_DIR + + +def check_can_update_run_spec(current_run_spec: RunSpec, new_run_spec: RunSpec): + spec_diff = diff_models(current_run_spec, new_run_spec) + changed_spec_fields = list(spec_diff.keys()) + updatable_spec_fields = _UPDATABLE_SPEC_FIELDS + _TYPE_SPECIFIC_UPDATABLE_SPEC_FIELDS.get( + new_run_spec.configuration.type, [] + ) + for key in changed_spec_fields: + if key not in updatable_spec_fields: + raise ServerClientError( + f"Failed to update fields {changed_spec_fields}." + f" Can only update {updatable_spec_fields}." + ) + _check_can_update_configuration(current_run_spec.configuration, new_run_spec.configuration) + + +def can_update_run_spec(current_run_spec: RunSpec, new_run_spec: RunSpec) -> bool: + try: + check_can_update_run_spec(current_run_spec, new_run_spec) + except ServerClientError as e: + logger.debug("Run cannot be updated: %s", repr(e)) + return False + return True + + +def get_nodes_required_num(run_spec: RunSpec) -> int: + nodes_required_num = 1 + if run_spec.configuration.type == "task": + nodes_required_num = run_spec.configuration.nodes + elif ( + run_spec.configuration.type == "service" + and run_spec.configuration.replicas.min is not None + ): + nodes_required_num = run_spec.configuration.replicas.min + return nodes_required_num + + +def check_run_spec_requires_instance_mounts(run_spec: RunSpec) -> bool: + return any( + isinstance(mp, InstanceMountPoint) and not mp.optional + for mp in run_spec.configuration.volumes + ) + + +def _check_can_update_configuration( + current: AnyRunConfiguration, new: AnyRunConfiguration +) -> None: + if current.type != new.type: + raise ServerClientError( + f"Configuration type changed from {current.type} to {new.type}, cannot update" + ) + updatable_fields = _CONF_UPDATABLE_FIELDS + _TYPE_SPECIFIC_CONF_UPDATABLE_FIELDS.get( + new.type, [] + ) + diff = diff_models(current, new) + changed_fields = list(diff.keys()) + for key in changed_fields: + if key not in updatable_fields: + raise ServerClientError( + f"Failed to update fields {changed_fields}. Can only update {updatable_fields}" + ) diff --git a/src/tests/_internal/server/services/test_runs.py b/src/tests/_internal/server/services/test_runs.py index 4ff1b87c1..8d86111ad 100644 --- a/src/tests/_internal/server/services/test_runs.py +++ b/src/tests/_internal/server/services/test_runs.py @@ -17,7 +17,7 @@ RunModel, ) from dstack._internal.server.services.jobs import check_can_attach_job_volumes -from dstack._internal.server.services.runs import scale_run_replicas +from dstack._internal.server.services.runs.replicas import scale_run_replicas from dstack._internal.server.testing.common import ( create_job, create_project,