Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions src/dstack/_internal/server/background/tasks/process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,8 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
# the job is submitted
replica_statuses.add(RunStatus.SUBMITTED)
elif job_model.status == JobStatus.FAILED or (
job_model.status == JobStatus.TERMINATING
job_model.status
in [JobStatus.TERMINATING, JobStatus.TERMINATED, JobStatus.ABORTED]
and job_model.termination_reason
not in {JobTerminationReason.DONE_BY_RUNNER, JobTerminationReason.SCALED_DOWN}
):
Expand All @@ -244,17 +245,6 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel):
run_termination_reasons.add(RunTerminationReason.RETRY_LIMIT_EXCEEDED)
else:
replica_needs_retry = True
elif job_model.status in {
JobStatus.TERMINATING,
JobStatus.TERMINATED,
JobStatus.ABORTED,
}:
# FIXME: This code does not expect JobStatus.TERMINATED status,
# so if a job transitions from RUNNING to TERMINATED,
# the run will transition to PENDING instead of TERMINATING.
# This may not be observed because process_runs is invoked more frequently
# than process_terminating_jobs and because most jobs usually transition to FAILED.
pass # unexpected, but let's ignore it
else:
raise ValueError(f"Unexpected job status {job_model.status}")

Expand Down
20 changes: 17 additions & 3 deletions src/tests/_internal/server/background/tasks/test_process_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,13 +288,27 @@ async def test_some_no_capacity_keep_running(self, test_db, session: AsyncSessio

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_some_failed_to_terminating(self, test_db, session: AsyncSession):
@pytest.mark.parametrize(
("job_status", "job_termination_reason"),
[
(JobStatus.FAILED, JobTerminationReason.CONTAINER_EXITED_WITH_ERROR),
(JobStatus.TERMINATING, JobTerminationReason.TERMINATED_BY_SERVER),
(JobStatus.TERMINATED, JobTerminationReason.TERMINATED_BY_SERVER),
],
)
async def test_some_failed_to_terminating(
self,
test_db,
session: AsyncSession,
job_status: JobStatus,
job_termination_reason: JobTerminationReason,
) -> None:
run = await make_run(session, status=RunStatus.RUNNING, replicas=2)
await create_job(
session=session,
run=run,
status=JobStatus.FAILED,
termination_reason=JobTerminationReason.CONTAINER_EXITED_WITH_ERROR,
status=job_status,
termination_reason=job_termination_reason,
replica_num=0,
)
await create_job(session=session, run=run, status=JobStatus.RUNNING, replica_num=1)
Expand Down