diff --git a/src/dstack/_internal/server/background/tasks/process_instances.py b/src/dstack/_internal/server/background/tasks/process_instances.py index 51e9310d7..f4b57a855 100644 --- a/src/dstack/_internal/server/background/tasks/process_instances.py +++ b/src/dstack/_internal/server/background/tasks/process_instances.py @@ -64,6 +64,7 @@ Retry, ) from dstack._internal.core.services.profiles import get_retry +from dstack._internal.server import settings as server_settings from dstack._internal.server.background.tasks.common import get_provisioning_timeout from dstack._internal.server.db import get_session_ctx from dstack._internal.server.models import ( @@ -529,7 +530,9 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No session=session, fleet_id=instance.fleet_id ) - for backend, instance_offer in offers: + # Limit number of offers tried to prevent long-running processing + # in case all offers fail. + for backend, instance_offer in offers[: server_settings.MAX_OFFERS_TRIED]: if instance_offer.backend not in BACKENDS_WITH_CREATE_INSTANCE_SUPPORT: continue compute = backend.compute() @@ -578,8 +581,13 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No extra={"instance_name": instance.name}, ) continue - except NotImplementedError: - # skip a backend without create_instance support, continue with next backend and offer + except Exception: + logger.exception( + "Got exception when launching %s in %s/%s", + instance_offer.instance.name, + instance_offer.backend.value, + instance_offer.region, + ) continue instance.status = InstanceStatus.PROVISIONING @@ -607,10 +615,11 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No if not should_retry: instance.status = InstanceStatus.TERMINATED - instance.termination_reason = "No offers found" + instance.termination_reason = "All offers failed" if offers else "No offers found" logger.info( - "No offers found. Terminated instance %s", + "Terminated instance %s: %s", instance.name, + instance.termination_reason, extra={ "instance_name": instance.name, "instance_status": InstanceStatus.TERMINATED.value, diff --git a/src/tests/_internal/server/background/tasks/test_process_instances.py b/src/tests/_internal/server/background/tasks/test_process_instances.py index 91f878901..d71f8e9c1 100644 --- a/src/tests/_internal/server/background/tasks/test_process_instances.py +++ b/src/tests/_internal/server/background/tasks/test_process_instances.py @@ -8,7 +8,7 @@ from freezegun import freeze_time from sqlalchemy.ext.asyncio import AsyncSession -from dstack._internal.core.errors import BackendError +from dstack._internal.core.errors import BackendError, ProvisioningError from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.instances import ( Gpu, @@ -35,6 +35,8 @@ create_repo, create_run, create_user, + get_instance_offer_with_availability, + get_job_provisioning_data, get_remote_connection_info, ) from dstack._internal.utils.common import get_current_datetime @@ -557,6 +559,68 @@ async def test_creates_instance( assert instance.total_blocks == expected_blocks assert instance.busy_blocks == 0 + @pytest.mark.parametrize("err", [RuntimeError("Unexpected"), ProvisioningError("Expected")]) + async def test_tries_second_offer_if_first_fails(self, session: AsyncSession, err: Exception): + project = await create_project(session=session) + instance = await create_instance( + session=session, project=project, status=InstanceStatus.PENDING + ) + aws_mock = Mock() + aws_mock.TYPE = BackendType.AWS + offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0) + aws_mock.compute.return_value = Mock(spec=ComputeMockSpec) + aws_mock.compute.return_value.get_offers_cached.return_value = [offer] + aws_mock.compute.return_value.create_instance.side_effect = err + gcp_mock = Mock() + gcp_mock.TYPE = BackendType.GCP + offer = get_instance_offer_with_availability(backend=BackendType.GCP, price=2.0) + gcp_mock.compute.return_value = Mock(spec=ComputeMockSpec) + gcp_mock.compute.return_value.get_offers_cached.return_value = [offer] + gcp_mock.compute.return_value.create_instance.return_value = get_job_provisioning_data( + backend=offer.backend, region=offer.region, price=offer.price + ) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + m.return_value = [aws_mock, gcp_mock] + await process_instances() + + await session.refresh(instance) + assert instance.status == InstanceStatus.PROVISIONING + aws_mock.compute.return_value.create_instance.assert_called_once() + assert instance.backend == BackendType.GCP + + @pytest.mark.parametrize("err", [RuntimeError("Unexpected"), ProvisioningError("Expected")]) + async def test_fails_if_all_offers_fail(self, session: AsyncSession, err: Exception): + project = await create_project(session=session) + instance = await create_instance( + session=session, project=project, status=InstanceStatus.PENDING + ) + aws_mock = Mock() + aws_mock.TYPE = BackendType.AWS + offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0) + aws_mock.compute.return_value = Mock(spec=ComputeMockSpec) + aws_mock.compute.return_value.get_offers_cached.return_value = [offer] + aws_mock.compute.return_value.create_instance.side_effect = err + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + m.return_value = [aws_mock] + await process_instances() + + await session.refresh(instance) + assert instance.status == InstanceStatus.TERMINATED + assert instance.termination_reason == "All offers failed" + + async def test_fails_if_no_offers(self, session: AsyncSession): + project = await create_project(session=session) + instance = await create_instance( + session=session, project=project, status=InstanceStatus.PENDING + ) + with patch("dstack._internal.server.services.backends.get_project_backends") as m: + m.return_value = [] + await process_instances() + + await session.refresh(instance) + assert instance.status == InstanceStatus.TERMINATED + assert instance.termination_reason == "No offers found" + @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)