From 443fab34b7b966a3fbd35f7d6ee327d264373777 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 17 Feb 2025 10:44:26 +0500 Subject: [PATCH 1/3] Fix and test offers and pool instances filtering --- src/dstack/_internal/core/models/runs.py | 6 +- .../_internal/server/services/offers.py | 14 +- src/dstack/_internal/server/services/pools.py | 6 +- src/dstack/_internal/server/testing/common.py | 2 + .../_internal/server/services/test_offers.py | 167 ++++++++++++++++++ .../_internal/server/services/test_pools.py | 106 ++++++++++- 6 files changed, 287 insertions(+), 14 deletions(-) create mode 100644 src/tests/_internal/server/services/test_offers.py diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 5c95ad2cf..0ce2670b0 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -150,9 +150,9 @@ def pretty_repr(self) -> str: class Requirements(CoreModel): # TODO: Make requirements' fields required resources: ResourcesSpec - max_price: Optional[float] - spot: Optional[bool] - reservation: Optional[str] + max_price: Optional[float] = None + spot: Optional[bool] = None + reservation: Optional[str] = None def pretty_format(self, resources_only: bool = False): res = self.resources.pretty_format() diff --git a/src/dstack/_internal/server/services/offers.py b/src/dstack/_internal/server/services/offers.py index 44bda7b0f..a0dcd7b48 100644 --- a/src/dstack/_internal/server/services/offers.py +++ b/src/dstack/_internal/server/services/offers.py @@ -50,35 +50,35 @@ async def get_offers_by_requirements( if volumes: mount_point_volumes = volumes[0] volumes_backend_types = [v.configuration.backend for v in mount_point_volumes] - if not backend_types: + if backend_types is None: backend_types = volumes_backend_types backend_types = [b for b in backend_types if b in volumes_backend_types] volumes_regions = [v.configuration.region for v in mount_point_volumes] - if not regions: + if regions is None: regions = volumes_regions regions = [r for r in regions if r in volumes_regions] if multinode: - if not backend_types: + if backend_types is None: backend_types = BACKENDS_WITH_MULTINODE_SUPPORT backend_types = [b for b in backend_types if b in BACKENDS_WITH_MULTINODE_SUPPORT] if privileged or instance_mounts: - if not backend_types: + if backend_types is None: backend_types = BACKENDS_WITH_CREATE_INSTANCE_SUPPORT backend_types = [b for b in backend_types if b in BACKENDS_WITH_CREATE_INSTANCE_SUPPORT] if profile.reservation is not None: - if not backend_types: + if backend_types is None: backend_types = BACKENDS_WITH_RESERVATION_SUPPORT backend_types = [b for b in backend_types if b in BACKENDS_WITH_RESERVATION_SUPPORT] # For multi-node, restrict backend and region. # The default behavior is to provision all nodes in the same backend and region. if master_job_provisioning_data is not None: - if not backend_types: + if backend_types is None: backend_types = [master_job_provisioning_data.get_base_backend()] - if not regions: + if regions is None: regions = [master_job_provisioning_data.region] backend_types = [ b for b in backend_types if b == master_job_provisioning_data.get_base_backend() diff --git a/src/dstack/_internal/server/services/pools.py b/src/dstack/_internal/server/services/pools.py index d21560188..18b801edd 100644 --- a/src/dstack/_internal/server/services/pools.py +++ b/src/dstack/_internal/server/services/pools.py @@ -462,19 +462,19 @@ def filter_pool_instances( zones = [z for z in zones if z in volume_zones] if multinode: - if not backend_types: + if backend_types is None: backend_types = BACKENDS_WITH_MULTINODE_SUPPORT backend_types = [b for b in backend_types if b in BACKENDS_WITH_MULTINODE_SUPPORT] # For multi-node, restrict backend and region. # The default behavior is to provision all nodes in the same backend and region. if master_job_provisioning_data is not None: - if not backend_types: + if backend_types is None: backend_types = [master_job_provisioning_data.get_base_backend()] backend_types = [ b for b in backend_types if b == master_job_provisioning_data.get_base_backend() ] - if not regions: + if regions is None: regions = [master_job_provisioning_data.region] regions = [r for r in regions if r == master_job_provisioning_data.region] diff --git a/src/dstack/_internal/server/testing/common.py b/src/dstack/_internal/server/testing/common.py index bb155c5a9..1b2d23de3 100644 --- a/src/dstack/_internal/server/testing/common.py +++ b/src/dstack/_internal/server/testing/common.py @@ -591,6 +591,7 @@ def get_instance_offer_with_availability( spot: bool = False, blocks: int = 1, total_blocks: int = 1, + availability_zones: Optional[List[str]] = None, ): gpus = [Gpu(name="T4", memory_mib=16384, vendor=gpuhunt.AcceleratorVendor.NVIDIA)] * gpu_count return InstanceOfferWithAvailability( @@ -609,6 +610,7 @@ def get_instance_offer_with_availability( region=region, price=1, availability=InstanceAvailability.AVAILABLE, + availability_zones=availability_zones, blocks=blocks, total_blocks=total_blocks, ) diff --git a/src/tests/_internal/server/services/test_offers.py b/src/tests/_internal/server/services/test_offers.py new file mode 100644 index 000000000..8c97a0e4f --- /dev/null +++ b/src/tests/_internal/server/services/test_offers.py @@ -0,0 +1,167 @@ +from unittest.mock import Mock, patch + +import pytest + +from dstack._internal.core.models.backends.base import BackendType +from dstack._internal.core.models.profiles import Profile +from dstack._internal.core.models.resources import ResourcesSpec +from dstack._internal.core.models.runs import Requirements +from dstack._internal.server.services.offers import get_offers_by_requirements +from dstack._internal.server.testing.common import ( + get_instance_offer_with_availability, + get_volume, + get_volume_configuration, +) + + +class TestGetOffersByRequirements: + @pytest.mark.asyncio + async def test_returns_all_offers(self): + profile = Profile(name="test") + requirements = Requirements(resources=ResourcesSpec()) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + aws_backend_mock = Mock() + aws_backend_mock.TYPE = BackendType.AWS + aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS) + aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer] + runpod_backend_mock = Mock() + runpod_backend_mock.TYPE = BackendType.RUNPOD + runpod_offer = get_instance_offer_with_availability(backend=BackendType.RUNPOD) + runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [ + runpod_offer + ] + m.return_value = [aws_backend_mock, runpod_backend_mock] + res = await get_offers_by_requirements( + project=Mock(), + profile=profile, + requirements=requirements, + ) + m.assert_awaited_once() + assert res == [(aws_backend_mock, aws_offer), (runpod_backend_mock, runpod_offer)] + + @pytest.mark.asyncio + async def test_returns_multinode_offers(self): + profile = Profile(name="test") + requirements = Requirements(resources=ResourcesSpec()) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + aws_backend_mock = Mock() + aws_backend_mock.TYPE = BackendType.AWS + aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS) + aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer] + runpod_backend_mock = Mock() + runpod_backend_mock.TYPE = BackendType.RUNPOD + runpod_offer = get_instance_offer_with_availability(backend=BackendType.RUNPOD) + runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [ + runpod_offer + ] + m.return_value = [aws_backend_mock, runpod_backend_mock] + res = await get_offers_by_requirements( + project=Mock(), + profile=profile, + requirements=requirements, + multinode=True, + ) + m.assert_awaited_once() + assert res == [(aws_backend_mock, aws_offer)] + + @pytest.mark.asyncio + async def test_returns_volume_offers(self): + profile = Profile(name="test") + requirements = Requirements(resources=ResourcesSpec()) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + aws_backend_mock = Mock() + aws_backend_mock.TYPE = BackendType.AWS + aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS) + aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer] + runpod_backend_mock = Mock() + runpod_backend_mock.TYPE = BackendType.RUNPOD + runpod_offer1 = get_instance_offer_with_availability( + backend=BackendType.RUNPOD, region="eu" + ) + runpod_offer2 = get_instance_offer_with_availability( + backend=BackendType.RUNPOD, region="us" + ) + runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [ + runpod_offer1, + runpod_offer2, + ] + m.return_value = [aws_backend_mock, runpod_backend_mock] + res = await get_offers_by_requirements( + project=Mock(), + profile=profile, + requirements=requirements, + volumes=[ + [ + get_volume( + configuration=get_volume_configuration( + backend=BackendType.RUNPOD, region="us" + ) + ) + ] + ], + ) + m.assert_awaited_once() + assert res == [(runpod_backend_mock, runpod_offer2)] + + @pytest.mark.asyncio + async def test_returns_az_offers(self): + profile = Profile(name="test", availability_zones=["az1", "az3"]) + requirements = Requirements(resources=ResourcesSpec()) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + aws_backend_mock = Mock() + aws_backend_mock.TYPE = BackendType.AWS + aws_offer1 = get_instance_offer_with_availability( + backend=BackendType.AWS, availability_zones=["az1"] + ) + aws_offer2 = get_instance_offer_with_availability( + backend=BackendType.AWS, availability_zones=["az2"] + ) + aws_offer3 = get_instance_offer_with_availability( + backend=BackendType.AWS, availability_zones=["az2", "az3"] + ) + expected_aws_offer3 = aws_offer3.copy() + expected_aws_offer3.availability_zones = ["az3"] + aws_offer4 = get_instance_offer_with_availability( + backend=BackendType.AWS, availability_zones=None + ) + aws_backend_mock.compute.return_value.get_offers_cached.return_value = [ + aws_offer1, + aws_offer2, + aws_offer3, + aws_offer4, + ] + m.return_value = [aws_backend_mock] + res = await get_offers_by_requirements( + project=Mock(), + profile=profile, + requirements=requirements, + ) + m.assert_awaited_once() + assert res == [(aws_backend_mock, aws_offer1), (aws_backend_mock, expected_aws_offer3)] + + @pytest.mark.asyncio + async def test_returns_no_offers_for_multinode_instance_mounts_and_non_multinode_backend(self): + # Regression test for https://github.com/dstackai/dstack/issues/2211 + profile = Profile(name="test", backends=[BackendType.RUNPOD]) + requirements = Requirements(resources=ResourcesSpec()) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + aws_backend_mock = Mock() + aws_backend_mock.TYPE = BackendType.AWS + aws_offer = get_instance_offer_with_availability(backend=BackendType.AWS) + aws_backend_mock.compute.return_value.get_offers_cached.return_value = [aws_offer] + runpod_backend_mock = Mock() + runpod_backend_mock.TYPE = BackendType.RUNPOD + runpod_offer = get_instance_offer_with_availability(backend=BackendType.RUNPOD) + runpod_backend_mock.compute.return_value.get_offers_cached.return_value = [ + runpod_offer + ] + m.return_value = [aws_backend_mock, runpod_backend_mock] + res = await get_offers_by_requirements( + project=Mock(), + profile=profile, + requirements=requirements, + multinode=True, + instance_mounts=True, + ) + m.assert_awaited_once() + assert res == [] diff --git a/src/tests/_internal/server/services/test_pools.py b/src/tests/_internal/server/services/test_pools.py index b02e38374..1dc96583a 100644 --- a/src/tests/_internal/server/services/test_pools.py +++ b/src/tests/_internal/server/services/test_pools.py @@ -7,11 +7,115 @@ from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.instances import InstanceStatus, InstanceType, Resources from dstack._internal.core.models.pools import Instance +from dstack._internal.core.models.profiles import Profile from dstack._internal.server.models import InstanceModel -from dstack._internal.server.testing.common import create_project, create_user +from dstack._internal.server.testing.common import ( + create_instance, + create_pool, + create_project, + create_user, + get_volume, + get_volume_configuration, +) from dstack._internal.utils.common import get_current_datetime +class TestFilterPoolInstances: + # TODO: Refactor filter_pool_instances to not depend on InstanceModel and simplify tests + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_returns_all_instances(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await create_pool(session=session, project=project) + aws_instance = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.AWS, + ) + runpod_instance = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.RUNPOD, + ) + instances = [aws_instance, runpod_instance] + res = services_pools.filter_pool_instances( + pool_instances=instances, + profile=Profile(name="test"), + ) + assert res == instances + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_returns_multinode_instances(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await create_pool(session=session, project=project) + aws_instance = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.AWS, + ) + runpod_instance = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.RUNPOD, + ) + instances = [aws_instance, runpod_instance] + res = services_pools.filter_pool_instances( + pool_instances=instances, + profile=Profile(name="test"), + multinode=True, + ) + assert res == [aws_instance] + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + async def test_returns_volume_instances(self, test_db, session: AsyncSession): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + pool = await create_pool(session=session, project=project) + aws_instance = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.AWS, + ) + runpod_instance1 = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.RUNPOD, + region="eu", + ) + runpod_instance2 = await create_instance( + session=session, + project=project, + pool=pool, + backend=BackendType.RUNPOD, + region="us", + ) + instances = [aws_instance, runpod_instance1, runpod_instance2] + res = services_pools.filter_pool_instances( + pool_instances=instances, + profile=Profile(name="test"), + volumes=[ + [ + get_volume( + configuration=get_volume_configuration( + backend=BackendType.RUNPOD, region="us" + ) + ) + ] + ], + ) + assert res == [runpod_instance2] + + class TestGenerateInstanceName: @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) From d8b2018d20ba337377ce9ff586065d96923ee0b1 Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 17 Feb 2025 11:30:29 +0500 Subject: [PATCH 2/3] Fix fleet plan offers --- .../_internal/server/background/tasks/process_instances.py | 2 +- src/dstack/_internal/server/services/fleets.py | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index fa798bdc4..cee25c3c4 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -507,9 +507,9 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No project=instance.project, profile=profile, requirements=requirements, - exclude_not_available=True, fleet_model=instance.fleet, blocks="auto" if instance.total_blocks is None else instance.total_blocks, + exclude_not_available=True, ) if not offers and should_retry: diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index fc06ad49a..405b6df70 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -257,6 +257,7 @@ async def get_plan( project=project, profile=spec.merged_profile, requirements=_get_fleet_requirements(spec), + fleet_spec=spec, blocks=spec.configuration.blocks, ) offers = [offer for _, offer in offers_with_backends] @@ -277,12 +278,15 @@ async def get_create_instance_offers( project: ProjectModel, profile: Profile, requirements: Requirements, - exclude_not_available=False, + fleet_spec: Optional[FleetSpec] = None, fleet_model: Optional[FleetModel] = None, blocks: Union[int, Literal["auto"]] = 1, + exclude_not_available: bool = False, ) -> List[Tuple[Backend, InstanceOfferWithAvailability]]: multinode = False master_job_provisioning_data = None + if fleet_spec is not None: + multinode = fleet_spec.configuration.placement == InstanceGroupPlacement.CLUSTER if fleet_model is not None: fleet = fleet_model_to_fleet(fleet_model) multinode = fleet.spec.configuration.placement == InstanceGroupPlacement.CLUSTER From f8031f6f7e519015b664106885c9ca3ae4d0238c Mon Sep 17 00:00:00 2001 From: Victor Skvortsov Date: Mon, 17 Feb 2025 11:30:58 +0500 Subject: [PATCH 3/3] Fix cluster fleets --- .../_internal/server/background/tasks/process_instances.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index cee25c3c4..e34e4af2f 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -915,9 +915,8 @@ def _get_instance_offer_for_instance( instance_offer.availability_zones = [ z for z in instance_offer.availability_zones - if instance_offer.availability_zones == master_job_provisioning_data.availability_zone + if z == master_job_provisioning_data.availability_zone ] - return instance_offer