From e3b82f85a787853ede1ce8862e56677fbc7bd9f1 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Mon, 13 Oct 2025 23:16:10 +0200 Subject: [PATCH 1/2] Support GCP reservations Allow users to specify a specifically-targeted GCP reservation in fleet configurations: ```yaml type: fleet nodes: 1 backends: [gcp] reservation: my-reservation ``` For reservations shared between projects, the full syntax can be used to reference the project that owns the reservation: ```yaml type: fleet nodes: 1 backends: [gcp] reservation: projects/my-proj/reservations/my-reservation ``` `dstack` will locate the specified reservation, suggest offers that match the reservation's properties, and provision instances in the reservation. If there are multiple reservations with the specified name, all of them will be considered for provisioning. Using reservations requires the `compute.reservations.list` permission in the project that owns the reservations. The implementation was only tested with on-demand reservations. Whether other reservation types work can be confirmed later, which includes Future Reservations in Calendar Mode and Future Reservations in AI Hypercomputer. --- docs/docs/concepts/backends.md | 4 + docs/docs/guides/troubleshooting.md | 2 +- .../_internal/core/backends/aws/compute.py | 13 +- .../_internal/core/backends/azure/compute.py | 15 +- .../_internal/core/backends/base/compute.py | 52 ++++--- .../_internal/core/backends/base/offers.py | 5 +- .../core/backends/datacrunch/compute.py | 15 +- .../_internal/core/backends/gcp/compute.py | 136 +++++++++++++++++- .../_internal/core/backends/gcp/resources.py | 90 +++++++++++- .../_internal/core/backends/nebius/compute.py | 15 +- .../_internal/core/backends/oci/compute.py | 15 +- .../_internal/core/backends/runpod/compute.py | 15 +- src/dstack/_internal/core/models/fleets.py | 2 +- src/dstack/_internal/core/models/profiles.py | 2 +- .../background/tasks/process_instances.py | 6 +- 15 files changed, 319 insertions(+), 68 deletions(-) diff --git a/docs/docs/concepts/backends.md b/docs/docs/concepts/backends.md index e251c3caf..6fe5fd9d7 100644 --- a/docs/docs/concepts/backends.md +++ b/docs/docs/concepts/backends.md @@ -520,6 +520,7 @@ gcloud projects list --format="json(projectId)" compute.networks.updatePolicy compute.regions.get compute.regions.list + compute.reservations.list compute.resourcePolicies.create compute.resourcePolicies.delete compute.routers.list @@ -543,6 +544,9 @@ gcloud projects list --format="json(projectId)" Also, the use of TPUs requires the `serviceAccountUser` role. For TPU VMs, dstack will use the default service account. + If you plan to use shared reservations, the `compute.reservations.list` + permission is required in the project that owns the reservations. + ??? info "Required APIs" First, ensure the required APIs are enabled in your GCP `project_id`. diff --git a/docs/docs/guides/troubleshooting.md b/docs/docs/guides/troubleshooting.md index 6a46ec073..ac4ef4bc9 100644 --- a/docs/docs/guides/troubleshooting.md +++ b/docs/docs/guides/troubleshooting.md @@ -111,7 +111,7 @@ one of these features, `dstack` will only select offers from the backends that s are only supported by the `aws`, `azure`, `gcp`, `nebius`, `oci`, and `vultr` backends, as well as SSH fleets. - [Reservations](../reference/dstack.yml/fleet.md#reservation) - are only supported by the `aws` backend. + are only supported by the `aws` and `gcp` backends. #### Cause 8: dstack Sky balance diff --git a/src/dstack/_internal/core/backends/aws/compute.py b/src/dstack/_internal/core/backends/aws/compute.py index b4bc6c670..f973980e0 100644 --- a/src/dstack/_internal/core/backends/aws/compute.py +++ b/src/dstack/_internal/core/backends/aws/compute.py @@ -1,4 +1,5 @@ import threading +from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Any, Callable, Dict, List, Optional, Tuple @@ -34,7 +35,11 @@ get_user_data, merge_tags, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.errors import ( ComputeError, NoCapacityError, @@ -159,10 +164,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability ) return availability_offers - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def _get_offers_cached_key(self, requirements: Requirements) -> int: # Requirements is not hashable, so we use a hack to get arguments hash diff --git a/src/dstack/_internal/core/backends/azure/compute.py b/src/dstack/_internal/core/backends/azure/compute.py index c67398829..8b37a72b2 100644 --- a/src/dstack/_internal/core/backends/azure/compute.py +++ b/src/dstack/_internal/core/backends/azure/compute.py @@ -1,8 +1,9 @@ import base64 import enum import re +from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor, as_completed -from typing import Callable, Dict, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from azure.core.credentials import TokenCredential from azure.core.exceptions import ResourceExistsError, ResourceNotFoundError @@ -51,7 +52,11 @@ merge_tags, requires_nvidia_proprietary_kernel_modules, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.consts import DSTACK_OS_IMAGE_WITH_PROPRIETARY_NVIDIA_KERNEL_MODULES from dstack._internal.core.errors import ComputeError, NoCapacityError from dstack._internal.core.models.backends.base import BackendType @@ -108,10 +113,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability ) return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def create_instance( self, diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 6a1f6af4a..1dbf28132 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -17,7 +17,7 @@ from gpuhunt import CPUArchitecture from dstack._internal import settings -from dstack._internal.core.backends.base.offers import filter_offers_by_requirements +from dstack._internal.core.backends.base.offers import OfferModifier, filter_offers_by_requirements from dstack._internal.core.consts import ( DSTACK_RUNNER_HTTP_PORT, DSTACK_RUNNER_SSH_PORT, @@ -168,17 +168,13 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability """ pass - def get_offers_modifier( - self, requirements: Requirements - ) -> Optional[ - Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]] - ]: + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: """ - Returns a modifier function that modifies offers before they are filtered by requirements. - Can return `None` to exclude the offer. + Returns functions that modify offers before they are filtered by requirements. + A modifier function can return `None` to exclude the offer. E.g. can be used to set appropriate disk size based on requirements. """ - return None + return [] def get_offers_post_filter( self, requirements: Requirements @@ -191,14 +187,7 @@ def get_offers_post_filter( def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: offers = self._get_all_offers_with_availability_cached() - modifier = self.get_offers_modifier(requirements) - if modifier is not None: - modified_offers = [] - for o in offers: - modified_offer = modifier(o) - if modified_offer is not None: - modified_offers.append(modified_offer) - offers = modified_offers + offers = self.__apply_modifiers(offers, self.get_offers_modifiers(requirements)) offers = filter_offers_by_requirements(offers, requirements) post_filter = self.get_offers_post_filter(requirements) if post_filter is not None: @@ -212,6 +201,20 @@ def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvaila def _get_all_offers_with_availability_cached(self) -> List[InstanceOfferWithAvailability]: return self.get_all_offers_with_availability() + @staticmethod + def __apply_modifiers( + offers: Iterable[InstanceOfferWithAvailability], modifiers: Iterable[OfferModifier] + ) -> list[InstanceOfferWithAvailability]: + modified_offers = [] + for offer in offers: + for modifier in modifiers: + offer = modifier(offer) + if offer is None: + break + else: + modified_offers.append(offer) + return modified_offers + class ComputeWithFilteredOffersCached(ABC): """ @@ -341,6 +344,15 @@ class ComputeWithMultinodeSupport: class ComputeWithReservationSupport: """ Must be subclassed to support provisioning from reservations. + + The following is expected from a backend that supports reservations: + + - `get_offers` respects `Requirements.reservation` if set, and only returns + offers that can be provisioned in the configured reservation. It can + adjust some offer properties such as `availability` and + `availability_zones` if necessary. + - `create_instance` respects `InstanceConfig.reservation` if set, and + provisions the instance in the configured reservation. """ pass @@ -391,6 +403,12 @@ def is_suitable_placement_group( """ pass + def are_placement_groups_compatible_with_reservations(self) -> bool: + """ + Whether placement groups can be used for instances provisioned in reservations. + """ + return True + class ComputeWithGatewaySupport(ABC): """ diff --git a/src/dstack/_internal/core/backends/base/offers.py b/src/dstack/_internal/core/backends/base/offers.py index c081b4271..e3baa53f1 100644 --- a/src/dstack/_internal/core/backends/base/offers.py +++ b/src/dstack/_internal/core/backends/base/offers.py @@ -199,9 +199,12 @@ def choose_disk_size_mib( return round(disk_size_gib * 1024) +OfferModifier = Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]] + + def get_offers_disk_modifier( configurable_disk_size: Range[Memory], requirements: Requirements -) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: +) -> OfferModifier: """ Returns a func that modifies offers disk by setting min value that satisfies both `configurable_disk_size` and `requirements`. diff --git a/src/dstack/_internal/core/backends/datacrunch/compute.py b/src/dstack/_internal/core/backends/datacrunch/compute.py index 6b602a6b7..6543567c3 100644 --- a/src/dstack/_internal/core/backends/datacrunch/compute.py +++ b/src/dstack/_internal/core/backends/datacrunch/compute.py @@ -1,4 +1,5 @@ -from typing import Callable, Dict, List, Optional +from collections.abc import Iterable +from typing import Dict, List, Optional from datacrunch import DataCrunchClient from datacrunch.exceptions import APIException @@ -12,7 +13,11 @@ generate_unique_instance_name, get_shim_commands, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.datacrunch.models import DataCrunchConfig from dstack._internal.core.errors import NoCapacityError from dstack._internal.core.models.backends.base import BackendType @@ -59,10 +64,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability offers_with_availability = self._get_offers_with_availability(offers) return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def _get_offers_with_availability( self, offers: List[InstanceOffer] diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index 3d2af719b..4d1d563c1 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -1,7 +1,9 @@ import concurrent.futures import json +import re import threading from collections import defaultdict +from collections.abc import Iterable from dataclasses import dataclass from typing import Callable, Dict, List, Literal, Optional, Tuple @@ -24,6 +26,7 @@ ComputeWithPlacementGroupSupport, ComputeWithPrivateGatewaySupport, ComputeWithPrivilegedSupport, + ComputeWithReservationSupport, ComputeWithVolumeSupport, generate_unique_gateway_instance_name, generate_unique_instance_name, @@ -35,6 +38,7 @@ requires_nvidia_proprietary_kernel_modules, ) from dstack._internal.core.backends.base.offers import ( + OfferModifier, get_catalog_offers, get_offers_disk_modifier, ) @@ -78,8 +82,11 @@ # pd-balanced disks can be 10GB-64TB, but dstack images are 20GB and cannot grow larger # than 32TB because of filesystem settings CONFIGURABLE_DISK_SIZE = Range[Memory](min=Memory.parse("20GB"), max=Memory.parse("32TB")) - - +# Pattern from https://cloud.google.com/compute/docs/instances/reservations-consume#consuming_instances_from_a_specific_reservation +RESERVATION_PATTERN = re.compile( + r"projects/(?P[a-z0-9-]+)/reservations/(?P[a-z0-9-]+)" +) +RESOURCE_NAME_PATTERN = re.compile(r"[a-z0-9-]+") TPU_VERSIONS = [tpu.name for tpu in KNOWN_TPUS] @@ -93,6 +100,7 @@ class GCPCompute( ComputeWithCreateInstanceSupport, ComputeWithPrivilegedSupport, ComputeWithMultinodeSupport, + ComputeWithReservationSupport, ComputeWithPlacementGroupSupport, ComputeWithGatewaySupport, ComputeWithPrivateGatewaySupport, @@ -113,8 +121,12 @@ def __init__(self, config: GCPConfig): self.resource_policies_client = compute_v1.ResourcePoliciesClient( credentials=self.credentials ) + self.reservations_client = compute_v1.ReservationsClient(credentials=self.credentials) self._usable_subnets_cache_lock = threading.Lock() self._usable_subnets_cache = TTLCache(maxsize=1, ttl=120) + self._find_reservation_cache_lock = threading.Lock() + # smaller TTL, since we check the reservation's in_use_count, which can change often + self._find_reservation_cache = TTLCache(maxsize=8, ttl=20) def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability]: regions = get_or_error(self.config.regions) @@ -149,10 +161,40 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability offers_with_availability[-1].region = region return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + modifiers = [] + + if requirements.reservation: + zone_to_reservation = self._find_reservation(requirements.reservation) + + def reservation_modifier( + offer: InstanceOfferWithAvailability, + ) -> Optional[InstanceOfferWithAvailability]: + if offer.instance.resources.spot: + return None + assert offer.availability_zones is not None + matching_zones = [] + zones_with_capacity = [] + for zone in offer.availability_zones: + reservation = zone_to_reservation.get(zone) + if reservation is not None and _offer_matches_reservation(offer, reservation): + matching_zones.append(zone) + if _reservation_has_capacity(reservation): + zones_with_capacity.append(zone) + if not matching_zones: + return None + offer = offer.copy(deep=True) + if zones_with_capacity: + offer.availability_zones = zones_with_capacity + else: + offer.availability_zones = matching_zones + offer.availability = InstanceAvailability.NOT_AVAILABLE + return offer + + modifiers.append(reservation_modifier) + + modifiers.append(get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)) + return modifiers def terminate_instance( self, instance_id: str, region: str, backend_data: Optional[str] = None @@ -305,6 +347,16 @@ def create_instance( ) for zone in zones: + reservation = None + if instance_config.reservation: + reservation = self._find_reservation(instance_config.reservation).get(zone) + if reservation is None: + logger.warning( + "Reservation %s no longer exists in zone %s", + instance_config.reservation, + zone, + ) + continue request = compute_v1.InsertInstanceRequest() request.zone = zone request.project = self.config.project_id @@ -335,6 +387,7 @@ def create_instance( roce_subnetworks=roce_subnets, allocate_public_ip=allocate_public_ip, placement_policy=placement_policy, + reservation=reservation, ) try: # GCP needs some time to return an error in case of no capacity (< 30s). @@ -475,6 +528,11 @@ def is_suitable_placement_group( ) -> bool: return placement_group.configuration.region == instance_offer.region + def are_placement_groups_compatible_with_reservations(self) -> bool: + # Cannot use our own placement policies when provisioning in a reservation. + # Instead, we use the placement policy defined in reservation settings. + return False + def create_gateway( self, configuration: GatewayComputeConfiguration, @@ -880,6 +938,26 @@ def _get_vpc_subnet(self, region: str) -> Optional[str]: usable_subnets=self._list_usable_subnets(), ) + @cachedmethod( + cache=lambda self: self._find_reservation_cache, + lock=lambda self: self._find_reservation_cache_lock, + ) + def _find_reservation(self, configured_name: str) -> dict[str, compute_v1.Reservation]: + if match := RESERVATION_PATTERN.fullmatch(configured_name): + project_id = match.group("project_id") + name = match.group("reservation_name") + elif RESOURCE_NAME_PATTERN.fullmatch(configured_name): + project_id = self.config.project_id + name = configured_name + else: + # misconfigured or non-GCP + return {} + return gcp_resources.find_reservation( + reservations_client=self.reservations_client, + project_id=project_id, + name=name, + ) + def _supported_instances_and_zones( regions: List[str], @@ -933,6 +1011,52 @@ def _has_gpu_quota(quotas: Dict[str, float], resources: Resources) -> bool: return len(resources.gpus) <= quotas.get(quota_name, 0) +def _offer_matches_reservation( + offer: InstanceOfferWithAvailability, reservation: compute_v1.Reservation +) -> bool: + if ( + reservation.specific_reservation is None + or reservation.specific_reservation.instance_properties is None + ): + return False + properties = reservation.specific_reservation.instance_properties + if properties.machine_type != offer.instance.name: + return False + accelerators = properties.guest_accelerators or [] + if not accelerators and offer.instance.resources.gpus: + return False + if len(accelerators) > 1: + logger.warning( + "Expected 0 or 1 accelerator types per instance," + f" but {properties.machine_type} has {len(accelerators)}." + f" Ignoring reservation {reservation.self_link}" + ) + return False + if accelerators: + if accelerators[0].accelerator_count != len(offer.instance.resources.gpus): + return False + if ( + offer.instance.resources.gpus + and gcp_resources.find_accelerator_name( + offer.instance.resources.gpus[0].name, + offer.instance.resources.gpus[0].memory_mib, + ) + != accelerators[0].accelerator_type + ): + return False + return True + + +def _reservation_has_capacity(reservation: compute_v1.Reservation) -> bool: + return ( + reservation.specific_reservation is not None + and reservation.specific_reservation.in_use_count is not None + and reservation.specific_reservation.assured_count is not None + and reservation.specific_reservation.in_use_count + < reservation.specific_reservation.assured_count + ) + + def _unique_instance_name(instance: InstanceType) -> str: if instance.resources.spot: name = f"{instance.name}-spot" diff --git a/src/dstack/_internal/core/backends/gcp/resources.py b/src/dstack/_internal/core/backends/gcp/resources.py index 57f2f8548..b3b937cea 100644 --- a/src/dstack/_internal/core/backends/gcp/resources.py +++ b/src/dstack/_internal/core/backends/gcp/resources.py @@ -29,6 +29,31 @@ ] +def find_accelerator_name(gpu_name: str, memory_mib: int) -> Optional[str]: + for acc in supported_accelerators: + if gpu_name == acc["gpu_name"] and memory_mib == acc["memory_mb"]: + return acc["accelerator_name"] + return None + + +def sanitize_filter_value(value: str) -> str: + """ + Escape characters that could break the Compute Engine API filter string. + """ + return value.replace("\\", "\\\\").replace('"', '\\"') + + +def get_resource_project(resource_url: str) -> str: + """ + Extract the project ID from a URL like + https://www.googleapis.com/compute/v1/projects/proj-id/zones/us-central1-a/instances/vm-name + """ + matches = re.findall(r"/projects/(?P[a-z0-9-]+)/", resource_url) + if not matches: + raise BackendError(f"Invalid resource URL {resource_url}") + return matches[0] + + def get_availability_zones( regions_client: compute_v1.RegionsClient, project_id: str, @@ -123,6 +148,7 @@ def create_instance_struct( roce_subnetworks: Optional[List[Tuple[str, str]]] = None, allocate_public_ip: bool = True, placement_policy: Optional[str] = None, + reservation: Optional[compute_v1.Reservation] = None, ) -> compute_v1.Instance: instance = compute_v1.Instance() instance.name = instance_name @@ -147,6 +173,25 @@ def create_instance_struct( initialize_params.disk_type = f"zones/{zone}/diskTypes/hyperdisk-balanced" disk.initialize_params = initialize_params instance.disks = [disk] + if ( + reservation is not None + and reservation.specific_reservation is not None + and reservation.specific_reservation.instance_properties is not None + and reservation.specific_reservation.instance_properties.local_ssds is not None + ): + for local_ssd in reservation.specific_reservation.instance_properties.local_ssds: + instance.disks.append( + compute_v1.AttachedDisk( + auto_delete=True, + boot=False, + type_="SCRATCH", + initialize_params=compute_v1.AttachedDiskInitializeParams( + disk_type=f"zones/{zone}/diskTypes/local-ssd", + disk_size_gb=local_ssd.disk_size_gb, + ), + interface=local_ssd.interface, + ) + ) if accelerators: instance.guest_accelerators = accelerators @@ -162,6 +207,8 @@ def create_instance_struct( if placement_policy is not None: instance.resource_policies = [placement_policy] + elif reservation is not None and "placement" in reservation.resource_policies: + instance.resource_policies = [reservation.resource_policies["placement"]] if spot: instance.scheduling = compute_v1.Scheduling() @@ -187,6 +234,17 @@ def create_instance_struct( ) ] + if reservation is not None: + reservation_project = get_resource_project(reservation.self_link) + instance.reservation_affinity = compute_v1.ReservationAffinity() + instance.reservation_affinity.consume_reservation_type = ( + compute_v1.ReservationAffinity.ConsumeReservationType.SPECIFIC_RESERVATION.name + ) + instance.reservation_affinity.key = "compute.googleapis.com/reservation-name" + instance.reservation_affinity.values = [ + f"projects/{reservation_project}/reservations/{reservation.name}" + ] + return instance @@ -350,11 +408,8 @@ def get_accelerators( return [] accelerator_config = compute_v1.AcceleratorConfig() accelerator_config.accelerator_count = len(gpus) - for acc in supported_accelerators: - if gpus[0].name == acc["gpu_name"] and gpus[0].memory_mib == acc["memory_mb"]: - accelerator_name = acc["accelerator_name"] - break - else: + accelerator_name = find_accelerator_name(gpus[0].name, gpus[0].memory_mib) + if accelerator_name is None: raise ValueError(f"Unsupported GPU: {gpus[0].name} {gpus[0].memory_mib} MiB") accelerator_config.accelerator_type = ( f"projects/{project_id}/zones/{zone}/acceleratorTypes/{accelerator_name}" @@ -362,6 +417,31 @@ def get_accelerators( return [accelerator_config] +def find_reservation( + reservations_client: compute_v1.ReservationsClient, + project_id: str, + name: str, +) -> dict[str, compute_v1.Reservation]: + request = compute_v1.AggregatedListReservationsRequest( + project=project_id, + filter=( + f'(name = "{sanitize_filter_value(name)}")' + ' AND (status = "READY")' + " AND (specificReservationRequired = true)" + ), + ) + try: + aggregated_reservations = reservations_client.aggregated_list(request=request) + except (google.api_core.exceptions.NotFound, google.api_core.exceptions.Forbidden) as e: + logger.warning("Could not find reservation: %s", e) + return {} + zone_to_reservation = {} + for zone, zone_reservations in aggregated_reservations: + if zone_reservations.reservations: + zone_to_reservation[zone.split("/")[-1]] = zone_reservations.reservations[0] + return zone_to_reservation + + def filter_invalid_labels(labels: Dict[str, str]) -> Dict[str, str]: filtered_labels = {} for k, v in labels.items(): diff --git a/src/dstack/_internal/core/backends/nebius/compute.py b/src/dstack/_internal/core/backends/nebius/compute.py index 7c1335f6b..70a41dedd 100644 --- a/src/dstack/_internal/core/backends/nebius/compute.py +++ b/src/dstack/_internal/core/backends/nebius/compute.py @@ -2,8 +2,9 @@ import random import shlex import time +from collections.abc import Iterable from functools import cached_property -from typing import Callable, List, Optional +from typing import List, Optional from nebius.aio.operation import Operation as SDKOperation from nebius.aio.service_error import RequestError, StatusCode @@ -21,7 +22,11 @@ get_user_data, merge_tags, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.nebius import resources from dstack._internal.core.backends.nebius.fabrics import get_suitable_infiniband_fabrics from dstack._internal.core.backends.nebius.models import NebiusConfig, NebiusServiceAccountCreds @@ -125,10 +130,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability for offer in offers ] - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def create_instance( self, diff --git a/src/dstack/_internal/core/backends/oci/compute.py b/src/dstack/_internal/core/backends/oci/compute.py index 90f8981d7..2560988b8 100644 --- a/src/dstack/_internal/core/backends/oci/compute.py +++ b/src/dstack/_internal/core/backends/oci/compute.py @@ -1,6 +1,7 @@ +from collections.abc import Iterable from concurrent.futures import ThreadPoolExecutor from functools import cached_property -from typing import Callable, List, Optional +from typing import List, Optional import oci @@ -13,7 +14,11 @@ generate_unique_instance_name, get_user_data, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.oci import resources from dstack._internal.core.backends.oci.models import OCIConfig from dstack._internal.core.backends.oci.region import make_region_clients_map @@ -96,10 +101,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability return offers_with_availability - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def terminate_instance( self, instance_id: str, region: str, backend_data: Optional[str] = None diff --git a/src/dstack/_internal/core/backends/runpod/compute.py b/src/dstack/_internal/core/backends/runpod/compute.py index 9b7fa6e65..355572a09 100644 --- a/src/dstack/_internal/core/backends/runpod/compute.py +++ b/src/dstack/_internal/core/backends/runpod/compute.py @@ -1,7 +1,8 @@ import json import uuid +from collections.abc import Iterable from datetime import timedelta -from typing import Callable, List, Optional +from typing import List, Optional from dstack._internal.core.backends.base.backend import Compute from dstack._internal.core.backends.base.compute import ( @@ -12,7 +13,11 @@ get_docker_commands, get_job_instance_name, ) -from dstack._internal.core.backends.base.offers import get_catalog_offers, get_offers_disk_modifier +from dstack._internal.core.backends.base.offers import ( + OfferModifier, + get_catalog_offers, + get_offers_disk_modifier, +) from dstack._internal.core.backends.runpod.api_client import RunpodApiClient from dstack._internal.core.backends.runpod.models import RunpodConfig from dstack._internal.core.consts import DSTACK_RUNNER_SSH_PORT @@ -72,10 +77,8 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability ] return offers - def get_offers_modifier( - self, requirements: Requirements - ) -> Callable[[InstanceOfferWithAvailability], Optional[InstanceOfferWithAvailability]]: - return get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements) + def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: + return [get_offers_disk_modifier(CONFIGURABLE_DISK_SIZE, requirements)] def run_job( self, diff --git a/src/dstack/_internal/core/models/fleets.py b/src/dstack/_internal/core/models/fleets.py index 596d8ec82..031b9aae4 100644 --- a/src/dstack/_internal/core/models/fleets.py +++ b/src/dstack/_internal/core/models/fleets.py @@ -244,7 +244,7 @@ class InstanceGroupParams(CoreModel): Field( description=( "The existing reservation to use for instance provisioning." - " Supports AWS Capacity Reservations and Capacity Blocks" + " Supports AWS Capacity Reservations, AWS Capacity Blocks, and GCP reservations" ) ), ] = None diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index 286b07492..ca37cd360 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -283,7 +283,7 @@ class ProfileParams(CoreModel): Field( description=( "The existing reservation to use for instance provisioning." - " Supports AWS Capacity Reservations and Capacity Blocks" + " Supports AWS Capacity Reservations, AWS Capacity Blocks, and GCP reservations" ) ), ] = None diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index ec7ca8f7e..eff82c02e 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -558,10 +558,14 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No if ( _is_fleet_master_instance(instance) and instance_offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT + and isinstance(compute, ComputeWithPlacementGroupSupport) + and ( + compute.are_placement_groups_compatible_with_reservations() + or instance_configuration.reservation is None + ) and instance.fleet and _is_cloud_cluster(instance.fleet) ): - assert isinstance(compute, ComputeWithPlacementGroupSupport) placement_group_model = _find_suitable_placement_group( placement_groups=placement_group_models, instance_offer=instance_offer, From ee21f04d88dcd849cbe8ee998dc183f14d84d1e7 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Wed, 15 Oct 2025 00:09:40 +0200 Subject: [PATCH 2/2] Tweak `Compute` API for dstack Sky --- src/dstack/_internal/core/backends/base/compute.py | 7 ++++++- src/dstack/_internal/core/backends/gcp/compute.py | 2 +- .../_internal/server/background/tasks/process_instances.py | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 1dbf28132..560982933 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -23,6 +23,7 @@ DSTACK_RUNNER_SSH_PORT, DSTACK_SHIM_HTTP_PORT, ) +from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.configurations import LEGACY_REPO_DIR from dstack._internal.core.models.gateways import ( GatewayComputeConfiguration, @@ -403,9 +404,13 @@ def is_suitable_placement_group( """ pass - def are_placement_groups_compatible_with_reservations(self) -> bool: + def are_placement_groups_compatible_with_reservations(self, backend_type: BackendType) -> bool: """ Whether placement groups can be used for instances provisioned in reservations. + + Arguments: + backend_type: matches the backend type of this compute, unless this compute is a proxy + for other backends (dstack Sky) """ return True diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index 4d1d563c1..efda11266 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -528,7 +528,7 @@ def is_suitable_placement_group( ) -> bool: return placement_group.configuration.region == instance_offer.region - def are_placement_groups_compatible_with_reservations(self) -> bool: + def are_placement_groups_compatible_with_reservations(self, backend_type: BackendType) -> bool: # Cannot use our own placement policies when provisioning in a reservation. # Instead, we use the placement policy defined in reservation settings. return False diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index eff82c02e..7a0815f3b 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -560,7 +560,7 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No and instance_offer.backend in BACKENDS_WITH_PLACEMENT_GROUPS_SUPPORT and isinstance(compute, ComputeWithPlacementGroupSupport) and ( - compute.are_placement_groups_compatible_with_reservations() + compute.are_placement_groups_compatible_with_reservations(instance_offer.backend) or instance_configuration.reservation is None ) and instance.fleet