Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion src/dstack/_internal/core/models/configurations.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from dstack._internal.core.models.envs import Env
from dstack._internal.core.models.fleets import FleetConfiguration
from dstack._internal.core.models.gateways import GatewayConfiguration
from dstack._internal.core.models.profiles import ProfileParams
from dstack._internal.core.models.profiles import ProfileParams, parse_off_duration
from dstack._internal.core.models.repos.base import Repo
from dstack._internal.core.models.repos.virtual import VirtualRepo
from dstack._internal.core.models.resources import Range, ResourcesSpec
Expand Down Expand Up @@ -212,6 +212,29 @@ class DevEnvironmentConfigurationParams(CoreModel):
ide: Annotated[Literal["vscode"], Field(description="The IDE to run")]
version: Annotated[Optional[str], Field(description="The version of the IDE")]
init: Annotated[CommandsList, Field(description="The bash commands to run on startup")] = []
inactivity_duration: Annotated[
Optional[Union[Literal["off"], int, bool, str]],
Field(
description=(
"The maximum amount of time the dev environment can be inactive"
" (e.g., `2h`, `1d`, etc)."
" After it elapses, the dev environment is automatically stopped."
" Inactivity is defined as the absence of SSH connections to the"
" dev environment, including VS Code connections, `ssh <run name>`"
" shells, and attached `dstack apply` or `dstack attach` commands."
" Use `off` for unlimited duration. Defaults to `off`"
)
),
]

@validator("inactivity_duration", pre=True, allow_reuse=True)
def parse_inactivity_duration(
cls, v: Optional[Union[Literal["off"], int, bool, str]]
) -> Optional[int]:
v = parse_off_duration(v)
if isinstance(v, int):
return v
return None


class DevEnvironmentConfiguration(
Expand Down
8 changes: 4 additions & 4 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def parse_duration(v: Optional[Union[int, str]]) -> Optional[int]:
return Duration.parse(v)


def parse_max_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
def parse_max_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int]]:
return parse_off_duration(v)


def parse_stop_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
def parse_stop_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int]]:
return parse_off_duration(v)


def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int, bool]]:
def parse_off_duration(v: Optional[Union[int, str, bool]]) -> Optional[Union[str, int]]:
if v == "off" or v is False:
return "off"
if v is True:
Expand Down Expand Up @@ -168,7 +168,7 @@ class ProfileParams(CoreModel):
Optional[Union[Literal["off"], str, int, bool]],
Field(
description=(
"The maximum duration of a run gracefull stopping."
"The maximum duration of a run graceful stopping."
" After it elapses, the run is automatically forced stopped."
" This includes force detaching volumes used by the run."
" Use `off` for unlimited duration. Defaults to `5m`"
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ class JobTerminationReason(str, Enum):
DONE_BY_RUNNER = "done_by_runner"
ABORTED_BY_USER = "aborted_by_user"
TERMINATED_BY_SERVER = "terminated_by_server"
INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded"
# Set by the runner
CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error"
PORTS_BINDING_FAILED = "ports_binding_failed"
Expand All @@ -133,6 +134,7 @@ def to_status(self) -> JobStatus:
self.DONE_BY_RUNNER: JobStatus.DONE,
self.ABORTED_BY_USER: JobStatus.ABORTED,
self.TERMINATED_BY_SERVER: JobStatus.TERMINATED,
self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED,
self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED,
self.PORTS_BINDING_FAILED: JobStatus.FAILED,
self.CREATING_CONTAINER_ERROR: JobStatus.FAILED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from dstack._internal.core.errors import GatewayError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import NetworkMode, RegistryAuth, is_core_model_instance
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration
from dstack._internal.core.models.instances import InstanceStatus, RemoteConnectionInfo
from dstack._internal.core.models.repos import RemoteRepoCreds
from dstack._internal.core.models.runs import (
Expand All @@ -20,6 +21,7 @@
JobStatus,
JobTerminationReason,
Run,
RunSpec,
)
from dstack._internal.core.models.volumes import InstanceMountPoint, Volume, VolumeMountPoint
from dstack._internal.server.db import get_session_ctx
Expand Down Expand Up @@ -598,6 +600,7 @@ def _process_running(
runner_logs=resp.runner_logs,
job_logs=resp.job_logs,
)
previous_status = job_model.status
if len(resp.job_states) > 0:
latest_state_event = resp.job_states[-1]
latest_status = latest_state_event.state
Expand All @@ -613,10 +616,39 @@ def _process_running(
)
if latest_state_event.termination_message:
job_model.termination_reason_message = latest_state_event.termination_message
else:
_terminate_if_inactivity_duration_exceeded(run_model, job_model, resp.no_connections_secs)
if job_model.status != previous_status:
logger.info("%s: now is %s", fmt(job_model), job_model.status.name)
return True


def _terminate_if_inactivity_duration_exceeded(
run_model: RunModel, job_model: JobModel, no_connections_secs: Optional[int]
) -> None:
conf = RunSpec.__response__.parse_raw(run_model.run_spec).configuration
if is_core_model_instance(conf, DevEnvironmentConfiguration) and isinstance(
conf.inactivity_duration, int
):
logger.debug("%s: no SSH connections for %s seconds", fmt(job_model), no_connections_secs)
if no_connections_secs is None:
# TODO(0.19 or earlier): make no_connections_secs required
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY
job_model.termination_reason_message = (
"The selected instance was created before dstack 0.18.41"
" and does not support inactivity_duration"
)
elif no_connections_secs >= conf.inactivity_duration:
job_model.status = JobStatus.TERMINATING
# TODO(0.19 or earlier): set JobTerminationReason.INACTIVITY_DURATION_EXCEEDED
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
job_model.termination_reason_message = (
f"The job was inactive for {no_connections_secs} seconds,"
f" exceeding the inactivity_duration of {conf.inactivity_duration} seconds"
)


def _get_cluster_info(
jobs: List[Job],
replica_num: int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""Add JobTerminationReason.INACTIVITY_DURATION_EXCEEDED

Revision ID: 63c3f19cb184
Revises: 1338b788b612
Create Date: 2025-02-11 22:30:47.289393

"""

from alembic import op
from alembic_postgresql_enum import TableReference

# revision identifiers, used by Alembic.
revision = "63c3f19cb184"
down_revision = "1338b788b612"
branch_labels = None
depends_on = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="jobterminationreason",
new_values=[
"FAILED_TO_START_DUE_TO_NO_CAPACITY",
"INTERRUPTED_BY_NO_CAPACITY",
"WAITING_INSTANCE_LIMIT_EXCEEDED",
"WAITING_RUNNER_LIMIT_EXCEEDED",
"TERMINATED_BY_USER",
"VOLUME_ERROR",
"GATEWAY_ERROR",
"SCALED_DOWN",
"DONE_BY_RUNNER",
"ABORTED_BY_USER",
"TERMINATED_BY_SERVER",
"INACTIVITY_DURATION_EXCEEDED",
"CONTAINER_EXITED_WITH_ERROR",
"PORTS_BINDING_FAILED",
"CREATING_CONTAINER_ERROR",
"EXECUTOR_ERROR",
"MAX_DURATION_EXCEEDED",
],
affected_columns=[
TableReference(
table_schema="public", table_name="jobs", column_name="termination_reason"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.sync_enum_values(
enum_schema="public",
enum_name="jobterminationreason",
new_values=[
"FAILED_TO_START_DUE_TO_NO_CAPACITY",
"INTERRUPTED_BY_NO_CAPACITY",
"WAITING_INSTANCE_LIMIT_EXCEEDED",
"WAITING_RUNNER_LIMIT_EXCEEDED",
"TERMINATED_BY_USER",
"VOLUME_ERROR",
"GATEWAY_ERROR",
"SCALED_DOWN",
"DONE_BY_RUNNER",
"ABORTED_BY_USER",
"TERMINATED_BY_SERVER",
"CONTAINER_EXITED_WITH_ERROR",
"PORTS_BINDING_FAILED",
"CREATING_CONTAINER_ERROR",
"EXECUTOR_ERROR",
"MAX_DURATION_EXCEEDED",
],
affected_columns=[
TableReference(
table_schema="public", table_name="jobs", column_name="termination_reason"
)
],
enum_values_to_rename=[],
)
# ### end Alembic commands ###
1 change: 1 addition & 0 deletions src/dstack/_internal/server/schemas/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class PullResponse(CoreModel):
job_logs: List[LogEvent]
runner_logs: List[LogEvent]
last_updated: int
no_connections_secs: Optional[int] = None # Optional for compatibility with old runners


class SubmitBody(CoreModel):
Expand Down
6 changes: 6 additions & 0 deletions src/dstack/api/server/_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from dstack._internal.core.models.common import is_core_model_instance
from dstack._internal.core.models.configurations import (
STRIP_PREFIX_DEFAULT,
DevEnvironmentConfiguration,
ServiceConfiguration,
)
from dstack._internal.core.models.pools import Instance
Expand Down Expand Up @@ -179,6 +180,11 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[dict]:
configuration_excludes["availability_zones"] = True
if profile is not None and profile.availability_zones is None:
profile_excludes.add("availability_zones")
if (
is_core_model_instance(configuration, DevEnvironmentConfiguration)
and configuration.inactivity_duration is None
):
configuration_excludes["inactivity_duration"] = True

if configuration_excludes:
spec_excludes["configuration"] = configuration_excludes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
from unittest.mock import MagicMock, Mock, patch

import pytest
Expand All @@ -8,11 +9,13 @@
from dstack._internal.core.errors import SSHError
from dstack._internal.core.models.backends.base import BackendType
from dstack._internal.core.models.common import NetworkMode
from dstack._internal.core.models.configurations import DevEnvironmentConfiguration
from dstack._internal.core.models.instances import InstanceStatus
from dstack._internal.core.models.runs import (
JobRuntimeData,
JobStatus,
JobTerminationReason,
RunStatus,
)
from dstack._internal.core.models.volumes import (
InstanceMountPoint,
Expand Down Expand Up @@ -495,3 +498,100 @@ async def test_provisioning_shim_force_stop_if_already_running_api_v1(
shim_client_mock.stop.assert_called_once_with(force=True)
await session.refresh(job)
assert job.status == JobStatus.PROVISIONING

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
@pytest.mark.parametrize(
(
"inactivity_duration",
"no_connections_secs",
"expected_status",
"expected_termination_reason",
),
[
pytest.param("1h", 60 * 60 - 1, JobStatus.RUNNING, None, id="duration-not-exceeded"),
pytest.param(
"1h",
60 * 60,
JobStatus.TERMINATING,
JobTerminationReason.TERMINATED_BY_SERVER,
id="duration-exceeded-exactly",
),
pytest.param(
"1h",
60 * 60 + 1,
JobStatus.TERMINATING,
JobTerminationReason.TERMINATED_BY_SERVER,
id="duration-exceeded",
),
pytest.param("off", 60 * 60, JobStatus.RUNNING, None, id="duration-off"),
pytest.param(False, 60 * 60, JobStatus.RUNNING, None, id="duration-false"),
pytest.param(None, 60 * 60, JobStatus.RUNNING, None, id="duration-none"),
pytest.param(
"1h",
None,
JobStatus.TERMINATING,
JobTerminationReason.INTERRUPTED_BY_NO_CAPACITY,
id="legacy-runner",
),
pytest.param(None, None, JobStatus.RUNNING, None, id="legacy-runner-without-duration"),
],
)
async def test_inactivity_duration(
self,
test_db,
session: AsyncSession,
inactivity_duration,
no_connections_secs: Optional[int],
expected_status: JobStatus,
expected_termination_reason: Optional[JobTerminationReason],
) -> None:
project = await create_project(session=session)
user = await create_user(session=session)
repo = await create_repo(
session=session,
project_id=project.id,
)
run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
status=RunStatus.RUNNING,
run_name="test-run",
run_spec=get_run_spec(
run_name="test-run",
repo_id=repo.name,
configuration=DevEnvironmentConfiguration(
name="test-run",
ide="vscode",
inactivity_duration=inactivity_duration,
),
),
)
job = await create_job(
session=session,
run=run,
status=JobStatus.RUNNING,
job_provisioning_data=get_job_provisioning_data(),
)
with (
patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock,
patch(
"dstack._internal.server.services.runner.client.RunnerClient"
) as RunnerClientMock,
):
runner_client_mock = RunnerClientMock.return_value
runner_client_mock.pull.return_value = PullResponse(
job_states=[],
job_logs=[],
runner_logs=[],
last_updated=0,
no_connections_secs=no_connections_secs,
)
await process_running_jobs()
SSHTunnelMock.assert_called_once()
runner_client_mock.pull.assert_called_once()
await session.refresh(job)
assert job.status == expected_status
assert job.termination_reason == expected_termination_reason
2 changes: 2 additions & 0 deletions src/tests/_internal/server/routers/test_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def get_dev_env_run_plan_dict(
"working_dir": None,
"home_dir": "/root",
"ide": "vscode",
"inactivity_duration": None,
"version": None,
"image": None,
"user": None,
Expand Down Expand Up @@ -241,6 +242,7 @@ def get_dev_env_run_dict(
"home_dir": "/root",
"working_dir": None,
"ide": "vscode",
"inactivity_duration": None,
"version": None,
"image": None,
"user": None,
Expand Down