Skip to content

Commit

Permalink
feat(refactor): Analysis failed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
henrikstranneheim committed Aug 16, 2023
1 parent 3d1d0f8 commit 818515e
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 35 deletions.
8 changes: 4 additions & 4 deletions tests/cli/test_cli_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from typing import Dict

import pytest
import trailblazer

from click.testing import CliRunner

import trailblazer
from tests.mocks.store_mock import MockStore
from trailblazer.cli.core import (
archive_user,
add_user_to_db,
archive_user,
base,
cancel,
delete,
Expand Down Expand Up @@ -138,7 +138,7 @@ def test_cancel_ongoing(cli_runner, trailblazer_context, caplog):
)

# Analysis should have jobs that can be cancelled
assert analysis_obj.failed_jobs
assert analysis_obj.jobs

# WHEN running cancel command
result = cli_runner.invoke(cancel, [str(analysis_obj.id)], obj=trailblazer_context)
Expand Down
6 changes: 3 additions & 3 deletions tests/store/crud/test_delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,15 @@ def test_delete_analysis_jobs(analysis_store: MockStore, tower_jobs: List[dict],

# GIVEN an analysis without failed jobs
analysis: Analysis = analysis_store.get_latest_analysis(case_id=case_id)
assert not analysis.failed_jobs
assert not analysis.jobs

# WHEN jobs are updated
analysis_store.update_analysis_jobs(analysis=analysis, jobs=tower_jobs[:2])

assert analysis.failed_jobs
assert analysis.jobs

# WHEN jobs are deleted
analysis_store.delete_analysis_jobs(analysis=analysis)

# THEN analysis object should have no jobs
assert not analysis.failed_jobs
assert not analysis.jobs
12 changes: 6 additions & 6 deletions tests/store/crud/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,22 @@
from trailblazer.apps.slurm.api import get_squeue_result
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.constants import TrailblazerStatus
from trailblazer.store.filters.user_filters import apply_user_filter, UserFilter
from trailblazer.store.models import User, Analysis
from trailblazer.store.filters.user_filters import UserFilter, apply_user_filter
from trailblazer.store.models import Analysis, User


def test_update_analysis_jobs(analysis_store: MockStore, tower_jobs: List[dict], case_id: str):
"""Test jobs are successfully updated."""

# GIVEN an analysis without failed jobs
analysis: Analysis = analysis_store.get_latest_analysis(case_id=case_id)
assert not analysis.failed_jobs
assert not analysis.jobs

# WHEN jobs are updated
analysis_store.update_analysis_jobs(analysis=analysis, jobs=tower_jobs[:2])

# THEN there should be jobs
assert analysis.failed_jobs
assert analysis.jobs


def test_update_user_is_archived(user_store: MockStore, user_email: str):
Expand Down Expand Up @@ -49,7 +49,7 @@ def test_update_analysis_jobs_from_slurm_jobs(analysis_store: MockStore, squeue_
"""Test updating analysis jobs when given squeue results."""
# GIVEN an analysis and a squeue stream
analysis: Analysis = analysis_store.get_query(table=Analysis).first()
assert not analysis.failed_jobs
assert not analysis.jobs

squeue_result: SqueueResult = get_squeue_result(squeue_response=squeue_stream_jobs)

Expand All @@ -62,4 +62,4 @@ def test_update_analysis_jobs_from_slurm_jobs(analysis_store: MockStore, squeue_
)

# THEN it should update the analysis jobs
assert updated_analysis.failed_jobs
assert updated_analysis.jobs
10 changes: 5 additions & 5 deletions tests/store/test_store_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def test_update_analysis_from_slurm_run_status(analysis_store: MockStore, squeue
"""Test updating analysis jobs when given squeue results."""
# GIVEN an analysis and a squeue stream
analysis: Analysis = analysis_store.get_query(table=Analysis).first()
assert not analysis.failed_jobs
assert not analysis.jobs

# WHEN updating the analysis
analysis_store.update_analysis_from_slurm_run_status(analysis_id=analysis.id)
Expand All @@ -33,7 +33,7 @@ def test_update_analysis_from_slurm_run_status(analysis_store: MockStore, squeue
)

# THEN it should update the analysis jobs
assert updated_analysis.failed_jobs
assert updated_analysis.jobs


@pytest.mark.parametrize(
Expand Down Expand Up @@ -218,19 +218,19 @@ def test_update_tower_jobs(analysis_store: MockStore, tower_jobs: List[dict], ca

# GIVEN an analysis without failed jobs
analysis: Analysis = analysis_store.get_latest_analysis(case_id=case_id)
assert not analysis.failed_jobs
assert not analysis.jobs

# WHEN analysis jobs are deleted
analysis_store.delete_analysis_jobs(analysis=analysis)

# THEN analysis object should have no failed jobs
assert not analysis.failed_jobs
assert not analysis.jobs

# WHEN jobs are updated
analysis_store.update_analysis_jobs(analysis=analysis, jobs=tower_jobs[:2])

# THEN failed jobs should be updated
assert len(analysis.failed_jobs) == 2
assert len(analysis.jobs) == 2


@pytest.mark.parametrize(
Expand Down
14 changes: 9 additions & 5 deletions trailblazer/server/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@
import multiprocessing
import os
from http import HTTPStatus
from typing import Dict, Mapping, List, Union
from typing import Dict, List, Mapping, Union

from flask import Blueprint, Response, abort, g, jsonify, make_response, request
from google.auth import jwt

from trailblazer.constants import TrailblazerStatus, ONE_MONTH_IN_DAYS, TRAILBLAZER_TIME_STAMP
from trailblazer.constants import (
ONE_MONTH_IN_DAYS,
TRAILBLAZER_TIME_STAMP,
TrailblazerStatus,
)
from trailblazer.server.ext import store
from trailblazer.store.models import Info, User, Analysis
from trailblazer.store.models import Analysis, Info, User
from trailblazer.utils.datetime import get_date_number_of_days_ago

blueprint = Blueprint("api", __name__, url_prefix="/api/v1")
Expand Down Expand Up @@ -59,7 +63,7 @@ def analyses():
for analysis_obj in query_page.items:
analysis_data = analysis_obj.to_dict()
analysis_data["user"] = analysis_obj.user.to_dict() if analysis_obj.user else None
analysis_data["failed_jobs"] = [job_obj.to_dict() for job_obj in analysis_obj.failed_jobs]
analysis_data["failed_jobs"] = [job_obj.to_dict() for job_obj in analysis_obj.jobs]
data.append(analysis_data)

return jsonify(analyses=data)
Expand All @@ -77,7 +81,7 @@ def analysis(analysis_id):
store.commit()

data = analysis_obj.to_dict()
data["failed_jobs"] = [job_obj.to_dict() for job_obj in analysis_obj.failed_jobs]
data["failed_jobs"] = [job_obj.to_dict() for job_obj in analysis_obj.jobs]
data["user"] = analysis_obj.user.to_dict() if analysis_obj.user else None
return jsonify(**data)

Expand Down
10 changes: 7 additions & 3 deletions trailblazer/store/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
from trailblazer.apps.slurm.api import get_current_analysis_status, get_squeue_result
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.apps.tower.api import TowerAPI
from trailblazer.constants import FileFormat, SlurmJobStatus, TrailblazerStatus, WorkflowManager
from trailblazer.constants import (
FileFormat,
SlurmJobStatus,
TrailblazerStatus,
WorkflowManager,
)
from trailblazer.exc import TowerRequirementsError, TrailblazerError
from trailblazer.io.controller import ReadFile
from trailblazer.store.core import CoreHandler
from trailblazer.store.models import Analysis, Model


LOG = logging.getLogger(__name__)


Expand Down Expand Up @@ -318,7 +322,7 @@ def cancel_analysis(self, analysis_id: int, email: str = None, ssh: bool = False
if analysis.status not in TrailblazerStatus.ongoing_statuses():
raise TrailblazerError(f"Analysis {analysis_id} is not running")

for job_obj in analysis.failed_jobs:
for job_obj in analysis.jobs:
if job_obj.status in SlurmJobStatus.ongoing_statuses():
LOG.info(f"Cancelling job {job_obj.slurm_id} - {job_obj.name}")
self.cancel_slurm_job(job_obj.slurm_id, ssh=ssh)
Expand Down
2 changes: 1 addition & 1 deletion trailblazer/store/crud/delete.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ class DeleteHandler(BaseHandler_2):

def delete_analysis_jobs(self, analysis: Analysis) -> None:
"""Delete all jobs linked to the given analysis."""
for job in analysis.failed_jobs:
for job in analysis.jobs:
job.delete()
self.commit()
13 changes: 6 additions & 7 deletions trailblazer/store/crud/update.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,17 @@
from trailblazer.apps.slurm.api import reformat_squeue_result_job_step
from trailblazer.apps.slurm.models import SqueueResult
from typing import List

from trailblazer.apps.slurm.api import reformat_squeue_result_job_step
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.store.base import BaseHandler_2
from trailblazer.store.models import User, Analysis, Job
from trailblazer.store.models import Analysis, Job, User


class UpdateHandler(BaseHandler_2):
"""Class for updating items in the database."""

def update_analysis_jobs(self, analysis: Analysis, jobs: List[dict]) -> None:
"""Update jobs in the analysis."""
# failed_jobs is misnamed and actually contains all jobs irrespective of status
analysis.failed_jobs = [Job(**job) for job in jobs]
analysis.jobs = [Job(**job) for job in jobs]
self.commit()

def update_user_is_archived(self, user: User, archive: bool = True) -> None:
Expand All @@ -23,7 +22,7 @@ def update_user_is_archived(self, user: User, archive: bool = True) -> None:
def update_analysis_jobs_from_slurm_jobs(
self, analysis: Analysis, squeue_result: SqueueResult
) -> None:
"""Update analysis failed jobs from supplied squeue results."""
"""Update analysis jobs from supplied squeue results."""
if len(squeue_result.jobs) == 0:
return
for job in squeue_result.jobs:
Expand All @@ -32,7 +31,7 @@ def update_analysis_jobs_from_slurm_jobs(
)

self.delete_analysis_jobs(analysis=analysis)
analysis.failed_jobs = [
analysis.jobs = [
Job(
analysis_id=analysis.id,
slurm_id=job.id,
Expand Down
2 changes: 1 addition & 1 deletion trailblazer/store/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ class Analysis(Model):
types.Enum(*WorkflowManager.list()), default=WorkflowManager.SLURM.value
)

failed_jobs = orm.relationship("Job", cascade="all,delete", backref="analysis")
jobs = orm.relationship("Job", cascade="all,delete", backref="analysis")

@property
def has_ongoing_status(self) -> bool:
Expand Down

0 comments on commit 818515e

Please sign in to comment.