From f7949846b01c19e5b386eaf4f1d858cda06f40e2 Mon Sep 17 00:00:00 2001 From: Jvst Me Date: Wed, 12 Feb 2025 12:41:41 +0100 Subject: [PATCH] Fix terminating runs when job is terminated This commit fixes a bug when runs would transition to `pending` instead of `terminating` if a job in the run is already terminated, e.g. by `inactivity_duration`. --- .../server/background/tasks/process_runs.py | 14 ++----------- .../background/tasks/test_process_runs.py | 20 ++++++++++++++++--- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/src/dstack/_internal/server/background/tasks/process_runs.py b/src/dstack/_internal/server/background/tasks/process_runs.py index 40ba8d849..092988e3c 100644 --- a/src/dstack/_internal/server/background/tasks/process_runs.py +++ b/src/dstack/_internal/server/background/tasks/process_runs.py @@ -230,7 +230,8 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): # the job is submitted replica_statuses.add(RunStatus.SUBMITTED) elif job_model.status == JobStatus.FAILED or ( - job_model.status == JobStatus.TERMINATING + job_model.status + in [JobStatus.TERMINATING, JobStatus.TERMINATED, JobStatus.ABORTED] and job_model.termination_reason not in {JobTerminationReason.DONE_BY_RUNNER, JobTerminationReason.SCALED_DOWN} ): @@ -244,17 +245,6 @@ async def _process_active_run(session: AsyncSession, run_model: RunModel): run_termination_reasons.add(RunTerminationReason.RETRY_LIMIT_EXCEEDED) else: replica_needs_retry = True - elif job_model.status in { - JobStatus.TERMINATING, - JobStatus.TERMINATED, - JobStatus.ABORTED, - }: - # FIXME: This code does not expect JobStatus.TERMINATED status, - # so if a job transitions from RUNNING to TERMINATED, - # the run will transition to PENDING instead of TERMINATING. - # This may not be observed because process_runs is invoked more frequently - # than process_terminating_jobs and because most jobs usually transition to FAILED. - pass # unexpected, but let's ignore it else: raise ValueError(f"Unexpected job status {job_model.status}") diff --git a/src/tests/_internal/server/background/tasks/test_process_runs.py b/src/tests/_internal/server/background/tasks/test_process_runs.py index 2cccf3088..59a61df94 100644 --- a/src/tests/_internal/server/background/tasks/test_process_runs.py +++ b/src/tests/_internal/server/background/tasks/test_process_runs.py @@ -288,13 +288,27 @@ async def test_some_no_capacity_keep_running(self, test_db, session: AsyncSessio @pytest.mark.asyncio @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) - async def test_some_failed_to_terminating(self, test_db, session: AsyncSession): + @pytest.mark.parametrize( + ("job_status", "job_termination_reason"), + [ + (JobStatus.FAILED, JobTerminationReason.CONTAINER_EXITED_WITH_ERROR), + (JobStatus.TERMINATING, JobTerminationReason.TERMINATED_BY_SERVER), + (JobStatus.TERMINATED, JobTerminationReason.TERMINATED_BY_SERVER), + ], + ) + async def test_some_failed_to_terminating( + self, + test_db, + session: AsyncSession, + job_status: JobStatus, + job_termination_reason: JobTerminationReason, + ) -> None: run = await make_run(session, status=RunStatus.RUNNING, replicas=2) await create_job( session=session, run=run, - status=JobStatus.FAILED, - termination_reason=JobTerminationReason.CONTAINER_EXITED_WITH_ERROR, + status=job_status, + termination_reason=job_termination_reason, replica_num=0, ) await create_job(session=session, run=run, status=JobStatus.RUNNING, replica_num=1)