Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ def pretty_repr(self) -> str:
class Requirements(CoreModel):
# TODO: Make requirements' fields required
resources: ResourcesSpec
max_price: Optional[float]
spot: Optional[bool]
reservation: Optional[str]
max_price: Optional[float] = None
spot: Optional[bool] = None
reservation: Optional[str] = None

def pretty_format(self, resources_only: bool = False):
res = self.resources.pretty_format()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,9 +507,9 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
project=instance.project,
profile=profile,
requirements=requirements,
exclude_not_available=True,
fleet_model=instance.fleet,
blocks="auto" if instance.total_blocks is None else instance.total_blocks,
exclude_not_available=True,
)

if not offers and should_retry:
Expand Down Expand Up @@ -915,9 +915,8 @@ def _get_instance_offer_for_instance(
instance_offer.availability_zones = [
z
for z in instance_offer.availability_zones
if instance_offer.availability_zones == master_job_provisioning_data.availability_zone
if z == master_job_provisioning_data.availability_zone
]

return instance_offer


Expand Down
6 changes: 5 additions & 1 deletion src/dstack/_internal/server/services/fleets.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ async def get_plan(
project=project,
profile=spec.merged_profile,
requirements=_get_fleet_requirements(spec),
fleet_spec=spec,
blocks=spec.configuration.blocks,
)
offers = [offer for _, offer in offers_with_backends]
Expand All @@ -277,12 +278,15 @@ async def get_create_instance_offers(
project: ProjectModel,
profile: Profile,
requirements: Requirements,
exclude_not_available=False,
fleet_spec: Optional[FleetSpec] = None,
fleet_model: Optional[FleetModel] = None,
blocks: Union[int, Literal["auto"]] = 1,
exclude_not_available: bool = False,
) -> List[Tuple[Backend, InstanceOfferWithAvailability]]:
multinode = False
master_job_provisioning_data = None
if fleet_spec is not None:
multinode = fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER
if fleet_model is not None:
fleet = fleet_model_to_fleet(fleet_model)
multinode = fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER
Expand Down
14 changes: 7 additions & 7 deletions src/dstack/_internal/server/services/offers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,35 @@ async def get_offers_by_requirements(
if volumes:
mount_point_volumes = volumes[0]
volumes_backend_types = [v.configuration.backend for v in mount_point_volumes]
if not backend_types:
if backend_types is None:
backend_types = volumes_backend_types
backend_types = [b for b in backend_types if b in volumes_backend_types]
volumes_regions = [v.configuration.region for v in mount_point_volumes]
if not regions:
if regions is None:
regions = volumes_regions
regions = [r for r in regions if r in volumes_regions]

if multinode:
if not backend_types:
if backend_types is None:
backend_types = BACKENDS_WITH_MULTINODE_SUPPORT
backend_types = [b for b in backend_types if b in BACKENDS_WITH_MULTINODE_SUPPORT]

if privileged or instance_mounts:
if not backend_types:
if backend_types is None:
backend_types = BACKENDS_WITH_CREATE_INSTANCE_SUPPORT
backend_types = [b for b in backend_types if b in BACKENDS_WITH_CREATE_INSTANCE_SUPPORT]

if profile.reservation is not None:
if not backend_types:
if backend_types is None:
backend_types = BACKENDS_WITH_RESERVATION_SUPPORT
backend_types = [b for b in backend_types if b in BACKENDS_WITH_RESERVATION_SUPPORT]

# For multi-node, restrict backend and region.
# The default behavior is to provision all nodes in the same backend and region.
if master_job_provisioning_data is not None:
if not backend_types:
if backend_types is None:
backend_types = [master_job_provisioning_data.get_base_backend()]
if not regions:
if regions is None:
regions = [master_job_provisioning_data.region]
backend_types = [
b for b in backend_types if b == master_job_provisioning_data.get_base_backend()
Expand Down
6 changes: 3 additions & 3 deletions src/dstack/_internal/server/services/pools.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,19 +462,19 @@ def filter_pool_instances(
zones = [z for z in zones if z in volume_zones]

if multinode:
if not backend_types:
if backend_types is None:
backend_types = BACKENDS_WITH_MULTINODE_SUPPORT
backend_types = [b for b in backend_types if b in BACKENDS_WITH_MULTINODE_SUPPORT]

# For multi-node, restrict backend and region.
# The default behavior is to provision all nodes in the same backend and region.
if master_job_provisioning_data is not None:
if not backend_types:
if backend_types is None:
backend_types = [master_job_provisioning_data.get_base_backend()]
backend_types = [
b for b in backend_types if b == master_job_provisioning_data.get_base_backend()
]
if not regions:
if regions is None:
regions = [master_job_provisioning_data.region]
regions = [r for r in regions if r == master_job_provisioning_data.region]

Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/testing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,7 @@ def get_instance_offer_with_availability(
spot: bool = False,
blocks: int = 1,
total_blocks: int = 1,
availability_zones: Optional[List[str]] = None,
):
gpus = [Gpu(name="T4", memory_mib=16384, vendor=gpuhunt.AcceleratorVendor.NVIDIA)] * gpu_count
return InstanceOfferWithAvailability(
Expand All @@ -609,6 +610,7 @@ def get_instance_offer_with_availability(
region=region,
price=1,
availability=InstanceAvailability.AVAILABLE,
availability_zones=availability_zones,
blocks=blocks,
total_blocks=total_blocks,
)
Expand Down
167 changes: 167 additions & 0 deletions src/tests/_internal/server/services/test_offers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
from unittest.mock import Mock, patch

import pytest

from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.profiles import Profile
from dstack._internal.core.models.resources import ResourcesSpec
from dstack._internal.core.models.runs import Requirements
from dstack._internal.server.services.offers import get_offers_by_requirements
from dstack._internal.server.testing.common import (
get_instance_offer_with_availability,
get_volume,
get_volume_configuration,
)


class TestGetOffersByRequirements:
@pytest.mark.asyncio
async def test_returns_all_offers(self):
profile = Profile(name="test")
requirements = Requirements(resources=ResourcesSpec())
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
aws_backend_mock = Mock()
aws_backend_mock.TYPE = BackendType.AWS
aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS)
aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer]
runpod_backend_mock = Mock()
runpod_backend_mock.TYPE = BackendType.RUNPOD
runpod_offer = get_instance_offer_with_availability(backend=BackendType.RUNPOD)
runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [
runpod_offer
]
m.return_value = [aws_backend_mock, runpod_backend_mock]
res = await get_offers_by_requirements(
project=Mock(),
profile=profile,
requirements=requirements,
)
m.assert_awaited_once()
assert res == [(aws_backend_mock, aws_offer), (runpod_backend_mock, runpod_offer)]

@pytest.mark.asyncio
async def test_returns_multinode_offers(self):
profile = Profile(name="test")
requirements = Requirements(resources=ResourcesSpec())
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
aws_backend_mock = Mock()
aws_backend_mock.TYPE = BackendType.AWS
aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS)
aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer]
runpod_backend_mock = Mock()
runpod_backend_mock.TYPE = BackendType.RUNPOD
runpod_offer = get_instance_offer_with_availability(backend=BackendType.RUNPOD)
runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [
runpod_offer
]
m.return_value = [aws_backend_mock, runpod_backend_mock]
res = await get_offers_by_requirements(
project=Mock(),
profile=profile,
requirements=requirements,
multinode=True,
)
m.assert_awaited_once()
assert res == [(aws_backend_mock, aws_offer)]

@pytest.mark.asyncio
async def test_returns_volume_offers(self):
profile = Profile(name="test")
requirements = Requirements(resources=ResourcesSpec())
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
aws_backend_mock = Mock()
aws_backend_mock.TYPE = BackendType.AWS
aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS)
aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer]
runpod_backend_mock = Mock()
runpod_backend_mock.TYPE = BackendType.RUNPOD
runpod_offer1 = get_instance_offer_with_availability(
backend=BackendType.RUNPOD, region="eu"
)
runpod_offer2 = get_instance_offer_with_availability(
backend=BackendType.RUNPOD, region="us"
)
runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [
runpod_offer1,
runpod_offer2,
]
m.return_value = [aws_backend_mock, runpod_backend_mock]
res = await get_offers_by_requirements(
project=Mock(),
profile=profile,
requirements=requirements,
volumes=[
[
get_volume(
configuration=get_volume_configuration(
backend=BackendType.RUNPOD, region="us"
)
)
]
],
)
m.assert_awaited_once()
assert res == [(runpod_backend_mock, runpod_offer2)]

@pytest.mark.asyncio
async def test_returns_az_offers(self):
profile = Profile(name="test", availability_zones=["az1", "az3"])
requirements = Requirements(resources=ResourcesSpec())
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
aws_backend_mock = Mock()
aws_backend_mock.TYPE = BackendType.AWS
aws_offer1 = get_instance_offer_with_availability(
backend=BackendType.AWS, availability_zones=["az1"]
)
aws_offer2 = get_instance_offer_with_availability(
backend=BackendType.AWS, availability_zones=["az2"]
)
aws_offer3 = get_instance_offer_with_availability(
backend=BackendType.AWS, availability_zones=["az2", "az3"]
)
expected_aws_offer3 = aws_offer3.copy()
expected_aws_offer3.availability_zones = ["az3"]
aws_offer4 = get_instance_offer_with_availability(
backend=BackendType.AWS, availability_zones=None
)
aws_backend_mock.compute.return_value.get_offers_cached.return_value = [
aws_offer1,
aws_offer2,
aws_offer3,
aws_offer4,
]
m.return_value = [aws_backend_mock]
res = await get_offers_by_requirements(
project=Mock(),
profile=profile,
requirements=requirements,
)
m.assert_awaited_once()
assert res == [(aws_backend_mock, aws_offer1), (aws_backend_mock, expected_aws_offer3)]

@pytest.mark.asyncio
async def test_returns_no_offers_for_multinode_instance_mounts_and_non_multinode_backend(self):
# Regression test for https://github.com/dstackai/dstack/issues/2211
profile = Profile(name="test", backends=[BackendType.RUNPOD])
requirements = Requirements(resources=ResourcesSpec())
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
aws_backend_mock = Mock()
aws_backend_mock.TYPE = BackendType.AWS
aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS)
aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer]
runpod_backend_mock = Mock()
runpod_backend_mock.TYPE = BackendType.RUNPOD
runpod_offer = get_instance_offer_with_availability(backend=BackendType.RUNPOD)
runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [
runpod_offer
]
m.return_value = [aws_backend_mock, runpod_backend_mock]
res = await get_offers_by_requirements(
project=Mock(),
profile=profile,
requirements=requirements,
multinode=True,
instance_mounts=True,
)
m.assert_awaited_once()
assert res == []
Loading