Skip to content

Commit

Permalink
Cancel tower analysis (#286) (patch)
Browse files Browse the repository at this point in the history
### Changed

- `cancel` support for NF-Tower analyses via NF-tower
  • Loading branch information
fevac authored Oct 9, 2023
1 parent 96d9ca8 commit 8ae79da
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 20 deletions.
2 changes: 1 addition & 1 deletion tests/cli/test_cli_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def test_cancel_with_ongoing_analysis(
assert result.exit_code == process_exit_success

# THEN log should inform of successful cancellation
assert "all ongoing jobs cancelled successfully" in caplog.text
assert "cancelled successfully" in caplog.text
assert "Cancelling" in caplog.text

# THEN job id from squeue output will be cancelled
Expand Down
34 changes: 30 additions & 4 deletions tests/store/crud/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tests.mocks.store_mock import MockStore
from trailblazer.apps.slurm.api import get_squeue_result
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.apps.tower.api import TowerAPI
from trailblazer.constants import CharacterFormat, TrailblazerStatus
from trailblazer.exc import MissingAnalysis, TrailblazerError
from trailblazer.store.filters.user_filters import UserFilter, apply_user_filter
Expand Down Expand Up @@ -105,7 +106,7 @@ def test_update_case_analyses_as_deleted_with_non_existing_case(
assert not analyses


def test_cancel_ongoing_analysis(
def test_cancel_ongoing_slurm_analysis(
analysis_store: MockStore, caplog, mocker, ongoing_analysis_case_id: str, tower_jobs: List[dict]
):
"""Test all ongoing analysis jobs are cancelled."""
Expand All @@ -116,7 +117,6 @@ def test_cancel_ongoing_analysis(
caplog.set_level("INFO")

# GIVEN an ongoing analysis
analysis_store.update_ongoing_analyses()
analysis: Optional[Analysis] = analysis_store.get_latest_analysis_for_case(
case_id=ongoing_analysis_case_id
)
Expand All @@ -129,8 +129,34 @@ def test_cancel_ongoing_analysis(
analysis_store.cancel_ongoing_analysis(analysis_id=analysis.id)

# THEN log should inform of successful cancellation
assert "all ongoing jobs cancelled successfully" in caplog.text
assert "Cancelling" in caplog.text
assert "Cancelling job" in caplog.text
assert "cancelled successfully!" in caplog.text

# THEN comment should be added
assert "Analysis cancelled manually by" in analysis.comment

# THEN analysis status should be updated
assert TrailblazerStatus.CANCELLED == analysis.status


def test_cancel_ongoing_tower_analysis(analysis_store: MockStore, caplog, mocker, case_id: str):
# GIVEN TOWER cancel output
mocker.patch.object(TowerAPI, "cancel", return_value=None)

caplog.set_level("INFO")

# GIVEN an ongoing analysis
analysis: Optional[Analysis] = analysis_store.get_latest_analysis_for_case(case_id=case_id)

# WHEN running cancel ongoing analysis
analysis_store.cancel_ongoing_analysis(analysis_id=analysis.id)

# THEN log should inform of successful cancellation
assert f"Cancelling Tower workflow for {case_id}" in caplog.text
assert "cancelled successfully!" in caplog.text

# THEN comment should be added
assert "Analysis cancelled manually by" in analysis.comment


def test_cancel_ongoing_analysis_when_no_analysis(
Expand Down
31 changes: 29 additions & 2 deletions trailblazer/apps/tower/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
TowerTaskResponse,
TowerWorkflowResponse,
)
from trailblazer.constants import TOWER_STATUS, TrailblazerStatus
from trailblazer.constants import TOWER_WORKFLOW_STATUS, TrailblazerStatus
from trailblazer.exc import TrailblazerError

LOG = logging.getLogger(__name__)

Expand All @@ -30,6 +31,7 @@ def __init__(self, workflow_id: str):
self.tower_api_endpoint: str = os.environ.get("TOWER_API_ENDPOINT", None)
self.workflow_endpoint: str = f"workflow/{self.workflow_id}"
self.tasks_endpoint: str = f"{self.workflow_endpoint}/tasks"
self.cancel_endpoint: str = f"{self.workflow_endpoint}/cancel"

@property
def headers(self) -> dict:
Expand Down Expand Up @@ -74,6 +76,19 @@ def send_request(self, url: str) -> dict:

return response.json()

def post_request(self, url: str, data: dict = {}) -> None:
"""Send data via POST request and return response."""
try:
response = requests.post(
url, headers=self.headers, params=self.request_params, json=data
)
if response.status_code in {404, 400}:
LOG.info(f"POST request failed for url {url}\n with message {str(response)}")
response.raise_for_status()
except (MissingSchema, HTTPError, ConnectionError) as error:
LOG.error(f"Request failed for url {url}: Error: {error}\n")
raise TrailblazerError

@property
def meets_requirements(self) -> bool:
"""Return True if required variables are not empty."""
Expand Down Expand Up @@ -102,6 +117,12 @@ def workflow(self) -> TowerWorkflowResponse:
url = self.build_url(endpoint=self.workflow_endpoint)
return TowerWorkflowResponse(**self.send_request(url=url))

def send_cancel_request(self) -> None:
"""Send a POST request to cancel a workflow."""
if self.meets_requirements:
url: str = self.build_url(endpoint=self.cancel_endpoint)
self.post_request(url=url)


class TowerAPI:
"""Class communicating with NF tower regarding a given analysis (workflow)."""
Expand Down Expand Up @@ -140,7 +161,9 @@ def tasks_response(self) -> TowerTaskResponse:
@property
def status(self) -> str:
"""Returns the status of an analysis (also called workflow in NF Tower)."""
status: str = TOWER_STATUS.get(self.response.workflow.status, TrailblazerStatus.ERROR.value)
status: str = TOWER_WORKFLOW_STATUS.get(
self.response.workflow.status, TrailblazerStatus.ERROR.value
)

# If the whole workflow (analysis) is completed set it as QC instead of COMPLETE
if status == TrailblazerStatus.COMPLETED:
Expand Down Expand Up @@ -207,3 +230,7 @@ def _get_job(self, task: TowerTask, analysis_id: int) -> dict:
started_at=task.start,
elapsed=int(task.duration / 60),
)

def cancel(self) -> None:
"""Cancel a workflow."""
self.tower_client.send_cancel_request()
12 changes: 9 additions & 3 deletions trailblazer/apps/tower/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

from pydantic import BaseModel, ConfigDict, field_validator

from trailblazer.constants import TOWER_PROCESS_STATUS, TOWER_STATUS, TrailblazerStatus
from trailblazer.constants import (
TOWER_PROCESS_STATUS,
TOWER_TASK_STATUS,
TOWER_WORKFLOW_STATUS,
SlurmJobStatus,
TrailblazerStatus,
)
from trailblazer.utils.datetime import tower_datetime_converter

SCALE_TO_MILLISEC: int = 1000
Expand Down Expand Up @@ -72,7 +78,7 @@ def set_duration(cls, raw_duration: Optional[int]) -> int:
@field_validator("status")
@classmethod
def set_status(cls, raw_status) -> str:
return TOWER_STATUS.get(raw_status)
return TOWER_TASK_STATUS.get(raw_status)

@field_validator("start", "dateCreated", "lastUpdated")
@classmethod
Expand All @@ -87,7 +93,7 @@ def set_datetime(cls, raw_time) -> Optional[Union[str, datetime]]:
@property
def is_complete(cls) -> bool:
"""Returns if the process succeded."""
return cls.status == TrailblazerStatus.COMPLETED
return cls.status == SlurmJobStatus.COMPLETED


class TowerProcess(BaseModel):
Expand Down
18 changes: 16 additions & 2 deletions trailblazer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ class TrailblazerStatusColor(str, Enum):
RUNNING: str = "blue"


TOWER_STATUS: Dict[str, str] = {
TOWER_WORKFLOW_STATUS: Dict[str, str] = {
"ABORTED": TrailblazerStatus.FAILED,
"CACHED": TrailblazerStatus.COMPLETED,
"CANCELLED": TrailblazerStatus.FAILED,
"CANCELLED": TrailblazerStatus.CANCELLED,
"COMPLETED": TrailblazerStatus.COMPLETED,
"FAILED": TrailblazerStatus.FAILED,
"NEW": TrailblazerStatus.PENDING,
Expand All @@ -134,3 +134,17 @@ class TrailblazerStatusColor(str, Enum):
"succeeded": TrailblazerStatus.COMPLETED,
"failed": TrailblazerStatus.FAILED,
}


TOWER_TASK_STATUS: Dict[str, str] = {
"ABORTED": SlurmJobStatus.FAILED,
"CACHED": SlurmJobStatus.COMPLETED,
"CANCELLED": SlurmJobStatus.CANCELLED,
"COMPLETED": SlurmJobStatus.COMPLETED,
"FAILED": SlurmJobStatus.FAILED,
"NEW": SlurmJobStatus.PENDING,
"RUNNING": SlurmJobStatus.RUNNING,
"SUBMITTED": SlurmJobStatus.PENDING,
"SUCCEEDED": SlurmJobStatus.COMPLETED,
"UNKNOWN": SlurmJobStatus.FAILED,
}
28 changes: 20 additions & 8 deletions trailblazer/store/crud/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
reformat_squeue_result_job_step,
)
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.constants import SlurmJobStatus, TrailblazerStatus
from trailblazer.constants import SlurmJobStatus, TrailblazerStatus, WorkflowManager
from trailblazer.exc import MissingAnalysis, TrailblazerError
from trailblazer.store.base import BaseHandler_2
from trailblazer.store.models import Analysis, Job, User
Expand Down Expand Up @@ -117,13 +117,11 @@ def cancel_ongoing_analysis(
raise MissingAnalysis(f"Analysis {analysis_id} does not exist")
if analysis.status not in TrailblazerStatus.ongoing_statuses():
raise TrailblazerError(f"Analysis {analysis_id} is not running")
for job in analysis.jobs:
if job.status in SlurmJobStatus.ongoing_statuses():
LOG.info(f"Cancelling job {job.slurm_id} - {job.name}")
cancel_slurm_job(analysis_host=analysis_host, slurm_id=job.slurm_id)
LOG.info(
f"Case {analysis.family} - Analysis {analysis_id}: all ongoing jobs cancelled successfully!"
)
if analysis.workflow_manager == WorkflowManager.TOWER.value:
self.cancel_tower_analysis(analysis=analysis)
else:
self.cancel_slurm_analysis(analysis=analysis, analysis_host=analysis_host)
LOG.info(f"Case {analysis.family} - Analysis {analysis.id}: cancelled successfully!")
self.update_run_status(analysis_id=analysis_id, analysis_host=analysis_host)
analysis.status = TrailblazerStatus.CANCELLED
analysis.comment = (
Expand All @@ -132,6 +130,20 @@ def cancel_ongoing_analysis(
)
self.commit()

def cancel_slurm_analysis(
self, analysis: Analysis, analysis_host: Optional[str] = None
) -> None:
"""Cancel SLURM analysis by cancelling all associated SLURM jobs."""
for job in analysis.jobs:
if job.status in SlurmJobStatus.ongoing_statuses():
LOG.info(f"Cancelling job {job.slurm_id} - {job.name}")
cancel_slurm_job(analysis_host=analysis_host, slurm_id=job.slurm_id)

def cancel_tower_analysis(self, analysis: Analysis) -> None:
"""Cancel a NF-Tower analysis. Associated jobs are cancelled by Tower."""
LOG.info(f"Cancelling Tower workflow for {analysis.family}")
self.query_tower(config_file=analysis.config_path, case_id=analysis.family).cancel()

def update_analysis_status(self, case_id: str, status: str):
"""Setting analysis status."""
status: str = status.lower()
Expand Down

0 comments on commit 8ae79da

Please sign in to comment.