diff --git a/docs/docs/concepts/backends.md b/docs/docs/concepts/backends.md index e251c3caf..6fe5fd9d7 100644 --- a/docs/docs/concepts/backends.md +++ b/docs/docs/concepts/backends.md @@ -520,6 +520,7 @@ gcloud projects list --format="json(projectId)" compute.networks.updatePolicy compute.regions.get compute.regions.list + compute.reservations.list compute.resourcePolicies.create compute.resourcePolicies.delete compute.routers.list @@ -543,6 +544,9 @@ gcloud projects list --format="json(projectId)" Also, the use of TPUs requires the `serviceAccountUser` role. For TPU VMs, dstack will use the default service account. + If you plan to use shared reservations, the `compute.reservations.list` + permission is required in the project that owns the reservations. + ??? info "Required APIs" First, ensure the required APIs are enabled in your GCP `project_id`. diff --git a/docs/docs/guides/troubleshooting.md b/docs/docs/guides/troubleshooting.md index 6a46ec073..ac4ef4bc9 100644 --- a/docs/docs/guides/troubleshooting.md +++ b/docs/docs/guides/troubleshooting.md @@ -111,7 +111,7 @@ one of these features, `dstack` will only select offers from the backends that s are only supported by the `aws`, `azure`, `gcp`, `nebius`, `oci`, and `vultr` backends, as well as SSH fleets. - [Reservations](../reference/dstack.yml/fleet.md#reservation) - are only supported by the `aws` backend. + are only supported by the `aws` and `gcp` backends. #### Cause 8: dstack Sky balance diff --git a/src/dstack/_internal/core/backends/aws/compute.py b/src/dstack/_internal/core/backends/aws/compute.py index b4bc6c670..f973980e0 100644 --- a/src/dstack/_internal/core/backends/aws/compute.py +++ b/src/dstack/_internal/core/backends/aws/compute.py @@ -1,4 +1,5 @@ import threading +from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Callable, Dict, List, Optional, Tuple @@ -34,7 +35,11 @@ get_user_data, merge_tags, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.errors import ( ComputeError, NoCapacityError, @@ -159,10 +164,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability ) return availability_offers - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def _get_offers_cached_key(self, requirements: Requirements) -> int: # Requirements is not hashable, so we use a hack to get arguments hash diff --git a/src/dstack/_internal/core/backends/azure/compute.py b/src/dstack/_internal/core/backends/azure/compute.py index c67398829..8b37a72b2 100644 --- a/src/dstack/_internal/core/backends/azure/compute.py +++ b/src/dstack/_internal/core/backends/azure/compute.py @@ -1,8 +1,9 @@ import base64 import enum import re +from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Callable, Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from azure.core.credentials import TokenCredential from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError @@ -51,7 +52,11 @@ merge_tags, requires_nvidia_proprietary_kernel_modules, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.consts import DSTACK_OS_IMAGE_WITH_PROPRIETARY_NVIDIA_KERNEL_MODULES from dstack._internal.core.errors import ComputeError, NoCapacityError from dstack._internal.core.models.backends.base import BackendType @@ -108,10 +113,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability ) return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def create_instance( self, diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 6a1f6af4a..560982933 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -17,12 +17,13 @@ from gpuhunt import CPUArchitecture from dstack._internal import settings -from dstack._internal.core.backends.base.offers import filter_offers_by_requirements +from dstack._internal.core.backends.base.offers import OfferModifier, filter_offers_by_requirements from dstack._internal.core.consts import ( DSTACK_RUNNER_HTTP_PORT, DSTACK_RUNNER_SSH_PORT, DSTACK_SHIM_HTTP_PORT, ) +from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.configurations import LEGACY_REPO_DIR from dstack._internal.core.models.gateways import ( GatewayComputeConfiguration, @@ -168,17 +169,13 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability """ pass - def get_offers_modifier( - self, requirements: Requirements - ) -> Optional[ - Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]] - ]: + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: """ - Returns a modifier function that modifies offers before they are filtered by requirements. - Can return `None` to exclude the offer. + Returns functions that modify offers before they are filtered by requirements. + A modifier function can return `None` to exclude the offer. E.g. can be used to set appropriate disk size based on requirements. """ - return None + return [] def get_offers_post_filter( self, requirements: Requirements @@ -191,14 +188,7 @@ def get_offers_post_filter( def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: offers = self._get_all_offers_with_availability_cached() - modifier = self.get_offers_modifier(requirements) - if modifier is not None: - modified_offers = [] - for o in offers: - modified_offer = modifier(o) - if modified_offer is not None: - modified_offers.append(modified_offer) - offers = modified_offers + offers = self.__apply_modifiers(offers, self.get_offers_modifiers(requirements)) offers = filter_offers_by_requirements(offers, requirements) post_filter = self.get_offers_post_filter(requirements) if post_filter is not None: @@ -212,6 +202,20 @@ def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvaila def _get_all_offers_with_availability_cached(self) -> List[InstanceOfferWithAvailability]: return self.get_all_offers_with_availability() + @staticmethod + def __apply_modifiers( + offers: Iterable[InstanceOfferWithAvailability], modifiers: Iterable[OfferModifier] + ) -> list[InstanceOfferWithAvailability]: + modified_offers = [] + for offer in offers: + for modifier in modifiers: + offer = modifier(offer) + if offer is None: + break + else: + modified_offers.append(offer) + return modified_offers + class ComputeWithFilteredOffersCached(ABC): """ @@ -341,6 +345,15 @@ class ComputeWithMultinodeSupport: class ComputeWithReservationSupport: """ Must be subclassed to support provisioning from reservations. + + The following is expected from a backend that supports reservations: + + - `get_offers` respects `Requirements.reservation` if set, and only returns + offers that can be provisioned in the configured reservation. It can + adjust some offer properties such as `availability` and + `availability_zones` if necessary. + - `create_instance` respects `InstanceConfig.reservation` if set, and + provisions the instance in the configured reservation. """ pass @@ -391,6 +404,16 @@ def is_suitable_placement_group( """ pass + def are_placement_groups_compatible_with_reservations(self, backend_type: BackendType) -> bool: + """ + Whether placement groups can be used for instances provisioned in reservations. + + Arguments: + backend_type: matches the backend type of this compute, unless this compute is a proxy + for other backends (dstack Sky) + """ + return True + class ComputeWithGatewaySupport(ABC): """ diff --git a/src/dstack/_internal/core/backends/base/offers.py b/src/dstack/_internal/core/backends/base/offers.py index c081b4271..e3baa53f1 100644 --- a/src/dstack/_internal/core/backends/base/offers.py +++ b/src/dstack/_internal/core/backends/base/offers.py @@ -199,9 +199,12 @@ def choose_disk_size_mib( return round(disk_size_gib * 1024) +OfferModifier = Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]] + + def get_offers_disk_modifier( configurable_disk_size: Range[Memory], requirements: Requirements -) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: +) -> OfferModifier: """ Returns a func that modifies offers disk by setting min value that satisfies both `configurable_disk_size` and `requirements`. diff --git a/src/dstack/_internal/core/backends/datacrunch/compute.py b/src/dstack/_internal/core/backends/datacrunch/compute.py index 6b602a6b7..6543567c3 100644 --- a/src/dstack/_internal/core/backends/datacrunch/compute.py +++ b/src/dstack/_internal/core/backends/datacrunch/compute.py @@ -1,4 +1,5 @@ -from typing import Callable, Dict, List, Optional +from collections.abc import Iterable +from typing import Dict, List, Optional from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException @@ -12,7 +13,11 @@ generate_unique_instance_name, get_shim_commands, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.datacrunch.models import DataCrunchConfig from dstack._internal.core.errors import NoCapacityError from dstack._internal.core.models.backends.base import BackendType @@ -59,10 +64,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability offers_with_availability = self._get_offers_with_availability(offers) return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def _get_offers_with_availability( self, offers: List[InstanceOffer] diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index 3d2af719b..efda11266 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -1,7 +1,9 @@ import concurrent.futures import json +import re import threading from collections import defaultdict +from collections.abc import Iterable from dataclasses import dataclass from typing import Callable, Dict, List, Literal, Optional, Tuple @@ -24,6 +26,7 @@ ComputeWithPlacementGroupSupport, ComputeWithPrivateGatewaySupport, ComputeWithPrivilegedSupport, + ComputeWithReservationSupport, ComputeWithVolumeSupport, generate_unique_gateway_instance_name, generate_unique_instance_name, @@ -35,6 +38,7 @@ requires_nvidia_proprietary_kernel_modules, ) from dstack._internal.core.backends.base.offers import ( + OfferModifier, get_catalog_offers, get_offers_disk_modifier, ) @@ -78,8 +82,11 @@ # pd-balanced disks can be 10GB-64TB, but dstack images are 20GB and cannot grow larger # than 32TB because of filesystem settings CONFIGURABLE_DISK_SIZE = Range[Memory](min=Memory.parse("20GB"), max=Memory.parse("32TB")) - - +# Pattern from https://cloud.google.com/compute/docs/instances/reservations-consume#consuming_instances_from_a_specific_reservation +RESERVATION_PATTERN = re.compile( + r"projects/(?P[a-z0-9-]+)/reservations/(?P[a-z0-9-]+)" +) +RESOURCE_NAME_PATTERN = re.compile(r"[a-z0-9-]+") TPU_VERSIONS = [tpu.name for tpu in KNOWN_TPUS] @@ -93,6 +100,7 @@ class GCPCompute( ComputeWithCreateInstanceSupport, ComputeWithPrivilegedSupport, ComputeWithMultinodeSupport, + ComputeWithReservationSupport, ComputeWithPlacementGroupSupport, ComputeWithGatewaySupport, ComputeWithPrivateGatewaySupport, @@ -113,8 +121,12 @@ def __init__(self, config: GCPConfig): self.resource_policies_client = compute_v1.ResourcePoliciesClient( credentials=self.credentials ) + self.reservations_client = compute_v1.ReservationsClient(credentials=self.credentials) self._usable_subnets_cache_lock = threading.Lock() self._usable_subnets_cache = TTLCache(maxsize=1, ttl=120) + self._find_reservation_cache_lock = threading.Lock() + # smaller TTL, since we check the reservation's in_use_count, which can change often + self._find_reservation_cache = TTLCache(maxsize=8, ttl=20) def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability]: regions = get_or_error(self.config.regions) @@ -149,10 +161,40 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability offers_with_availability[-1].region = region return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + modifiers = [] + + if requirements.reservation: + zone_to_reservation = self._find_reservation(requirements.reservation) + + def reservation_modifier( + offer: InstanceOfferWithAvailability, + ) -> Optional[InstanceOfferWithAvailability]: + if offer.instance.resources.spot: + return None + assert offer.availability_zones is not None + matching_zones = [] + zones_with_capacity = [] + for zone in offer.availability_zones: + reservation = zone_to_reservation.get(zone) + if reservation is not None and _offer_matches_reservation(offer, reservation): + matching_zones.append(zone) + if _reservation_has_capacity(reservation): + zones_with_capacity.append(zone) + if not matching_zones: + return None + offer = offer.copy(deep=True) + if zones_with_capacity: + offer.availability_zones = zones_with_capacity + else: + offer.availability_zones = matching_zones + offer.availability = InstanceAvailability.NOT_AVAILABLE + return offer + + modifiers.append(reservation_modifier) + + modifiers.append(get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)) + return modifiers def terminate_instance( self, instance_id: str, region: str, backend_data: Optional[str] = None @@ -305,6 +347,16 @@ def create_instance( ) for zone in zones: + reservation = None + if instance_config.reservation: + reservation = self._find_reservation(instance_config.reservation).get(zone) + if reservation is None: + logger.warning( + "Reservation %s no longer exists in zone %s", + instance_config.reservation, + zone, + ) + continue request = compute_v1.InsertInstanceRequest() request.zone = zone request.project = self.config.project_id @@ -335,6 +387,7 @@ def create_instance( roce_subnetworks=roce_subnets, allocate_public_ip=allocate_public_ip, placement_policy=placement_policy, + reservation=reservation, ) try: # GCP needs some time to return an error in case of no capacity (< 30s). @@ -475,6 +528,11 @@ def is_suitable_placement_group( ) -> bool: return placement_group.configuration.region == instance_offer.region + def are_placement_groups_compatible_with_reservations(self, backend_type: BackendType) -> bool: + # Cannot use our own placement policies when provisioning in a reservation. + # Instead, we use the placement policy defined in reservation settings. + return False + def create_gateway( self, configuration: GatewayComputeConfiguration, @@ -880,6 +938,26 @@ def _get_vpc_subnet(self, region: str) -> Optional[str]: usable_subnets=self._list_usable_subnets(), ) + @cachedmethod( + cache=lambda self: self._find_reservation_cache, + lock=lambda self: self._find_reservation_cache_lock, + ) + def _find_reservation(self, configured_name: str) -> dict[str, compute_v1.Reservation]: + if match := RESERVATION_PATTERN.fullmatch(configured_name): + project_id = match.group("project_id") + name = match.group("reservation_name") + elif RESOURCE_NAME_PATTERN.fullmatch(configured_name): + project_id = self.config.project_id + name = configured_name + else: + # misconfigured or non-GCP + return {} + return gcp_resources.find_reservation( + reservations_client=self.reservations_client, + project_id=project_id, + name=name, + ) + def _supported_instances_and_zones( regions: List[str], @@ -933,6 +1011,52 @@ def _has_gpu_quota(quotas: Dict[str, float], resources: Resources) -> bool: return len(resources.gpus) <= quotas.get(quota_name, 0) +def _offer_matches_reservation( + offer: InstanceOfferWithAvailability, reservation: compute_v1.Reservation +) -> bool: + if ( + reservation.specific_reservation is None + or reservation.specific_reservation.instance_properties is None + ): + return False + properties = reservation.specific_reservation.instance_properties + if properties.machine_type != offer.instance.name: + return False + accelerators = properties.guest_accelerators or [] + if not accelerators and offer.instance.resources.gpus: + return False + if len(accelerators) > 1: + logger.warning( + "Expected 0 or 1 accelerator types per instance," + f" but {properties.machine_type} has {len(accelerators)}." + f" Ignoring reservation {reservation.self_link}" + ) + return False + if accelerators: + if accelerators[0].accelerator_count != len(offer.instance.resources.gpus): + return False + if ( + offer.instance.resources.gpus + and gcp_resources.find_accelerator_name( + offer.instance.resources.gpus[0].name, + offer.instance.resources.gpus[0].memory_mib, + ) + != accelerators[0].accelerator_type + ): + return False + return True + + +def _reservation_has_capacity(reservation: compute_v1.Reservation) -> bool: + return ( + reservation.specific_reservation is not None + and reservation.specific_reservation.in_use_count is not None + and reservation.specific_reservation.assured_count is not None + and reservation.specific_reservation.in_use_count + < reservation.specific_reservation.assured_count + ) + + def _unique_instance_name(instance: InstanceType) -> str: if instance.resources.spot: name = f"{instance.name}-spot" diff --git a/src/dstack/_internal/core/backends/gcp/resources.py b/src/dstack/_internal/core/backends/gcp/resources.py index 57f2f8548..b3b937cea 100644 --- a/src/dstack/_internal/core/backends/gcp/resources.py +++ b/src/dstack/_internal/core/backends/gcp/resources.py @@ -29,6 +29,31 @@ ] +def find_accelerator_name(gpu_name: str, memory_mib: int) -> Optional[str]: + for acc in supported_accelerators: + if gpu_name == acc["gpu_name"] and memory_mib == acc["memory_mb"]: + return acc["accelerator_name"] + return None + + +def sanitize_filter_value(value: str) -> str: + """ + Escape characters that could break the Compute Engine API filter string. + """ + return value.replace("\\", "\\\\").replace('"', '\\"') + + +def get_resource_project(resource_url: str) -> str: + """ + Extract the project ID from a URL like + https://www.googleapis.com/compute/v1/projects/proj-id/zones/us-central1-a/instances/vm-name + """ + matches = re.findall(r"/projects/(?P[a-z0-9-]+)/", resource_url) + if not matches: + raise BackendError(f"Invalid resource URL {resource_url}") + return matches[0] + + def get_availability_zones( regions_client: compute_v1.RegionsClient, project_id: str, @@ -123,6 +148,7 @@ def create_instance_struct( roce_subnetworks: Optional[List[Tuple[str, str]]] = None, allocate_public_ip: bool = True, placement_policy: Optional[str] = None, + reservation: Optional[compute_v1.Reservation] = None, ) -> compute_v1.Instance: instance = compute_v1.Instance() instance.name = instance_name @@ -147,6 +173,25 @@ def create_instance_struct( initialize_params.disk_type = f"zones/{zone}/diskTypes/hyperdisk-balanced" disk.initialize_params = initialize_params instance.disks = [disk] + if ( + reservation is not None + and reservation.specific_reservation is not None + and reservation.specific_reservation.instance_properties is not None + and reservation.specific_reservation.instance_properties.local_ssds is not None + ): + for local_ssd in reservation.specific_reservation.instance_properties.local_ssds: + instance.disks.append( + compute_v1.AttachedDisk( + auto_delete=True, + boot=False, + type_="SCRATCH", + initialize_params=compute_v1.AttachedDiskInitializeParams( + disk_type=f"zones/{zone}/diskTypes/local-ssd", + disk_size_gb=local_ssd.disk_size_gb, + ), + interface=local_ssd.interface, + ) + ) if accelerators: instance.guest_accelerators = accelerators @@ -162,6 +207,8 @@ def create_instance_struct( if placement_policy is not None: instance.resource_policies = [placement_policy] + elif reservation is not None and "placement" in reservation.resource_policies: + instance.resource_policies = [reservation.resource_policies["placement"]] if spot: instance.scheduling = compute_v1.Scheduling() @@ -187,6 +234,17 @@ def create_instance_struct( ) ] + if reservation is not None: + reservation_project = get_resource_project(reservation.self_link) + instance.reservation_affinity = compute_v1.ReservationAffinity() + instance.reservation_affinity.consume_reservation_type = ( + compute_v1.ReservationAffinity.ConsumeReservationType.SPECIFIC_RESERVATION.name + ) + instance.reservation_affinity.key = "compute.googleapis.com/reservation-name" + instance.reservation_affinity.values = [ + f"projects/{reservation_project}/reservations/{reservation.name}" + ] + return instance @@ -350,11 +408,8 @@ def get_accelerators( return [] accelerator_config = compute_v1.AcceleratorConfig() accelerator_config.accelerator_count = len(gpus) - for acc in supported_accelerators: - if gpus[0].name == acc["gpu_name"] and gpus[0].memory_mib == acc["memory_mb"]: - accelerator_name = acc["accelerator_name"] - break - else: + accelerator_name = find_accelerator_name(gpus[0].name, gpus[0].memory_mib) + if accelerator_name is None: raise ValueError(f"Unsupported GPU: {gpus[0].name} {gpus[0].memory_mib} MiB") accelerator_config.accelerator_type = ( f"projects/{project_id}/zones/{zone}/acceleratorTypes/{accelerator_name}" @@ -362,6 +417,31 @@ def get_accelerators( return [accelerator_config] +def find_reservation( + reservations_client: compute_v1.ReservationsClient, + project_id: str, + name: str, +) -> dict[str, compute_v1.Reservation]: + request = compute_v1.AggregatedListReservationsRequest( + project=project_id, + filter=( + f'(name = "{sanitize_filter_value(name)}")' + ' AND (status = "READY")' + " AND (specificReservationRequired = true)" + ), + ) + try: + aggregated_reservations = reservations_client.aggregated_list(request=request) + except (google.api_core.exceptions.NotFound, google.api_core.exceptions.Forbidden) as e: + logger.warning("Could not find reservation: %s", e) + return {} + zone_to_reservation = {} + for zone, zone_reservations in aggregated_reservations: + if zone_reservations.reservations: + zone_to_reservation[zone.split("/")[-1]] = zone_reservations.reservations[0] + return zone_to_reservation + + def filter_invalid_labels(labels: Dict[str, str]) -> Dict[str, str]: filtered_labels = {} for k, v in labels.items(): diff --git a/src/dstack/_internal/core/backends/nebius/compute.py b/src/dstack/_internal/core/backends/nebius/compute.py index 7c1335f6b..70a41dedd 100644 --- a/src/dstack/_internal/core/backends/nebius/compute.py +++ b/src/dstack/_internal/core/backends/nebius/compute.py @@ -2,8 +2,9 @@ import random import shlex import time +from collections.abc import Iterable from functools import cached_property -from typing import Callable, List, Optional +from typing import List, Optional from nebius.aio.operation import Operation as SDKOperation from nebius.aio.service_error import RequestError, StatusCode @@ -21,7 +22,11 @@ get_user_data, merge_tags, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.nebius import resources from dstack._internal.core.backends.nebius.fabrics import get_suitable_infiniband_fabrics from dstack._internal.core.backends.nebius.models import NebiusConfig, NebiusServiceAccountCreds @@ -125,10 +130,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability for offer in offers ] - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def create_instance( self, diff --git a/src/dstack/_internal/core/backends/oci/compute.py b/src/dstack/_internal/core/backends/oci/compute.py index 90f8981d7..2560988b8 100644 --- a/src/dstack/_internal/core/backends/oci/compute.py +++ b/src/dstack/_internal/core/backends/oci/compute.py @@ -1,6 +1,7 @@ +from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor from functools import cached_property -from typing import Callable, List, Optional +from typing import List, Optional import oci @@ -13,7 +14,11 @@ generate_unique_instance_name, get_user_data, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.oci import resources from dstack._internal.core.backends.oci.models import OCIConfig from dstack._internal.core.backends.oci.region import make_region_clients_map @@ -96,10 +101,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def terminate_instance( self, instance_id: str, region: str, backend_data: Optional[str] = None diff --git a/src/dstack/_internal/core/backends/runpod/compute.py b/src/dstack/_internal/core/backends/runpod/compute.py index 9b7fa6e65..355572a09 100644 --- a/src/dstack/_internal/core/backends/runpod/compute.py +++ b/src/dstack/_internal/core/backends/runpod/compute.py @@ -1,7 +1,8 @@ import json import uuid +from collections.abc import Iterable from datetime import timedelta -from typing import Callable, List, Optional +from typing import List, Optional from dstack._internal.core.backends.base.backend import Compute from dstack._internal.core.backends.base.compute import ( @@ -12,7 +13,11 @@ get_docker_commands, get_job_instance_name, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.runpod.api_client import RunpodApiClient from dstack._internal.core.backends.runpod.models import RunpodConfig from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT @@ -72,10 +77,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability ] return offers - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def run_job( self, diff --git a/src/dstack/_internal/core/models/fleets.py b/src/dstack/_internal/core/models/fleets.py index 596d8ec82..031b9aae4 100644 --- a/src/dstack/_internal/core/models/fleets.py +++ b/src/dstack/_internal/core/models/fleets.py @@ -244,7 +244,7 @@ class InstanceGroupParams(CoreModel): Field( description=( "The existing reservation to use for instance provisioning." - " Supports AWS Capacity Reservations and Capacity Blocks" + " Supports AWS Capacity Reservations, AWS Capacity Blocks, and GCP reservations" ) ), ] = None diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index 286b07492..ca37cd360 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -283,7 +283,7 @@ class ProfileParams(CoreModel): Field( description=( "The existing reservation to use for instance provisioning." - " Supports AWS Capacity Reservations and Capacity Blocks" + " Supports AWS Capacity Reservations, AWS Capacity Blocks, and GCP reservations" ) ), ] = None diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index ec7ca8f7e..7a0815f3b 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -558,10 +558,14 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No if ( _is_fleet_master_instance(instance) and instance_offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT + and isinstance(compute, ComputeWithPlacementGroupSupport) + and ( + compute.are_placement_groups_compatible_with_reservations(instance_offer.backend) + or instance_configuration.reservation is None + ) and instance.fleet and _is_cloud_cluster(instance.fleet) ): - assert isinstance(compute, ComputeWithPlacementGroupSupport) placement_group_model = _find_suitable_placement_group( placement_groups=placement_group_models, instance_offer=instance_offer,