From ee5de14c0490fa046cc56d4f7d94a09a31289b6f Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 20 Nov 2025 14:58:42 +0500 Subject: [PATCH 01/10] WIP: Support lazy max_offers for get_offers_by_requirements --- .../server/services/backends/__init__.py | 16 +- .../_internal/server/services/offers.py | 139 +++++++++--------- 2 files changed, 79 insertions(+), 76 deletions(-) diff --git a/src/dstack/_internal/server/services/backends/__init__.py b/src/dstack/_internal/server/services/backends/__init__.py index 38350d9ca..f515ac156 100644 --- a/src/dstack/_internal/server/services/backends/__init__.py +++ b/src/dstack/_internal/server/services/backends/__init__.py @@ -1,5 +1,6 @@ import asyncio import heapq +from collections.abc import Iterator from typing import Callable, Coroutine, Dict, List, Optional, Tuple from uuid import UUID @@ -338,11 +339,13 @@ async def get_project_backend_model_by_type_or_error( return backend_model -async def get_instance_offers( - backends: List[Backend], requirements: Requirements, exclude_not_available: bool = False -) -> List[Tuple[Backend, InstanceOfferWithAvailability]]: +async def get_backend_offers( + backends: List[Backend], + requirements: Requirements, + exclude_not_available: bool = False, +) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]: """ - Returns list of instances satisfying minimal resource requirements sorted by price + Yields backend offers satisfying `requirements` sorted by price. """ logger.info("Requesting instance offers from backends: %s", [b.TYPE.value for b in backends]) tasks = [run_async(backend.compute().get_offers, requirements) for backend in backends] @@ -369,10 +372,9 @@ async def get_instance_offers( if not exclude_not_available or offer.availability.is_available() ] ) - # Merge preserving order for every backend + # Merge preserving order for every backend. offers = heapq.merge(*offers_by_backend, key=lambda i: i[1].price) - # Put NOT_AVAILABLE, NO_QUOTA, and BUSY instances at the end, do not sort by price - return sorted(offers, key=lambda i: not i[1].availability.is_available()) + return offers def check_backend_type_available(backend_type: BackendType): diff --git a/src/dstack/_internal/server/services/offers.py b/src/dstack/_internal/server/services/offers.py index ba8f8bae9..50185efa4 100644 --- a/src/dstack/_internal/server/services/offers.py +++ b/src/dstack/_internal/server/services/offers.py @@ -1,3 +1,5 @@ +import itertools +from collections.abc import Iterable, Iterator from typing import List, Literal, Optional, Tuple, Union import gpuhunt @@ -36,17 +38,10 @@ async def get_offers_by_requirements( instance_mounts: bool = False, placement_group: Optional[PlacementGroup] = None, blocks: Union[int, Literal["auto"]] = 1, + max_offers: Optional[int] = None, ) -> List[Tuple[Backend, InstanceOfferWithAvailability]]: backends: List[Backend] = await backends_services.get_project_backends(project=project) - # For backward-compatibility to show offers if users set `backends: [dstack]` - if ( - profile.backends is not None - and len(profile.backends) == 1 - and BackendType.DSTACK in profile.backends - ): - profile.backends = None - backend_types = profile.backends regions = profile.regions availability_zones = profile.availability_zones @@ -98,13 +93,13 @@ async def get_offers_by_requirements( if backend_types is not None: backends = [b for b in backends if b.TYPE in backend_types or b.TYPE == BackendType.DSTACK] - offers = await backends_services.get_instance_offers( + offers = await backends_services.get_backend_offers( backends=backends, requirements=requirements, exclude_not_available=exclude_not_available, ) - offers = filter_offers( + offers = _filter_offers( offers=offers, # Double filtering by backends if backend returns offers for other backend. backend_types=backend_types, @@ -114,67 +109,14 @@ async def get_offers_by_requirements( placement_group=placement_group, ) - if blocks == 1: - return offers - - shareable_offers = [] - for backend, offer in offers: - resources = offer.instance.resources - cpu_count = resources.cpus - gpu_count = len(resources.gpus) - if gpu_count > 0 and resources.gpus[0].vendor == gpuhunt.AcceleratorVendor.GOOGLE: - # TPUs cannot be shared - gpu_count = 1 - divisible, _blocks = is_divisible_into_blocks(cpu_count, gpu_count, blocks) - if not divisible: - continue - offer.total_blocks = _blocks - shareable_offers.append((backend, offer)) - return shareable_offers - - -def filter_offers( - offers: List[Tuple[Backend, InstanceOfferWithAvailability]], - backend_types: Optional[List[BackendType]] = None, - regions: Optional[List[str]] = None, - availability_zones: Optional[List[str]] = None, - instance_types: Optional[List[str]] = None, - placement_group: Optional[PlacementGroup] = None, -) -> List[Tuple[Backend, InstanceOfferWithAvailability]]: - if backend_types is not None: - offers = [(b, o) for b, o in offers if o.backend in backend_types] + if blocks != 1: + offers = _get_shareable_offers(offers, blocks) - if regions is not None: - regions = [r.lower() for r in regions] - offers = [(b, o) for b, o in offers if o.region.lower() in regions] - - if availability_zones is not None: - new_offers = [] - for b, o in offers: - if o.availability_zones is not None: - new_offer = o.copy() - new_offer.availability_zones = [ - z for z in o.availability_zones if z in availability_zones - ] - if new_offer.availability_zones: - new_offers.append((b, new_offer)) - offers = new_offers - - if instance_types is not None: - instance_types = [i.lower() for i in instance_types] - offers = [(b, o) for b, o in offers if o.instance.name.lower() in instance_types] - - if placement_group is not None: - new_offers = [] - for b, o in offers: - compute = b.compute() - if isinstance( - compute, ComputeWithPlacementGroupSupport - ) and compute.is_suitable_placement_group(placement_group, o): - new_offers.append((b, o)) - offers = new_offers + if max_offers is not None: + offers = itertools.islice(offers, max_offers) - return offers + # Put NOT_AVAILABLE and NO_QUOTA offers at the end. + return sorted(offers, key=lambda i: not i[1].availability.is_available()) def is_divisible_into_blocks( @@ -239,3 +181,62 @@ def get_instance_offer_with_restricted_az( if z == master_job_provisioning_data.availability_zone ] return instance_offer + + +def _filter_offers( + offers: Iterable[Tuple[Backend, InstanceOfferWithAvailability]], + backend_types: Optional[List[BackendType]] = None, + regions: Optional[List[str]] = None, + availability_zones: Optional[List[str]] = None, + instance_types: Optional[List[str]] = None, + placement_group: Optional[PlacementGroup] = None, +) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]: + """ + Yields filtered offers. May change offer attributes to match the filters. + """ + if regions is not None: + regions = [r.lower() for r in regions] + if instance_types is not None: + instance_types = [i.lower() for i in instance_types] + + for b, offer in offers: + if backend_types is not None and offer.backend not in backend_types: + continue + if regions is not None and offer.region.lower() not in regions: + continue + if instance_types is not None and offer.instance.name.lower() not in instance_types: + continue + if placement_group is not None: + compute = b.compute() + if not isinstance( + compute, ComputeWithPlacementGroupSupport + ) or not compute.is_suitable_placement_group(placement_group, offer): + continue + if availability_zones is not None: + if offer.availability_zones is None: + continue + offer.availability_zones = [ + z for z in offer.availability_zones if z in availability_zones + ] + yield (b, offer) + + +def _get_shareable_offers( + offers: Iterable[Tuple[Backend, InstanceOfferWithAvailability]], + blocks: Union[int, Literal["auto"]], +) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]: + """ + Yields offers that can be shared with `total_blocks` set. + """ + for backend, offer in offers: + resources = offer.instance.resources + cpu_count = resources.cpus + gpu_count = len(resources.gpus) + if gpu_count > 0 and resources.gpus[0].vendor == gpuhunt.AcceleratorVendor.GOOGLE: + # TPUs cannot be shared + gpu_count = 1 + divisible, total_blocks = is_divisible_into_blocks(cpu_count, gpu_count, blocks) + if not divisible: + continue + offer.total_blocks = total_blocks + yield (backend, offer) From fa15d898c641649f9316d7fd10bbc13cd3ccbbcd Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Thu, 20 Nov 2025 15:49:56 +0500 Subject: [PATCH 02/10] Implement get_offers iterator for ComputeWithAllOffersCached --- .../_internal/core/backends/base/compute.py | 16 +++++++--------- .../_internal/core/backends/base/offers.py | 9 ++++----- src/dstack/_internal/server/services/offers.py | 2 ++ .../_internal/server/services/runs/plan.py | 5 +++++ 4 files changed, 18 insertions(+), 14 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 29129f6cb..b10384eec 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -4,7 +4,7 @@ import string import threading from abc import ABC, abstractmethod -from collections.abc import Iterable +from collections.abc import Iterable, Iterator from enum import Enum from functools import lru_cache from pathlib import Path @@ -189,13 +189,13 @@ def get_offers_post_filter( """ return None - def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: - offers = self._get_all_offers_with_availability_cached() - offers = self.__apply_modifiers(offers, self.get_offers_modifiers(requirements)) + def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]: + cached_offers = self._get_all_offers_with_availability_cached() + offers = self.__apply_modifiers(cached_offers, self.get_offers_modifiers(requirements)) offers = filter_offers_by_requirements(offers, requirements) post_filter = self.get_offers_post_filter(requirements) if post_filter is not None: - offers = [o for o in offers if post_filter(o)] + offers = (o for o in offers if post_filter(o)) return offers @cachedmethod( @@ -208,16 +208,14 @@ def _get_all_offers_with_availability_cached(self) -> List[InstanceOfferWithAvai @staticmethod def __apply_modifiers( offers: Iterable[InstanceOfferWithAvailability], modifiers: Iterable[OfferModifier] - ) -> list[InstanceOfferWithAvailability]: - modified_offers = [] + ) -> Iterator[InstanceOfferWithAvailability]: for offer in offers: for modifier in modifiers: offer = modifier(offer) if offer is None: break else: - modified_offers.append(offer) - return modified_offers + yield offer class ComputeWithFilteredOffersCached(ABC): diff --git a/src/dstack/_internal/core/backends/base/offers.py b/src/dstack/_internal/core/backends/base/offers.py index 4ecc73bef..de1d7c487 100644 --- a/src/dstack/_internal/core/backends/base/offers.py +++ b/src/dstack/_internal/core/backends/base/offers.py @@ -1,3 +1,4 @@ +from collections.abc import Iterable, Iterator from dataclasses import asdict from typing import Callable, List, Optional, TypeVar @@ -174,16 +175,14 @@ def requirements_to_query_filter(req: Optional[Requirements]) -> gpuhunt.QueryFi def filter_offers_by_requirements( - offers: List[InstanceOfferT], + offers: Iterable[InstanceOfferT], requirements: Optional[Requirements], -) -> List[InstanceOfferT]: +) -> Iterator[InstanceOfferT]: query_filter = requirements_to_query_filter(requirements) - filtered_offers = [] for offer in offers: catalog_item = offer_to_catalog_item(offer) if gpuhunt.matches(catalog_item, q=query_filter): - filtered_offers.append(offer) - return filtered_offers + yield offer def choose_disk_size_mib( diff --git a/src/dstack/_internal/server/services/offers.py b/src/dstack/_internal/server/services/offers.py index 50185efa4..e3b83f821 100644 --- a/src/dstack/_internal/server/services/offers.py +++ b/src/dstack/_internal/server/services/offers.py @@ -116,6 +116,8 @@ async def get_offers_by_requirements( offers = itertools.islice(offers, max_offers) # Put NOT_AVAILABLE and NO_QUOTA offers at the end. + # We have to do this after taking max_offers to avoid processing all offers + # if all/most offers are unavailable. return sorted(offers, key=lambda i: not i[1].availability.is_available()) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index b19876bd6..504827132 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -59,6 +59,10 @@ DEFAULT_MAX_OFFERS = 50 +# To avoid too many offers from being processed per fleet when searching for optimal fleet. +# Without the limit, time and peak memory usage spike since +# they grow linearly with the number of fleets. +PER_FLEET_MAX_OFFERS = 100 async def get_job_plans( @@ -309,6 +313,7 @@ async def find_optimal_fleet_with_offers( volumes=volumes, privileged=job.job_spec.privileged, instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + max_offers=PER_FLEET_MAX_OFFERS, ) available_backend_offers = _exclude_non_available_backend_offers(backend_offers) From 68bdaf291e61b78437931fed5531811be176fbe1 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Fri, 21 Nov 2025 15:05:22 +0500 Subject: [PATCH 03/10] Refetch backend offers without limit to return all offers for the optimal fleet --- .../_internal/server/services/offers.py | 13 +- .../_internal/server/services/runs/plan.py | 128 +++++++++++------- 2 files changed, 91 insertions(+), 50 deletions(-) diff --git a/src/dstack/_internal/server/services/offers.py b/src/dstack/_internal/server/services/offers.py index e3b83f821..13b093cd1 100644 --- a/src/dstack/_internal/server/services/offers.py +++ b/src/dstack/_internal/server/services/offers.py @@ -194,7 +194,7 @@ def _filter_offers( placement_group: Optional[PlacementGroup] = None, ) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]: """ - Yields filtered offers. May change offer attributes to match the filters. + Yields filtered offers. May return modified offers to match the filters. """ if regions is not None: regions = [r.lower() for r in regions] @@ -217,9 +217,13 @@ def _filter_offers( if availability_zones is not None: if offer.availability_zones is None: continue - offer.availability_zones = [ + new_offer = offer.copy() + new_offer.availability_zones = [ z for z in offer.availability_zones if z in availability_zones ] + if not new_offer.availability_zones: + continue + offer = new_offer yield (b, offer) @@ -240,5 +244,6 @@ def _get_shareable_offers( divisible, total_blocks = is_divisible_into_blocks(cpu_count, gpu_count, blocks) if not divisible: continue - offer.total_blocks = total_blocks - yield (backend, offer) + new_offer = offer.copy() + new_offer.total_blocks = total_blocks + yield (backend, new_offer) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index 504827132..cf1023c12 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -1,5 +1,5 @@ import math -from typing import List, Optional, Union +from typing import Optional, Union from sqlalchemy import and_, not_, or_, select from sqlalchemy.ext.asyncio import AsyncSession @@ -58,11 +58,11 @@ logger = get_logger(__name__) -DEFAULT_MAX_OFFERS = 50 +_DEFAULT_MAX_OFFERS = 50 # To avoid too many offers from being processed per fleet when searching for optimal fleet. # Without the limit, time and peak memory usage spike since # they grow linearly with the number of fleets. -PER_FLEET_MAX_OFFERS = 100 +_PER_FLEET_MAX_OFFERS = 100 async def get_job_plans( @@ -232,7 +232,7 @@ async def find_optimal_fleet_with_offers( """ if run_model is not None and run_model.fleet is not None: # Using the fleet that was already chosen by the master job - instance_offers = _get_run_fleet_instance_offers( + instance_offers = _get_instance_offers_in_fleet( fleet_model=run_model.fleet, run_spec=run_spec, job=job, @@ -252,7 +252,7 @@ async def find_optimal_fleet_with_offers( # TODO: Consider trying all backend offers and then choosing a fleet. candidate_fleets_with_offers: list[ tuple[ - Optional[FleetModel], + FleetModel, list[tuple[InstanceModel, InstanceOfferWithAvailability]], list[tuple[Backend, InstanceOfferWithAvailability]], int, @@ -269,7 +269,11 @@ async def find_optimal_fleet_with_offers( # Limit multinode runs to cluster fleets to guarantee best connectivity. continue - instance_offers = _get_run_fleet_instance_offers( + if not _run_can_fit_into_fleet(run_spec, candidate_fleet): + logger.debug("Skipping fleet %s from consideration: run cannot fit into fleet") + continue + + instance_offers = _get_instance_offers_in_fleet( fleet_model=candidate_fleet_model, run_spec=run_spec, job=job, @@ -286,35 +290,15 @@ async def find_optimal_fleet_with_offers( available_instance_offers ) - try: - check_can_create_new_cloud_instance_in_fleet(candidate_fleet) - profile, requirements = get_run_profile_and_requirements_in_fleet( - job=job, - run_spec=run_spec, - fleet=candidate_fleet, - ) - except ValueError: - backend_offers = [] - else: - # Master job offers must be in the same cluster as existing instances. - master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( - fleet_model=candidate_fleet_model, - fleet_spec=candidate_fleet.spec, - ) - # Handle multinode for old jobs that don't have requirements.multinode set. - # TODO: Drop multinode param. - multinode = requirements.multinode or is_multinode_job(job) - backend_offers = await get_offers_by_requirements( - project=project, - profile=profile, - requirements=requirements, - multinode=multinode, - master_job_provisioning_data=master_instance_provisioning_data, - volumes=volumes, - privileged=job.job_spec.privileged, - instance_mounts=check_run_spec_requires_instance_mounts(run_spec), - max_offers=PER_FLEET_MAX_OFFERS, - ) + backend_offers = await _get_backend_offers_in_fleet( + project=project, + fleet_model=candidate_fleet_model, + fleet=candidate_fleet, + run_spec=run_spec, + job=job, + volumes=volumes, + max_offers=_PER_FLEET_MAX_OFFERS, + ) available_backend_offers = _exclude_non_available_backend_offers(backend_offers) if exclude_not_available: @@ -323,10 +307,6 @@ async def find_optimal_fleet_with_offers( available_backend_offers ) - if not _run_can_fit_into_fleet(run_spec, candidate_fleet): - logger.debug("Skipping fleet %s from consideration: run cannot fit into fleet") - continue - fleet_priority = ( not has_pool_capacity, min_instance_offer_price, @@ -342,8 +322,10 @@ async def find_optimal_fleet_with_offers( fleet_priority, ) ) + if len(candidate_fleets_with_offers) == 0: return None, [], [] + if ( not FeatureFlags.AUTOCREATED_FLEETS_DISABLED and run_spec.merged_profile.fleets is None @@ -354,8 +336,19 @@ async def find_optimal_fleet_with_offers( # This is for compatibility with non-fleet-first UX when runs created new fleets # if there are no instances to reuse. return None, [], [] + candidate_fleets_with_offers.sort(key=lambda t: t[-1]) - return candidate_fleets_with_offers[0][:3] + optimal_fleet_model, instance_offers, backend_offers = candidate_fleets_with_offers[0][:3] + # Refetch backend offers without limit to return all offers for the optimal fleet. + backend_offers = await _get_backend_offers_in_fleet( + project=project, + fleet_model=optimal_fleet_model, + run_spec=run_spec, + job=job, + volumes=volumes, + max_offers=None, + ) + return optimal_fleet_model, instance_offers, backend_offers def get_run_profile_and_requirements_in_fleet( @@ -399,12 +392,12 @@ async def _select_candidate_fleet_models( return fleet_models_with_instances + fleet_models_without_instances -def _get_run_fleet_instance_offers( +def _get_instance_offers_in_fleet( fleet_model: FleetModel, run_spec: RunSpec, job: Job, master_job_provisioning_data: Optional[JobProvisioningData] = None, - volumes: Optional[List[List[Volume]]] = None, + volumes: Optional[list[list[Volume]]] = None, ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: pool_instances = fleet_model.instances profile = run_spec.merged_profile @@ -466,12 +459,55 @@ def _run_can_fit_into_fleet(run_spec: RunSpec, fleet: Fleet) -> bool: return True +async def _get_backend_offers_in_fleet( + project: ProjectModel, + fleet_model: FleetModel, + run_spec: RunSpec, + job: Job, + volumes: Optional[list[list[Volume]]], + fleet: Optional[Fleet] = None, + max_offers: Optional[int] = None, +) -> list[tuple[Backend, InstanceOfferWithAvailability]]: + if fleet is None: + fleet = fleet_model_to_fleet(fleet_model) + try: + check_can_create_new_cloud_instance_in_fleet(fleet) + profile, requirements = get_run_profile_and_requirements_in_fleet( + job=job, + run_spec=run_spec, + fleet=fleet, + ) + except ValueError: + backend_offers = [] + else: + # Master job offers must be in the same cluster as existing instances. + master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( + fleet_model=fleet_model, + fleet_spec=fleet.spec, + ) + # Handle multinode for old jobs that don't have requirements.multinode set. + # TODO: Drop multinode param. + multinode = requirements.multinode or is_multinode_job(job) + backend_offers = await get_offers_by_requirements( + project=project, + profile=profile, + requirements=requirements, + multinode=multinode, + master_job_provisioning_data=master_instance_provisioning_data, + volumes=volumes, + privileged=job.job_spec.privileged, + instance_mounts=check_run_spec_requires_instance_mounts(run_spec), + max_offers=max_offers, + ) + return backend_offers + + async def _get_pool_offers( session: AsyncSession, project: ProjectModel, run_spec: RunSpec, job: Job, - volumes: List[List[Volume]], + volumes: list[list[Volume]], ) -> list[tuple[InstanceModel, InstanceOfferWithAvailability]]: pool_offers: list[tuple[InstanceModel, InstanceOfferWithAvailability]] = [] detaching_instances_ids = await get_instances_ids_with_detaching_volumes(session) @@ -508,7 +544,7 @@ async def _get_non_fleet_offers( profile: Profile, run_spec: RunSpec, job: Job, - volumes: List[List[Volume]], + volumes: list[list[Volume]], ) -> tuple[ list[tuple[InstanceModel, InstanceOfferWithAvailability]], list[tuple[Backend, InstanceOfferWithAvailability]], @@ -544,7 +580,7 @@ def _get_job_plan( job: Job, max_offers: Optional[int], ) -> JobPlan: - job_offers: List[InstanceOfferWithAvailability] = [] + job_offers: list[InstanceOfferWithAvailability] = [] job_offers.extend(offer for _, offer in instance_offers) if profile.creation_policy == CreationPolicy.REUSE_OR_CREATE: job_offers.extend(offer for _, offer in backend_offers) @@ -552,7 +588,7 @@ def _get_job_plan( remove_job_spec_sensitive_info(job.job_spec) return JobPlan( job_spec=job.job_spec, - offers=job_offers[: (max_offers or DEFAULT_MAX_OFFERS)], + offers=job_offers[: (max_offers or _DEFAULT_MAX_OFFERS)], total_offers=len(job_offers), max_price=max((offer.price for offer in job_offers), default=None), ) From 1ceed0fac5e14a0ba7b6cbe8734ad0e71d125757 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 24 Nov 2025 10:33:19 +0500 Subject: [PATCH 04/10] Drop tensordock compute --- .../core/backends/tensordock/compute.py | 122 ------------------ 1 file changed, 122 deletions(-) delete mode 100644 src/dstack/_internal/core/backends/tensordock/compute.py diff --git a/src/dstack/_internal/core/backends/tensordock/compute.py b/src/dstack/_internal/core/backends/tensordock/compute.py deleted file mode 100644 index 600ed1c73..000000000 --- a/src/dstack/_internal/core/backends/tensordock/compute.py +++ /dev/null @@ -1,122 +0,0 @@ -import json -from typing import List, Optional - -import requests - -from dstack._internal.core.backends.base.backend import Compute -from dstack._internal.core.backends.base.compute import ( - ComputeWithCreateInstanceSupport, - ComputeWithPrivilegedSupport, - generate_unique_instance_name, - get_shim_commands, -) -from dstack._internal.core.backends.base.offers import get_catalog_offers -from dstack._internal.core.backends.tensordock.api_client import TensorDockAPIClient -from dstack._internal.core.backends.tensordock.models import TensorDockConfig -from dstack._internal.core.errors import NoCapacityError -from dstack._internal.core.models.backends.base import BackendType -from dstack._internal.core.models.instances import ( - InstanceAvailability, - InstanceConfiguration, - InstanceOfferWithAvailability, -) -from dstack._internal.core.models.placement import PlacementGroup -from dstack._internal.core.models.runs import JobProvisioningData, Requirements -from dstack._internal.utils.logging import get_logger - -logger = get_logger(__name__) - - -# Undocumented but names of len 60 work -MAX_INSTANCE_NAME_LEN = 60 - - -class TensorDockCompute( - ComputeWithCreateInstanceSupport, - ComputeWithPrivilegedSupport, - Compute, -): - def __init__(self, config: TensorDockConfig): - super().__init__() - self.config = config - self.api_client = TensorDockAPIClient(config.creds.api_key, config.creds.api_token) - - def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: - offers = get_catalog_offers( - backend=BackendType.TENSORDOCK, - requirements=requirements, - ) - offers = [ - InstanceOfferWithAvailability( - **offer.dict(), availability=InstanceAvailability.AVAILABLE - ) - for offer in offers - ] - return offers - - def create_instance( - self, - instance_offer: InstanceOfferWithAvailability, - instance_config: InstanceConfiguration, - placement_group: Optional[PlacementGroup], - ) -> JobProvisioningData: - instance_name = generate_unique_instance_name( - instance_config, max_length=MAX_INSTANCE_NAME_LEN - ) - commands = get_shim_commands(authorized_keys=instance_config.get_public_keys()) - try: - resp = self.api_client.deploy_single( - instance_name=instance_name, - instance=instance_offer.instance, - cloudinit={ - "ssh_pwauth": False, # disable password auth - "users": [ - "default", - { - "name": "user", - "ssh_authorized_keys": instance_config.get_public_keys(), - }, - ], - "runcmd": [ - ["sh", "-c", " && ".join(commands)], - ], - "write_files": [ - { - "path": "/etc/docker/daemon.json", - "content": json.dumps( - { - "runtimes": { - "nvidia": { - "path": "nvidia-container-runtime", - "runtimeArgs": [], - } - }, - "exec-opts": ["native.cgroupdriver=cgroupfs"], - } - ), - } - ], - }, - ) - except requests.HTTPError as e: - logger.warning("Got error from tensordock: %s", e) - raise NoCapacityError() - return JobProvisioningData( - backend=instance_offer.backend, - instance_type=instance_offer.instance, - instance_id=resp["server"], - hostname=resp["ip"], - internal_ip=None, - region=instance_offer.region, - price=instance_offer.price, - username="user", - ssh_port={int(v): int(k) for k, v in resp["port_forwards"].items()}[22], - dockerized=True, - ssh_proxy=None, - backend_data=None, - ) - - def terminate_instance( - self, instance_id: str, region: str, backend_data: Optional[str] = None - ): - self.api_client.delete_single_if_exists(instance_id) From 99cb6f8db9fb324912f1e6b6c00aa4d7a926b785 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 24 Nov 2025 10:45:29 +0500 Subject: [PATCH 05/10] Update get_offers() signatures --- .../_internal/core/backends/base/compute.py | 6 +-- .../_internal/core/backends/local/compute.py | 42 +++++++++++-------- .../core/backends/template/compute.py.jinja | 7 ++-- 3 files changed, 31 insertions(+), 24 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index b10384eec..2fd2b98ce 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -94,7 +94,7 @@ class Compute(ABC): """ @abstractmethod - def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: + def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]: """ Returns offers with availability matching `requirements`. If the provider is added to gpuhunt, typically gets offers using `base.offers.get_catalog_offers()` @@ -239,8 +239,8 @@ def get_offers_by_requirements( """ pass - def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: - return self._get_offers_cached(requirements) + def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]: + return iter(self._get_offers_cached(requirements)) def _get_offers_cached_key(self, requirements: Requirements) -> int: # Requirements is not hashable, so we use a hack to get arguments hash diff --git a/src/dstack/_internal/core/backends/local/compute.py b/src/dstack/_internal/core/backends/local/compute.py index fbee6bef3..9af6bfd58 100644 --- a/src/dstack/_internal/core/backends/local/compute.py +++ b/src/dstack/_internal/core/backends/local/compute.py @@ -1,3 +1,4 @@ +from collections.abc import Iterator from typing import List, Optional from dstack._internal.core.backends.base.compute import ( @@ -18,7 +19,12 @@ ) from dstack._internal.core.models.placement import PlacementGroup from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run -from dstack._internal.core.models.volumes import Volume, VolumeProvisioningData +from dstack._internal.core.models.volumes import ( + Volume, + VolumeAttachmentData, + VolumeProvisioningData, +) +from dstack._internal.utils.common import get_or_error from dstack._internal.utils.logging import get_logger logger = get_logger(__name__) @@ -30,20 +36,18 @@ class LocalCompute( ComputeWithVolumeSupport, Compute, ): - def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]: - return [ - InstanceOfferWithAvailability( - backend=BackendType.LOCAL, - instance=InstanceType( - name="local", - resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False), - ), - region="local", - price=0.00, - availability=InstanceAvailability.AVAILABLE, - instance_runtime=InstanceRuntime.RUNNER, - ) - ] + def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]: + yield InstanceOfferWithAvailability( + backend=BackendType.LOCAL, + instance=InstanceType( + name="local", + resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False), + ), + region="local", + price=0.00, + availability=InstanceAvailability.AVAILABLE, + instance_runtime=InstanceRuntime.RUNNER, + ) def terminate_instance( self, instance_id: str, region: str, backend_data: Optional[str] = None @@ -98,7 +102,7 @@ def run_job( def register_volume(self, volume: Volume) -> VolumeProvisioningData: return VolumeProvisioningData( - volume_id=volume.volume_id, + volume_id=get_or_error(volume.volume_id), size_gb=volume.configuration.size_gb, ) @@ -111,8 +115,10 @@ def create_volume(self, volume: Volume) -> VolumeProvisioningData: def delete_volume(self, volume: Volume): pass - def attach_volume(self, volume: Volume, provisioning_data: JobProvisioningData): - pass + def attach_volume( + self, volume: Volume, provisioning_data: JobProvisioningData + ) -> VolumeAttachmentData: + return VolumeAttachmentData(device_name=None) def detach_volume( self, volume: Volume, provisioning_data: JobProvisioningData, force: bool = False diff --git a/src/dstack/_internal/core/backends/template/compute.py.jinja b/src/dstack/_internal/core/backends/template/compute.py.jinja index 49b51b0d4..e0a1d33aa 100644 --- a/src/dstack/_internal/core/backends/template/compute.py.jinja +++ b/src/dstack/_internal/core/backends/template/compute.py.jinja @@ -1,3 +1,4 @@ +from collections.abc import Iterator from typing import List, Optional from dstack._internal.core.backends.base.backend import Compute @@ -47,7 +48,7 @@ class {{ backend_name }}Compute( def get_offers( self, requirements: Requirements - ) -> List[InstanceOfferWithAvailability]: + ) -> Iterator[InstanceOfferWithAvailability]: # If the provider is added to gpuhunt, you'd typically get offers # using `get_catalog_offers()` and extend them with availability info. offers = get_catalog_offers( @@ -57,13 +58,13 @@ class {{ backend_name }}Compute( # configurable_disk_size=..., TODO: set in case of boot volume size limits ) # TODO: Add availability info to offers - return [ + return ( InstanceOfferWithAvailability( **offer.dict(), availability=InstanceAvailability.UNKNOWN, ) for offer in offers - ] + ) def create_instance( self, From 9cb8ed05d7461a3e5aef54b316504f140f681335 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 24 Nov 2025 13:35:43 +0500 Subject: [PATCH 06/10] Fix get_backend_offers() --- src/dstack/_internal/core/backends/base/compute.py | 5 +++-- src/dstack/_internal/server/services/backends/__init__.py | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/dstack/_internal/core/backends/base/compute.py b/src/dstack/_internal/core/backends/base/compute.py index 2fd2b98ce..a52ac59fd 100644 --- a/src/dstack/_internal/core/backends/base/compute.py +++ b/src/dstack/_internal/core/backends/base/compute.py @@ -97,8 +97,9 @@ class Compute(ABC): def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]: """ Returns offers with availability matching `requirements`. - If the provider is added to gpuhunt, typically gets offers using `base.offers.get_catalog_offers()` - and extends them with availability info. + If the provider is added to gpuhunt, typically gets offers using + `base.offers.get_catalog_offers()` and extends them with availability info. + It is called from async code in executor. It can block on call but not between yields. """ pass diff --git a/src/dstack/_internal/server/services/backends/__init__.py b/src/dstack/_internal/server/services/backends/__init__.py index f515ac156..58be0722c 100644 --- a/src/dstack/_internal/server/services/backends/__init__.py +++ b/src/dstack/_internal/server/services/backends/__init__.py @@ -366,11 +366,11 @@ async def get_backend_offers( ) continue offers_by_backend.append( - [ + ( (backend, offer) for offer in result if not exclude_not_available or offer.availability.is_available() - ] + ) ) # Merge preserving order for every backend. offers = heapq.merge(*offers_by_backend, key=lambda i: i[1].price) From 27a9532f3531917c1f488ee38f7e33106a2caa19 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 24 Nov 2025 14:47:59 +0500 Subject: [PATCH 07/10] Replace yield with iter for LocalCompute --- .../_internal/core/backends/local/compute.py | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/src/dstack/_internal/core/backends/local/compute.py b/src/dstack/_internal/core/backends/local/compute.py index 9af6bfd58..74d4b8a43 100644 --- a/src/dstack/_internal/core/backends/local/compute.py +++ b/src/dstack/_internal/core/backends/local/compute.py @@ -37,16 +37,20 @@ class LocalCompute( Compute, ): def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]: - yield InstanceOfferWithAvailability( - backend=BackendType.LOCAL, - instance=InstanceType( - name="local", - resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False), - ), - region="local", - price=0.00, - availability=InstanceAvailability.AVAILABLE, - instance_runtime=InstanceRuntime.RUNNER, + return iter( + [ + InstanceOfferWithAvailability( + backend=BackendType.LOCAL, + instance=InstanceType( + name="local", + resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False), + ), + region="local", + price=0.00, + availability=InstanceAvailability.AVAILABLE, + instance_runtime=InstanceRuntime.RUNNER, + ) + ] ) def terminate_instance( From 02ecec7d88d1ee56816d9aa7a14c4c4fcb62cf80 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 24 Nov 2025 15:49:24 +0500 Subject: [PATCH 08/10] Fix var capture by generator expression --- .../server/services/backends/__init__.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/src/dstack/_internal/server/services/backends/__init__.py b/src/dstack/_internal/server/services/backends/__init__.py index 58be0722c..5b9908e35 100644 --- a/src/dstack/_internal/server/services/backends/__init__.py +++ b/src/dstack/_internal/server/services/backends/__init__.py @@ -1,6 +1,6 @@ import asyncio import heapq -from collections.abc import Iterator +from collections.abc import Iterable, Iterator from typing import Callable, Coroutine, Dict, List, Optional, Tuple from uuid import UUID @@ -347,6 +347,15 @@ async def get_backend_offers( """ Yields backend offers satisfying `requirements` sorted by price. """ + + def get_filtered_offers_with_backends( + backend: Backend, + offers: Iterable[InstanceOfferWithAvailability], + ) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]: + for offer in offers: + if not exclude_not_available or offer.availability.is_available(): + yield (backend, offer) + logger.info("Requesting instance offers from backends: %s", [b.TYPE.value for b in backends]) tasks = [run_async(backend.compute().get_offers, requirements) for backend in backends] offers_by_backend = [] @@ -365,13 +374,7 @@ async def get_backend_offers( exc_info=result, ) continue - offers_by_backend.append( - ( - (backend, offer) - for offer in result - if not exclude_not_available or offer.availability.is_available() - ) - ) + offers_by_backend.append(get_filtered_offers_with_backends(backend, result)) # Merge preserving order for every backend. offers = heapq.merge(*offers_by_backend, key=lambda i: i[1].price) return offers From f49390320a45d6a0200c7aa54d63302143405e19 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 25 Nov 2025 11:28:27 +0500 Subject: [PATCH 09/10] Fix exclude_not_available ignored --- src/dstack/_internal/server/services/runs/plan.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dstack/_internal/server/services/runs/plan.py b/src/dstack/_internal/server/services/runs/plan.py index cf1023c12..58c3529d5 100644 --- a/src/dstack/_internal/server/services/runs/plan.py +++ b/src/dstack/_internal/server/services/runs/plan.py @@ -301,8 +301,6 @@ async def find_optimal_fleet_with_offers( ) available_backend_offers = _exclude_non_available_backend_offers(backend_offers) - if exclude_not_available: - backend_offers = available_backend_offers min_backend_offer_price = _get_min_instance_or_backend_offer_price( available_backend_offers ) @@ -338,7 +336,7 @@ async def find_optimal_fleet_with_offers( return None, [], [] candidate_fleets_with_offers.sort(key=lambda t: t[-1]) - optimal_fleet_model, instance_offers, backend_offers = candidate_fleets_with_offers[0][:3] + optimal_fleet_model, instance_offers = candidate_fleets_with_offers[0][:2] # Refetch backend offers without limit to return all offers for the optimal fleet. backend_offers = await _get_backend_offers_in_fleet( project=project, @@ -348,6 +346,8 @@ async def find_optimal_fleet_with_offers( volumes=volumes, max_offers=None, ) + if exclude_not_available: + backend_offers = _exclude_non_available_backend_offers(backend_offers) return optimal_fleet_model, instance_offers, backend_offers From db286ebf7299b0cf3711fd65470250da43098eb3 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Tue, 25 Nov 2025 11:31:49 +0500 Subject: [PATCH 10/10] Drop tensordock configurator import --- src/dstack/_internal/core/backends/configurators.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/dstack/_internal/core/backends/configurators.py b/src/dstack/_internal/core/backends/configurators.py index 6284dd0a5..eeb91d0ba 100644 --- a/src/dstack/_internal/core/backends/configurators.py +++ b/src/dstack/_internal/core/backends/configurators.py @@ -119,14 +119,6 @@ except ImportError: pass -try: - from dstack._internal.core.backends.tensordock.configurator import ( - TensorDockConfigurator, - ) - - _CONFIGURATOR_CLASSES.append(TensorDockConfigurator) -except ImportError: - pass try: from dstack._internal.core.backends.vastai.configurator import VastAIConfigurator