From f03f7d0307ad7a5c4b08da89b540e19c99d983d0 Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Mon, 3 Mar 2025 15:28:11 +0000 Subject: [PATCH 1/2] Add `utilization_policy` If any GPU has GPU util below threshold in all samples in a time window, kill the job (and, consequently, the run) Closes: https://github.com/dstackai/dstack/issues/2374 --- src/dstack/_internal/core/models/profiles.py | 30 +++ src/dstack/_internal/core/models/runs.py | 2 + .../background/tasks/process_running_jobs.py | 46 +++++ ...dd_jobterminationreason_terminated_due_.py | 140 ++++++++++++++ .../_internal/server/routers/metrics.py | 23 ++- .../_internal/server/services/metrics.py | 173 +++++++++++------- src/dstack/api/server/_fleets.py | 2 + src/dstack/api/server/_runs.py | 4 + .../tasks/test_process_running_jobs.py | 125 +++++++++++++ .../_internal/server/routers/test_fleets.py | 2 + .../_internal/server/routers/test_metrics.py | 15 ++ .../_internal/server/routers/test_runs.py | 4 + .../_internal/server/services/test_metrics.py | 163 +++++++++++++++++ 13 files changed, 657 insertions(+), 72 deletions(-) create mode 100644 src/dstack/_internal/server/migrations/versions/98d1b92988bc_add_jobterminationreason_terminated_due_.py create mode 100644 src/tests/_internal/server/services/test_metrics.py diff --git a/src/dstack/_internal/core/models/profiles.py b/src/dstack/_internal/core/models/profiles.py index d9137b15d..80b64eb9a 100644 --- a/src/dstack/_internal/core/models/profiles.py +++ b/src/dstack/_internal/core/models/profiles.py @@ -112,6 +112,32 @@ def _validate_fields(cls, values): return values +class UtilizationPolicy(CoreModel): + min_gpu_utilization: Annotated[ + int, + Field( + description=( + "Minimum required GPU utilization, percent." + " If any GPU has utilization below specified value during the whole time window," + " the run is terminated" + ), + ge=0, + le=100, + ), + ] + time_window: Annotated[ + Union[int, str], + Field( + description=( + "The time window of metric samples taking into account to measure utilization" + " (e.g., `30m`, `1h`)" + ) + ), + ] + + _validate_time_window = validator("time_window", pre=True, allow_reuse=True)(parse_duration) + + class ProfileParams(CoreModel): backends: Annotated[ Optional[List[BackendType]], @@ -194,6 +220,10 @@ class ProfileParams(CoreModel): ) ), ] + utilization_policy: Annotated[ + Optional[UtilizationPolicy], + Field(description="Run termination policy based on utilization"), + ] # Deprecated: termination_policy: Annotated[ Optional[TerminationPolicy], diff --git a/src/dstack/_internal/core/models/runs.py b/src/dstack/_internal/core/models/runs.py index 0ce2670b0..9950f52da 100644 --- a/src/dstack/_internal/core/models/runs.py +++ b/src/dstack/_internal/core/models/runs.py @@ -114,6 +114,7 @@ class JobTerminationReason(str, Enum): ABORTED_BY_USER = "aborted_by_user" TERMINATED_BY_SERVER = "terminated_by_server" INACTIVITY_DURATION_EXCEEDED = "inactivity_duration_exceeded" + TERMINATED_DUE_TO_UTILIZATION_POLICY = "terminated_due_to_utilization_policy" # Set by the runner CONTAINER_EXITED_WITH_ERROR = "container_exited_with_error" PORTS_BINDING_FAILED = "ports_binding_failed" @@ -135,6 +136,7 @@ def to_status(self) -> JobStatus: self.ABORTED_BY_USER: JobStatus.ABORTED, self.TERMINATED_BY_SERVER: JobStatus.TERMINATED, self.INACTIVITY_DURATION_EXCEEDED: JobStatus.TERMINATED, + self.TERMINATED_DUE_TO_UTILIZATION_POLICY: JobStatus.TERMINATED, self.CONTAINER_EXITED_WITH_ERROR: JobStatus.FAILED, self.PORTS_BINDING_FAILED: JobStatus.FAILED, self.CREATING_CONTAINER_ERROR: JobStatus.FAILED, diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index cc7b66232..d1bcafa29 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -1,4 +1,6 @@ import asyncio +from collections.abc import Iterable +from datetime import timedelta from typing import Dict, List, Optional from sqlalchemy import select @@ -15,6 +17,7 @@ RemoteConnectionInfo, SSHConnectionParams, ) +from dstack._internal.core.models.metrics import Metric from dstack._internal.core.models.repos import RemoteRepoCreds from dstack._internal.core.models.runs import ( ClusterInfo, @@ -48,6 +51,7 @@ ) from dstack._internal.server.services.locking import get_locker from dstack._internal.server.services.logging import fmt +from dstack._internal.server.services.metrics import get_job_metrics from dstack._internal.server.services.pools import get_instance_ssh_private_keys from dstack._internal.server.services.repos import ( get_code_model, @@ -343,6 +347,9 @@ async def _process_running_job(session: AsyncSession, job_model: JobModel): job_model.status = JobStatus.TERMINATING job_model.termination_reason = JobTerminationReason.GATEWAY_ERROR + if job_model.status == JobStatus.RUNNING: + await _check_gpu_utilization(session, run_model, job_model) + job_model.last_processed_at = common_utils.get_current_datetime() await session.commit() @@ -669,6 +676,45 @@ def _terminate_if_inactivity_duration_exceeded( ) +async def _check_gpu_utilization( + session: AsyncSession, run_model: RunModel, job_model: JobModel +) -> None: + policy = RunSpec.__response__.parse_raw(run_model.run_spec).configuration.utilization_policy + if policy is None: + return + after = common_utils.get_current_datetime() - timedelta(seconds=policy.time_window) + job_metrics = await get_job_metrics(session, job_model, after=after) + gpus_util_metrics: list[Metric] = [] + for metric in job_metrics.metrics: + if metric.name.startswith("gpu_util_percent_gpu"): + gpus_util_metrics.append(metric) + if not gpus_util_metrics or gpus_util_metrics[0].timestamps[-1] > after + timedelta(minutes=1): + # Job has started recently, not enough points collected. + # Assuming that metrics collection interval less than 1 minute. + logger.debug("%s: GPU utilization check: not enough samples", fmt(job_model)) + return + if _should_terminate_due_to_low_gpu_util( + policy.min_gpu_utilization, [m.values for m in gpus_util_metrics] + ): + logger.debug("%s: GPU utilization check: terminating", fmt(job_model)) + job_model.status = JobStatus.TERMINATING + # TODO(0.19 or earlier): set JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY + job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER + job_model.termination_reason_message = ( + f"The job GPU utilization below {policy.min_gpu_utilization}%" + f" for {policy.time_window} seconds" + ) + else: + logger.debug("%s: GPU utilization check: OK", fmt(job_model)) + + +def _should_terminate_due_to_low_gpu_util(min_util: int, gpus_util: Iterable[Iterable[int]]): + for gpu_util in gpus_util: + if all(util < min_util for util in gpu_util): + return True + return False + + def _get_cluster_info( jobs: List[Job], replica_num: int, diff --git a/src/dstack/_internal/server/migrations/versions/98d1b92988bc_add_jobterminationreason_terminated_due_.py b/src/dstack/_internal/server/migrations/versions/98d1b92988bc_add_jobterminationreason_terminated_due_.py new file mode 100644 index 000000000..76543d8fa --- /dev/null +++ b/src/dstack/_internal/server/migrations/versions/98d1b92988bc_add_jobterminationreason_terminated_due_.py @@ -0,0 +1,140 @@ +"""Add JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY + +Revision ID: 98d1b92988bc +Revises: 60e444118b6d +Create Date: 2025-02-28 15:12:37.649876 + +""" + +import sqlalchemy as sa +from alembic import op +from alembic_postgresql_enum import TableReference + +# revision identifiers, used by Alembic. +revision = "98d1b92988bc" +down_revision = "60e444118b6d" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # SQLite + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.alter_column( + "termination_reason", + existing_type=sa.VARCHAR(length=34), + type_=sa.Enum( + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "INACTIVITY_DURATION_EXCEEDED", + "TERMINATED_DUE_TO_UTILIZATION_POLICY", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + name="jobterminationreason", + ), + existing_nullable=True, + ) + # PostgreSQL + op.sync_enum_values( + enum_schema="public", + enum_name="jobterminationreason", + new_values=[ + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "INACTIVITY_DURATION_EXCEEDED", + "TERMINATED_DUE_TO_UTILIZATION_POLICY", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="jobs", column_name="termination_reason" + ) + ], + enum_values_to_rename=[], + ) + + +def downgrade() -> None: + # SQLite + with op.batch_alter_table("jobs", schema=None) as batch_op: + batch_op.alter_column( + "termination_reason", + existing_type=sa.Enum( + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "INACTIVITY_DURATION_EXCEEDED", + "TERMINATED_DUE_TO_UTILIZATION_POLICY", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + name="jobterminationreason", + ), + type_=sa.VARCHAR(length=34), + existing_nullable=True, + ) + # PostgreSQL + op.sync_enum_values( + enum_schema="public", + enum_name="jobterminationreason", + new_values=[ + "FAILED_TO_START_DUE_TO_NO_CAPACITY", + "INTERRUPTED_BY_NO_CAPACITY", + "WAITING_INSTANCE_LIMIT_EXCEEDED", + "WAITING_RUNNER_LIMIT_EXCEEDED", + "TERMINATED_BY_USER", + "VOLUME_ERROR", + "GATEWAY_ERROR", + "SCALED_DOWN", + "DONE_BY_RUNNER", + "ABORTED_BY_USER", + "TERMINATED_BY_SERVER", + "INACTIVITY_DURATION_EXCEEDED", + "CONTAINER_EXITED_WITH_ERROR", + "PORTS_BINDING_FAILED", + "CREATING_CONTAINER_ERROR", + "EXECUTOR_ERROR", + "MAX_DURATION_EXCEEDED", + ], + affected_columns=[ + TableReference( + table_schema="public", table_name="jobs", column_name="termination_reason" + ) + ], + enum_values_to_rename=[], + ) diff --git a/src/dstack/_internal/server/routers/metrics.py b/src/dstack/_internal/server/routers/metrics.py index 0508f3608..5c1bf3588 100644 --- a/src/dstack/_internal/server/routers/metrics.py +++ b/src/dstack/_internal/server/routers/metrics.py @@ -1,13 +1,16 @@ -from typing import Tuple +from datetime import datetime +from typing import Optional, Tuple from fastapi import APIRouter, Depends from sqlalchemy.ext.asyncio import AsyncSession +from dstack._internal.core.errors import ResourceNotExistsError from dstack._internal.core.models.metrics import JobMetrics from dstack._internal.server.db import get_session from dstack._internal.server.models import ProjectModel, UserModel from dstack._internal.server.security.permissions import ProjectMember from dstack._internal.server.services import metrics +from dstack._internal.server.services.jobs import get_run_job_model from dstack._internal.server.utils.routers import get_base_api_additional_responses router = APIRouter( @@ -24,6 +27,9 @@ async def get_job_metrics( run_name: str, replica_num: int = 0, job_num: int = 0, + limit: int = 1, + after: Optional[datetime] = None, + before: Optional[datetime] = None, session: AsyncSession = Depends(get_session), user_project: Tuple[UserModel, ProjectModel] = Depends(ProjectMember()), ) -> JobMetrics: @@ -31,6 +37,8 @@ async def get_job_metrics( Returns job-level metrics such as hardware utilization given `run_name`, `replica_num`, and `job_num`. If only `run_name` is specified, returns metrics of `(replica_num=0, job_num=0)`. + By default, returns one latest sample. To control time window/number of samples, use + `limit`, `after`, `before`. Supported metrics: [ "cpu_usage_percent", @@ -42,10 +50,21 @@ async def get_job_metrics( ] """ _, project = user_project - return await metrics.get_job_metrics( + + job_model = await get_run_job_model( session=session, project=project, run_name=run_name, replica_num=replica_num, job_num=job_num, ) + if job_model is None: + raise ResourceNotExistsError("Found no job with given parameters") + + return await metrics.get_job_metrics( + session=session, + job_model=job_model, + limit=limit, + after=after, + before=before, + ) diff --git a/src/dstack/_internal/server/services/metrics.py b/src/dstack/_internal/server/services/metrics.py index 87f8ed769..f5f7ceefe 100644 --- a/src/dstack/_internal/server/services/metrics.py +++ b/src/dstack/_internal/server/services/metrics.py @@ -1,113 +1,146 @@ import json +from collections import defaultdict +from collections.abc import Sequence from datetime import datetime, timezone +from typing import Optional from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession -from dstack._internal.core.errors import ResourceNotExistsError from dstack._internal.core.models.metrics import JobMetrics, Metric -from dstack._internal.server.models import JobMetricsPoint, JobModel, ProjectModel -from dstack._internal.server.services.jobs import get_run_job_model +from dstack._internal.server.models import JobMetricsPoint, JobModel +from dstack._internal.utils.logging import get_logger - -async def get_job_metrics( - session: AsyncSession, - project: ProjectModel, - run_name: str, - replica_num: int, - job_num: int, -) -> JobMetrics: - job_model = await get_run_job_model( - session=session, - project=project, - run_name=run_name, - replica_num=replica_num, - job_num=job_num, - ) - if job_model is None: - raise ResourceNotExistsError("Found no job with given parameters") - job_metrics = await _get_job_metrics( - session=session, - job_model=job_model, - ) - return job_metrics +logger = get_logger(__name__) -async def _get_job_metrics( +async def get_job_metrics( session: AsyncSession, job_model: JobModel, + after: Optional[datetime] = None, + before: Optional[datetime] = None, + limit: Optional[int] = None, ) -> JobMetrics: - res = await session.execute( + """ + Returns metrics ordered from the latest to the earliest. + + Expected usage: + * limit=100 — get the latest 100 points + * after= — get points for the last one hour + * before=, limit=100 ­— paginate back in history + """ + stmt = ( select(JobMetricsPoint) .where(JobMetricsPoint.job_id == job_model.id) .order_by(JobMetricsPoint.timestamp_micro.desc()) - .limit(2) ) + if after is not None: + # we need +1 point for cpu_usage_percent, thus >= + stmt = stmt.where(JobMetricsPoint.timestamp_micro >= _datetime_to_unix_time_micro(after)) + if before is not None: + stmt = stmt.where(JobMetricsPoint.timestamp_micro < _datetime_to_unix_time_micro(before)) + if limit is not None: + # +1 for cpu_usage_percent + stmt = stmt.limit(limit + 1) + res = await session.execute(stmt) points = res.scalars().all() + # we need at least 2 points to calculate cpu_usage_percent if len(points) < 2: return JobMetrics(metrics=[]) - last_point = points[0] - prev_point = points[1] - return _calculate_job_metrics(last_point, prev_point) + return _calculate_job_metrics(points) + + +def _calculate_job_metrics(points: Sequence[JobMetricsPoint]) -> JobMetrics: + timestamps: list[datetime] = [] + cpu_usage_points: list[int] = [] + memory_usage_points: list[int] = [] + memory_working_set_points: list[int] = [] + gpus_memory_usage_points: defaultdict[int, list[int]] = defaultdict(list) + gpus_util_points: defaultdict[int, list[int]] = defaultdict(list) + gpus_detected_num: Optional[int] = None + gpus_detected_num_mismatch: bool = False + for point, prev_point in zip(points, points[1:]): + timestamps.append(_unix_time_micro_to_datetime(point.timestamp_micro)) + cpu_usage_points.append(_get_cpu_usage(point, prev_point)) + memory_usage_points.append(point.memory_usage_bytes) + memory_working_set_points.append(point.memory_working_set_bytes) + gpus_memory_usage = json.loads(point.gpus_memory_usage_bytes) + gpus_util = json.loads(point.gpus_util_percent) + if gpus_detected_num is None: + gpus_detected_num = len(gpus_memory_usage) + if len(gpus_memory_usage) != gpus_detected_num or len(gpus_util) != gpus_detected_num: + gpus_detected_num_mismatch = True + if not gpus_detected_num_mismatch: + for i in range(gpus_detected_num): + gpus_memory_usage_points[i].append(gpus_memory_usage[i]) + gpus_util_points[i].append(gpus_util[i]) -def _calculate_job_metrics(last_point: JobMetricsPoint, prev_point: JobMetricsPoint) -> JobMetrics: - metrics = [] - timestamp = _unix_time_micro_to_datetime(last_point.timestamp_micro) - metrics.append( + metrics: list[Metric] = [ Metric( name="cpu_usage_percent", - timestamps=[timestamp], - values=[_get_cpu_usage(last_point, prev_point)], - ) - ) - metrics.append( + timestamps=timestamps, + values=cpu_usage_points, + ), Metric( name="memory_usage_bytes", - timestamps=[timestamp], - values=[last_point.memory_usage_bytes], - ) - ) - metrics.append( + timestamps=timestamps, + values=memory_usage_points, + ), Metric( name="memory_working_set_bytes", - timestamps=[timestamp], - values=[last_point.memory_working_set_bytes], - ) - ) - - gpus_memory_usage_bytes = json.loads(last_point.gpus_memory_usage_bytes) - gpus_util_percent = json.loads(last_point.gpus_util_percent) - gpus_detected_num = len(gpus_memory_usage_bytes) - metrics.append( - Metric( - name="gpus_detected_num", - timestamps=[timestamp], - values=[gpus_detected_num], - ) - ) - for i in range(gpus_detected_num): + timestamps=timestamps, + values=memory_working_set_points, + ), + ] + if gpus_detected_num_mismatch: + # If number of GPUs changed in the time window, skip GPU metrics altogether, otherwise + # results can be unpredictable (e.g, one GPU takes place of another, as they are + # identified by an array index only). + logger.warning("gpus_detected_num mismatch, skipping GPU metrics") + else: metrics.append( + # As gpus_detected_num expected to be constant, we add only two points — the latest + # and the earliest in the batch Metric( - name=f"gpu_memory_usage_bytes_gpu{i}", - timestamps=[timestamp], - values=[gpus_memory_usage_bytes[i]], + name="gpus_detected_num", + timestamps=[timestamps[0], timestamps[-1]] + if len(timestamps) > 1 + else [timestamps[0]], + values=[gpus_detected_num, gpus_detected_num] + if len(timestamps) > 1 + else [gpus_detected_num], ) ) - metrics.append( - Metric( - name=f"gpu_util_percent_gpu{i}", - timestamps=[timestamp], - values=[gpus_util_percent[i]], + for index, gpu_memory_usage_points in gpus_memory_usage_points.items(): + metrics.append( + Metric( + name=f"gpu_memory_usage_bytes_gpu{index}", + timestamps=timestamps, + values=gpu_memory_usage_points, + ) + ) + for index, gpu_util_points in gpus_util_points.items(): + metrics.append( + Metric( + name=f"gpu_util_percent_gpu{index}", + timestamps=timestamps, + values=gpu_util_points, + ) ) - ) return JobMetrics(metrics=metrics) def _get_cpu_usage(last_point: JobMetricsPoint, prev_point: JobMetricsPoint) -> int: window = last_point.timestamp_micro - prev_point.timestamp_micro + if window == 0: + return 0 return round((last_point.cpu_usage_micro - prev_point.cpu_usage_micro) / window * 100) def _unix_time_micro_to_datetime(unix_time_ms: int) -> datetime: return datetime.fromtimestamp(unix_time_ms / 1_000_000, tz=timezone.utc) + + +def _datetime_to_unix_time_micro(dt: datetime) -> int: + return int(dt.timestamp() * 1_000_000) diff --git a/src/dstack/api/server/_fleets.py b/src/dstack/api/server/_fleets.py index 822d3f3a2..1b6e1fa11 100644 --- a/src/dstack/api/server/_fleets.py +++ b/src/dstack/api/server/_fleets.py @@ -104,6 +104,8 @@ def _get_fleet_spec_excludes(fleet_spec: FleetSpec) -> Optional[_ExcludeDict]: profile_excludes.add("availability_zones") if fleet_spec.configuration.blocks == 1: configuration_excludes["blocks"] = True + if fleet_spec.profile is not None and fleet_spec.profile.utilization_policy is None: + profile_excludes.add("utilization_policy") if ssh_hosts_excludes: ssh_config_excludes["hosts"] = {"__all__": ssh_hosts_excludes} diff --git a/src/dstack/api/server/_runs.py b/src/dstack/api/server/_runs.py index 143b84b26..5b063d2cf 100644 --- a/src/dstack/api/server/_runs.py +++ b/src/dstack/api/server/_runs.py @@ -185,6 +185,10 @@ def _get_run_spec_excludes(run_spec: RunSpec) -> Optional[dict]: and configuration.inactivity_duration is None ): configuration_excludes["inactivity_duration"] = True + if configuration.utilization_policy is None: + configuration_excludes["utilization_policy"] = True + if profile is not None and profile.utilization_policy is None: + profile_excludes.add("utilization_policy") if configuration_excludes: spec_excludes["configuration"] = configuration_excludes diff --git a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py index a652f46b0..7b74559d8 100644 --- a/src/tests/_internal/server/background/tasks/test_process_running_jobs.py +++ b/src/tests/_internal/server/background/tasks/test_process_running_jobs.py @@ -4,6 +4,7 @@ from unittest.mock import MagicMock, Mock, patch import pytest +from freezegun import freeze_time from sqlalchemy.ext.asyncio import AsyncSession from dstack._internal.core.errors import SSHError @@ -11,6 +12,7 @@ from dstack._internal.core.models.common import NetworkMode from dstack._internal.core.models.configurations import DevEnvironmentConfiguration from dstack._internal.core.models.instances import InstanceStatus +from dstack._internal.core.models.profiles import UtilizationPolicy from dstack._internal.core.models.runs import ( JobRuntimeData, JobStatus, @@ -39,6 +41,7 @@ from dstack._internal.server.testing.common import ( create_instance, create_job, + create_job_metrics_point, create_pool, create_project, create_repo, @@ -688,3 +691,125 @@ async def test_inactivity_duration( assert job.status == expected_status assert job.termination_reason == expected_termination_reason assert job.inactivity_secs == expected_inactivity_secs + + @pytest.mark.asyncio + @pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) + @pytest.mark.parametrize( + ["samples", "expected_status"], + [ + pytest.param( + [ + (datetime(2023, 1, 1, 12, 25, 20, tzinfo=timezone.utc), 30), + (datetime(2023, 1, 1, 12, 25, 30, tzinfo=timezone.utc), 30), + (datetime(2023, 1, 1, 12, 29, 50, tzinfo=timezone.utc), 40), + ], + JobStatus.RUNNING, + id="not-enough-points", + ), + pytest.param( + [ + (datetime(2023, 1, 1, 12, 20, 10, tzinfo=timezone.utc), 30), + (datetime(2023, 1, 1, 12, 20, 20, tzinfo=timezone.utc), 30), + (datetime(2023, 1, 1, 12, 29, 50, tzinfo=timezone.utc), 80), + ], + JobStatus.RUNNING, + id="any-above-min", + ), + pytest.param( + [ + (datetime(2023, 1, 1, 12, 10, 10, tzinfo=timezone.utc), 80), # outside window + (datetime(2023, 1, 1, 12, 10, 20, tzinfo=timezone.utc), 80), # outside window + (datetime(2023, 1, 1, 12, 20, 10, tzinfo=timezone.utc), 30), + (datetime(2023, 1, 1, 12, 20, 20, tzinfo=timezone.utc), 30), + (datetime(2023, 1, 1, 12, 29, 50, tzinfo=timezone.utc), 40), + ], + JobStatus.TERMINATING, + id="all-below-min", + ), + ], + ) + @freeze_time(datetime(2023, 1, 1, 12, 30, tzinfo=timezone.utc)) + async def test_gpu_utilization( + self, + test_db, + session: AsyncSession, + samples: list[tuple[datetime, int]], + expected_status: JobStatus, + ) -> None: + project = await create_project(session=session) + user = await create_user(session=session) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + status=RunStatus.RUNNING, + run_name="test-run", + run_spec=get_run_spec( + run_name="test-run", + repo_id=repo.name, + configuration=DevEnvironmentConfiguration( + name="test-run", + ide="vscode", + utilization_policy=UtilizationPolicy( + min_gpu_utilization=80, + time_window=600, + ), + ), + ), + ) + pool = await create_pool(session=session, project=project) + instance = await create_instance( + session=session, + project=project, + pool=pool, + status=InstanceStatus.BUSY, + ) + job = await create_job( + session=session, + run=run, + status=JobStatus.RUNNING, + job_provisioning_data=get_job_provisioning_data(), + instance=instance, + instance_assigned=True, + ) + for timestamp, gpu_util in samples: + # two GPUs, the second one always 100% utilized + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=timestamp, + gpus_memory_usage_bytes=[1024, 1024], + gpus_util_percent=[gpu_util, 100], + ) + with ( + patch("dstack._internal.server.services.runner.ssh.SSHTunnel") as SSHTunnelMock, + patch( + "dstack._internal.server.services.runner.client.RunnerClient" + ) as RunnerClientMock, + ): + runner_client_mock = RunnerClientMock.return_value + runner_client_mock.pull.return_value = PullResponse( + job_states=[], + job_logs=[], + runner_logs=[], + last_updated=0, + no_connections_secs=0, + ) + await process_running_jobs() + SSHTunnelMock.assert_called_once() + runner_client_mock.pull.assert_called_once() + await session.refresh(job) + assert job.status == expected_status + if expected_status == JobStatus.TERMINATING: + assert job.termination_reason == JobTerminationReason.TERMINATED_BY_SERVER + assert job.termination_reason_message == ( + "The job GPU utilization below 80% for 600 seconds" + ) + else: + assert job.termination_reason is None + assert job.termination_reason_message is None diff --git a/src/tests/_internal/server/routers/test_fleets.py b/src/tests/_internal/server/routers/test_fleets.py index 2ad610603..a64093db7 100644 --- a/src/tests/_internal/server/routers/test_fleets.py +++ b/src/tests/_internal/server/routers/test_fleets.py @@ -370,6 +370,7 @@ async def test_creates_fleet(self, test_db, session: AsyncSession, client: Async "idle_duration": None, "termination_policy": None, "termination_idle_time": None, + "utilization_policy": None, "name": "", "default": False, "reservation": None, @@ -495,6 +496,7 @@ async def test_creates_ssh_fleet(self, test_db, session: AsyncSession, client: A "idle_duration": None, "termination_policy": None, "termination_idle_time": None, + "utilization_policy": None, "name": "", "default": False, "reservation": None, diff --git a/src/tests/_internal/server/routers/test_metrics.py b/src/tests/_internal/server/routers/test_metrics.py index b7a038b55..e3b06d470 100644 --- a/src/tests/_internal/server/routers/test_metrics.py +++ b/src/tests/_internal/server/routers/test_metrics.py @@ -55,11 +55,25 @@ async def test_returns_metrics(self, test_db, session: AsyncSession, client: Asy session=session, run=run, ) + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=datetime(2023, 1, 2, 3, 4, 5, tzinfo=timezone.utc), + cpu_usage_micro=2 * 1_000_000, + memory_usage_bytes=256, + memory_working_set_bytes=128, + gpus_memory_usage_bytes=[256], + gpus_util_percent=[2], + ) await create_job_metrics_point( session=session, job_model=job, timestamp=datetime(2023, 1, 2, 3, 4, 15, tzinfo=timezone.utc), cpu_usage_micro=4 * 1_000_000, + memory_usage_bytes=512, + memory_working_set_bytes=256, + gpus_memory_usage_bytes=[512], + gpus_util_percent=[6], ) await create_job_metrics_point( session=session, @@ -76,6 +90,7 @@ async def test_returns_metrics(self, test_db, session: AsyncSession, client: Asy headers=get_auth_headers(user.token), ) assert response.status_code == 200 + # Returns one last sample by default. Filtering is tested in services/test_metrics.py assert response.json() == { "metrics": [ { diff --git a/src/tests/_internal/server/routers/test_runs.py b/src/tests/_internal/server/routers/test_runs.py index 085e2315a..bc6fb7c61 100644 --- a/src/tests/_internal/server/routers/test_runs.py +++ b/src/tests/_internal/server/routers/test_runs.py @@ -126,6 +126,7 @@ def get_dev_env_run_plan_dict( "idle_duration": None, "termination_idle_time": 300, "termination_policy": None, + "utilization_policy": None, "reservation": None, }, "configuration_path": "dstack.yaml", @@ -148,6 +149,7 @@ def get_dev_env_run_plan_dict( "idle_duration": None, "termination_idle_time": 300, "termination_policy": None, + "utilization_policy": None, "reservation": None, }, "repo_code_hash": None, @@ -283,6 +285,7 @@ def get_dev_env_run_dict( "idle_duration": None, "termination_idle_time": 300, "termination_policy": None, + "utilization_policy": None, "reservation": None, }, "configuration_path": "dstack.yaml", @@ -305,6 +308,7 @@ def get_dev_env_run_dict( "idle_duration": None, "termination_idle_time": 300, "termination_policy": None, + "utilization_policy": None, "reservation": None, }, "repo_code_hash": None, diff --git a/src/tests/_internal/server/services/test_metrics.py b/src/tests/_internal/server/services/test_metrics.py new file mode 100644 index 000000000..5843a66c2 --- /dev/null +++ b/src/tests/_internal/server/services/test_metrics.py @@ -0,0 +1,163 @@ +from datetime import datetime, timedelta, timezone + +import pytest +from sqlalchemy.ext.asyncio import AsyncSession + +from dstack._internal.core.models.metrics import Metric +from dstack._internal.server.services.metrics import get_job_metrics +from dstack._internal.server.testing.common import ( + create_job, + create_job_metrics_point, + create_project, + create_repo, + create_run, + create_user, +) + + +@pytest.mark.asyncio +@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True) +@pytest.mark.usefixtures("test_db", "image_config_mock") +class TestGetMetrics: + latest_ts = datetime(2023, 1, 2, 3, 4, 25, tzinfo=timezone.utc) + ts: tuple[datetime, ...] = ( + latest_ts, # 0 + latest_ts - timedelta(seconds=10), # 1 + latest_ts - timedelta(seconds=20), # 2 + latest_ts - timedelta(seconds=30), # 3 + latest_ts - timedelta(seconds=40), # 4 + latest_ts - timedelta(seconds=50), # 5 + ) + # dt, cpu_usage_sec, memory_usage_bytes, memory_ws_bytes, gpu0_memory_usage_bytes, gpu0_util, + # gpu1_memory_usage_bytess, gpu1_util + points: tuple[tuple[datetime, int, int, int, int, int, int, int], ...] = ( + (ts[0], 110, 512, 128, 768, 15, 128, 20), + (ts[1], 104, 1024, 512, 1024, 10, 256, 10), + (ts[2], 100, 1024, 512, 1024, 20, 128, 5), + (ts[3], 90, 512, 512, 2048, 40, 512, 20), + (ts[4], 90, 1024, 1024, 1024, 0, 128, 0), + (ts[5], 80, 512, 512, 1024, 10, 256, 0), + ) + + @pytest.mark.parametrize( + ["params", "ts", "cpu", "mem", "mem_ws", "gpu0_mem", "gpu0_util", "gpu1_mem", "gpu1_util"], + [ + pytest.param( + {"limit": 1}, + [ts[0]], + [60], + [512], + [128], + [768], + [15], + [128], + [20], + id="limit-1-latest", + ), + pytest.param( + {"limit": 3}, + [ts[0], ts[1], ts[2]], + [60, 40, 100], + [512, 1024, 1024], + [128, 512, 512], + [768, 1024, 1024], + [15, 10, 20], + [128, 256, 128], + [20, 10, 5], + id="limit-3-latest", + ), + pytest.param( + {}, + [ts[0], ts[1], ts[2], ts[3], ts[4]], + [60, 40, 100, 0, 100], + [512, 1024, 1024, 512, 1024], + [128, 512, 512, 512, 1024], + [768, 1024, 1024, 2048, 1024], + [15, 10, 20, 40, 0], + [128, 256, 128, 512, 128], + [20, 10, 5, 20, 0], + id="all", + ), + pytest.param( + {"after": ts[3]}, + [ts[0], ts[1], ts[2]], + [60, 40, 100], + [512, 1024, 1024], + [128, 512, 512], + [768, 1024, 1024], + [15, 10, 20], + [128, 256, 128], + [20, 10, 5], + id="all-after", + ), + pytest.param( + {"before": ts[2]}, + [ts[3], ts[4]], + [0, 100], + [512, 1024], + [512, 1024], + [2048, 1024], + [40, 0], + [512, 128], + [20, 0], + id="all-before", + ), + ], + ) + async def test_get_metrics( + self, + session: AsyncSession, + params: dict, + ts: list[datetime], + cpu: list[int], + mem: list[int], + mem_ws: list[int], + gpu0_mem: list[int], + gpu0_util: list[int], + gpu1_mem: list[int], + gpu1_util: list[int], + ): + user = await create_user(session=session) + project = await create_project(session=session, owner=user) + repo = await create_repo( + session=session, + project_id=project.id, + ) + run = await create_run( + session=session, + project=project, + repo=repo, + user=user, + ) + job = await create_job( + session=session, + run=run, + ) + for dt, _cpu, _mem, _mem_ws, _gpu0_mem, _gpu0_util, _gpu1_mem, _gpu1_util in self.points: + await create_job_metrics_point( + session=session, + job_model=job, + timestamp=dt, + cpu_usage_micro=_cpu * 1_000_000, + memory_usage_bytes=_mem, + memory_working_set_bytes=_mem_ws, + gpus_memory_usage_bytes=[_gpu0_mem, _gpu1_mem], + gpus_util_percent=[_gpu0_util, _gpu1_util], + ) + + metrics = await get_job_metrics(session, job, **params) + + assert metrics.metrics == [ + Metric(name="cpu_usage_percent", timestamps=ts, values=cpu), + Metric(name="memory_usage_bytes", timestamps=ts, values=mem), + Metric(name="memory_working_set_bytes", timestamps=ts, values=mem_ws), + Metric( + name="gpus_detected_num", + timestamps=[ts[0], ts[-1]] if len(ts) > 1 else ts, + values=[2, 2] if len(ts) > 1 else [2], + ), + Metric(name="gpu_memory_usage_bytes_gpu0", timestamps=ts, values=gpu0_mem), + Metric(name="gpu_memory_usage_bytes_gpu1", timestamps=ts, values=gpu1_mem), + Metric(name="gpu_util_percent_gpu0", timestamps=ts, values=gpu0_util), + Metric(name="gpu_util_percent_gpu1", timestamps=ts, values=gpu1_util), + ] From 3a39af1b505778a8a138fd045c2aadbab4b6ac01 Mon Sep 17 00:00:00 2001 From: Dmitry Meyer Date: Tue, 4 Mar 2025 07:01:56 +0000 Subject: [PATCH 2/2] Increase log level --- .../_internal/server/background/tasks/process_running_jobs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dstack/_internal/server/background/tasks/process_running_jobs.py b/src/dstack/_internal/server/background/tasks/process_running_jobs.py index d1bcafa29..7403a1854 100644 --- a/src/dstack/_internal/server/background/tasks/process_running_jobs.py +++ b/src/dstack/_internal/server/background/tasks/process_running_jobs.py @@ -696,7 +696,7 @@ async def _check_gpu_utilization( if _should_terminate_due_to_low_gpu_util( policy.min_gpu_utilization, [m.values for m in gpus_util_metrics] ): - logger.debug("%s: GPU utilization check: terminating", fmt(job_model)) + logger.info("%s: GPU utilization check: terminating", fmt(job_model)) job_model.status = JobStatus.TERMINATING # TODO(0.19 or earlier): set JobTerminationReason.TERMINATED_DUE_TO_UTILIZATION_POLICY job_model.termination_reason = JobTerminationReason.TERMINATED_BY_SERVER