From e145e8e36b4d87b6ef0083d1b0f53d1114a4684b Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Tue, 24 Mar 2026 14:02:36 +0545 Subject: [PATCH 1/3] Fix multiple jobs assigned to same instance --- .../pipeline_tasks/jobs_submitted.py | 30 +++++++++++++++++-- .../pipeline_tasks/jobs_terminating.py | 2 +- .../scheduled_tasks/submitted_jobs.py | 29 +++++++++++------- .../scheduled_tasks/terminating_jobs.py | 2 +- .../_internal/server/services/instances.py | 25 +++++++++++++++- 5 files changed, 72 insertions(+), 16 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index 9e93e7b2a..c7ba495e9 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -98,6 +98,7 @@ get_instance_offer, get_instance_provisioning_data, switch_instance_status, + try_atomic_busy_blocks_increment, ) from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, @@ -574,13 +575,36 @@ async def _apply_assignment_result( await _reset_job_lock_for_retry(session=session, item=item) return - instance_model, current_offer = current_instance_offers[0] + instance_model = None + current_offer = None + for candidate_instance, candidate_offer in current_instance_offers: + if await try_atomic_busy_blocks_increment( + session=session, + instance_id=candidate_instance.id, + blocks=candidate_offer.blocks, + ): + instance_model = candidate_instance + current_offer = candidate_offer + break + if instance_model is None or current_offer is None: + await _reset_job_lock_for_retry(session=session, item=item) + return + + # Refetch instance to get updated busy_blocks from the atomic update + res = await session.execute( + select(InstanceModel) + .where(InstanceModel.id == instance_model.id) + .options(joinedload(InstanceModel.volume_attachments)) + ) + instance_model = res.unique().scalar_one() + _assign_instance_to_job( session=session, job_model=job_model, instance_model=instance_model, offer=current_offer, multinode=context.multinode, + busy_blocks_already_updated=True, ) await _mark_job_processed(session=session, job_model=job_model) @@ -871,6 +895,7 @@ def _assign_instance_to_job( instance_model: InstanceModel, offer: InstanceOfferWithAvailability, multinode: bool, + busy_blocks_already_updated: bool = False, ) -> None: job_model.fleet_id = instance_model.fleet_id job_model.instance_assigned = True @@ -880,7 +905,8 @@ def _assign_instance_to_job( job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json() switch_instance_status(session, instance_model, InstanceStatus.BUSY) - instance_model.busy_blocks += offer.blocks + if not busy_blocks_already_updated: + instance_model.busy_blocks += offer.blocks events.emit( session, ( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 5f1a0f74b..75d52f98d 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -620,7 +620,7 @@ async def _process_terminating_job( result.job_update_map["volumes_detached_at"] = NOW_PLACEHOLDER instance_update_map = get_or_error(result.instance_update_map) - busy_blocks = instance_model.busy_blocks - _get_job_occupied_blocks(jrd) + busy_blocks = max(0, instance_model.busy_blocks - _get_job_occupied_blocks(jrd)) instance_update_map["busy_blocks"] = busy_blocks if instance_model.status != InstanceStatus.BUSY or jpd is None or not jpd.dockerized: if instance_model.status not in InstanceStatus.finished_statuses(): diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index eaa452380..8c9acdee7 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -93,6 +93,7 @@ format_instance_blocks_for_event, get_instance_provisioning_data, switch_instance_status, + try_atomic_busy_blocks_increment, ) from dstack._internal.server.services.jobs import ( check_can_attach_job_volumes, @@ -543,15 +544,14 @@ async def _find_assignment_fleet_with_offers( await exit_stack.enter_async_context( get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids) ) - if is_db_sqlite(): - fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] - fleet_models_with_instances = await _refetch_fleet_models_with_instances( - session=session, - fleets_ids=fleets_with_instances_ids, - instances_ids=instances_ids, - fleet_filters=fleet_filters, - instance_filters=instance_filters, - ) + fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] + fleet_models_with_instances = await _refetch_fleet_models_with_instances( + session=session, + fleets_ids=fleets_with_instances_ids, + instances_ids=instances_ids, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + ) fleet_models = fleet_models_with_instances + fleet_models_without_instances fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( project=context.project, @@ -1078,7 +1078,15 @@ async def _assign_existing_instance_to_job( return instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) - instance, offer = instances_with_offers[0] + for instance, offer in instances_with_offers: + if await try_atomic_busy_blocks_increment( + session=session, instance_id=instance.id, blocks=offer.blocks + ): + break + else: + job_model.instance_assigned = False + return + # Reload InstanceModel with volume attachments res = await session.execute( select(InstanceModel) @@ -1087,7 +1095,6 @@ async def _assign_existing_instance_to_job( ) instance = res.unique().scalar_one() switch_instance_status(session, instance, InstanceStatus.BUSY) - instance.busy_blocks += offer.blocks job_model.instance = instance job_model.used_instance_id = instance.id diff --git a/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py index ab25c2c7d..44c644dca 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py @@ -194,7 +194,7 @@ async def _process_terminating_job( volume_models=volume_models, ) - instance_model.busy_blocks -= _get_job_occupied_blocks(jrd) + instance_model.busy_blocks = max(0, instance_model.busy_blocks - _get_job_occupied_blocks(jrd)) if instance_model.status != InstanceStatus.BUSY or jpd is None or not jpd.dockerized: # Terminate instances that: # - have not finished provisioning yet diff --git a/src/dstack/_internal/server/services/instances.py b/src/dstack/_internal/server/services/instances.py index d54ec8b68..7fd373b99 100644 --- a/src/dstack/_internal/server/services/instances.py +++ b/src/dstack/_internal/server/services/instances.py @@ -5,7 +5,7 @@ from typing import Dict, List, Literal, Optional, Union import gpuhunt -from sqlalchemy import and_, exists, false, or_, select +from sqlalchemy import and_, exists, false, or_, select, update from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import joinedload, load_only @@ -63,6 +63,29 @@ logger = get_logger(__name__) +async def try_atomic_busy_blocks_increment( + session: AsyncSession, + instance_id: uuid.UUID, + blocks: int, +) -> bool: + """ + Atomically increment busy_blocks if the instance has capacity. + Returns True if the update affected a row, False otherwise. + """ + res = await session.execute( + update(InstanceModel) + .where( + InstanceModel.id == instance_id, + or_( + InstanceModel.total_blocks.is_(None), + InstanceModel.busy_blocks + blocks <= InstanceModel.total_blocks, + ), + ) + .values(busy_blocks=InstanceModel.busy_blocks + blocks) + ) + return res.rowcount > 0 + + def switch_instance_status( session: AsyncSession, instance_model: InstanceModel, From 84387167a5e8e8ad5b37b49f211b7a06ac8dadd0 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 25 Mar 2026 06:06:53 +0545 Subject: [PATCH 2/3] Fix multiple jobs assigned to same instance --- .../pipeline_tasks/jobs_submitted.py | 22 +++++-------------- .../pipeline_tasks/jobs_terminating.py | 2 +- .../scheduled_tasks/submitted_jobs.py | 10 ++++----- .../scheduled_tasks/terminating_jobs.py | 2 +- 4 files changed, 12 insertions(+), 24 deletions(-) diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py index c7ba495e9..5f5ac0eeb 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_submitted.py @@ -575,18 +575,12 @@ async def _apply_assignment_result( await _reset_job_lock_for_retry(session=session, item=item) return - instance_model = None - current_offer = None - for candidate_instance, candidate_offer in current_instance_offers: - if await try_atomic_busy_blocks_increment( - session=session, - instance_id=candidate_instance.id, - blocks=candidate_offer.blocks, - ): - instance_model = candidate_instance - current_offer = candidate_offer - break - if instance_model is None or current_offer is None: + instance_model, current_offer = current_instance_offers[0] + if not await try_atomic_busy_blocks_increment( + session=session, + instance_id=instance_model.id, + blocks=current_offer.blocks, + ): await _reset_job_lock_for_retry(session=session, item=item) return @@ -604,7 +598,6 @@ async def _apply_assignment_result( instance_model=instance_model, offer=current_offer, multinode=context.multinode, - busy_blocks_already_updated=True, ) await _mark_job_processed(session=session, job_model=job_model) @@ -895,7 +888,6 @@ def _assign_instance_to_job( instance_model: InstanceModel, offer: InstanceOfferWithAvailability, multinode: bool, - busy_blocks_already_updated: bool = False, ) -> None: job_model.fleet_id = instance_model.fleet_id job_model.instance_assigned = True @@ -905,8 +897,6 @@ def _assign_instance_to_job( job_model.job_runtime_data = _prepare_job_runtime_data(offer, multinode).json() switch_instance_status(session, instance_model, InstanceStatus.BUSY) - if not busy_blocks_already_updated: - instance_model.busy_blocks += offer.blocks events.emit( session, ( diff --git a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py index 75d52f98d..5f1a0f74b 100644 --- a/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py +++ b/src/dstack/_internal/server/background/pipeline_tasks/jobs_terminating.py @@ -620,7 +620,7 @@ async def _process_terminating_job( result.job_update_map["volumes_detached_at"] = NOW_PLACEHOLDER instance_update_map = get_or_error(result.instance_update_map) - busy_blocks = max(0, instance_model.busy_blocks - _get_job_occupied_blocks(jrd)) + busy_blocks = instance_model.busy_blocks - _get_job_occupied_blocks(jrd) instance_update_map["busy_blocks"] = busy_blocks if instance_model.status != InstanceStatus.BUSY or jpd is None or not jpd.dockerized: if instance_model.status not in InstanceStatus.finished_statuses(): diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 8c9acdee7..17791f929 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -1078,12 +1078,10 @@ async def _assign_existing_instance_to_job( return instances_with_offers.sort(key=lambda instance_with_offer: instance_with_offer[0].price or 0) - for instance, offer in instances_with_offers: - if await try_atomic_busy_blocks_increment( - session=session, instance_id=instance.id, blocks=offer.blocks - ): - break - else: + instance, offer = instances_with_offers[0] + if not await try_atomic_busy_blocks_increment( + session=session, instance_id=instance.id, blocks=offer.blocks + ): job_model.instance_assigned = False return diff --git a/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py index 44c644dca..ab25c2c7d 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py @@ -194,7 +194,7 @@ async def _process_terminating_job( volume_models=volume_models, ) - instance_model.busy_blocks = max(0, instance_model.busy_blocks - _get_job_occupied_blocks(jrd)) + instance_model.busy_blocks -= _get_job_occupied_blocks(jrd) if instance_model.status != InstanceStatus.BUSY or jpd is None or not jpd.dockerized: # Terminate instances that: # - have not finished provisioning yet From 365cce62aec3041a1802d3ade0a92422480d9343 Mon Sep 17 00:00:00 2001 From: Bihan Rana Date: Wed, 25 Mar 2026 06:27:52 +0545 Subject: [PATCH 3/3] Fix multiple jobs assigned to same instance --- .../scheduled_tasks/submitted_jobs.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index 17791f929..f65b3fff0 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -544,14 +544,15 @@ async def _find_assignment_fleet_with_offers( await exit_stack.enter_async_context( get_locker(get_db().dialect_name).lock_ctx(InstanceModel.__tablename__, instances_ids) ) - fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] - fleet_models_with_instances = await _refetch_fleet_models_with_instances( - session=session, - fleets_ids=fleets_with_instances_ids, - instances_ids=instances_ids, - fleet_filters=fleet_filters, - instance_filters=instance_filters, - ) + if is_db_sqlite(): + fleets_with_instances_ids = [f.id for f in fleet_models_with_instances] + fleet_models_with_instances = await _refetch_fleet_models_with_instances( + session=session, + fleets_ids=fleets_with_instances_ids, + instances_ids=instances_ids, + fleet_filters=fleet_filters, + instance_filters=instance_filters, + ) fleet_models = fleet_models_with_instances + fleet_models_without_instances fleet_model, fleet_instances_with_offers, _ = await find_optimal_fleet_with_offers( project=context.project,