diff --git a/src/dstack/_internal/server/background/tasks/process_placement_groups.py b/src/dstack/_internal/server/background/tasks/process_placement_groups.py index 1b24f07cc..3bfd1f20f 100644 --- a/src/dstack/_internal/server/background/tasks/process_placement_groups.py +++ b/src/dstack/_internal/server/background/tasks/process_placement_groups.py @@ -28,6 +28,7 @@ async def process_placement_groups(): PlacementGroupModel.deleted == False, PlacementGroupModel.id.not_in(lockset), ) + .order_by(PlacementGroupModel.id) # take locks in order .with_for_update(skip_locked=True) ) placement_group_models = res.scalars().all() diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 092988e3c..be8bb610e 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -74,6 +74,7 @@ async def _process_next_run(): JobModel.run_id == run_model.id, JobModel.id.not_in(job_lockset), ) + .order_by(JobModel.id) # take locks in order .with_for_update(skip_locked=True) ) job_models = res.scalars().all() diff --git a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py index b9b535a97..c32f04af1 100644 --- a/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_submitted_jobs.py @@ -195,6 +195,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): InstanceModel.total_blocks > InstanceModel.busy_blocks, ) .options(lazyload(InstanceModel.jobs)) + .order_by(InstanceModel.id) # take locks in order .with_for_update() ) pool_instances = list(res.unique().scalars().all()) @@ -319,6 +320,7 @@ async def _process_submitted_job(session: AsyncSession, job_model: JobModel): select(VolumeModel) .where(VolumeModel.id.in_(volumes_ids)) .options(selectinload(VolumeModel.user)) + .order_by(VolumeModel.id) # take locks in order .with_for_update() ) async with get_locker().lock_ctx(VolumeModel.__tablename__, volumes_ids): diff --git a/src/dstack/_internal/server/services/fleets.py b/src/dstack/_internal/server/services/fleets.py index 405b6df70..34c5e9a56 100644 --- a/src/dstack/_internal/server/services/fleets.py +++ b/src/dstack/_internal/server/services/fleets.py @@ -517,6 +517,7 @@ async def delete_fleets( .options(selectinload(FleetModel.instances)) .options(selectinload(FleetModel.runs)) .execution_options(populate_existing=True) + .order_by(FleetModel.id) # take locks in order .with_for_update() ) fleet_models = res.scalars().unique().all() diff --git a/src/dstack/_internal/server/services/gateways/__init__.py b/src/dstack/_internal/server/services/gateways/__init__.py index 151f496eb..2f192ee56 100644 --- a/src/dstack/_internal/server/services/gateways/__init__.py +++ b/src/dstack/_internal/server/services/gateways/__init__.py @@ -220,6 +220,7 @@ async def delete_gateways( ) .options(selectinload(GatewayModel.gateway_compute)) .execution_options(populate_existing=True) + .order_by(GatewayModel.id) # take locks in order .with_for_update() ) gateway_models = res.scalars().all() diff --git a/src/dstack/_internal/server/services/runs.py b/src/dstack/_internal/server/services/runs.py index 546ebbc10..b421ff267 100644 --- a/src/dstack/_internal/server/services/runs.py +++ b/src/dstack/_internal/server/services/runs.py @@ -552,6 +552,7 @@ async def stop_run(session: AsyncSession, run_model: RunModel, abort: bool): res = await session.execute( select(RunModel) .where(RunModel.id == run_model.id) + .order_by(RunModel.id) # take locks in order .with_for_update() .execution_options(populate_existing=True) ) @@ -559,6 +560,7 @@ async def stop_run(session: AsyncSession, run_model: RunModel, abort: bool): await session.execute( select(JobModel) .where(JobModel.run_id == run_model.id) + .order_by(JobModel.id) # take locks in order .with_for_update() .execution_options(populate_existing=True) ) @@ -592,7 +594,10 @@ async def delete_runs( await session.commit() async with get_locker().lock_ctx(RunModel.__tablename__, run_ids): res = await session.execute( - select(RunModel).where(RunModel.id.in_(run_ids)).with_for_update() + select(RunModel) + .where(RunModel.id.in_(run_ids)) + .order_by(RunModel.id) # take locks in order + .with_for_update() ) run_models = res.scalars().all() active_runs = [r for r in run_models if not r.status.is_finished()] diff --git a/src/dstack/_internal/server/services/volumes.py b/src/dstack/_internal/server/services/volumes.py index deea47a81..747af9c24 100644 --- a/src/dstack/_internal/server/services/volumes.py +++ b/src/dstack/_internal/server/services/volumes.py @@ -264,6 +264,7 @@ async def delete_volumes(session: AsyncSession, project: ProjectModel, names: Li .options(selectinload(VolumeModel.user)) .options(selectinload(VolumeModel.attachments)) .execution_options(populate_existing=True) + .order_by(VolumeModel.id) # take locks in order .with_for_update() ) volume_models = res.scalars().unique().all()