Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions src/dstack/_internal/core/backends/base/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import string
import threading
from abc import ABC, abstractmethod
from collections.abc import Iterable
from collections.abc import Iterable, Iterator
from enum import Enum
from functools import lru_cache
from pathlib import Path
Expand Down Expand Up @@ -95,11 +95,12 @@ class Compute(ABC):
"""

@abstractmethod
def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]:
def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]:
"""
Returns offers with availability matching `requirements`.
If the provider is added to gpuhunt, typically gets offers using `base.offers.get_catalog_offers()`
and extends them with availability info.
If the provider is added to gpuhunt, typically gets offers using
`base.offers.get_catalog_offers()` and extends them with availability info.
It is called from async code in executor. It can block on call but not between yields.
"""
pass

Expand Down Expand Up @@ -190,13 +191,13 @@ def get_offers_post_filter(
"""
return None

def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]:
offers = self._get_all_offers_with_availability_cached()
offers = self.__apply_modifiers(offers, self.get_offers_modifiers(requirements))
def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]:
cached_offers = self._get_all_offers_with_availability_cached()
offers = self.__apply_modifiers(cached_offers, self.get_offers_modifiers(requirements))
offers = filter_offers_by_requirements(offers, requirements)
post_filter = self.get_offers_post_filter(requirements)
if post_filter is not None:
offers = [o for o in offers if post_filter(o)]
offers = (o for o in offers if post_filter(o))
return offers

@cachedmethod(
Expand All @@ -209,16 +210,14 @@ def _get_all_offers_with_availability_cached(self) -> List[InstanceOfferWithAvai
@staticmethod
def __apply_modifiers(
offers: Iterable[InstanceOfferWithAvailability], modifiers: Iterable[OfferModifier]
) -> list[InstanceOfferWithAvailability]:
modified_offers = []
) -> Iterator[InstanceOfferWithAvailability]:
for offer in offers:
for modifier in modifiers:
offer = modifier(offer)
if offer is None:
break
else:
modified_offers.append(offer)
return modified_offers
yield offer


class ComputeWithFilteredOffersCached(ABC):
Expand All @@ -242,8 +241,8 @@ def get_offers_by_requirements(
"""
pass

def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]:
return self._get_offers_cached(requirements)
def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]:
return iter(self._get_offers_cached(requirements))

def _get_offers_cached_key(self, requirements: Requirements) -> int:
# Requirements is not hashable, so we use a hack to get arguments hash
Expand Down
9 changes: 4 additions & 5 deletions src/dstack/_internal/core/backends/base/offers.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterable, Iterator
from dataclasses import asdict
from typing import Callable, List, Optional, TypeVar

Expand Down Expand Up @@ -174,16 +175,14 @@ def requirements_to_query_filter(req: Optional[Requirements]) -> gpuhunt.QueryFi


def filter_offers_by_requirements(
offers: List[InstanceOfferT],
offers: Iterable[InstanceOfferT],
requirements: Optional[Requirements],
) -> List[InstanceOfferT]:
) -> Iterator[InstanceOfferT]:
query_filter = requirements_to_query_filter(requirements)
filtered_offers = []
for offer in offers:
catalog_item = offer_to_catalog_item(offer)
if gpuhunt.matches(catalog_item, q=query_filter):
filtered_offers.append(offer)
return filtered_offers
yield offer


def choose_disk_size_mib(
Expand Down
8 changes: 0 additions & 8 deletions src/dstack/_internal/core/backends/configurators.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,6 @@
except ImportError:
pass

try:
from dstack._internal.core.backends.tensordock.configurator import (
TensorDockConfigurator,
)

_CONFIGURATOR_CLASSES.append(TensorDockConfigurator)
except ImportError:
pass

try:
from dstack._internal.core.backends.vastai.configurator import VastAIConfigurator
Expand Down
46 changes: 28 additions & 18 deletions src/dstack/_internal/core/backends/local/compute.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterator
from typing import List, Optional

from dstack._internal.core.backends.base.compute import (
Expand All @@ -18,7 +19,12 @@
)
from dstack._internal.core.models.placement import PlacementGroup
from dstack._internal.core.models.runs import Job, JobProvisioningData, Requirements, Run
from dstack._internal.core.models.volumes import Volume, VolumeProvisioningData
from dstack._internal.core.models.volumes import (
Volume,
VolumeAttachmentData,
VolumeProvisioningData,
)
from dstack._internal.utils.common import get_or_error
from dstack._internal.utils.logging import get_logger

logger = get_logger(__name__)
Expand All @@ -30,20 +36,22 @@ class LocalCompute(
ComputeWithVolumeSupport,
Compute,
):
def get_offers(self, requirements: Requirements) -> List[InstanceOfferWithAvailability]:
return [
InstanceOfferWithAvailability(
backend=BackendType.LOCAL,
instance=InstanceType(
name="local",
resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False),
),
region="local",
price=0.00,
availability=InstanceAvailability.AVAILABLE,
instance_runtime=InstanceRuntime.RUNNER,
)
]
def get_offers(self, requirements: Requirements) -> Iterator[InstanceOfferWithAvailability]:
return iter(
[
InstanceOfferWithAvailability(
backend=BackendType.LOCAL,
instance=InstanceType(
name="local",
resources=Resources(cpus=4, memory_mib=8192, gpus=[], spot=False),
),
region="local",
price=0.00,
availability=InstanceAvailability.AVAILABLE,
instance_runtime=InstanceRuntime.RUNNER,
)
]
)

def terminate_instance(
self, instance_id: str, region: str, backend_data: Optional[str] = None
Expand Down Expand Up @@ -98,7 +106,7 @@ def run_job(

def register_volume(self, volume: Volume) -> VolumeProvisioningData:
return VolumeProvisioningData(
volume_id=volume.volume_id,
volume_id=get_or_error(volume.volume_id),
size_gb=volume.configuration.size_gb,
)

Expand All @@ -111,8 +119,10 @@ def create_volume(self, volume: Volume) -> VolumeProvisioningData:
def delete_volume(self, volume: Volume):
pass

def attach_volume(self, volume: Volume, provisioning_data: JobProvisioningData):
pass
def attach_volume(
self, volume: Volume, provisioning_data: JobProvisioningData
) -> VolumeAttachmentData:
return VolumeAttachmentData(device_name=None)

def detach_volume(
self, volume: Volume, provisioning_data: JobProvisioningData, force: bool = False
Expand Down
7 changes: 4 additions & 3 deletions src/dstack/_internal/core/backends/template/compute.py.jinja
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Iterator
from typing import List, Optional

from dstack._internal.core.backends.base.backend import Compute
Expand Down Expand Up @@ -47,7 +48,7 @@ class {{ backend_name }}Compute(

def get_offers(
self, requirements: Requirements
) -> List[InstanceOfferWithAvailability]:
) -> Iterator[InstanceOfferWithAvailability]:
# If the provider is added to gpuhunt, you'd typically get offers
# using `get_catalog_offers()` and extend them with availability info.
offers = get_catalog_offers(
Expand All @@ -57,13 +58,13 @@ class {{ backend_name }}Compute(
# configurable_disk_size=..., TODO: set in case of boot volume size limits
)
# TODO: Add availability info to offers
return [
return (
InstanceOfferWithAvailability(
**offer.dict(),
availability=InstanceAvailability.UNKNOWN,
)
for offer in offers
]
)

def create_instance(
self,
Expand Down
122 changes: 0 additions & 122 deletions src/dstack/_internal/core/backends/tensordock/compute.py

This file was deleted.

33 changes: 19 additions & 14 deletions src/dstack/_internal/server/services/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import heapq
from collections.abc import Iterable, Iterator
from typing import Callable, Coroutine, Dict, List, Optional, Tuple
from uuid import UUID

Expand Down Expand Up @@ -338,12 +339,23 @@ async def get_project_backend_model_by_type_or_error(
return backend_model


async def get_instance_offers(
backends: List[Backend], requirements: Requirements, exclude_not_available: bool = False
) -> List[Tuple[Backend, InstanceOfferWithAvailability]]:
async def get_backend_offers(
backends: List[Backend],
requirements: Requirements,
exclude_not_available: bool = False,
) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]:
"""
Returns list of instances satisfying minimal resource requirements sorted by price
Yields backend offers satisfying `requirements` sorted by price.
"""

def get_filtered_offers_with_backends(
backend: Backend,
offers: Iterable[InstanceOfferWithAvailability],
) -> Iterator[Tuple[Backend, InstanceOfferWithAvailability]]:
for offer in offers:
if not exclude_not_available or offer.availability.is_available():
yield (backend, offer)

logger.info("Requesting instance offers from backends: %s", [b.TYPE.value for b in backends])
tasks = [run_async(backend.compute().get_offers, requirements) for backend in backends]
offers_by_backend = []
Expand All @@ -362,17 +374,10 @@ async def get_instance_offers(
exc_info=result,
)
continue
offers_by_backend.append(
[
(backend, offer)
for offer in result
if not exclude_not_available or offer.availability.is_available()
]
)
# Merge preserving order for every backend
offers_by_backend.append(get_filtered_offers_with_backends(backend, result))
# Merge preserving order for every backend.
offers = heapq.merge(*offers_by_backend, key=lambda i: i[1].price)
# Put NOT_AVAILABLE, NO_QUOTA, and BUSY instances at the end, do not sort by price
return sorted(offers, key=lambda i: not i[1].availability.is_available())
return offers


def check_backend_type_available(backend_type: BackendType):
Expand Down
Loading