Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions src/dstack/_internal/core/models/profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,32 @@ def _validate_fields(cls, values):
return values


class UtilizationPolicy(CoreModel):
min_gpu_utilization: Annotated[
int,
Field(
description=(
"Minimum required GPU utilization, percent."
" If any GPU has utilization below specified value during the whole time window,"
" the run is terminated"
),
ge=0,
le=100,
),
]
time_window: Annotated[
Union[int, str],
Field(
description=(
"The time window of metric samples taking into account to measure utilization"
" (e.g., `30m`, `1h`)"
)
),
]

_validate_time_window = validator("time_window", pre=True, allow_reuse=True)(parse_duration)


class ProfileParams(CoreModel):
backends: Annotated[
Optional[List[BackendType]],
Expand Down Expand Up @@ -194,6 +220,10 @@ class ProfileParams(CoreModel):
)
),
]
utilization_policy: Annotated[
Optional[UtilizationPolicy],
Field(description="Run termination policy based on utilization"),
]
# Deprecated:
termination_policy: Annotated[
Optional[TerminationPolicy],
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/core/models/runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ class JobTerminationReason(str, Enum):
ABORTED_BY_USER = "aborted_by_user"
TERMINATED_BY_SERVER = "terminated_by_server"
INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded"
TERMINATED_DUE_TO_UTILIZATION_POLICY = "terminated_due_to_utilization_policy"
# Set by the runner
CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error"
PORTS_BINDING_FAILED = "ports_binding_failed"
Expand All @@ -135,6 +136,7 @@ def to_status(self) -> JobStatus:
self.ABORTED_BY_USER: JobStatus.ABORTED,
self.TERMINATED_BY_SERVER: JobStatus.TERMINATED,
self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED,
self.TERMINATED_DUE_TO_UTILIZATION_POLICY: JobStatus.TERMINATED,
self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED,
self.PORTS_BINDING_FAILED: JobStatus.FAILED,
self.CREATING_CONTAINER_ERROR: JobStatus.FAILED,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import asyncio
from collections.abc import Iterable
from datetime import timedelta
from typing import Dict, List, Optional

from sqlalchemy import select
Expand All @@ -15,6 +17,7 @@
RemoteConnectionInfo,
SSHConnectionParams,
)
from dstack._internal.core.models.metrics import Metric
from dstack._internal.core.models.repos import RemoteRepoCreds
from dstack._internal.core.models.runs import (
ClusterInfo,
Expand Down Expand Up @@ -48,6 +51,7 @@
)
from dstack._internal.server.services.locking import get_locker
from dstack._internal.server.services.logging import fmt
from dstack._internal.server.services.metrics import get_job_metrics
from dstack._internal.server.services.pools import get_instance_ssh_private_keys
from dstack._internal.server.services.repos import (
get_code_model,
Expand Down Expand Up @@ -343,6 +347,9 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel):
job_model.status = JobStatus.TERMINATING
job_model.termination_reason = JobTerminationReason.GATEWAY_ERROR

if job_model.status == JobStatus.RUNNING:
await _check_gpu_utilization(session, run_model, job_model)

job_model.last_processed_at = common_utils.get_current_datetime()
await session.commit()

Expand Down Expand Up @@ -669,6 +676,45 @@ def _terminate_if_inactivity_duration_exceeded(
)


async def _check_gpu_utilization(
session: AsyncSession, run_model: RunModel, job_model: JobModel
) -> None:
policy = RunSpec.__response__.parse_raw(run_model.run_spec).configuration.utilization_policy
if policy is None:
return
after = common_utils.get_current_datetime() - timedelta(seconds=policy.time_window)
job_metrics = await get_job_metrics(session, job_model, after=after)
gpus_util_metrics: list[Metric] = []
for metric in job_metrics.metrics:
if metric.name.startswith("gpu_util_percent_gpu"):
gpus_util_metrics.append(metric)
if not gpus_util_metrics or gpus_util_metrics[0].timestamps[-1] > after + timedelta(minutes=1):
# Job has started recently, not enough points collected.
# Assuming that metrics collection interval less than 1 minute.
logger.debug("%s: GPU utilization check: not enough samples", fmt(job_model))
return
if _should_terminate_due_to_low_gpu_util(
policy.min_gpu_utilization, [m.values for m in gpus_util_metrics]
):
logger.info("%s: GPU utilization check: terminating", fmt(job_model))
job_model.status = JobStatus.TERMINATING
# TODO(0.19 or earlier): set JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY
job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER
job_model.termination_reason_message = (
f"The job GPU utilization below {policy.min_gpu_utilization}%"
f" for {policy.time_window} seconds"
)
else:
logger.debug("%s: GPU utilization check: OK", fmt(job_model))


def _should_terminate_due_to_low_gpu_util(min_util: int, gpus_util: Iterable[Iterable[int]]):
for gpu_util in gpus_util:
if all(util < min_util for util in gpu_util):
return True
return False


def _get_cluster_info(
jobs: List[Job],
replica_num: int,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Add JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY

Revision ID: 98d1b92988bc
Revises: 60e444118b6d
Create Date: 2025-02-28 15:12:37.649876

"""

import sqlalchemy as sa
from alembic import op
from alembic_postgresql_enum import TableReference

# revision identifiers, used by Alembic.
revision = "98d1b92988bc"
down_revision = "60e444118b6d"
branch_labels = None
depends_on = None


def upgrade() -> None:
# SQLite
with op.batch_alter_table("jobs", schema=None) as batch_op:
batch_op.alter_column(
"termination_reason",
existing_type=sa.VARCHAR(length=34),
type_=sa.Enum(
"FAILED_TO_START_DUE_TO_NO_CAPACITY",
"INTERRUPTED_BY_NO_CAPACITY",
"WAITING_INSTANCE_LIMIT_EXCEEDED",
"WAITING_RUNNER_LIMIT_EXCEEDED",
"TERMINATED_BY_USER",
"VOLUME_ERROR",
"GATEWAY_ERROR",
"SCALED_DOWN",
"DONE_BY_RUNNER",
"ABORTED_BY_USER",
"TERMINATED_BY_SERVER",
"INACTIVITY_DURATION_EXCEEDED",
"TERMINATED_DUE_TO_UTILIZATION_POLICY",
"CONTAINER_EXITED_WITH_ERROR",
"PORTS_BINDING_FAILED",
"CREATING_CONTAINER_ERROR",
"EXECUTOR_ERROR",
"MAX_DURATION_EXCEEDED",
name="jobterminationreason",
),
existing_nullable=True,
)
# PostgreSQL
op.sync_enum_values(
enum_schema="public",
enum_name="jobterminationreason",
new_values=[
"FAILED_TO_START_DUE_TO_NO_CAPACITY",
"INTERRUPTED_BY_NO_CAPACITY",
"WAITING_INSTANCE_LIMIT_EXCEEDED",
"WAITING_RUNNER_LIMIT_EXCEEDED",
"TERMINATED_BY_USER",
"VOLUME_ERROR",
"GATEWAY_ERROR",
"SCALED_DOWN",
"DONE_BY_RUNNER",
"ABORTED_BY_USER",
"TERMINATED_BY_SERVER",
"INACTIVITY_DURATION_EXCEEDED",
"TERMINATED_DUE_TO_UTILIZATION_POLICY",
"CONTAINER_EXITED_WITH_ERROR",
"PORTS_BINDING_FAILED",
"CREATING_CONTAINER_ERROR",
"EXECUTOR_ERROR",
"MAX_DURATION_EXCEEDED",
],
affected_columns=[
TableReference(
table_schema="public", table_name="jobs", column_name="termination_reason"
)
],
enum_values_to_rename=[],
)


def downgrade() -> None:
# SQLite
with op.batch_alter_table("jobs", schema=None) as batch_op:
batch_op.alter_column(
"termination_reason",
existing_type=sa.Enum(
"FAILED_TO_START_DUE_TO_NO_CAPACITY",
"INTERRUPTED_BY_NO_CAPACITY",
"WAITING_INSTANCE_LIMIT_EXCEEDED",
"WAITING_RUNNER_LIMIT_EXCEEDED",
"TERMINATED_BY_USER",
"VOLUME_ERROR",
"GATEWAY_ERROR",
"SCALED_DOWN",
"DONE_BY_RUNNER",
"ABORTED_BY_USER",
"TERMINATED_BY_SERVER",
"INACTIVITY_DURATION_EXCEEDED",
"TERMINATED_DUE_TO_UTILIZATION_POLICY",
"CONTAINER_EXITED_WITH_ERROR",
"PORTS_BINDING_FAILED",
"CREATING_CONTAINER_ERROR",
"EXECUTOR_ERROR",
"MAX_DURATION_EXCEEDED",
name="jobterminationreason",
),
type_=sa.VARCHAR(length=34),
existing_nullable=True,
)
# PostgreSQL
op.sync_enum_values(
enum_schema="public",
enum_name="jobterminationreason",
new_values=[
"FAILED_TO_START_DUE_TO_NO_CAPACITY",
"INTERRUPTED_BY_NO_CAPACITY",
"WAITING_INSTANCE_LIMIT_EXCEEDED",
"WAITING_RUNNER_LIMIT_EXCEEDED",
"TERMINATED_BY_USER",
"VOLUME_ERROR",
"GATEWAY_ERROR",
"SCALED_DOWN",
"DONE_BY_RUNNER",
"ABORTED_BY_USER",
"TERMINATED_BY_SERVER",
"INACTIVITY_DURATION_EXCEEDED",
"CONTAINER_EXITED_WITH_ERROR",
"PORTS_BINDING_FAILED",
"CREATING_CONTAINER_ERROR",
"EXECUTOR_ERROR",
"MAX_DURATION_EXCEEDED",
],
affected_columns=[
TableReference(
table_schema="public", table_name="jobs", column_name="termination_reason"
)
],
enum_values_to_rename=[],
)
23 changes: 21 additions & 2 deletions src/dstack/_internal/server/routers/metrics.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
from typing import Tuple
from datetime import datetime
from typing import Optional, Tuple

from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession

from dstack._internal.core.errors import ResourceNotExistsError
from dstack._internal.core.models.metrics import JobMetrics
from dstack._internal.server.db import get_session
from dstack._internal.server.models import ProjectModel, UserModel
from dstack._internal.server.security.permissions import ProjectMember
from dstack._internal.server.services import metrics
from dstack._internal.server.services.jobs import get_run_job_model
from dstack._internal.server.utils.routers import get_base_api_additional_responses

router = APIRouter(
Expand All @@ -24,13 +27,18 @@ async def get_job_metrics(
run_name: str,
replica_num: int = 0,
job_num: int = 0,
limit: int = 1,
after: Optional[datetime] = None,
before: Optional[datetime] = None,
session: AsyncSession = Depends(get_session),
user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()),
) -> JobMetrics:
"""
Returns job-level metrics such as hardware utilization
given `run_name`, `replica_num`, and `job_num`.
If only `run_name` is specified, returns metrics of `(replica_num=0, job_num=0)`.
By default, returns one latest sample. To control time window/number of samples, use
`limit`, `after`, `before`.

Supported metrics: [
"cpu_usage_percent",
Expand All @@ -42,10 +50,21 @@ async def get_job_metrics(
]
"""
_, project = user_project
return await metrics.get_job_metrics(

job_model = await get_run_job_model(
session=session,
project=project,
run_name=run_name,
replica_num=replica_num,
job_num=job_num,
)
if job_model is None:
raise ResourceNotExistsError("Found no job with given parameters")

return await metrics.get_job_metrics(
session=session,
job_model=job_model,
limit=limit,
after=after,
before=before,
)
Loading