Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/dstack/_internal/core/backends/base/offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,14 @@ def get_catalog_offers(
configurable_disk_size: Range[Memory] = Range[Memory](min=Memory.parse("1GB"), max=None),
extra_filter: Optional[Callable[[InstanceOffer], bool]] = None,
catalog: Optional[gpuhunt.Catalog] = None,
catalog_item_filter: Optional[Callable[[gpuhunt.CatalogItem], bool]] = None,
) -> List[InstanceOffer]:
"""
Args:
catalog_item_filter: applied to raw catalog items before the conversion to
`InstanceOffer` models. Use it for filtering that can be done on raw catalog fields
to avoid expensive model construction for items that will be discarded.
"""
provider = backend.value
if backend == BackendType.DATACRUNCH:
provider = BackendType.VERDA.value # Backward compatibility
Expand All @@ -54,6 +61,8 @@ def get_catalog_offers(
for item in catalog.query(**asdict(q)):
if locations is not None and item.location not in locations:
continue
if catalog_item_filter is not None and not catalog_item_filter(item):
continue
offer = catalog_item_to_offer(backend, item, requirements, configurable_disk_size)
if offer is None:
continue
Expand Down
114 changes: 65 additions & 49 deletions src/dstack/_internal/core/backends/gcp/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import google.api_core.exceptions
import google.cloud.compute_v1 as compute_v1
import gpuhunt
from cachetools import TTLCache, cachedmethod
from google.cloud import tpu_v2
from google.cloud.compute_v1.types.compute import Instance
Expand Down Expand Up @@ -64,7 +65,6 @@
InstanceConfiguration,
InstanceOffer,
InstanceOfferWithAvailability,
InstanceType,
Resources,
)
from dstack._internal.core.models.placement import PlacementGroup, PlacementGroupProvisioningData
Expand Down Expand Up @@ -136,35 +136,37 @@ def __init__(self, config: GCPConfig):

def get_all_offers_with_availability(self) -> List[InstanceOfferWithAvailability]:
regions = get_or_error(self.config.regions)
zones_by_key: Dict[Tuple, List[str]] = {}
catalog_item_filter = _make_catalog_item_filter(regions, zones_by_key)
offers = get_catalog_offers(
backend=BackendType.GCP,
extra_filter=_supported_instances_and_zones(regions),
catalog_item_filter=catalog_item_filter,
)
quotas: Dict[str, Dict[str, float]] = defaultdict(dict)
for region in self.regions_client.list(project=self.config.project_id):
for quota in region.quotas:
quotas[region.name][quota.metric] = quota.limit - quota.usage

offer_keys_to_offers = {}
offers_with_availability = []
for offer in offers:
region = offer.region[:-2] # strip zone
key = (_unique_instance_name(offer.instance), region)
if key in offer_keys_to_offers:
offer_keys_to_offers[key].availability_zones.append(offer.region)
continue
gpu_name = (
offer.instance.resources.gpus[0].name if offer.instance.resources.gpus else None
)
key = _offer_dedup_key(
offer.instance.name, offer.instance.resources.spot, gpu_name, region
)
availability = InstanceAvailability.NO_QUOTA
if _has_gpu_quota(quotas[region], offer.instance.resources):
availability = InstanceAvailability.UNKNOWN
# todo quotas: cpu, memory, global gpu, tpu
offer_with_availability = InstanceOfferWithAvailability(
**offer.dict(),
availability=availability,
availability_zones=[offer.region],
availability_zones=zones_by_key.get(key, []),
)
offer_keys_to_offers[key] = offer_with_availability
offers_with_availability.append(offer_with_availability)
offers_with_availability[-1].region = region
offer_with_availability.region = region
return offers_with_availability

def get_offers_modifiers(self, requirements: Requirements) -> Iterable[OfferModifier]:
Expand Down Expand Up @@ -992,37 +994,62 @@ def _find_reservation(self, configured_name: str) -> dict[str, compute_v1.Reserv
)


def _supported_instances_and_zones(
def _is_supported_gcp_instance(instance_name: str, gpu_name: Optional[str]) -> bool:
"""Check if the instance is supported by dstack."""
if _is_tpu(instance_name) and not _is_single_host_tpu(instance_name):
return False
for family in [
"m4-",
"c4-",
"n4-",
"h3-",
"n2-",
"e2-medium",
"e2-standard-",
"e2-highmem-",
"e2-highcpu-",
"m1-",
"a2-",
"a3-",
"g2-",
]:
if instance_name.startswith(family):
return True
if gpu_name is not None and gpu_name not in {"K80", "P4"}:
return True
return False


def _offer_dedup_key(
instance_name: str, spot: bool, gpu_name: Optional[str], region: str
) -> Tuple[str, bool, Optional[str], str]:
"""Key for deduplicating GCP per-zone items into per-region offers."""
return (instance_name, spot, gpu_name, region)


def _make_catalog_item_filter(
regions: List[str],
) -> Optional[Callable[[InstanceOffer], bool]]:
def _filter(offer: InstanceOffer) -> bool:
# strip zone
if offer.region[:-2] not in regions:
zones_by_key: Dict[Tuple, List[str]],
) -> Callable[[gpuhunt.CatalogItem], bool]:
"""
Returns a filter that checks region, instance support, and deduplicates
per-zone items into per-region offers. Zones are collected in `zones_by_key`
so the caller can attach them to offers later.
"""
seen: set = set()

def _filter(item: gpuhunt.CatalogItem) -> bool:
region = item.location[:-2]
if region not in regions:
return False
# remove multi-host TPUs for initial release
if _is_tpu(offer.instance.name) and not _is_single_host_tpu(offer.instance.name):
if not _is_supported_gcp_instance(item.instance_name, item.gpu_name):
return False
for family in [
"m4-",
"c4-",
"n4-",
"h3-",
"n2-",
"e2-medium",
"e2-standard-",
"e2-highmem-",
"e2-highcpu-",
"m1-",
"a2-",
"a3-",
"g2-",
]:
if offer.instance.name.startswith(family):
return True
if offer.instance.resources.gpus:
if offer.instance.resources.gpus[0].name not in {"K80", "P4"}:
return True
return False
key = _offer_dedup_key(item.instance_name, item.spot, item.gpu_name, region)
zones_by_key.setdefault(key, []).append(item.location)
if key in seen:
return False
seen.add(key)
return True

return _filter

Expand Down Expand Up @@ -1090,17 +1117,6 @@ def _reservation_has_capacity(reservation: compute_v1.Reservation) -> bool:
)


def _unique_instance_name(instance: InstanceType) -> str:
if instance.resources.spot:
name = f"{instance.name}-spot"
else:
name = instance.name
if not instance.resources.gpus:
return name
gpu = instance.resources.gpus[0]
return f"{name}-{gpu.name}-{gpu.memory_mib}"


@dataclass
class GCPImage:
id: str
Expand Down
Loading