Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions src/dstack/_internal/server/services/jobs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,13 @@
RunSpec,
)
from dstack._internal.core.services.ssh.tunnel import SSHTunnel, ports_to_forwarded_sockets
from dstack._internal.server.models import InstanceModel, JobModel, ProjectModel, VolumeModel
from dstack._internal.server.models import (
InstanceModel,
JobModel,
ProjectModel,
RunModel,
VolumeModel,
)
from dstack._internal.server.services.backends import get_project_backend_by_type
from dstack._internal.server.services.jobs.configurators.base import JobConfigurator
from dstack._internal.server.services.jobs.configurators.dev import DevEnvironmentJobConfigurator
Expand Down Expand Up @@ -63,20 +69,24 @@ def find_job(jobs: List[Job], replica_num: int, job_num: int) -> Job:
)


async def list_run_job_models(
async def get_run_job_model(
session: AsyncSession, project: ProjectModel, run_name: str, replica_num: int, job_num: int
) -> List[JobModel]:
) -> Optional[JobModel]:
res = await session.execute(
select(JobModel)
.join(JobModel.run)
.where(
JobModel.project_id == project.id,
JobModel.run_name == run_name,
RunModel.project_id == project.id,
# assuming run_name is unique for non-deleted runs
RunModel.run_name == run_name,
RunModel.deleted == False,
JobModel.replica_num == replica_num,
JobModel.job_num == job_num,
)
.order_by(JobModel.submission_num)
.order_by(JobModel.submission_num.desc())
.limit(1)
)
return list(res.scalars().all())
return res.scalar_one_or_none()


def job_model_to_job_submission(job_model: JobModel) -> JobSubmission:
Expand Down
7 changes: 3 additions & 4 deletions src/dstack/_internal/server/services/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from dstack._internal.core.errors import ResourceNotExistsError
from dstack._internal.core.models.metrics import JobMetrics, Metric
from dstack._internal.server.models import JobMetricsPoint, JobModel, ProjectModel
from dstack._internal.server.services.jobs import list_run_job_models
from dstack._internal.server.services.jobs import get_run_job_model


async def get_job_metrics(
Expand All @@ -17,16 +17,15 @@ async def get_job_metrics(
replica_num: int,
job_num: int,
) -> JobMetrics:
job_models = await list_run_job_models(
job_model = await get_run_job_model(
session=session,
project=project,
run_name=run_name,
replica_num=replica_num,
job_num=job_num,
)
if len(job_models) == 0:
if job_model is None:
raise ResourceNotExistsError("Found no job with given parameters")
job_model = job_models[-1]
job_metrics = await _get_job_metrics(
session=session,
job_model=job_model,
Expand Down
2 changes: 2 additions & 0 deletions src/dstack/_internal/server/testing/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ async def create_run(
submitted_at: datetime = datetime(2023, 1, 2, 3, 4, tzinfo=timezone.utc),
run_spec: Optional[RunSpec] = None,
run_id: Optional[UUID] = None,
deleted: bool = False,
) -> RunModel:
if run_spec is None:
run_spec = get_run_spec(
Expand All @@ -250,6 +251,7 @@ async def create_run(
run_id = uuid.uuid4()
run = RunModel(
id=run_id,
deleted=deleted,
project_id=project.id,
repo_id=repo.id,
user_id=user.id,
Expand Down
44 changes: 44 additions & 0 deletions src/tests/_internal/server/routers/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,47 @@ async def test_returns_metrics(self, test_db, session: AsyncSession, client: Asy
},
]
}

@pytest.mark.asyncio
@pytest.mark.parametrize("test_db", ["sqlite", "postgres"], indirect=True)
async def test_ignores_deleted_runs(self, test_db, session: AsyncSession, client: AsyncClient):
user = await create_user(session=session, global_role=GlobalRole.USER)
project = await create_project(session=session, owner=user)
await add_project_member(
session=session, project=project, user=user, project_role=ProjectRole.USER
)
repo = await create_repo(session=session, project_id=project.id)
deleted_run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
run_name="test-run",
deleted=True,
)
active_run = await create_run(
session=session,
project=project,
repo=repo,
user=user,
run_name="test-run",
)
await create_job(session=session, run=deleted_run, job_num=0)
await create_job(session=session, run=deleted_run, job_num=1)
await create_job(session=session, run=active_run, job_num=0)
response_job_0 = await client.get(
f"/api/project/{project.name}/metrics/job/test-run",
params={"job_num": 0},
headers=get_auth_headers(user.token),
)
response_job_1 = await client.get(
f"/api/project/{project.name}/metrics/job/test-run",
params={"job_num": 1},
headers=get_auth_headers(user.token),
)
# Only deleted_run has job_num=1, but it's deleted
assert response_job_1.status_code == 400
assert response_job_1.json()["detail"][0]["code"] == "resource_not_exists"
# job_num=0 is taken from active_run
assert response_job_0.status_code == 200
assert response_job_0.json() == {"metrics": []}