Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ async def _process_next_compute_group():
res = await session.execute(
select(ComputeGroupModel)
.where(
ComputeGroupModel.lock_expires_at.is_(None),
ComputeGroupModel.deleted == False,
ComputeGroupModel.id.not_in(lockset),
ComputeGroupModel.last_processed_at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ async def process_fleets():
res = await session.execute(
select(FleetModel)
.where(
FleetModel.lock_expires_at.is_(None),
FleetModel.deleted == False,
FleetModel.id.not_in(fleet_lockset),
FleetModel.last_processed_at
Expand All @@ -75,6 +76,7 @@ async def process_fleets():
res = await session.execute(
select(InstanceModel)
.where(
InstanceModel.lock_expires_at.is_(None),
InstanceModel.id.not_in(instance_lockset),
InstanceModel.fleet_id.in_(fleet_ids),
InstanceModel.deleted == False,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ async def process_gateways():
res = await session.execute(
select(GatewayModel)
.where(
GatewayModel.lock_expires_at.is_(None),
GatewayModel.status.in_([GatewayStatus.SUBMITTED, GatewayStatus.PROVISIONING]),
GatewayModel.id.not_in(lockset),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ async def _process_next_instance():
InstanceModel.compute_group_id.is_not(None),
)
),
InstanceModel.lock_expires_at.is_(None),
InstanceModel.id.not_in(lockset),
InstanceModel.last_processed_at
< get_current_datetime() - MIN_PROCESSING_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ async def process_placement_groups():
res = await session.execute(
select(PlacementGroupModel)
.where(
PlacementGroupModel.lock_expires_at.is_(None),
PlacementGroupModel.fleet_deleted == True,
PlacementGroupModel.deleted == False,
PlacementGroupModel.id.not_in(lockset),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ async def _process_next_running_job():
select(JobModel)
.join(JobModel.run)
.where(
JobModel.lock_expires_at.is_(None),
JobModel.status.in_(
[JobStatus.PROVISIONING, JobStatus.PULLING, JobStatus.RUNNING]
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@
ROLLING_DEPLOYMENT_MAX_SURGE = 1 # at most one extra replica during rolling deployment


# NOTE: This scheduled task is going to be deprecated in favor of `RunPipeline`.
# If this logic changes before removal, keep `pipeline_tasks/runs/__init__.py` in sync.
async def process_runs(batch_size: int = 1):
tasks = []
for _ in range(batch_size):
Expand All @@ -90,6 +92,7 @@ async def _process_next_run():
res = await session.execute(
select(RunModel)
.where(
RunModel.lock_expires_at.is_(None),
RunModel.id.not_in(run_lockset),
RunModel.last_processed_at < now - MIN_PROCESSING_INTERVAL,
# Filter out runs that don't need to be processed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@
last_processed_at: Optional[datetime] = None


# NOTE: This scheduled task is going to be deprecated in favor of `JobSubmittedPipeline`.
# If this logic changes before removal, keep `pipeline_tasks/jobs_submitted.py` in sync.
async def process_submitted_jobs(batch_size: int = 1):
tasks = []
effective_batch_size = _get_effective_batch_size(batch_size)
Expand All @@ -177,6 +179,7 @@ async def _process_next_submitted_job():
select(JobModel)
.join(JobModel.run)
.where(
JobModel.lock_expires_at.is_(None),
JobModel.status == JobStatus.SUBMITTED,
JobModel.waiting_master_job.is_not(True),
JobModel.id.not_in(lockset),
Expand Down Expand Up @@ -634,6 +637,14 @@ async def _process_new_capacity_provisioning_path(
job=context.job,
)

if context.fleet_model is not None and fleet_model is None:
await _defer_submitted_job(
session=session,
job_model=context.job_model,
log_message="cluster fleet is locked",
)
return None

# master_job_provisioning_data is present if there is a master job.
# master_instance_provisioning_data is present if there is a master instance (non empty cluster fleet).
master_provisioning_data = master_job_provisioning_data or master_instance_provisioning_data
Expand Down Expand Up @@ -1018,6 +1029,8 @@ async def _lock_fleet_and_get_master_provisioning_data(
)
await sqlite_commit(session)
fleet_model = await _refetch_cluster_master_fleet(session=session, fleet_model=fleet_model)
if fleet_model is None:
return None, None
master_instance_provisioning_data = get_fleet_master_instance_provisioning_data(
fleet_model=fleet_model,
fleet_spec=fleet_spec,
Expand All @@ -1034,7 +1047,7 @@ def _get_cluster_fleet_spec(fleet_model: FleetModel) -> Optional[FleetSpec]:

async def _refetch_cluster_master_fleet(
session: AsyncSession, fleet_model: FleetModel
) -> FleetModel:
) -> Optional[FleetModel]:
res = await session.execute(
select(FleetModel)
.where(
Expand All @@ -1050,6 +1063,9 @@ async def _refetch_cluster_master_fleet(
)
empty_fleet_model = res.unique().scalar()
if empty_fleet_model is not None:
if empty_fleet_model.lock_expires_at is not None:
# Defer while a pipeline owns the empty cluster fleet.
return None
return empty_fleet_model

res = await session.execute(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ async def _process_next_terminating_job():
res = await session.execute(
select(JobModel)
.where(
JobModel.lock_expires_at.is_(None),
JobModel.id.not_in(job_lockset),
JobModel.status == JobStatus.TERMINATING,
or_(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ async def process_submitted_volumes():
res = await session.execute(
select(VolumeModel)
.where(
VolumeModel.lock_expires_at.is_(None),
VolumeModel.status == VolumeStatus.SUBMITTED,
VolumeModel.deleted == False,
VolumeModel.id.not_in(lockset),
Expand Down
Loading