Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cancel tower analysis #286

Merged
merged 20 commits into from
Oct 9, 2023
2 changes: 1 addition & 1 deletion tests/cli/test_cli_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ def test_cancel_with_ongoing_analysis(
assert result.exit_code == process_exit_success

# THEN log should inform of successful cancellation
assert "all ongoing jobs cancelled successfully" in caplog.text
assert "cancelled successfully" in caplog.text
assert "Cancelling" in caplog.text

# THEN job id from squeue output will be cancelled
Expand Down
34 changes: 30 additions & 4 deletions tests/store/crud/test_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from tests.mocks.store_mock import MockStore
from trailblazer.apps.slurm.api import get_squeue_result
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.apps.tower.api import TowerAPI
from trailblazer.constants import CharacterFormat, TrailblazerStatus
from trailblazer.exc import MissingAnalysis, TrailblazerError
from trailblazer.store.filters.user_filters import UserFilter, apply_user_filter
Expand Down Expand Up @@ -105,7 +106,7 @@ def test_update_case_analyses_as_deleted_with_non_existing_case(
assert not analyses


def test_cancel_ongoing_analysis(
def test_cancel_ongoing_slurm_analysis(
analysis_store: MockStore, caplog, mocker, ongoing_analysis_case_id: str, tower_jobs: List[dict]
):
"""Test all ongoing analysis jobs are cancelled."""
Expand All @@ -116,7 +117,6 @@ def test_cancel_ongoing_analysis(
caplog.set_level("INFO")

# GIVEN an ongoing analysis
analysis_store.update_ongoing_analyses()
analysis: Optional[Analysis] = analysis_store.get_latest_analysis_for_case(
case_id=ongoing_analysis_case_id
)
Expand All @@ -129,8 +129,34 @@ def test_cancel_ongoing_analysis(
analysis_store.cancel_ongoing_analysis(analysis_id=analysis.id)

# THEN log should inform of successful cancellation
assert "all ongoing jobs cancelled successfully" in caplog.text
assert "Cancelling" in caplog.text
assert "Cancelling job" in caplog.text
assert "cancelled successfully!" in caplog.text

# THEN comment should be added
assert "Analysis cancelled manually by" in analysis.comment

# THEN analysis status should be updated
assert TrailblazerStatus.CANCELLED == analysis.status


def test_cancel_ongoing_tower_analysis(analysis_store: MockStore, caplog, mocker, case_id: str):
# GIVEN TOWER cancel output
mocker.patch.object(TowerAPI, "cancel", return_value=None)

caplog.set_level("INFO")

# GIVEN an ongoing analysis
analysis: Optional[Analysis] = analysis_store.get_latest_analysis_for_case(case_id=case_id)

# WHEN running cancel ongoing analysis
analysis_store.cancel_ongoing_analysis(analysis_id=analysis.id)

# THEN log should inform of successful cancellation
assert f"Cancelling Tower workflow for {case_id}" in caplog.text
assert "cancelled successfully!" in caplog.text

# THEN comment should be added
assert "Analysis cancelled manually by" in analysis.comment


def test_cancel_ongoing_analysis_when_no_analysis(
Expand Down
31 changes: 29 additions & 2 deletions trailblazer/apps/tower/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
TowerTaskResponse,
TowerWorkflowResponse,
)
from trailblazer.constants import TOWER_STATUS, TrailblazerStatus
from trailblazer.constants import TOWER_WORKFLOW_STATUS, TrailblazerStatus
from trailblazer.exc import TrailblazerError

LOG = logging.getLogger(__name__)

Expand All @@ -30,6 +31,7 @@ def __init__(self, workflow_id: str):
self.tower_api_endpoint: str = os.environ.get("TOWER_API_ENDPOINT", None)
self.workflow_endpoint: str = f"workflow/{self.workflow_id}"
self.tasks_endpoint: str = f"{self.workflow_endpoint}/tasks"
self.cancel_endpoint: str = f"{self.workflow_endpoint}/cancel"

@property
def headers(self) -> dict:
Expand Down Expand Up @@ -74,6 +76,19 @@ def send_request(self, url: str) -> dict:

return response.json()

def post_request(self, url: str, data: dict = {}) -> None:
"""Send data via POST request and return response."""
try:
response = requests.post(
url, headers=self.headers, params=self.request_params, json=data
)
if response.status_code in {404, 400}:
LOG.info(f"POST request failed for url {url}\n with message {str(response)}")
response.raise_for_status()
except (MissingSchema, HTTPError, ConnectionError) as error:
LOG.error(f"Request failed for url {url}: Error: {error}\n")
raise TrailblazerError

@property
def meets_requirements(self) -> bool:
"""Return True if required variables are not empty."""
Expand Down Expand Up @@ -102,6 +117,12 @@ def workflow(self) -> TowerWorkflowResponse:
url = self.build_url(endpoint=self.workflow_endpoint)
return TowerWorkflowResponse(**self.send_request(url=url))

def send_cancel_request(self) -> None:
"""Send a POST request to cancel a workflow."""
if self.meets_requirements:
fevac marked this conversation as resolved.
Show resolved Hide resolved
url: str = self.build_url(endpoint=self.cancel_endpoint)
self.post_request(url=url)


class TowerAPI:
"""Class communicating with NF tower regarding a given analysis (workflow)."""
Expand Down Expand Up @@ -140,7 +161,9 @@ def tasks_response(self) -> TowerTaskResponse:
@property
def status(self) -> str:
"""Returns the status of an analysis (also called workflow in NF Tower)."""
status: str = TOWER_STATUS.get(self.response.workflow.status, TrailblazerStatus.ERROR.value)
status: str = TOWER_WORKFLOW_STATUS.get(
self.response.workflow.status, TrailblazerStatus.ERROR.value
)

# If the whole workflow (analysis) is completed set it as QC instead of COMPLETE
if status == TrailblazerStatus.COMPLETED:
Expand Down Expand Up @@ -207,3 +230,7 @@ def _get_job(self, task: TowerTask, analysis_id: int) -> dict:
started_at=task.start,
elapsed=int(task.duration / 60),
)

def cancel(self) -> None:
"""Cancel a workflow."""
self.tower_client.send_cancel_request()
12 changes: 9 additions & 3 deletions trailblazer/apps/tower/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,13 @@

from pydantic import BaseModel, ConfigDict, field_validator

from trailblazer.constants import TOWER_PROCESS_STATUS, TOWER_STATUS, TrailblazerStatus
from trailblazer.constants import (
TOWER_PROCESS_STATUS,
TOWER_TASK_STATUS,
TOWER_WORKFLOW_STATUS,
SlurmJobStatus,
TrailblazerStatus,
)
from trailblazer.utils.datetime import tower_datetime_converter

SCALE_TO_MILLISEC: int = 1000
Expand Down Expand Up @@ -72,7 +78,7 @@ def set_duration(cls, raw_duration: Optional[int]) -> int:
@field_validator("status")
@classmethod
def set_status(cls, raw_status) -> str:
return TOWER_STATUS.get(raw_status)
return TOWER_TASK_STATUS.get(raw_status)

@field_validator("start", "dateCreated", "lastUpdated")
@classmethod
Expand All @@ -87,7 +93,7 @@ def set_datetime(cls, raw_time) -> Optional[Union[str, datetime]]:
@property
def is_complete(cls) -> bool:
"""Returns if the process succeded."""
return cls.status == TrailblazerStatus.COMPLETED
return cls.status == SlurmJobStatus.COMPLETED


class TowerProcess(BaseModel):
Expand Down
18 changes: 16 additions & 2 deletions trailblazer/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,10 @@ class TrailblazerStatusColor(str, Enum):
RUNNING: str = "blue"


TOWER_STATUS: Dict[str, str] = {
TOWER_WORKFLOW_STATUS: Dict[str, str] = {
"ABORTED": TrailblazerStatus.FAILED,
"CACHED": TrailblazerStatus.COMPLETED,
"CANCELLED": TrailblazerStatus.FAILED,
"CANCELLED": TrailblazerStatus.CANCELLED,
"COMPLETED": TrailblazerStatus.COMPLETED,
"FAILED": TrailblazerStatus.FAILED,
"NEW": TrailblazerStatus.PENDING,
Expand All @@ -134,3 +134,17 @@ class TrailblazerStatusColor(str, Enum):
"succeeded": TrailblazerStatus.COMPLETED,
"failed": TrailblazerStatus.FAILED,
}


TOWER_TASK_STATUS: Dict[str, str] = {
"ABORTED": SlurmJobStatus.FAILED,
"CACHED": SlurmJobStatus.COMPLETED,
"CANCELLED": SlurmJobStatus.CANCELLED,
"COMPLETED": SlurmJobStatus.COMPLETED,
"FAILED": SlurmJobStatus.FAILED,
"NEW": SlurmJobStatus.PENDING,
"RUNNING": SlurmJobStatus.RUNNING,
"SUBMITTED": SlurmJobStatus.PENDING,
"SUCCEEDED": SlurmJobStatus.COMPLETED,
"UNKNOWN": SlurmJobStatus.FAILED,
}
28 changes: 20 additions & 8 deletions trailblazer/store/crud/update.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
reformat_squeue_result_job_step,
)
from trailblazer.apps.slurm.models import SqueueResult
from trailblazer.constants import SlurmJobStatus, TrailblazerStatus
from trailblazer.constants import SlurmJobStatus, TrailblazerStatus, WorkflowManager
from trailblazer.exc import MissingAnalysis, TrailblazerError
from trailblazer.store.base import BaseHandler_2
from trailblazer.store.models import Analysis, Job, User
Expand Down Expand Up @@ -117,13 +117,11 @@ def cancel_ongoing_analysis(
raise MissingAnalysis(f"Analysis {analysis_id} does not exist")
if analysis.status not in TrailblazerStatus.ongoing_statuses():
raise TrailblazerError(f"Analysis {analysis_id} is not running")
for job in analysis.jobs:
if job.status in SlurmJobStatus.ongoing_statuses():
LOG.info(f"Cancelling job {job.slurm_id} - {job.name}")
cancel_slurm_job(analysis_host=analysis_host, slurm_id=job.slurm_id)
LOG.info(
f"Case {analysis.family} - Analysis {analysis_id}: all ongoing jobs cancelled successfully!"
)
if analysis.workflow_manager == WorkflowManager.TOWER.value:
self.cancel_tower_analysis(analysis=analysis)
else:
self.cancel_slurm_analysis(analysis=analysis, analysis_host=analysis_host)
LOG.info(f"Case {analysis.family} - Analysis {analysis.id}: cancelled successfully!")
self.update_run_status(analysis_id=analysis_id, analysis_host=analysis_host)
analysis.status = TrailblazerStatus.CANCELLED
analysis.comment = (
Expand All @@ -132,6 +130,20 @@ def cancel_ongoing_analysis(
)
self.commit()

def cancel_slurm_analysis(
self, analysis: Analysis, analysis_host: Optional[str] = None
) -> None:
"""Cancel SLURM analysis by cancelling all associated SLURM jobs."""
for job in analysis.jobs:
if job.status in SlurmJobStatus.ongoing_statuses():
LOG.info(f"Cancelling job {job.slurm_id} - {job.name}")
cancel_slurm_job(analysis_host=analysis_host, slurm_id=job.slurm_id)

def cancel_tower_analysis(self, analysis: Analysis) -> None:
"""Cancel a NF-Tower analysis. Associated jobs are cancelled by Tower."""
LOG.info(f"Cancelling Tower workflow for {analysis.family}")
self.query_tower(config_file=analysis.config_path, case_id=analysis.family).cancel()

def update_analysis_status(self, case_id: str, status: str):
"""Setting analysis status."""
status: str = status.lower()
Expand Down
Loading