Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
get_instance_offer,
get_instance_provisioning_data,
switch_instance_status,
try_atomic_busy_blocks_increment,
)
from dstack._internal.server.services.jobs import (
check_can_attach_job_volumes,
Expand Down Expand Up @@ -575,6 +576,22 @@ async def _apply_assignment_result(
return

instance_model, current_offer = current_instance_offers[0]
if not await try_atomic_busy_blocks_increment(
session=session,
instance_id=instance_model.id,
blocks=current_offer.blocks,
):
await _reset_job_lock_for_retry(session=session, item=item)
return

# Refetch instance to get updated busy_blocks from the atomic update
res = await session.execute(
select(InstanceModel)
.where(InstanceModel.id == instance_model.id)
.options(joinedload(InstanceModel.volume_attachments))
)
instance_model = res.unique().scalar_one()

_assign_instance_to_job(
session=session,
job_model=job_model,
Expand Down Expand Up @@ -880,7 +897,6 @@ def _assign_instance_to_job(
job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json()

switch_instance_status(session, instance_model, InstanceStatus.BUSY)
instance_model.busy_blocks += offer.blocks
events.emit(
session,
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
format_instance_blocks_for_event,
get_instance_provisioning_data,
switch_instance_status,
try_atomic_busy_blocks_increment,
)
from dstack._internal.server.services.jobs import (
check_can_attach_job_volumes,
Expand Down Expand Up @@ -1079,6 +1080,12 @@ async def _assign_existing_instance_to_job(

instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0)
instance, offer = instances_with_offers[0]
if not await try_atomic_busy_blocks_increment(
session=session, instance_id=instance.id, blocks=offer.blocks
):
job_model.instance_assigned = False
return

# Reload InstanceModel with volume attachments
res = await session.execute(
select(InstanceModel)
Expand All @@ -1087,7 +1094,6 @@ async def _assign_existing_instance_to_job(
)
instance = res.unique().scalar_one()
switch_instance_status(session, instance, InstanceStatus.BUSY)
instance.busy_blocks += offer.blocks

job_model.instance = instance
job_model.used_instance_id = instance.id
Expand Down
25 changes: 24 additions & 1 deletion src/dstack/_internal/server/services/instances.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import Dict, List, Literal, Optional, Union

import gpuhunt
from sqlalchemy import and_, exists, false, or_, select
from sqlalchemy import and_, exists, false, or_, select, update
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import joinedload, load_only

Expand Down Expand Up @@ -63,6 +63,29 @@
logger = get_logger(__name__)


async def try_atomic_busy_blocks_increment(
session: AsyncSession,
instance_id: uuid.UUID,
blocks: int,
) -> bool:
"""
Atomically increment busy_blocks if the instance has capacity.
Returns True if the update affected a row, False otherwise.
"""
res = await session.execute(
update(InstanceModel)
.where(
InstanceModel.id == instance_id,
or_(
InstanceModel.total_blocks.is_(None),
InstanceModel.busy_blocks + blocks <= InstanceModel.total_blocks,
),
)
.values(busy_blocks=InstanceModel.busy_blocks + blocks)
)
return res.rowcount > 0

Check failure on line 86 in src/dstack/_internal/server/services/instances.py

View workflow job for this annotation

GitHub Actions / build-artifacts / python-test (ubuntu-latest, 3.10)

Cannot access attribute "rowcount" for class "Result[Any]"   Attribute "rowcount" is unknown (reportAttributeAccessIssue)

Check failure on line 86 in src/dstack/_internal/server/services/instances.py

View workflow job for this annotation

GitHub Actions / build-artifacts / python-test (ubuntu-latest, 3.13)

Cannot access attribute "rowcount" for class "Result[Any]"   Attribute "rowcount" is unknown (reportAttributeAccessIssue)

Check failure on line 86 in src/dstack/_internal/server/services/instances.py

View workflow job for this annotation

GitHub Actions / build-artifacts / python-test (ubuntu-latest, 3.11)

Cannot access attribute "rowcount" for class "Result[Any]"   Attribute "rowcount" is unknown (reportAttributeAccessIssue)

Check failure on line 86 in src/dstack/_internal/server/services/instances.py

View workflow job for this annotation

GitHub Actions / build-artifacts / python-test (ubuntu-latest, 3.9)

Cannot access attribute "rowcount" for class "Result[Any]"   Attribute "rowcount" is unknown (reportAttributeAccessIssue)

Check failure on line 86 in src/dstack/_internal/server/services/instances.py

View workflow job for this annotation

GitHub Actions / build-artifacts / python-test (macos-latest, 3.11)

Cannot access attribute "rowcount" for class "Result[Any]"   Attribute "rowcount" is unknown (reportAttributeAccessIssue)

Check failure on line 86 in src/dstack/_internal/server/services/instances.py

View workflow job for this annotation

GitHub Actions / build-artifacts / python-test (macos-latest, 3.13)

Cannot access attribute "rowcount" for class "Result[Any]"   Attribute "rowcount" is unknown (reportAttributeAccessIssue)


def switch_instance_status(
session: AsyncSession,
instance_model: InstanceModel,
Expand Down
Loading