diff --git a/src/dstack/_internal/server/background/scheduled_tasks/compute_groups.py b/src/dstack/_internal/server/background/scheduled_tasks/compute_groups.py index 58d6b2c8b..66e31a169 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/compute_groups.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/compute_groups.py @@ -49,6 +49,7 @@ async def _process_next_compute_group(): res = await session.execute( select(ComputeGroupModel) .where( + ComputeGroupModel.lock_expires_at.is_(None), ComputeGroupModel.deleted == False, ComputeGroupModel.id.not_in(lockset), ComputeGroupModel.last_processed_at diff --git a/src/dstack/_internal/server/background/scheduled_tasks/fleets.py b/src/dstack/_internal/server/background/scheduled_tasks/fleets.py index 6b1ba7667..a1cd696b6 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/fleets.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/fleets.py @@ -55,6 +55,7 @@ async def process_fleets(): res = await session.execute( select(FleetModel) .where( + FleetModel.lock_expires_at.is_(None), FleetModel.deleted == False, FleetModel.id.not_in(fleet_lockset), FleetModel.last_processed_at @@ -75,6 +76,7 @@ async def process_fleets(): res = await session.execute( select(InstanceModel) .where( + InstanceModel.lock_expires_at.is_(None), InstanceModel.id.not_in(instance_lockset), InstanceModel.fleet_id.in_(fleet_ids), InstanceModel.deleted == False, diff --git a/src/dstack/_internal/server/background/scheduled_tasks/gateways.py b/src/dstack/_internal/server/background/scheduled_tasks/gateways.py index 262f45a18..3c88d21c7 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/gateways.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/gateways.py @@ -45,6 +45,7 @@ async def process_gateways(): res = await session.execute( select(GatewayModel) .where( + GatewayModel.lock_expires_at.is_(None), GatewayModel.status.in_([GatewayStatus.SUBMITTED, GatewayStatus.PROVISIONING]), GatewayModel.id.not_in(lockset), ) diff --git a/src/dstack/_internal/server/background/scheduled_tasks/instances.py b/src/dstack/_internal/server/background/scheduled_tasks/instances.py index e60f81190..5c041dc2b 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/instances.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/instances.py @@ -176,6 +176,7 @@ async def _process_next_instance(): InstanceModel.compute_group_id.is_not(None), ) ), + InstanceModel.lock_expires_at.is_(None), InstanceModel.id.not_in(lockset), InstanceModel.last_processed_at < get_current_datetime() - MIN_PROCESSING_INTERVAL, diff --git a/src/dstack/_internal/server/background/scheduled_tasks/placement_groups.py b/src/dstack/_internal/server/background/scheduled_tasks/placement_groups.py index 1106ce491..eafd21820 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/placement_groups.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/placement_groups.py @@ -31,6 +31,7 @@ async def process_placement_groups(): res = await session.execute( select(PlacementGroupModel) .where( + PlacementGroupModel.lock_expires_at.is_(None), PlacementGroupModel.fleet_deleted == True, PlacementGroupModel.deleted == False, PlacementGroupModel.id.not_in(lockset), diff --git a/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py index 7adcabdf7..72d60b581 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/running_jobs.py @@ -121,6 +121,7 @@ async def _process_next_running_job(): select(JobModel) .join(JobModel.run) .where( + JobModel.lock_expires_at.is_(None), JobModel.status.in_( [JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING] ), diff --git a/src/dstack/_internal/server/background/scheduled_tasks/runs.py b/src/dstack/_internal/server/background/scheduled_tasks/runs.py index 1b3313725..715c47c61 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/runs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/runs.py @@ -73,6 +73,8 @@ ROLLING_DEPLOYMENT_MAX_SURGE = 1 # at most one extra replica during rolling deployment +# NOTE: This scheduled task is going to be deprecated in favor of `RunPipeline`. +# If this logic changes before removal, keep `pipeline_tasks/runs/__init__.py` in sync. async def process_runs(batch_size: int = 1): tasks = [] for _ in range(batch_size): @@ -90,6 +92,7 @@ async def _process_next_run(): res = await session.execute( select(RunModel) .where( + RunModel.lock_expires_at.is_(None), RunModel.id.not_in(run_lockset), RunModel.last_processed_at < now - MIN_PROCESSING_INTERVAL, # Filter out runs that don't need to be processed. diff --git a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py index eaa452380..4f6207112 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/submitted_jobs.py @@ -151,6 +151,8 @@ last_processed_at: Optional[datetime] = None +# NOTE: This scheduled task is going to be deprecated in favor of `JobSubmittedPipeline`. +# If this logic changes before removal, keep `pipeline_tasks/jobs_submitted.py` in sync. async def process_submitted_jobs(batch_size: int = 1): tasks = [] effective_batch_size = _get_effective_batch_size(batch_size) @@ -177,6 +179,7 @@ async def _process_next_submitted_job(): select(JobModel) .join(JobModel.run) .where( + JobModel.lock_expires_at.is_(None), JobModel.status == JobStatus.SUBMITTED, JobModel.waiting_master_job.is_not(True), JobModel.id.not_in(lockset), @@ -634,6 +637,14 @@ async def _process_new_capacity_provisioning_path( job=context.job, ) + if context.fleet_model is not None and fleet_model is None: + await _defer_submitted_job( + session=session, + job_model=context.job_model, + log_message="cluster fleet is locked", + ) + return None + # master_job_provisioning_data is present if there is a master job. # master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet). master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data @@ -1018,6 +1029,8 @@ async def _lock_fleet_and_get_master_provisioning_data( ) await sqlite_commit(session) fleet_model = await _refetch_cluster_master_fleet(session=session, fleet_model=fleet_model) + if fleet_model is None: + return None, None master_instance_provisioning_data = get_fleet_master_instance_provisioning_data( fleet_model=fleet_model, fleet_spec=fleet_spec, @@ -1034,7 +1047,7 @@ def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]: async def _refetch_cluster_master_fleet( session: AsyncSession, fleet_model: FleetModel -) -> FleetModel: +) -> Optional[FleetModel]: res = await session.execute( select(FleetModel) .where( @@ -1050,6 +1063,9 @@ async def _refetch_cluster_master_fleet( ) empty_fleet_model = res.unique().scalar() if empty_fleet_model is not None: + if empty_fleet_model.lock_expires_at is not None: + # Defer while a pipeline owns the empty cluster fleet. + return None return empty_fleet_model res = await session.execute( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py b/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py index ab25c2c7d..fefadcf44 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/terminating_jobs.py @@ -81,6 +81,7 @@ async def _process_next_terminating_job(): res = await session.execute( select(JobModel) .where( + JobModel.lock_expires_at.is_(None), JobModel.id.not_in(job_lockset), JobModel.status == JobStatus.TERMINATING, or_( diff --git a/src/dstack/_internal/server/background/scheduled_tasks/volumes.py b/src/dstack/_internal/server/background/scheduled_tasks/volumes.py index 11e6f3c59..2fb35ff7b 100644 --- a/src/dstack/_internal/server/background/scheduled_tasks/volumes.py +++ b/src/dstack/_internal/server/background/scheduled_tasks/volumes.py @@ -34,6 +34,7 @@ async def process_submitted_volumes(): res = await session.execute( select(VolumeModel) .where( + VolumeModel.lock_expires_at.is_(None), VolumeModel.status == VolumeStatus.SUBMITTED, VolumeModel.deleted == False, VolumeModel.id.not_in(lockset),