diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 9598a52b0..29129f6cb 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -111,6 +111,7 @@ def run_job( project_ssh_public_key: str, project_ssh_private_key: str, volumes: List[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: """ Launches a new instance for the job. It should return `JobProvisioningData` ASAP. @@ -287,6 +288,7 @@ def run_job( project_ssh_public_key: str, project_ssh_private_key: str, volumes: List[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: """ The default `run_job()` implementation for all backends that support `create_instance()`. @@ -303,7 +305,9 @@ def run_job( ) instance_offer = instance_offer.copy() self._restrict_instance_offer_az_to_volumes_az(instance_offer, volumes) - return self.create_instance(instance_offer, instance_config, placement_group=None) + return self.create_instance( + instance_offer, instance_config, placement_group=placement_group + ) def _restrict_instance_offer_az_to_volumes_az( self, @@ -335,6 +339,7 @@ def run_jobs( instance_offer: InstanceOfferWithAvailability, project_ssh_public_key: str, project_ssh_private_key: str, + placement_group: Optional[PlacementGroup], ) -> ComputeGroupProvisioningData: pass diff --git a/src/dstack/_internal/core/backends/kubernetes/compute.py b/src/dstack/_internal/core/backends/kubernetes/compute.py index cb5f1b499..b09de10df 100644 --- a/src/dstack/_internal/core/backends/kubernetes/compute.py +++ b/src/dstack/_internal/core/backends/kubernetes/compute.py @@ -49,6 +49,7 @@ Resources, SSHConnectionParams, ) +from dstack._internal.core.models.placement import PlacementGroup from dstack._internal.core.models.resources import CPUSpec, GPUSpec, Memory from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run from dstack._internal.core.models.volumes import Volume @@ -131,6 +132,7 @@ def run_job( project_ssh_public_key: str, project_ssh_private_key: str, volumes: list[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: instance_name = generate_unique_instance_name_for_job(run, job) assert run.run_spec.ssh_key_pub is not None diff --git a/src/dstack/_internal/core/backends/local/compute.py b/src/dstack/_internal/core/backends/local/compute.py index dff7a3340..fbee6bef3 100644 --- a/src/dstack/_internal/core/backends/local/compute.py +++ b/src/dstack/_internal/core/backends/local/compute.py @@ -79,6 +79,7 @@ def run_job( project_ssh_public_key: str, project_ssh_private_key: str, volumes: List[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: return JobProvisioningData( backend=instance_offer.backend, diff --git a/src/dstack/_internal/core/backends/runpod/compute.py b/src/dstack/_internal/core/backends/runpod/compute.py index e78ffa1f1..bd5ae0e8c 100644 --- a/src/dstack/_internal/core/backends/runpod/compute.py +++ b/src/dstack/_internal/core/backends/runpod/compute.py @@ -36,6 +36,7 @@ InstanceOfferWithAvailability, SSHKey, ) +from dstack._internal.core.models.placement import PlacementGroup from dstack._internal.core.models.resources import Memory, Range from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run from dstack._internal.core.models.volumes import Volume, VolumeProvisioningData @@ -109,6 +110,7 @@ def run_job( project_ssh_public_key: str, project_ssh_private_key: str, volumes: List[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: assert run.run_spec.ssh_key_pub is not None instance_config = InstanceConfiguration( @@ -216,6 +218,7 @@ def run_jobs( instance_offer: InstanceOfferWithAvailability, project_ssh_public_key: str, project_ssh_private_key: str, + placement_group: Optional[PlacementGroup], ) -> ComputeGroupProvisioningData: master_job_configuration = job_configurations[0] master_job = master_job_configuration.job diff --git a/src/dstack/_internal/core/backends/template/compute.py.jinja b/src/dstack/_internal/core/backends/template/compute.py.jinja index 2a90692cb..49b51b0d4 100644 --- a/src/dstack/_internal/core/backends/template/compute.py.jinja +++ b/src/dstack/_internal/core/backends/template/compute.py.jinja @@ -83,6 +83,7 @@ class {{ backend_name }}Compute( project_ssh_public_key: str, project_ssh_private_key: str, volumes: List[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: # TODO: Implement if create_instance() is not implemented. Delete otherwise. raise NotImplementedError() diff --git a/src/dstack/_internal/core/backends/vastai/compute.py b/src/dstack/_internal/core/backends/vastai/compute.py index ed350cfe7..abd0ee5bb 100644 --- a/src/dstack/_internal/core/backends/vastai/compute.py +++ b/src/dstack/_internal/core/backends/vastai/compute.py @@ -20,6 +20,7 @@ InstanceOfferWithAvailability, InstanceRuntime, ) +from dstack._internal.core.models.placement import PlacementGroup from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run from dstack._internal.core.models.volumes import Volume from dstack._internal.utils.logging import get_logger @@ -82,6 +83,7 @@ def run_job( project_ssh_public_key: str, project_ssh_private_key: str, volumes: List[Volume], + placement_group: Optional[PlacementGroup], ) -> JobProvisioningData: instance_name = generate_unique_instance_name_for_job( run, job, max_length=MAX_INSTANCE_NAME_LEN diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 88a9970b8..d7cab3970 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -2,7 +2,7 @@ import datetime import logging from datetime import timedelta -from typing import Any, Dict, List, Optional, cast +from typing import Any, Dict, Optional, cast import requests from paramiko.pkey import PKey @@ -17,7 +17,6 @@ ComputeWithCreateInstanceSupport, ComputeWithPlacementGroupSupport, GoArchType, - generate_unique_placement_group_name, get_dstack_runner_binary_path, get_dstack_shim_binary_path, get_dstack_working_dir, @@ -34,24 +33,18 @@ from dstack._internal.core.errors import ( BackendError, NotYetTerminated, - PlacementGroupNotSupportedError, ProvisioningError, ) from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.fleets import InstanceGroupPlacement from dstack._internal.core.models.instances import ( InstanceAvailability, - InstanceOffer, InstanceOfferWithAvailability, InstanceRuntime, InstanceStatus, RemoteConnectionInfo, SSHKey, ) -from dstack._internal.core.models.placement import ( - PlacementGroupConfiguration, - PlacementStrategy, -) from dstack._internal.core.models.profiles import ( TerminationPolicy, ) @@ -66,7 +59,6 @@ InstanceHealthCheckModel, InstanceModel, JobModel, - PlacementGroupModel, ProjectModel, ) from dstack._internal.server.schemas.instances import InstanceCheck @@ -75,6 +67,8 @@ from dstack._internal.server.services.fleets import ( fleet_model_to_fleet, get_create_instance_offers, + is_cloud_cluster, + is_fleet_master_instance, ) from dstack._internal.server.services.instances import ( get_instance_configuration, @@ -86,10 +80,15 @@ ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.logging import fmt -from dstack._internal.server.services.offers import is_divisible_into_blocks +from dstack._internal.server.services.offers import ( + get_instance_offer_with_restricted_az, + is_divisible_into_blocks, +) from dstack._internal.server.services.placement import ( + find_or_create_suitable_placement_group, get_fleet_placement_group_models, - placement_group_model_to_placement_group, + get_placement_group_model_for_instance, + placement_group_model_to_placement_group_optional, schedule_fleet_placement_groups_deletion, ) from dstack._internal.server.services.runner import client as runner_client @@ -549,39 +548,22 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No ) return - placement_group_models = [] - placement_group_model = None - if instance.fleet_id: - placement_group_models = await get_fleet_placement_group_models( - session=session, - fleet_id=instance.fleet_id, - ) - # The placement group is determined when provisioning the master instance - # and used for all other instances in the fleet. - if not _is_fleet_master_instance(instance): - if placement_group_models: - placement_group_model = placement_group_models[0] - if len(placement_group_models) > 1: - logger.error( - ( - "Expected 0 or 1 placement groups associated with fleet %s, found %s." - " An incorrect placement group might have been selected for instance %s" - ), - instance.fleet_id, - len(placement_group_models), - instance.name, - ) - + # The placement group is determined when provisioning the master instance + # and used for all other instances in the fleet. + placement_group_models = await get_fleet_placement_group_models( + session=session, + fleet_id=instance.fleet_id, + ) + placement_group_model = get_placement_group_model_for_instance( + placement_group_models=placement_group_models, + instance_model=instance, + ) offers = await get_create_instance_offers( project=instance.project, profile=profile, requirements=requirements, fleet_model=instance.fleet, - placement_group=( - placement_group_model_to_placement_group(placement_group_model) - if placement_group_model - else None - ), + placement_group=placement_group_model_to_placement_group_optional(placement_group_model), blocks="auto" if instance.total_blocks is None else instance.total_blocks, exclude_not_available=True, ) @@ -595,31 +577,26 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No assert isinstance(compute, ComputeWithCreateInstanceSupport) instance_offer = _get_instance_offer_for_instance(instance_offer, instance) if ( - _is_fleet_master_instance(instance) + instance.fleet + and is_cloud_cluster(instance.fleet) + and is_fleet_master_instance(instance) and instance_offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) and ( compute.are_placement_groups_compatible_with_reservations(instance_offer.backend) or instance_configuration.reservation is None ) - and instance.fleet - and _is_cloud_cluster(instance.fleet) ): - placement_group_model = _find_suitable_placement_group( + placement_group_model = await find_or_create_suitable_placement_group( + fleet_model=instance.fleet, placement_groups=placement_group_models, instance_offer=instance_offer, compute=compute, ) - if placement_group_model is None: - placement_group_model = await _create_placement_group( - fleet_model=instance.fleet, - master_instance_offer=instance_offer, - compute=compute, - ) - if placement_group_model is None: # error occurred - continue - session.add(placement_group_model) - placement_group_models.append(placement_group_model) + if placement_group_model is None: # error occurred + continue + session.add(placement_group_model) + placement_group_models.append(placement_group_model) logger.debug( "Trying %s in %s/%s for $%0.4f per hour", instance_offer.instance.name, @@ -632,11 +609,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No compute.create_instance, instance_offer, instance_configuration, - ( - placement_group_model_to_placement_group(placement_group_model) - if placement_group_model - else None - ), + placement_group_model_to_placement_group_optional(placement_group_model), ) except BackendError as e: logger.warning( @@ -675,7 +648,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No "instance_status": InstanceStatus.PROVISIONING.value, }, ) - if instance.fleet_id and _is_fleet_master_instance(instance): + if instance.fleet_id and is_fleet_master_instance(instance): # Clean up placement groups that did not end up being used. # Flush to update still uncommitted placement groups. await session.flush() @@ -689,11 +662,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No return _mark_terminated(instance, "All offers failed" if offers else "No offers found") - if ( - instance.fleet - and _is_fleet_master_instance(instance) - and _is_cloud_cluster(instance.fleet) - ): + if instance.fleet and is_fleet_master_instance(instance) and is_cloud_cluster(instance.fleet): # Do not attempt to deploy other instances, as they won't determine the correct cluster # backend, region, and placement group without a successfully deployed master instance for sibling_instance in instance.fleet.instances: @@ -1030,24 +999,12 @@ def _need_to_wait_fleet_provisioning(instance: InstanceModel) -> bool: if instance.fleet is None: return False if ( - _is_fleet_master_instance(instance) + is_fleet_master_instance(instance) or instance.fleet.instances[0].job_provisioning_data is not None or instance.fleet.instances[0].status == InstanceStatus.TERMINATED ): return False - return _is_cloud_cluster(instance.fleet) - - -def _is_fleet_master_instance(instance: InstanceModel) -> bool: - return instance.fleet is not None and instance.id == instance.fleet.instances[0].id - - -def _is_cloud_cluster(fleet_model: FleetModel) -> bool: - fleet = fleet_model_to_fleet(fleet_model) - return ( - fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER - and fleet.spec.configuration.ssh_config is None - ) + return is_cloud_cluster(instance.fleet) def _get_instance_offer_for_instance( @@ -1056,103 +1013,15 @@ def _get_instance_offer_for_instance( ) -> InstanceOfferWithAvailability: if instance.fleet is None: return instance_offer - fleet = fleet_model_to_fleet(instance.fleet) master_instance = instance.fleet.instances[0] master_job_provisioning_data = get_instance_provisioning_data(master_instance) - instance_offer = instance_offer.copy() - if ( - fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER - and master_job_provisioning_data is not None - and master_job_provisioning_data.availability_zone is not None - ): - if instance_offer.availability_zones is None: - instance_offer.availability_zones = [master_job_provisioning_data.availability_zone] - instance_offer.availability_zones = [ - z - for z in instance_offer.availability_zones - if z == master_job_provisioning_data.availability_zone - ] - return instance_offer - - -def _find_suitable_placement_group( - placement_groups: List[PlacementGroupModel], - instance_offer: InstanceOffer, - compute: ComputeWithPlacementGroupSupport, -) -> Optional[PlacementGroupModel]: - for pg in placement_groups: - if compute.is_suitable_placement_group( - placement_group_model_to_placement_group(pg), instance_offer - ): - return pg - return None - - -async def _create_placement_group( - fleet_model: FleetModel, - master_instance_offer: InstanceOffer, - compute: ComputeWithPlacementGroupSupport, -) -> Optional[PlacementGroupModel]: - placement_group_model = PlacementGroupModel( - # TODO: generate the name in Compute.create_placement_group to allow - # backend-specific name length limits - name=generate_unique_placement_group_name( - project_name=fleet_model.project.name, - fleet_name=fleet_model.name, - ), - project=fleet_model.project, - fleet=fleet_model, - configuration=PlacementGroupConfiguration( - backend=master_instance_offer.backend, - region=master_instance_offer.region, - placement_strategy=PlacementStrategy.CLUSTER, - ).json(), - ) - placement_group = placement_group_model_to_placement_group(placement_group_model) - logger.debug( - "Creating placement group %s in %s/%s", - placement_group.name, - placement_group.configuration.backend.value, - placement_group.configuration.region, - ) - try: - pgpd = await run_async( - compute.create_placement_group, - placement_group_model_to_placement_group(placement_group_model), - master_instance_offer, + if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER: + return get_instance_offer_with_restricted_az( + instance_offer=instance_offer, + master_job_provisioning_data=master_job_provisioning_data, ) - except PlacementGroupNotSupportedError: - logger.debug( - "Skipping offer %s because placement group not supported", - master_instance_offer.instance.name, - ) - return None - except BackendError as e: - logger.warning( - "Failed to create placement group %s in %s/%s: %r", - placement_group.name, - placement_group.configuration.backend.value, - placement_group.configuration.region, - e, - ) - return None - except Exception: - logger.exception( - "Got exception when creating placement group %s in %s/%s", - placement_group.name, - placement_group.configuration.backend.value, - placement_group.configuration.region, - ) - return None - logger.info( - "Created placement group %s in %s/%s", - placement_group.name, - placement_group.configuration.backend.value, - placement_group.configuration.region, - ) - placement_group_model.provisioning_data = pgpd.json() - return placement_group_model + return instance_offer def _get_instance_idle_duration(instance: InstanceModel) -> datetime.timedelta: diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index fb2deed3e..72c5417c1 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -59,6 +59,7 @@ find_job, get_job_attached_volumes, get_job_runtime_data, + is_master_job, job_model_to_job_submission, ) from dstack._internal.server.services.locking import get_locker @@ -178,12 +179,15 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): initial_status = job_model.status if initial_status in [JobStatus.PROVISIONING, JobStatus.PULLING]: - # Wait until all other jobs in the replica are provisioned for other_job in run.jobs: if ( other_job.job_spec.replica_num == job.job_spec.replica_num and other_job.job_submissions[-1].status == JobStatus.SUBMITTED ): + logger.debug( + "%s: waiting for all jobs in the replica to be provisioned", + fmt(job_model), + ) job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() return @@ -466,7 +470,7 @@ def _should_wait_for_other_nodes(run: Run, job: Job, job_model: JobModel) -> boo ) return True if ( - job.job_spec.job_num == 0 + is_master_job(job) and run.run_spec.merged_profile.startup_order == StartupOrder.WORKERS_FIRST ): for other_job in run.jobs: diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index df1cce72f..64ba0b031 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -32,6 +32,7 @@ find_job, get_job_specs_from_run_spec, group_jobs_by_replica_latest, + is_master_job, ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.prometheus.client_metrics import run_metrics @@ -606,6 +607,6 @@ def _should_stop_on_master_done(run: Run) -> bool: if run.run_spec.merged_profile.stop_criteria != StopCriteria.MASTER_DONE: return False for job in run.jobs: - if job.job_spec.job_num == 0 and job.job_submissions[-1].status == JobStatus.DONE: + if is_master_job(job) and job.job_submissions[-1].status == JobStatus.DONE: return True return False diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index 9f281a75b..d26f26d70 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -13,10 +13,14 @@ from dstack._internal.core.backends.base.backend import Backend from dstack._internal.core.backends.base.compute import ( ComputeWithGroupProvisioningSupport, + ComputeWithPlacementGroupSupport, ComputeWithVolumeSupport, ) from dstack._internal.core.backends.base.models import JobConfiguration -from dstack._internal.core.backends.features import BACKENDS_WITH_GROUP_PROVISIONING_SUPPORT +from dstack._internal.core.backends.features import ( + BACKENDS_WITH_GROUP_PROVISIONING_SUPPORT, + BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT, +) from dstack._internal.core.errors import BackendError, ServerClientError from dstack._internal.core.models.common import NetworkMode from dstack._internal.core.models.compute_groups import ComputeGroupProvisioningData @@ -74,6 +78,7 @@ generate_fleet_name, get_fleet_requirements, get_next_instance_num, + is_cloud_cluster, ) from dstack._internal.server.services.instances import ( filter_pool_instances, @@ -89,10 +94,19 @@ get_job_configured_volume_models, get_job_configured_volumes, get_job_runtime_data, + is_master_job, + is_multinode_job, ) from dstack._internal.server.services.locking import get_locker, string_to_lock_id from dstack._internal.server.services.logging import fmt from dstack._internal.server.services.offers import get_offers_by_requirements +from dstack._internal.server.services.placement import ( + find_or_create_suitable_placement_group, + get_fleet_placement_group_models, + get_placement_group_model_for_job, + placement_group_model_to_placement_group_optional, + schedule_fleet_placement_groups_deletion, +) from dstack._internal.server.services.requirements.combine import ( combine_fleet_and_run_profiles, combine_fleet_and_run_requirements, @@ -347,6 +361,7 @@ async def _process_submitted_job( await session.commit() return + jobs_to_provision = _get_jobs_to_provision(job, replica_jobs, job_model) # TODO: Volume attachment for compute groups is not yet supported since # currently supported compute groups (e.g. Runpod) don't need explicit volume attachment. need_volume_attachment = True @@ -369,17 +384,6 @@ async def _process_submitted_job( await session.commit() return - jobs_to_provision = [job] - if ( - multinode - and job.job_spec.job_num == 0 - # job_model.waiting_master_job is not set for legacy jobs. - # In this case compute group provisioning not supported - # and jobs always provision one-by-one. - and job_model.waiting_master_job is not None - ): - jobs_to_provision = replica_jobs - master_instance_provisioning_data = ( await _fetch_fleet_with_master_instance_provisioning_data( exit_stack=exit_stack, @@ -392,6 +396,7 @@ async def _process_submitted_job( master_job_provisioning_data or master_instance_provisioning_data ) run_job_result = await _run_jobs_on_new_instances( + session=session, project=project, fleet_model=fleet_model, job_model=job_model, @@ -436,11 +441,6 @@ async def _process_submitted_job( else: provisioned_jobs = [job] jpds = [provisioning_data] - if len(jobs_to_provision) > 1: - # Tried provisioning multiple jobs but provisioned only one. - # Allow other jobs to provision one-by-one. - for replica_job_model in replica_job_models: - replica_job_model.waiting_master_job = False logger.info("%s: provisioned %s new instance(s)", fmt(job_model), len(provisioned_jobs)) provisioned_job_models = _get_job_models_for_jobs(run_model.jobs, provisioned_jobs) @@ -482,6 +482,8 @@ async def _process_submitted_job( provisioned_job_model.used_instance_id = instance.id provisioned_job_model.last_processed_at = common_utils.get_current_datetime() + _allow_other_replica_jobs_to_provision(job_model, replica_job_models, jobs_to_provision) + volumes_ids = sorted([v.id for vs in volume_models for v in vs]) if need_volume_attachment: # Take lock to prevent attaching volumes that are to be deleted. @@ -634,7 +636,7 @@ async def _find_optimal_fleet_with_offers( for candidate_fleet_model in fleet_models: candidate_fleet = fleet_model_to_fleet(candidate_fleet_model) if ( - job.job_spec.jobs_per_replica > 1 + is_multinode_job(job) and candidate_fleet.spec.configuration.placement != InstanceGroupPlacement.CLUSTER ): # Limit multinode runs to cluster fleets to guarantee best connectivity. @@ -671,7 +673,7 @@ async def _find_optimal_fleet_with_offers( ) # Handle multinode for old jobs that don't have requirements.multinode set. # TODO: Drop multinode param. - multinode = requirements.multinode or job.job_spec.jobs_per_replica > 1 + multinode = requirements.multinode or is_multinode_job(job) fleet_backend_offers = await get_offers_by_requirements( project=project, profile=profile, @@ -757,7 +759,7 @@ async def _fetch_fleet_with_master_instance_provisioning_data( job: Job, ) -> Optional[JobProvisioningData]: master_instance_provisioning_data = None - if job.job_spec.job_num == 0 and fleet_model is not None: + if is_master_job(job) and fleet_model is not None: fleet = fleet_model_to_fleet(fleet_model) if fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER: # To avoid violating fleet placement cluster during master provisioning, @@ -774,7 +776,10 @@ async def _fetch_fleet_with_master_instance_provisioning_data( .outerjoin(FleetModel.instances) .where( FleetModel.id == fleet_model.id, - InstanceModel.id.is_(None), + or_( + InstanceModel.id.is_(None), + InstanceModel.deleted == True, + ), ) .with_for_update(key_share=True, of=FleetModel) .execution_options(populate_existing=True) @@ -786,8 +791,12 @@ async def _fetch_fleet_with_master_instance_provisioning_data( else: res = await session.execute( select(FleetModel) - .where(FleetModel.id == fleet_model.id) - .options(joinedload(FleetModel.instances)) + .join(FleetModel.instances) + .where( + FleetModel.id == fleet_model.id, + InstanceModel.deleted == False, + ) + .options(contains_eager(FleetModel.instances)) .execution_options(populate_existing=True) ) fleet_model = res.unique().scalar_one() @@ -841,7 +850,7 @@ def _get_fleet_instances_with_pool_offers( pool_instances = fleet_model.instances instances_with_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] profile = run_spec.merged_profile - multinode = job.job_spec.jobs_per_replica > 1 + multinode = is_multinode_job(job) nonshared_instances = filter_pool_instances( pool_instances=pool_instances, profile=profile, @@ -912,7 +921,36 @@ async def _assign_job_to_fleet_instance( return instance +def _get_jobs_to_provision(job: Job, replica_jobs: list[Job], job_model: JobModel) -> list[Job]: + """ + Returns the passed job for non-master jobs and all replica jobs for master jobs in multinode setups. + """ + jobs_to_provision = [job] + if ( + is_multinode_job(job) + and is_master_job(job) + # job_model.waiting_master_job is not set for legacy jobs. + # In this case compute group provisioning not supported + # and jobs always provision one-by-one. + and job_model.waiting_master_job is not None + ): + jobs_to_provision = replica_jobs + return jobs_to_provision + + +def _allow_other_replica_jobs_to_provision( + job_model: JobModel, + replica_job_models: list[JobModel], + jobs_to_provision: list[Job], +): + if len(jobs_to_provision) > 1: + logger.debug("%s: allow replica jobs to be provisioned one-by-one", fmt(job_model)) + for replica_job_model in replica_job_models: + replica_job_model.waiting_master_job = False + + async def _run_jobs_on_new_instances( + session: AsyncSession, project: ProjectModel, job_model: JobModel, run: Run, @@ -956,7 +994,17 @@ async def _run_jobs_on_new_instances( return None # TODO: Respect fleet provisioning properties such as tags - multinode = requirements.multinode or job.job_spec.jobs_per_replica > 1 + # The placement group is determined when provisioning the master instance + # and used for all other instances in the fleet. + placement_group_models = await get_fleet_placement_group_models( + session=session, + fleet_id=fleet_model.id if fleet_model else None, + ) + placement_group_model = get_placement_group_model_for_job( + placement_group_models=placement_group_models, + fleet_model=fleet_model, + ) + multinode = requirements.multinode or is_multinode_job(job) offers = await get_offers_by_requirements( project=project, profile=profile, @@ -967,6 +1015,7 @@ async def _run_jobs_on_new_instances( volumes=volumes, privileged=job.job_spec.privileged, instance_mounts=check_run_spec_requires_instance_mounts(run.run_spec), + placement_group=placement_group_model_to_placement_group_optional(placement_group_model), ) # Limit number of offers tried to prevent long-running processing # in case all offers fail. @@ -982,6 +1031,27 @@ async def _run_jobs_on_new_instances( offer_volumes = _get_offer_volumes(volumes, offer) job_configurations = [JobConfiguration(job=j, volumes=offer_volumes) for j in jobs] compute = backend.compute() + if ( + fleet_model is not None + and len(fleet_model.instances) == 0 + and is_cloud_cluster(fleet_model) + and offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT + and isinstance(compute, ComputeWithPlacementGroupSupport) + and ( + compute.are_placement_groups_compatible_with_reservations(offer.backend) + or job.job_spec.requirements.reservation is None + ) + ): + placement_group_model = await find_or_create_suitable_placement_group( + fleet_model=fleet_model, + placement_groups=placement_group_models, + instance_offer=offer, + compute=compute, + ) + if placement_group_model is None: # error occurred + continue + session.add(placement_group_model) + placement_group_models.append(placement_group_model) try: if len(jobs) > 1 and offer.backend in BACKENDS_WITH_GROUP_PROVISIONING_SUPPORT: assert isinstance(compute, ComputeWithGroupProvisioningSupport) @@ -992,6 +1062,7 @@ async def _run_jobs_on_new_instances( offer, project_ssh_public_key, project_ssh_private_key, + placement_group_model_to_placement_group_optional(placement_group_model), ) return cgpd, offer, profile, requirements else: @@ -1003,6 +1074,7 @@ async def _run_jobs_on_new_instances( project_ssh_public_key, project_ssh_private_key, offer_volumes, + placement_group_model_to_placement_group_optional(placement_group_model), ) return jpd, offer, profile, requirements except BackendError as e: @@ -1024,6 +1096,18 @@ async def _run_jobs_on_new_instances( offer.region, ) continue + finally: + if fleet_model is not None and len(fleet_model.instances) == 0: + # Clean up placement groups that did not end up being used. + # Flush to update still uncommitted placement groups. + await session.flush() + await schedule_fleet_placement_groups_deletion( + session=session, + fleet_id=fleet_model.id, + except_placement_group_ids=( + [placement_group_model.id] if placement_group_model is not None else [] + ), + ) return None diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 7ee0bbfab..5bf4b48bb 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -643,6 +643,18 @@ def is_fleet_empty(fleet_model: FleetModel) -> bool: return len(active_instances) == 0 +def is_cloud_cluster(fleet_model: FleetModel) -> bool: + fleet = fleet_model_to_fleet(fleet_model) + return ( + fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER + and fleet.spec.configuration.ssh_config is None + ) + + +def is_fleet_master_instance(instance: InstanceModel) -> bool: + return instance.fleet is not None and instance.id == instance.fleet.instances[0].id + + def get_fleet_requirements(fleet_spec: FleetSpec) -> Requirements: profile = fleet_spec.merged_profile requirements = Requirements( diff --git a/src/dstack/_internal/server/services/jobs/__init__.py b/src/dstack/_internal/server/services/jobs/__init__.py index 4d4f75e75..e20a033a6 100644 --- a/src/dstack/_internal/server/services/jobs/__init__.py +++ b/src/dstack/_internal/server/services/jobs/__init__.py @@ -202,6 +202,14 @@ def delay_job_instance_termination(job_model: JobModel): job_model.remove_at = common.get_current_datetime() + timedelta(seconds=15) +def is_multinode_job(job: Job) -> bool: + return job.job_spec.jobs_per_replica > 1 + + +def is_master_job(job: Job) -> bool: + return job.job_spec.job_num == 0 + + def _get_job_configurator(run_spec: RunSpec, secrets: Dict[str, str]) -> JobConfigurator: configuration_type = RunConfigurationType(run_spec.configuration.type) configurator_class = _configuration_type_to_configurator_class_map[configuration_type] diff --git a/src/dstack/_internal/server/services/offers.py b/src/dstack/_internal/server/services/offers.py index 1b2525449..ba8f8bae9 100644 --- a/src/dstack/_internal/server/services/offers.py +++ b/src/dstack/_internal/server/services/offers.py @@ -220,3 +220,22 @@ def generate_shared_offer( blocks=blocks, total_blocks=total_blocks, ) + + +def get_instance_offer_with_restricted_az( + instance_offer: InstanceOfferWithAvailability, + master_job_provisioning_data: Optional[JobProvisioningData], +) -> InstanceOfferWithAvailability: + instance_offer = instance_offer.copy() + if ( + master_job_provisioning_data is not None + and master_job_provisioning_data.availability_zone is not None + ): + if instance_offer.availability_zones is None: + instance_offer.availability_zones = [master_job_provisioning_data.availability_zone] + instance_offer.availability_zones = [ + z + for z in instance_offer.availability_zones + if z == master_job_provisioning_data.availability_zone + ] + return instance_offer diff --git a/src/dstack/_internal/server/services/placement.py b/src/dstack/_internal/server/services/placement.py index 3ae5d705b..f0c63f891 100644 --- a/src/dstack/_internal/server/services/placement.py +++ b/src/dstack/_internal/server/services/placement.py @@ -6,18 +6,68 @@ from sqlalchemy import and_, select, update from sqlalchemy.ext.asyncio import AsyncSession +from dstack._internal.core.backends.base.compute import ( + ComputeWithPlacementGroupSupport, + generate_unique_placement_group_name, +) +from dstack._internal.core.errors import BackendError, PlacementGroupNotSupportedError +from dstack._internal.core.models.instances import InstanceOffer from dstack._internal.core.models.placement import ( PlacementGroup, PlacementGroupConfiguration, PlacementGroupProvisioningData, + PlacementStrategy, ) -from dstack._internal.server.models import PlacementGroupModel +from dstack._internal.server.models import FleetModel, InstanceModel, PlacementGroupModel +from dstack._internal.utils.common import run_async +from dstack._internal.utils.logging import get_logger + +logger = get_logger(__name__) + + +def placement_group_model_to_placement_group( + placement_group_model: PlacementGroupModel, +) -> PlacementGroup: + configuration = get_placement_group_configuration(placement_group_model) + provisioning_data = get_placement_group_provisioning_data(placement_group_model) + return PlacementGroup( + name=placement_group_model.name, + project_name=placement_group_model.project.name, + configuration=configuration, + provisioning_data=provisioning_data, + ) + + +def placement_group_model_to_placement_group_optional( + placement_group_model: Optional[PlacementGroupModel], +) -> Optional[PlacementGroup]: + if placement_group_model is None: + return None + return placement_group_model_to_placement_group(placement_group_model) + + +def get_placement_group_configuration( + placement_group_model: PlacementGroupModel, +) -> PlacementGroupConfiguration: + return PlacementGroupConfiguration.__response__.parse_raw(placement_group_model.configuration) + + +def get_placement_group_provisioning_data( + placement_group_model: PlacementGroupModel, +) -> Optional[PlacementGroupProvisioningData]: + if placement_group_model.provisioning_data is None: + return None + return PlacementGroupProvisioningData.__response__.parse_raw( + placement_group_model.provisioning_data + ) async def get_fleet_placement_group_models( session: AsyncSession, - fleet_id: UUID, + fleet_id: Optional[UUID], ) -> List[PlacementGroupModel]: + if fleet_id is None: + return [] res = await session.execute( select(PlacementGroupModel).where( and_( @@ -32,7 +82,7 @@ async def get_fleet_placement_group_models( async def schedule_fleet_placement_groups_deletion( session: AsyncSession, fleet_id: UUID, except_placement_group_ids: Iterable[UUID] = () -) -> None: +): await session.execute( update(PlacementGroupModel) .where( @@ -45,30 +95,143 @@ async def schedule_fleet_placement_groups_deletion( ) -def placement_group_model_to_placement_group( - placement_group_model: PlacementGroupModel, -) -> PlacementGroup: - configuration = get_placement_group_configuration(placement_group_model) - provisioning_data = get_placement_group_provisioning_data(placement_group_model) - return PlacementGroup( - name=placement_group_model.name, - project_name=placement_group_model.project.name, - configuration=configuration, - provisioning_data=provisioning_data, +def get_placement_group_model_for_instance( + placement_group_models: list[PlacementGroupModel], + instance_model: InstanceModel, +) -> Optional[PlacementGroupModel]: + placement_group_model = None + if not _is_fleet_master_instance(instance_model): + if placement_group_models: + placement_group_model = placement_group_models[0] + if len(placement_group_models) > 1: + logger.error( + ( + "Expected 0 or 1 placement groups associated with fleet %s, found %s." + " An incorrect placement group might have been selected for instance %s" + ), + instance_model.fleet_id, + len(placement_group_models), + instance_model.name, + ) + return placement_group_model + + +def get_placement_group_model_for_job( + placement_group_models: list[PlacementGroupModel], + fleet_model: Optional[FleetModel], +) -> Optional[PlacementGroupModel]: + """ + Returns any fleet placement group for jobs that provision + in non-empty fleets and `None` for empty fleets. + This is so that only the first job creates placement groups. + """ + placement_group_model = None + active_instances = [] + if fleet_model is not None: + active_instances = [i for i in fleet_model.instances if not i.deleted] + if len(active_instances) > 0 and len(placement_group_models) > 0: + placement_group_model = placement_group_models[0] + return placement_group_model + + +async def find_or_create_suitable_placement_group( + fleet_model: FleetModel, + placement_groups: List[PlacementGroupModel], + instance_offer: InstanceOffer, + compute: ComputeWithPlacementGroupSupport, +) -> Optional[PlacementGroupModel]: + placement_group_model = find_suitable_placement_group( + placement_groups=placement_groups, + instance_offer=instance_offer, + compute=compute, ) + if placement_group_model is None: + placement_group_model = await create_placement_group( + fleet_model=fleet_model, + master_instance_offer=instance_offer, + compute=compute, + ) + return placement_group_model -def get_placement_group_configuration( - placement_group_model: PlacementGroupModel, -) -> PlacementGroupConfiguration: - return PlacementGroupConfiguration.__response__.parse_raw(placement_group_model.configuration) +def find_suitable_placement_group( + placement_groups: List[PlacementGroupModel], + instance_offer: InstanceOffer, + compute: ComputeWithPlacementGroupSupport, +) -> Optional[PlacementGroupModel]: + for pg in placement_groups: + if compute.is_suitable_placement_group( + placement_group_model_to_placement_group(pg), instance_offer + ): + return pg + return None -def get_placement_group_provisioning_data( - placement_group_model: PlacementGroupModel, -) -> Optional[PlacementGroupProvisioningData]: - if placement_group_model.provisioning_data is None: +async def create_placement_group( + fleet_model: FleetModel, + master_instance_offer: InstanceOffer, + compute: ComputeWithPlacementGroupSupport, +) -> Optional[PlacementGroupModel]: + placement_group_model = PlacementGroupModel( + # TODO: generate the name in Compute.create_placement_group to allow + # backend-specific name length limits + name=generate_unique_placement_group_name( + project_name=fleet_model.project.name, + fleet_name=fleet_model.name, + ), + project=fleet_model.project, + fleet=fleet_model, + configuration=PlacementGroupConfiguration( + backend=master_instance_offer.backend, + region=master_instance_offer.region, + placement_strategy=PlacementStrategy.CLUSTER, + ).json(), + ) + placement_group = placement_group_model_to_placement_group(placement_group_model) + logger.debug( + "Creating placement group %s in %s/%s", + placement_group.name, + placement_group.configuration.backend.value, + placement_group.configuration.region, + ) + try: + pgpd = await run_async( + compute.create_placement_group, + placement_group_model_to_placement_group(placement_group_model), + master_instance_offer, + ) + except PlacementGroupNotSupportedError: + logger.debug( + "Skipping offer %s because placement group not supported", + master_instance_offer.instance.name, + ) return None - return PlacementGroupProvisioningData.__response__.parse_raw( - placement_group_model.provisioning_data + except BackendError as e: + logger.warning( + "Failed to create placement group %s in %s/%s: %r", + placement_group.name, + placement_group.configuration.backend.value, + placement_group.configuration.region, + e, + ) + return None + except Exception: + logger.exception( + "Got exception when creating placement group %s in %s/%s", + placement_group.name, + placement_group.configuration.backend.value, + placement_group.configuration.region, + ) + return None + logger.info( + "Created placement group %s in %s/%s", + placement_group.name, + placement_group.configuration.backend.value, + placement_group.configuration.region, ) + placement_group_model.provisioning_data = pgpd.json() + return placement_group_model + + +def _is_fleet_master_instance(instance: InstanceModel) -> bool: + return instance.fleet is not None and instance.id == instance.fleet.instances[0].id diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 870b378a8..cff08e5b0 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -84,6 +84,7 @@ get_job_configured_volumes, get_jobs_from_run_spec, group_jobs_by_replica_latest, + is_multinode_job, job_model_to_job_submission, stop_runner, ) @@ -862,7 +863,7 @@ async def _get_pool_offers( detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) pool_instances = await get_pool_instances(session, project) pool_instances = [i for i in pool_instances if i.id not in detaching_instances_ids] - multinode = job.job_spec.jobs_per_replica > 1 + multinode = is_multinode_job(job) shared_instances_with_offers = get_shared_pool_instances_with_offers( pool_instances=pool_instances, diff --git a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py index c92b8a230..c22b45f0a 100644 --- a/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_submitted_jobs.py @@ -15,8 +15,8 @@ InstanceAvailability, InstanceStatus, ) +from dstack._internal.core.models.placement import PlacementGroup from dstack._internal.core.models.profiles import Profile -from dstack._internal.core.models.resources import Range, ResourcesSpec from dstack._internal.core.models.runs import ( JobStatus, JobTerminationReason, @@ -35,6 +35,7 @@ ComputeGroupModel, InstanceModel, JobModel, + PlacementGroupModel, VolumeAttachmentModel, ) from dstack._internal.server.settings import JobNetworkMode @@ -52,6 +53,7 @@ get_fleet_spec, get_instance_offer_with_availability, get_job_provisioning_data, + get_placement_group_provisioning_data, get_run_spec, get_volume_provisioning_data, ) @@ -814,17 +816,6 @@ async def test_does_not_assign_job_to_elastic_empty_fleet_without_backend_offers fleet_spec = get_fleet_spec() fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=1) await create_fleet(session=session, project=project, spec=fleet_spec, name="fleet") - # Need a second non-empty fleet to have two-stage processing - fleet2 = await create_fleet( - session=session, project=project, spec=fleet_spec, name="fleet2" - ) - await create_instance( - session=session, - project=project, - fleet=fleet2, - instance_num=0, - status=InstanceStatus.BUSY, - ) run = await create_run( session=session, project=project, @@ -856,20 +847,6 @@ async def test_assigns_job_to_elastic_empty_fleet_with_backend_offers_if_fleets_ fleet1 = await create_fleet( session=session, project=project, spec=fleet_spec1, name="fleet" ) - # Need a second non-empty fleet to have two-stage processing - fleet_spec2 = get_fleet_spec() - # Empty resources intersection to return no backend offers - fleet_spec2.configuration.resources = ResourcesSpec(cpu=Range(min=0, max=0)) - fleet2 = await create_fleet( - session=session, project=project, spec=fleet_spec2, name="fleet2" - ) - await create_instance( - session=session, - project=project, - fleet=fleet2, - instance_num=0, - status=InstanceStatus.BUSY, - ) run = await create_run( session=session, project=project, @@ -1243,6 +1220,55 @@ async def test_provisioning_master_job_respects_cluster_placement_in_non_empty_f await session.refresh(job) assert job.status == JobStatus.PROVISIONING + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_creates_placement_group(self, test_db, session: AsyncSession): + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo(session=session, project_id=project.id) + fleet_spec = get_fleet_spec() + fleet_spec.configuration.placement = InstanceGroupPlacement.CLUSTER + fleet_spec.configuration.nodes = FleetNodesSpec(min=0, target=0, max=None) + fleet = await create_fleet(session=session, project=project, spec=fleet_spec) + run_spec = get_run_spec(run_name="test-run", repo_id=repo.name) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + fleet=fleet, + run_name="test-run", + run_spec=run_spec, + ) + job = await create_job( + session=session, + run=run, + instance_assigned=True, + ) + offer = get_instance_offer_with_availability() + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + backend_mock = Mock() + backend_mock.TYPE = BackendType.AWS + compute_mock = Mock(spec=ComputeMockSpec) + backend_mock.compute.return_value = compute_mock + m.return_value = [backend_mock] + compute_mock.get_offers.return_value = [offer] + compute_mock.run_job.return_value = get_job_provisioning_data() + compute_mock.create_placement_group.return_value = ( + get_placement_group_provisioning_data() + ) + await process_submitted_jobs() + m.assert_called_once() + compute_mock.get_offers.assert_called_once() + compute_mock.run_job.assert_called_once() + compute_mock.create_placement_group.assert_called_once() + pg_arg = compute_mock.run_job.call_args[0][6] + assert isinstance(pg_arg, PlacementGroup) + placement_group = (await session.execute(select(PlacementGroupModel))).scalar() + assert placement_group is not None + await session.refresh(job) + assert job.status == JobStatus.PROVISIONING + @pytest.mark.parametrize( ["job_network_mode", "blocks", "multinode", "network_mode", "constraints_are_set"],