Skip to content

Commit

Permalink
Refactor trailblazer scan (#423)(patch)
Browse files Browse the repository at this point in the history
## Fixed
- Extract logic for parsing slurm from data access layer
- Abstract away slurm interactions to enable going via API or CLI
  • Loading branch information
seallard committed Apr 30, 2024
1 parent 24ef98e commit 027b716
Show file tree
Hide file tree
Showing 21 changed files with 227 additions and 60 deletions.
33 changes: 0 additions & 33 deletions tests/cli/test_cli_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
get_user_from_db,
get_users_from_db,
ls_cmd,
scan,
set_analysis_completed,
set_analysis_status,
unarchive_user,
Expand Down Expand Up @@ -476,38 +475,6 @@ def test_unarchive_user(
assert f"User unarchived: {archived_user_email}" in caplog.text


def test_scan(
cli_runner: CliRunner,
analysis_store: MockStore,
caplog,
mocker,
ongoing_analysis_case_id: str,
slurm_squeue_output: dict[str, str],
):
"""Test scanning for analyses and updating analysis status."""
caplog.set_level("INFO")

# GIVEN SLURM squeue output for an analysis
mocker.patch(
FUNC_GET_SLURM_SQUEUE_OUTPUT_PATH,
return_value=subprocess.check_output(
["cat", slurm_squeue_output.get(ongoing_analysis_case_id)]
).decode(CharacterFormat.UNICODE_TRANSFORMATION_FORMAT_8),
)

# GIVEN populated Trailblazer database with pending analyses

# WHEN running trailblazer scan command
cli_runner.invoke(scan, [])

# THEN log that analyses are updated
assert "All analyses updated" in caplog.text

# THEN the status of analysis should be updated from pending
analysis = analysis_store.get_latest_analysis_for_case(ongoing_analysis_case_id)
assert analysis.status == TrailblazerStatus.RUNNING


@pytest.mark.parametrize(
"case_id, status",
[
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ def slurm_squeue_output(squeue_dir: Path) -> dict[str, str]:
}


@pytest.fixture
def slurm_job_ids() -> list[int]:
return [690993, 690994, 690992, 690988, 690989, 690990]


@pytest.fixture(scope="session")
def tower_case_config() -> dict[str, dict]:
"""Return a Tower case configs."""
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/services/test_job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def test_update_upload_jobs(

# GIVEN that slurm says the upload job is completed
slurm_service_mock = mock.Mock()
slurm_service_mock.get_job_info.return_value = upload_job_info
slurm_service_mock.get_job.return_value = upload_job_info
job_service.slurm_service = slurm_service_mock

# WHEN updating the upload jobs
Expand Down
37 changes: 35 additions & 2 deletions tests/services/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,42 @@
from unittest.mock import Mock
import pytest

from trailblazer.services.analysis_service.analysis_service import AnalysisService
from trailblazer.services.job_service.job_service import JobService
from trailblazer.services.slurm.dtos import SlurmJobInfo
from trailblazer.services.slurm.slurm_cli_service.slurm_cli_service import SlurmCLIService
from trailblazer.services.slurm.slurm_service import SlurmService
from trailblazer.store.store import Store


@pytest.fixture
def analysis_service(analysis_store: Store) -> AnalysisService:
return AnalysisService(analysis_store)
def slurm_completed_job_info() -> SlurmJobInfo:
return SlurmJobInfo(
slurm_id=690994,
name="job",
status="completed",
elapsed=1000,
started_at="2020-10-22T11:43:33",
)


@pytest.fixture
def slurm_service(slurm_completed_job_info) -> SlurmService:
"""Slurm service reporting all jobs as completed."""
service = SlurmCLIService(client=Mock())
service.get_job = Mock(return_value=slurm_completed_job_info)
service.get_jobs = Mock(return_value=[slurm_completed_job_info])
return service


@pytest.fixture
def job_service(store: Store, slurm_service: SlurmService, slurm_job_ids, mocker) -> JobService:
mocker.patch(
"trailblazer.services.job_service.job_service.get_slurm_job_ids", return_value=slurm_job_ids
)
return JobService(store=store, slurm_service=slurm_service)


@pytest.fixture
def analysis_service(analysis_store: Store, job_service: JobService) -> AnalysisService:
return AnalysisService(store=analysis_store, job_service=job_service)
13 changes: 13 additions & 0 deletions tests/services/test_analysis_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from trailblazer.constants import TrailblazerStatus
from trailblazer.dto.update_analyses import AnalysisUpdate, UpdateAnalyses
from trailblazer.services.analysis_service.analysis_service import AnalysisService
from trailblazer.store.models import Analysis, User
Expand All @@ -15,3 +16,15 @@ def test_patch_analyses_delivered(
analysis_service.update_analyses(data=update_analyses, user=user)

assert analysis.delivered_by


def test_update_analysis_status(analysis_store: Store, analysis_service: AnalysisService):
# GIVEN a store with ongoing analyses
assert analysis_store.get_ongoing_analyses()
# GIVEN that all slurm jobs are completed

# WHEN updating the status of the analyses
analysis_service.update_ongoing_analyses()

# THEN the status of all analyses should be updated
assert not analysis_store.get_ongoing_analyses()
4 changes: 3 additions & 1 deletion tests/store/crud/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ def test_delete_analysis(analysis_store: MockStore, case_id: str):
"""Test analysis is successfully deleted."""

# GIVEN a not ongoing analysis
analysis_store.update_analysis_status(case_id=case_id, status=TrailblazerStatus.CANCELLED)
analysis_store.update_analysis_status_by_case_id(
case_id=case_id, status=TrailblazerStatus.CANCELLED
)
analysis: Analysis | None = analysis_store.get_latest_analysis_for_case(case_id=case_id)

# WHEN deleting analysis
Expand Down
4 changes: 3 additions & 1 deletion tests/store/crud/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,7 +291,9 @@ def test_update_analysis_status_with_failed(analysis_store: MockStore, case_id:
assert analysis.status != TrailblazerStatus.FAILED

# WHEN setting analysis to failed
analysis_store.update_analysis_status(case_id=analysis.case_id, status=TrailblazerStatus.FAILED)
analysis_store.update_analysis_status_by_case_id(
case_id=analysis.case_id, status=TrailblazerStatus.FAILED
)

# THEN the analysis status should be updated to failed
assert analysis.status == TrailblazerStatus.FAILED
Expand Down
2 changes: 1 addition & 1 deletion trailblazer/apps/slurm/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def cancel_slurm_job(slurm_id: int, analysis_host: str | None = None) -> None:
subprocess.Popen(scancel_commands)


def _get_slurm_queue(job_ids: str, analysis_host: str | None = None) -> SqueueResult:
def get_slurm_queue(job_ids: str, analysis_host: str | None = None) -> SqueueResult:
"""Return squeue output from ongoing analyses in SLURM."""
queue_output: str = get_slurm_queue_output(job_ids=job_ids, analysis_host=analysis_host)
return get_squeue_result(queue_output)
Expand Down
2 changes: 1 addition & 1 deletion trailblazer/cli/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ def set_analysis_status(
"""Set the status of the latest analysis for a given case id."""
trailblazer_db: Store = context.obj["trailblazer_db"]
try:
trailblazer_db.update_analysis_status(case_id=case_id, status=status)
trailblazer_db.update_analysis_status_by_case_id(case_id=case_id, status=status)
except ValueError as error:
LOG.error(error)
raise click.Abort from error
Expand Down
4 changes: 2 additions & 2 deletions trailblazer/clients/slurm_cli_client/slurm_cli_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from trailblazer.apps.slurm.api import _get_slurm_queue
from trailblazer.apps.slurm.api import get_slurm_queue
from trailblazer.apps.slurm.models import SqueueResult


Expand All @@ -7,4 +7,4 @@ def __init__(self, host: str):
self.host = host

def get_slurm_queue(self, job_ids: str) -> SqueueResult:
return _get_slurm_queue(job_ids=job_ids, analysis_host=self.host)
return get_slurm_queue(job_ids=job_ids, analysis_host=self.host)
4 changes: 4 additions & 0 deletions trailblazer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ def statuses(cls) -> tuple:
def ongoing_statuses(cls) -> tuple:
return cls.PENDING, cls.RUNNING, cls.COMPLETING

@classmethod
def fail_statuses(cls) -> tuple:
return cls.FAILED, cls.TIME_OUT


class CharacterFormat(StrEnum):
"""Define character encoding/decoding to use."""
Expand Down
2 changes: 1 addition & 1 deletion trailblazer/containers.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class Container(containers.DeclarativeContainer):
slurm_service = providers.Singleton(SlurmCLIService, client=slurm_client)

job_service = providers.Factory(JobService, store=store, slurm_service=slurm_service)
analysis_service = providers.Factory(AnalysisService, store=store)
analysis_service = providers.Factory(AnalysisService, store=store, job_service=job_service)

encryption_service = providers.Singleton(EncryptionService, secret_key=encryption_key)

Expand Down
2 changes: 1 addition & 1 deletion trailblazer/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ def set_analysis_status():
try:
case_id: str = put_request.get("case_id")
status: str = put_request.get("status")
store.update_analysis_status(case_id=case_id, status=status)
store.update_analysis_status_by_case_id(case_id=case_id, status=status)
return (
jsonify(f"Success! Analysis set to {put_request.get('status')} request sent"),
HTTPStatus.CREATED,
Expand Down
13 changes: 11 additions & 2 deletions trailblazer/services/analysis_service/analysis_service.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from trailblazer.constants import TrailblazerStatus
from trailblazer.constants import Workflow
from trailblazer.dto import (
AnalysesRequest,
Expand All @@ -16,13 +17,15 @@
create_summary,
create_update_analyses_response,
)
from trailblazer.services.job_service.job_service import JobService
from trailblazer.store.models import Analysis, Job, User
from trailblazer.store.store import Store


class AnalysisService:
def __init__(self, store: Store):
def __init__(self, store: Store, job_service: JobService):
self.store = store
self.job_service = job_service

def get_analyses(self, request: AnalysesRequest) -> AnalysesResponse:
analyses, total_count = self.store.get_paginated_analyses(request)
Expand Down Expand Up @@ -68,7 +71,13 @@ def create_analyses_response(
return AnalysesResponse(analyses=response_data, total_count=total_count)

def update_ongoing_analyses(self) -> None:
self.store.update_ongoing_analyses()
analyses: list[Analysis] = self.store.get_ongoing_analyses()
for analysis in analyses:
self.job_service.update_jobs(analysis.id)
status: TrailblazerStatus = self.job_service.get_analysis_status(analysis.id)
progress: float = self.job_service.get_analysis_progression(analysis.id)
self.store.update_analysis_progress(analysis_id=analysis.id, progress=progress)
self.store.update_analysis_status(analysis_id=analysis.id, status=status)

def get_summaries(self, request_data: SummariesRequest) -> SummariesResponse:
summaries: list[Summary] = []
Expand Down
37 changes: 34 additions & 3 deletions trailblazer/services/job_service/job_service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
from datetime import datetime
import logging

from trailblazer.constants import TrailblazerStatus, WorkflowManager
from trailblazer.dto import CreateJobRequest, FailedJobsRequest, FailedJobsResponse, JobResponse
from trailblazer.services.job_service.utils import create_failed_jobs_response, create_job_response
from trailblazer.services.job_service.mappers import (
create_failed_jobs_response,
create_job_response,
slurm_info_to_job,
)
from trailblazer.services.job_service.utils import get_progress, get_slurm_job_ids, get_status
from trailblazer.services.slurm.dtos import SlurmJobInfo
from trailblazer.services.slurm.slurm_service import SlurmService
from trailblazer.store.models import Job
from trailblazer.store.models import Analysis, Job
from trailblazer.store.store import Store
from trailblazer.utils.datetime import get_date_number_of_days_ago

Expand All @@ -30,5 +36,30 @@ def update_upload_jobs(self) -> None:
jobs: list[Job] = self.store.get_ongoing_upload_jobs()
for job in jobs:
LOG.info(f"Updating upload job {job.id}")
updated_job: SlurmJobInfo = self.slurm_service.get_job_info(job.slurm_id)
updated_job: SlurmJobInfo = self.slurm_service.get_job(job.slurm_id)
self.store.update_job(job_id=job.id, job_info=updated_job)

def update_jobs(self, analysis_id: int) -> None:
analysis: Analysis = self.store.get_analysis_with_id(analysis_id)
try:
if analysis.workflow_manager == WorkflowManager.SLURM:
self._update_slurm_jobs(analysis_id)
if analysis.workflow_manager == WorkflowManager.TOWER:
self.store.update_tower_run_status(analysis_id)
except Exception as error:
LOG.error(f"Failed to update jobs {analysis.case_id} - {analysis.id}: {error}")

def _update_slurm_jobs(self, analysis_id: int) -> None:
analysis: Analysis = self.store.get_analysis_with_id(analysis_id)
slurm_ids: list[int] = get_slurm_job_ids(analysis.config_path)
slurm_jobs: list[SlurmJobInfo] = self.slurm_service.get_jobs(slurm_ids)
jobs: list[Job] = [slurm_info_to_job(job_info) for job_info in slurm_jobs]
self.store.replace_jobs(analysis_id=analysis_id, jobs=jobs)

def get_analysis_status(self, analysis_id: int) -> TrailblazerStatus:
analysis: Analysis = self.store.get_analysis_with_id(analysis_id)
return get_status(analysis.jobs)

def get_analysis_progression(self, analysis_id: int) -> float:
analysis: Analysis = self.store.get_analysis_with_id(analysis_id)
return get_progress(analysis.jobs)
24 changes: 24 additions & 0 deletions trailblazer/services/job_service/mappers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
from trailblazer.dto.failed_jobs_response import FailedJobsResponse
from trailblazer.dto.job_response import JobResponse
from trailblazer.services.slurm.dtos import SlurmJobInfo
from trailblazer.store.models import Job


def create_job_response(job: Job) -> JobResponse:
return JobResponse(
slurm_id=job.slurm_id, analysis_id=job.analysis_id, status=job.status, id=job.id
)


def create_failed_jobs_response(failed_job_statistics: list[dict]) -> FailedJobsResponse:
return FailedJobsResponse(jobs=failed_job_statistics)


def slurm_info_to_job(slurm_job_info: SlurmJobInfo) -> Job:
return Job(
slurm_id=slurm_job_info.slurm_id,
name=slurm_job_info.name,
status=slurm_job_info.status,
elapsed=slurm_job_info.elapsed,
started_at=slurm_job_info.started_at,
)
60 changes: 53 additions & 7 deletions trailblazer/services/job_service/utils.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,59 @@
from trailblazer.dto.failed_jobs_response import FailedJobsResponse
from trailblazer.dto.job_response import JobResponse
from pathlib import Path
from trailblazer.constants import FileFormat, SlurmJobStatus, TrailblazerStatus
from trailblazer.io.controller import ReadFile
from trailblazer.store.models import Job


def create_job_response(job: Job) -> JobResponse:
return JobResponse(
slurm_id=job.slurm_id, analysis_id=job.analysis_id, status=job.status, id=job.id
def get_slurm_job_ids(job_id_file: str) -> list[int]:
job_id_file_path = Path(job_id_file)
content: dict = ReadFile.get_content_from_file(
file_format=FileFormat.YAML, file_path=job_id_file_path
)
job_ids: list[int] = []
for row in content.values():
[job_ids.append(job_id) for job_id in row]
return job_ids


def create_failed_jobs_response(failed_job_statistics: list[dict]) -> FailedJobsResponse:
return FailedJobsResponse(jobs=failed_job_statistics)
def get_status(jobs: list[Job]) -> TrailblazerStatus:
if has_same_status(jobs):
return get_single_status(jobs)
is_running: bool = has_running_jobs(jobs)
is_failed: bool = has_failed_jobs(jobs)
if is_failed:
return TrailblazerStatus.ERROR if is_running else TrailblazerStatus.FAILED
return TrailblazerStatus.RUNNING if is_running else TrailblazerStatus.CANCELLED


def has_same_status(jobs: list[Job]) -> bool:
if not jobs:
return False
first_status = jobs[0].status
return all(job.status == first_status for job in jobs)


def get_single_status(jobs: list[Job]) -> SlurmJobStatus:
single_status = jobs[0].status
return (
TrailblazerStatus.FAILED
if single_status == SlurmJobStatus.TIME_OUT
else TrailblazerStatus[single_status.upper()]
)


def has_running_jobs(jobs: list[Job]) -> bool:
run_statuses = SlurmJobStatus.ongoing_statuses()
return any(job.status in run_statuses for job in jobs)


def has_failed_jobs(jobs: list[Job]) -> bool:
fail_statuses = SlurmJobStatus.fail_statuses()
return any(job.status in fail_statuses for job in jobs)


def get_progress(jobs: list[Job]) -> float:
total_jobs: int = len(jobs)
if total_jobs == 0:
return 0.0
completed_jobs: int = len([job for job in jobs if job.status == SlurmJobStatus.COMPLETED])
return completed_jobs / total_jobs
Loading

0 comments on commit 027b716

Please sign in to comment.