diff --git a/src/dstack/_internal/core/models/configurations.py b/src/dstack/_internal/core/models/configurations.py index 4c8f3b2b3..1ddc69ab8 100644 --- a/src/dstack/_internal/core/models/configurations.py +++ b/src/dstack/_internal/core/models/configurations.py @@ -10,7 +10,7 @@ from dstack._internal.core.models.envs import Env from dstack._internal.core.models.fleets import FleetConfiguration from dstack._internal.core.models.gateways import GatewayConfiguration -from dstack._internal.core.models.profiles import ProfileParams +from dstack._internal.core.models.profiles import ProfileParams, parse_off_duration from dstack._internal.core.models.repos.base import Repo from dstack._internal.core.models.repos.virtual import VirtualRepo from dstack._internal.core.models.resources import Range, ResourcesSpec @@ -212,6 +212,29 @@ class DevEnvironmentConfigurationParams(CoreModel): ide: Annotated[Literal["vscode"], Field(description="The IDE to run")] version: Annotated[Optional[str], Field(description="The version of the IDE")] init: Annotated[CommandsList, Field(description="The bash commands to run on startup")] = [] + inactivity_duration: Annotated[ + Optional[Union[Literal["off"], int, bool, str]], + Field( + description=( + "The maximum amount of time the dev environment can be inactive" + " (e.g., `2h`, `1d`, etc)." + " After it elapses, the dev environment is automatically stopped." + " Inactivity is defined as the absence of SSH connections to the" + " dev environment, including VS Code connections, `ssh `" + " shells, and attached `dstack apply` or `dstack attach` commands." + " Use `off` for unlimited duration. Defaults to `off`" + ) + ), + ] + + @validator("inactivity_duration", pre=True, allow_reuse=True) + def parse_inactivity_duration( + cls, v: Optional[Union[Literal["off"], int, bool, str]] + ) -> Optional[int]: + v = parse_off_duration(v) + if isinstance(v, int): + return v + return None class DevEnvironmentConfiguration( diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index ca4e3a05a..d9137b15d 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -40,15 +40,15 @@ def parse_duration(v: Optional[Union[int, str]]) -> Optional[int]: return Duration.parse(v) -def parse_max_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]: +def parse_max_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int]]: return parse_off_duration(v) -def parse_stop_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]: +def parse_stop_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int]]: return parse_off_duration(v) -def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]: +def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int]]: if v == "off" or v is False: return "off" if v is True: @@ -168,7 +168,7 @@ class ProfileParams(CoreModel): Optional[Union[Literal["off"], str, int, bool]], Field( description=( - "The maximum duration of a run gracefull stopping." + "The maximum duration of a run graceful stopping." " After it elapses, the run is automatically forced stopped." " This includes force detaching volumes used by the run." " Use `off` for unlimited duration. Defaults to `5m`" diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index e40b3c41d..f6b28bda4 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -113,6 +113,7 @@ class JobTerminationReason(str, Enum): DONE_BY_RUNNER = "done_by_runner" ABORTED_BY_USER = "aborted_by_user" TERMINATED_BY_SERVER = "terminated_by_server" + INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded" # Set by the runner CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error" PORTS_BINDING_FAILED = "ports_binding_failed" @@ -133,6 +134,7 @@ def to_status(self) -> JobStatus: self.DONE_BY_RUNNER: JobStatus.DONE, self.ABORTED_BY_USER: JobStatus.ABORTED, self.TERMINATED_BY_SERVER: JobStatus.TERMINATED, + self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED, self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED, self.PORTS_BINDING_FAILED: JobStatus.FAILED, self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index 69e9c5986..c190c1814 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -10,6 +10,7 @@ from dstack._internal.core.errors import GatewayError from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import NetworkMode, RegistryAuth, is_core_model_instance +from dstack._internal.core.models.configurations import DevEnvironmentConfiguration from dstack._internal.core.models.instances import InstanceStatus, RemoteConnectionInfo from dstack._internal.core.models.repos import RemoteRepoCreds from dstack._internal.core.models.runs import ( @@ -20,6 +21,7 @@ JobStatus, JobTerminationReason, Run, + RunSpec, ) from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint from dstack._internal.server.db import get_session_ctx @@ -598,6 +600,7 @@ def _process_running( runner_logs=resp.runner_logs, job_logs=resp.job_logs, ) + previous_status = job_model.status if len(resp.job_states) > 0: latest_state_event = resp.job_states[-1] latest_status = latest_state_event.state @@ -613,10 +616,39 @@ def _process_running( ) if latest_state_event.termination_message: job_model.termination_reason_message = latest_state_event.termination_message + else: + _terminate_if_inactivity_duration_exceeded(run_model, job_model, resp.no_connections_secs) + if job_model.status != previous_status: logger.info("%s: now is %s", fmt(job_model), job_model.status.name) return True +def _terminate_if_inactivity_duration_exceeded( + run_model: RunModel, job_model: JobModel, no_connections_secs: Optional[int] +) -> None: + conf = RunSpec.__response__.parse_raw(run_model.run_spec).configuration + if is_core_model_instance(conf, DevEnvironmentConfiguration) and isinstance( + conf.inactivity_duration, int + ): + logger.debug("%s: no SSH connections for %s seconds", fmt(job_model), no_connections_secs) + if no_connections_secs is None: + # TODO(0.19 or earlier): make no_connections_secs required + job_model.status = JobStatus.TERMINATING + job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY + job_model.termination_reason_message = ( + "The selected instance was created before dstack 0.18.41" + " and does not support inactivity_duration" + ) + elif no_connections_secs >= conf.inactivity_duration: + job_model.status = JobStatus.TERMINATING + # TODO(0.19 or earlier): set JobTerminationReason.INACTIVITY_DURATION_EXCEEDED + job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + job_model.termination_reason_message = ( + f"The job was inactive for {no_connections_secs} seconds," + f" exceeding the inactivity_duration of {conf.inactivity_duration} seconds" + ) + + def _get_cluster_info( jobs: List[Job], replica_num: int, diff --git a/src/dstack/_internal/server/migrations/versions/63c3f19cb184_add_jobterminationreason_inactivity_.py b/src/dstack/_internal/server/migrations/versions/63c3f19cb184_add_jobterminationreason_inactivity_.py new file mode 100644 index 000000000..659ac9e09 --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/63c3f19cb184_add_jobterminationreason_inactivity_.py @@ -0,0 +1,83 @@ +"""Add JobTerminationReason.INACTIVITY_DURATION_EXCEEDED + +Revision ID: 63c3f19cb184 +Revises: 1338b788b612 +Create Date: 2025-02-11 22:30:47.289393 + +""" + +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision = "63c3f19cb184" +down_revision = "1338b788b612" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="jobterminationreason", + new_values=[ + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "INACTIVITY_DURATION_EXCEEDED", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="jobs", column_name="termination_reason" + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.sync_enum_values( + enum_schema="public", + enum_name="jobterminationreason", + new_values=[ + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="jobs", column_name="termination_reason" + ) + ], + enum_values_to_rename=[], + ) + # ### end Alembic commands ### diff --git a/src/dstack/_internal/server/schemas/runner.py b/src/dstack/_internal/server/schemas/runner.py index 5af8e2ad1..847fa0624 100644 --- a/src/dstack/_internal/server/schemas/runner.py +++ b/src/dstack/_internal/server/schemas/runner.py @@ -34,6 +34,7 @@ class PullResponse(CoreModel): job_logs: List[LogEvent] runner_logs: List[LogEvent] last_updated: int + no_connections_secs: Optional[int] = None # Optional for compatibility with old runners class SubmitBody(CoreModel): diff --git a/src/dstack/api/server/_runs.py b/src/dstack/api/server/_runs.py index 46cd47133..143b84b26 100644 --- a/src/dstack/api/server/_runs.py +++ b/src/dstack/api/server/_runs.py @@ -7,6 +7,7 @@ from dstack._internal.core.models.common import is_core_model_instance from dstack._internal.core.models.configurations import ( STRIP_PREFIX_DEFAULT, + DevEnvironmentConfiguration, ServiceConfiguration, ) from dstack._internal.core.models.pools import Instance @@ -179,6 +180,11 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[dict]: configuration_excludes["availability_zones"] = True if profile is not None and profile.availability_zones is None: profile_excludes.add("availability_zones") + if ( + is_core_model_instance(configuration, DevEnvironmentConfiguration) + and configuration.inactivity_duration is None + ): + configuration_excludes["inactivity_duration"] = True if configuration_excludes: spec_excludes["configuration"] = configuration_excludes diff --git a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py index 63438ae67..ca15ca679 100644 --- a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py @@ -1,5 +1,6 @@ from datetime import datetime, timezone from pathlib import Path +from typing import Optional from unittest.mock import MagicMock, Mock, patch import pytest @@ -8,11 +9,13 @@ from dstack._internal.core.errors import SSHError from dstack._internal.core.models.backends.base import BackendType from dstack._internal.core.models.common import NetworkMode +from dstack._internal.core.models.configurations import DevEnvironmentConfiguration from dstack._internal.core.models.instances import InstanceStatus from dstack._internal.core.models.runs import ( JobRuntimeData, JobStatus, JobTerminationReason, + RunStatus, ) from dstack._internal.core.models.volumes import ( InstanceMountPoint, @@ -495,3 +498,100 @@ async def test_provisioning_shim_force_stop_if_already_running_api_v1( shim_client_mock.stop.assert_called_once_with(force=True) await session.refresh(job) assert job.status == JobStatus.PROVISIONING + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @pytest.mark.parametrize( + ( + "inactivity_duration", + "no_connections_secs", + "expected_status", + "expected_termination_reason", + ), + [ + pytest.param("1h", 60 * 60 - 1, JobStatus.RUNNING, None, id="duration-not-exceeded"), + pytest.param( + "1h", + 60 * 60, + JobStatus.TERMINATING, + JobTerminationReason.TERMINATED_BY_SERVER, + id="duration-exceeded-exactly", + ), + pytest.param( + "1h", + 60 * 60 + 1, + JobStatus.TERMINATING, + JobTerminationReason.TERMINATED_BY_SERVER, + id="duration-exceeded", + ), + pytest.param("off", 60 * 60, JobStatus.RUNNING, None, id="duration-off"), + pytest.param(False, 60 * 60, JobStatus.RUNNING, None, id="duration-false"), + pytest.param(None, 60 * 60, JobStatus.RUNNING, None, id="duration-none"), + pytest.param( + "1h", + None, + JobStatus.TERMINATING, + JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY, + id="legacy-runner", + ), + pytest.param(None, None, JobStatus.RUNNING, None, id="legacy-runner-without-duration"), + ], + ) + async def test_inactivity_duration( + self, + test_db, + session: AsyncSession, + inactivity_duration, + no_connections_secs: Optional[int], + expected_status: JobStatus, + expected_termination_reason: Optional[JobTerminationReason], + ) -> None: + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + status=RunStatus.RUNNING, + run_name="test-run", + run_spec=get_run_spec( + run_name="test-run", + repo_id=repo.name, + configuration=DevEnvironmentConfiguration( + name="test-run", + ide="vscode", + inactivity_duration=inactivity_duration, + ), + ), + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + job_provisioning_data=get_job_provisioning_data(), + ) + with ( + patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock, + patch( + "dstack._internal.server.services.runner.client.RunnerClient" + ) as RunnerClientMock, + ): + runner_client_mock = RunnerClientMock.return_value + runner_client_mock.pull.return_value = PullResponse( + job_states=[], + job_logs=[], + runner_logs=[], + last_updated=0, + no_connections_secs=no_connections_secs, + ) + await process_running_jobs() + SSHTunnelMock.assert_called_once() + runner_client_mock.pull.assert_called_once() + await session.refresh(job) + assert job.status == expected_status + assert job.termination_reason == expected_termination_reason diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 46db014ff..0f7db6872 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -85,6 +85,7 @@ def get_dev_env_run_plan_dict( "working_dir": None, "home_dir": "/root", "ide": "vscode", + "inactivity_duration": None, "version": None, "image": None, "user": None, @@ -241,6 +242,7 @@ def get_dev_env_run_dict( "home_dir": "/root", "working_dir": None, "ide": "vscode", + "inactivity_duration": None, "version": None, "image": None, "user": None,