Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions src/dstack/_internal/server/background/tasks/process_instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
Retry,
)
from dstack._internal.core.services.profiles import get_retry
from dstack._internal.server import settings as server_settings
from dstack._internal.server.background.tasks.common import get_provisioning_timeout
from dstack._internal.server.db import get_session_ctx
from dstack._internal.server.models import (
Expand Down Expand Up @@ -529,7 +530,9 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
session=session, fleet_id=instance.fleet_id
)

for backend, instance_offer in offers:
# Limit number of offers tried to prevent long-running processing
# in case all offers fail.
for backend, instance_offer in offers[: server_settings.MAX_OFFERS_TRIED]:
if instance_offer.backend not in BACKENDS_WITH_CREATE_INSTANCE_SUPPORT:
continue
compute = backend.compute()
Expand Down Expand Up @@ -578,8 +581,13 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No
extra={"instance_name": instance.name},
)
continue
except NotImplementedError:
# skip a backend without create_instance support, continue with next backend and offer
except Exception:
logger.exception(
"Got exception when launching %s in %s/%s",
instance_offer.instance.name,
instance_offer.backend.value,
instance_offer.region,
)
continue

instance.status = InstanceStatus.PROVISIONING
Expand Down Expand Up @@ -607,10 +615,11 @@ async def _create_instance(session: AsyncSession, instance: InstanceModel) -> No

if not should_retry:
instance.status = InstanceStatus.TERMINATED
instance.termination_reason = "No offers found"
instance.termination_reason = "All offers failed" if offers else "No offers found"
logger.info(
"No offers found. Terminated instance %s",
"Terminated instance %s: %s",
instance.name,
instance.termination_reason,
extra={
"instance_name": instance.name,
"instance_status": InstanceStatus.TERMINATED.value,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from freezegun import freeze_time
from sqlalchemy.ext.asyncio import AsyncSession

from dstack._internal.core.errors import BackendError
from dstack._internal.core.errors import BackendError, ProvisioningError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.instances import (
Gpu,
Expand All @@ -35,6 +35,8 @@
create_repo,
create_run,
create_user,
get_instance_offer_with_availability,
get_job_provisioning_data,
get_remote_connection_info,
)
from dstack._internal.utils.common import get_current_datetime
Expand Down Expand Up @@ -557,6 +559,68 @@ async def test_creates_instance(
assert instance.total_blocks == expected_blocks
assert instance.busy_blocks == 0

@pytest.mark.parametrize("err", [RuntimeError("Unexpected"), ProvisioningError("Expected")])
async def test_tries_second_offer_if_first_fails(self, session: AsyncSession, err: Exception):
project = await create_project(session=session)
instance = await create_instance(
session=session, project=project, status=InstanceStatus.PENDING
)
aws_mock = Mock()
aws_mock.TYPE = BackendType.AWS
offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0)
aws_mock.compute.return_value = Mock(spec=ComputeMockSpec)
aws_mock.compute.return_value.get_offers_cached.return_value = [offer]
aws_mock.compute.return_value.create_instance.side_effect = err
gcp_mock = Mock()
gcp_mock.TYPE = BackendType.GCP
offer = get_instance_offer_with_availability(backend=BackendType.GCP, price=2.0)
gcp_mock.compute.return_value = Mock(spec=ComputeMockSpec)
gcp_mock.compute.return_value.get_offers_cached.return_value = [offer]
gcp_mock.compute.return_value.create_instance.return_value = get_job_provisioning_data(
backend=offer.backend, region=offer.region, price=offer.price
)
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
m.return_value = [aws_mock, gcp_mock]
await process_instances()

await session.refresh(instance)
assert instance.status == InstanceStatus.PROVISIONING
aws_mock.compute.return_value.create_instance.assert_called_once()
assert instance.backend == BackendType.GCP

@pytest.mark.parametrize("err", [RuntimeError("Unexpected"), ProvisioningError("Expected")])
async def test_fails_if_all_offers_fail(self, session: AsyncSession, err: Exception):
project = await create_project(session=session)
instance = await create_instance(
session=session, project=project, status=InstanceStatus.PENDING
)
aws_mock = Mock()
aws_mock.TYPE = BackendType.AWS
offer = get_instance_offer_with_availability(backend=BackendType.AWS, price=1.0)
aws_mock.compute.return_value = Mock(spec=ComputeMockSpec)
aws_mock.compute.return_value.get_offers_cached.return_value = [offer]
aws_mock.compute.return_value.create_instance.side_effect = err
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
m.return_value = [aws_mock]
await process_instances()

await session.refresh(instance)
assert instance.status == InstanceStatus.TERMINATED
assert instance.termination_reason == "All offers failed"

async def test_fails_if_no_offers(self, session: AsyncSession):
project = await create_project(session=session)
instance = await create_instance(
session=session, project=project, status=InstanceStatus.PENDING
)
with patch("dstack._internal.server.services.backends.get_project_backends") as m:
m.return_value = []
await process_instances()

await session.refresh(instance)
assert instance.status == InstanceStatus.TERMINATED
assert instance.termination_reason == "No offers found"


@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
Expand Down