From ecda61a517d09391fd46e6418788c83577abf9c3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 20 Apr 2026 14:06:41 +0500 Subject: [PATCH] Optimize GCP offers --- .../_internal/core/backends/base/offers.py | 9 ++ .../_internal/core/backends/gcp/compute.py | 114 ++++++++++-------- 2 files changed, 74 insertions(+), 49 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/offers.py b/src/dstack/_internal/core/backends/base/offers.py index 70d453658..6212037d8 100644 --- a/src/dstack/_internal/core/backends/base/offers.py +++ b/src/dstack/_internal/core/backends/base/offers.py @@ -38,7 +38,14 @@ def get_catalog_offers( configurable_disk_size: Range[Memory] = Range[Memory](min=Memory.parse("1GB"), max=None), extra_filter: Optional[Callable[[InstanceOffer], bool]] = None, catalog: Optional[gpuhunt.Catalog] = None, + catalog_item_filter: Optional[Callable[[gpuhunt.CatalogItem], bool]] = None, ) -> List[InstanceOffer]: + """ + Args: + catalog_item_filter: applied to raw catalog items before the conversion to + `InstanceOffer` models. Use it for filtering that can be done on raw catalog fields + to avoid expensive model construction for items that will be discarded. + """ provider = backend.value if backend == BackendType.DATACRUNCH: provider = BackendType.VERDA.value # Backward compatibility @@ -54,6 +61,8 @@ def get_catalog_offers( for item in catalog.query(**asdict(q)): if locations is not None and item.location not in locations: continue + if catalog_item_filter is not None and not catalog_item_filter(item): + continue offer = catalog_item_to_offer(backend, item, requirements, configurable_disk_size) if offer is None: continue diff --git a/src/dstack/_internal/core/backends/gcp/compute.py b/src/dstack/_internal/core/backends/gcp/compute.py index 7a0101ed0..37685303d 100644 --- a/src/dstack/_internal/core/backends/gcp/compute.py +++ b/src/dstack/_internal/core/backends/gcp/compute.py @@ -8,6 +8,7 @@ import google.api_core.exceptions import google.cloud.compute_v1 as compute_v1 +import gpuhunt from cachetools import TTLCache, cachedmethod from google.cloud import tpu_v2 from google.cloud.compute_v1.types.compute import Instance @@ -64,7 +65,6 @@ InstanceConfiguration, InstanceOffer, InstanceOfferWithAvailability, - InstanceType, Resources, ) from dstack._internal.core.models.placement import PlacementGroup, PlacementGroupProvisioningData @@ -136,23 +136,26 @@ def __init__(self, config: GCPConfig): def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability]: regions = get_or_error(self.config.regions) + zones_by_key: Dict[Tuple, List[str]] = {} + catalog_item_filter = _make_catalog_item_filter(regions, zones_by_key) offers = get_catalog_offers( backend=BackendType.GCP, - extra_filter=_supported_instances_and_zones(regions), + catalog_item_filter=catalog_item_filter, ) quotas: Dict[str, Dict[str, float]] = defaultdict(dict) for region in self.regions_client.list(project=self.config.project_id): for quota in region.quotas: quotas[region.name][quota.metric] = quota.limit - quota.usage - offer_keys_to_offers = {} offers_with_availability = [] for offer in offers: region = offer.region[:-2] # strip zone - key = (_unique_instance_name(offer.instance), region) - if key in offer_keys_to_offers: - offer_keys_to_offers[key].availability_zones.append(offer.region) - continue + gpu_name = ( + offer.instance.resources.gpus[0].name if offer.instance.resources.gpus else None + ) + key = _offer_dedup_key( + offer.instance.name, offer.instance.resources.spot, gpu_name, region + ) availability = InstanceAvailability.NO_QUOTA if _has_gpu_quota(quotas[region], offer.instance.resources): availability = InstanceAvailability.UNKNOWN @@ -160,11 +163,10 @@ def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability offer_with_availability = InstanceOfferWithAvailability( **offer.dict(), availability=availability, - availability_zones=[offer.region], + availability_zones=zones_by_key.get(key, []), ) - offer_keys_to_offers[key] = offer_with_availability offers_with_availability.append(offer_with_availability) - offers_with_availability[-1].region = region + offer_with_availability.region = region return offers_with_availability def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]: @@ -992,37 +994,62 @@ def _find_reservation(self, configured_name: str) -> dict[str, compute_v1.Reserv ) -def _supported_instances_and_zones( +def _is_supported_gcp_instance(instance_name: str, gpu_name: Optional[str]) -> bool: + """Check if the instance is supported by dstack.""" + if _is_tpu(instance_name) and not _is_single_host_tpu(instance_name): + return False + for family in [ + "m4-", + "c4-", + "n4-", + "h3-", + "n2-", + "e2-medium", + "e2-standard-", + "e2-highmem-", + "e2-highcpu-", + "m1-", + "a2-", + "a3-", + "g2-", + ]: + if instance_name.startswith(family): + return True + if gpu_name is not None and gpu_name not in {"K80", "P4"}: + return True + return False + + +def _offer_dedup_key( + instance_name: str, spot: bool, gpu_name: Optional[str], region: str +) -> Tuple[str, bool, Optional[str], str]: + """Key for deduplicating GCP per-zone items into per-region offers.""" + return (instance_name, spot, gpu_name, region) + + +def _make_catalog_item_filter( regions: List[str], -) -> Optional[Callable[[InstanceOffer], bool]]: - def _filter(offer: InstanceOffer) -> bool: - # strip zone - if offer.region[:-2] not in regions: + zones_by_key: Dict[Tuple, List[str]], +) -> Callable[[gpuhunt.CatalogItem], bool]: + """ + Returns a filter that checks region, instance support, and deduplicates + per-zone items into per-region offers. Zones are collected in `zones_by_key` + so the caller can attach them to offers later. + """ + seen: set = set() + + def _filter(item: gpuhunt.CatalogItem) -> bool: + region = item.location[:-2] + if region not in regions: return False - # remove multi-host TPUs for initial release - if _is_tpu(offer.instance.name) and not _is_single_host_tpu(offer.instance.name): + if not _is_supported_gcp_instance(item.instance_name, item.gpu_name): return False - for family in [ - "m4-", - "c4-", - "n4-", - "h3-", - "n2-", - "e2-medium", - "e2-standard-", - "e2-highmem-", - "e2-highcpu-", - "m1-", - "a2-", - "a3-", - "g2-", - ]: - if offer.instance.name.startswith(family): - return True - if offer.instance.resources.gpus: - if offer.instance.resources.gpus[0].name not in {"K80", "P4"}: - return True - return False + key = _offer_dedup_key(item.instance_name, item.spot, item.gpu_name, region) + zones_by_key.setdefault(key, []).append(item.location) + if key in seen: + return False + seen.add(key) + return True return _filter @@ -1090,17 +1117,6 @@ def _reservation_has_capacity(reservation: compute_v1.Reservation) -> bool: ) -def _unique_instance_name(instance: InstanceType) -> str: - if instance.resources.spot: - name = f"{instance.name}-spot" - else: - name = instance.name - if not instance.resources.gpus: - return name - gpu = instance.resources.gpus[0] - return f"{name}-{gpu.name}-{gpu.memory_mib}" - - @dataclass class GCPImage: id: str