From e0e4561c5ae2a4880d814df1bee4d14d6fc3b955 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Thu, 27 Mar 2025 16:08:10 -0400 Subject: [PATCH 01/17] Add format Makefile target. --- Makefile | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Makefile b/Makefile index 019fb9c11..76abd5add 100644 --- a/Makefile +++ b/Makefile @@ -73,6 +73,9 @@ lint: ## check style using tox and flake8 for Python 2 and Python 3 test: ## run tests with the default Python (faster than tox) $(IN_VENV) pytest $(TESTS) +format: ## format Python code with black + $(IN_VENV) black planemo tests + quick-test: ## run quickest tests with the default Python $(IN_VENV) PLANEMO_SKIP_SLOW_TESTS=1 PLANEMO_SKIP_GALAXY_TESTS=1 pytest $(TESTS) From 12039a6aa4fa1a9aafdc947d560dd631009bb3e7 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sat, 29 Mar 2025 10:09:56 -0400 Subject: [PATCH 02/17] More consistent behavior for --no_wait on workflows. --- planemo/galaxy/activity.py | 71 ++++++++++++++++++++------------------ 1 file changed, 37 insertions(+), 34 deletions(-) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index c75ee107a..e2e2edde5 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -258,19 +258,23 @@ def invocation_to_run_response( ctx.vlog("Waiting for invocation [%s]" % invocation_id) - final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( - ctx, - invocation_id=invocation_id, - history_id=history_id, - user_gi=user_gi, - no_wait=no_wait, - polling_backoff=polling_backoff, - early_termination=early_termination, - ) - if final_invocation_state not in ("ok", "skipped", "scheduled"): - msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." - ctx.vlog(msg) - summarize_history(ctx, user_gi, history_id) + if not no_wait: + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=invocation_id, + history_id=history_id, + user_gi=user_gi, + polling_backoff=polling_backoff, + early_termination=early_termination, + ) + if final_invocation_state not in ("ok", "skipped", "scheduled"): + msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." + ctx.vlog(msg) + summarize_history(ctx, user_gi, history_id) + else: + final_invocation_state = invocation["state"] + job_state = None + error_message = None return GalaxyWorkflowRunResponse( ctx, @@ -801,27 +805,26 @@ def wait_for_invocation_and_jobs( ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") - if not no_wait: - job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff, early_termination) - if job_state not in ("ok", "skipped"): - msg = f"Failed to run workflow, at least one job is in [{job_state}] state." - error_message = msg if not error_message else f"{error_message}. {msg}" - else: - # wait for possible subworkflow invocations - invocation = user_gi.invocations.show_invocation(invocation_id) - for step in invocation["steps"]: - if step.get("subworkflow_invocation_id") is not None: - final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( - ctx, - invocation_id=step["subworkflow_invocation_id"], - history_id=history_id, - user_gi=user_gi, - no_wait=no_wait, - polling_backoff=polling_backoff, - early_termination=early_termination, - ) - if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): - return final_invocation_state, job_state, error_message + job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) + if job_state not in ("ok", "skipped"): + msg = f"Failed to run workflow, at least one job is in [{job_state}] state." + error_message = msg if not error_message else f"{error_message}. {msg}" + else: + # wait for possible subworkflow invocations + invocation = user_gi.invocations.show_invocation(invocation_id) + for step in invocation["steps"]: + if step.get("subworkflow_invocation_id") is not None: + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id=step["subworkflow_invocation_id"], + history_id=history_id, + user_gi=user_gi, + no_wait=no_wait, + polling_backoff=polling_backoff, + early_termination=early_termination, + ) + if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): + return final_invocation_state, job_state, error_message ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") return final_invocation_state, job_state, error_message From 8d9719a5db2695fa92c90121479119c3194d6654 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sat, 29 Mar 2025 10:27:50 -0400 Subject: [PATCH 03/17] Refactor retry_on_timeout for reuse. --- planemo/galaxy/activity.py | 35 ++++++++--------------------------- planemo/galaxy/api.py | 21 +++++++++++++++++++++ 2 files changed, 29 insertions(+), 27 deletions(-) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index e2e2edde5..bd1e105c6 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -36,12 +36,12 @@ unicodify, ) from pathvalidate import sanitize_filename -from requests.exceptions import ( - HTTPError, - RequestException, -) +from requests.exceptions import HTTPError -from planemo.galaxy.api import summarize_history +from planemo.galaxy.api import ( + retry_on_timeouts, + summarize_history, +) from planemo.io import wait_on from planemo.runnable import ( ErrorRunResponse, @@ -832,30 +832,11 @@ def wait_for_invocation_and_jobs( def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) + return retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) return _wait_on_state(state_func, polling_backoff) -def _retry_on_timeouts(ctx, gi, f): - gi.timeout = 60 - try_count = 5 - try: - for try_num in range(try_count): - start_time = time.time() - try: - return f(gi) - except RequestException: - end_time = time.time() - if end_time - start_time > 45 and (try_num + 1) < try_count: - ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.") - continue - else: - raise - finally: - gi.timeout = None - - def has_jobs_in_states(ctx, gi, history_id, states): params = {"history_id": history_id} jobs_url = gi.url + "/jobs" @@ -870,7 +851,7 @@ def _wait_for_history(ctx, gi, history_id, polling_backoff=0): # no need to wait for active jobs anymore I think. def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) + return retry_on_timeouts(ctx, gi, lambda gi: gi.histories.show_history(history_id)) return _wait_on_state(state_func, polling_backoff) @@ -883,7 +864,7 @@ def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_t ctx.log(f"waiting for invocation {invocation_id}") def state_func(): - return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id)) + return retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id)) return _wait_on_state(state_func, polling_backoff, early_termination=early_termination) diff --git a/planemo/galaxy/api.py b/planemo/galaxy/api.py index 6f4906f33..cbcea69a8 100644 --- a/planemo/galaxy/api.py +++ b/planemo/galaxy/api.py @@ -1,9 +1,11 @@ """A high-level interface to local Galaxy instances using bioblend.""" +import time from io import StringIO from typing import Optional from bioblend.galaxy import GalaxyInstance +from requests.exceptions import RequestException DEFAULT_ADMIN_API_KEY = "test_key" @@ -136,6 +138,25 @@ def _dataset_provenance(gi, history_id, id): return provenance +def retry_on_timeouts(ctx, gi, f): + gi.timeout = 60 + try_count = 5 + try: + for try_num in range(try_count): + start_time = time.time() + try: + return f(gi) + except RequestException: + end_time = time.time() + if end_time - start_time > 45 and (try_num + 1) < try_count: + ctx.vlog("Galaxy seems to have timed out, retrying to fetch status.") + continue + else: + raise + finally: + gi.timeout = None + + __all__ = ( "DEFAULT_ADMIN_API_KEY", "gi", From d8e57d4a8cac1f3f2c09c845c2ee018d9dbe9812 Mon Sep 17 00:00:00 2001 From: John Chilton Date: Sat, 29 Mar 2025 10:13:13 -0400 Subject: [PATCH 04/17] workflow progress bar. --- .../cmd_workflow_test_on_invocation.py | 4 +- planemo/commands/cmd_workflow_track.py | 50 +++ planemo/galaxy/activity.py | 84 +---- planemo/galaxy/invocations/__init__.py | 0 planemo/galaxy/invocations/api.py | 61 +++ planemo/galaxy/invocations/polling.py | 170 +++++++++ planemo/galaxy/invocations/progress.py | 351 ++++++++++++++++++ .../galaxy/invocations/progress_display.py | 73 ++++ planemo/galaxy/invocations/simulations.py | 227 +++++++++++ planemo/options.py | 8 + tests/test_invocation_polling.py | 142 +++++++ tests/test_workflow_progress.py | 141 +++++++ tests/test_workflow_simulation.py | 260 +++++++++++++ 13 files changed, 1499 insertions(+), 72 deletions(-) create mode 100644 planemo/commands/cmd_workflow_track.py create mode 100644 planemo/galaxy/invocations/__init__.py create mode 100644 planemo/galaxy/invocations/api.py create mode 100644 planemo/galaxy/invocations/polling.py create mode 100644 planemo/galaxy/invocations/progress.py create mode 100644 planemo/galaxy/invocations/progress_display.py create mode 100644 planemo/galaxy/invocations/simulations.py create mode 100644 tests/test_invocation_polling.py create mode 100644 tests/test_workflow_progress.py create mode 100644 tests/test_workflow_simulation.py diff --git a/planemo/commands/cmd_workflow_test_on_invocation.py b/planemo/commands/cmd_workflow_test_on_invocation.py index 8083e2a6a..9036a0825 100644 --- a/planemo/commands/cmd_workflow_test_on_invocation.py +++ b/planemo/commands/cmd_workflow_test_on_invocation.py @@ -15,9 +15,7 @@ @click.command("workflow_test_on_invocation") @options.optional_tools_arg(multiple=False, allow_uris=False, metavar="TEST.YML") -@options.required_invocation_id_arg() -@options.galaxy_url_option(required=True) -@options.galaxy_user_key_option(required=True) +@options.invocation_target_options() @options.test_index_option() @options.test_output_options() @command_function diff --git a/planemo/commands/cmd_workflow_track.py b/planemo/commands/cmd_workflow_track.py new file mode 100644 index 000000000..4c4a132fd --- /dev/null +++ b/planemo/commands/cmd_workflow_track.py @@ -0,0 +1,50 @@ +"""Module describing the planemo ``workflow_track`` command.""" + +import click + +from planemo import options +from planemo.cli import command_function +from planemo.engine.factory import engine_context +from planemo.galaxy.workflow_progress import WorkflowProgress + + +@click.command("workflow_track") +@options.invocation_target_options() +@command_function +def cli(ctx, invocation_id, **kwds): + """Run defined tests against existing workflow invocation.""" + with WorkflowProgress() as workflow_progress: + workflow_progress.add_bars() + import time + + time.sleep(1) + new_step = {"state": "new"} + scheduled_step = {"state": "scheduled"} + new_steps = [new_step, new_step, new_step] + one_scheduled_steps = [scheduled_step, new_step, new_step] + two_scheduled_steps = [scheduled_step, scheduled_step, new_step] + all_scheduled_steps = [scheduled_step, scheduled_step, scheduled_step] + state_pairs = [ + ({"state": "new"}, {}), + ({"state": "ready", "steps": new_steps}, {}), + ({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "new": 3}}), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "running": 1, "new": 2}}), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 3, "running": 1, "new": 1}}), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 4, "running": 1}}), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}), + ] + for invocation, job_states_summary in state_pairs: + workflow_progress.handle_invocation(invocation, job_states_summary) + time.sleep(1) + + with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config: + user_gi = config.user_gi + invocation = user_gi.invocations.show_invocation(invocation_id) + # https://stackoverflow.com/questions/23113494/double-progress-bar-in-python + + ctx.exit(0) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index bd1e105c6..b2e09cce8 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -3,7 +3,6 @@ import os import sys import tempfile -import time import traceback from datetime import datetime from typing import ( @@ -42,6 +41,12 @@ retry_on_timeouts, summarize_history, ) +from planemo.galaxy.invocations.api import BioblendInvocationApi +from planemo.galaxy.invocations.polling import ( + PollingTrackerImpl, +) +from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs +from planemo.galaxy.invocations.progress import WorkflowProgressDisplay from planemo.io import wait_on from planemo.runnable import ( ErrorRunResponse, @@ -785,64 +790,18 @@ def wait_for_invocation_and_jobs( invocation_id: str, history_id: str, user_gi: GalaxyInstance, - no_wait: bool, polling_backoff: int, early_termination: bool, ): - ctx.vlog("Waiting for invocation [%s]" % invocation_id) - final_invocation_state = "new" - - # TODO: hook in invocation["messages"] - error_message = "" - job_state = "ok" - try: - final_invocation_state = _wait_for_invocation(ctx, user_gi, invocation_id, polling_backoff) - assert final_invocation_state == "scheduled" - except Exception as e: - ctx.vlog(f"Problem waiting on invocation: {str(e)}") - summarize_history(ctx, user_gi, history_id) - error_message = f"Final state of invocation {invocation_id} is [{final_invocation_state}]" - - ctx.vlog(f"Final state of invocation {invocation_id} is [{final_invocation_state}]") - - job_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff) - if job_state not in ("ok", "skipped"): - msg = f"Failed to run workflow, at least one job is in [{job_state}] state." - error_message = msg if not error_message else f"{error_message}. {msg}" - else: - # wait for possible subworkflow invocations - invocation = user_gi.invocations.show_invocation(invocation_id) - for step in invocation["steps"]: - if step.get("subworkflow_invocation_id") is not None: - final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( - ctx, - invocation_id=step["subworkflow_invocation_id"], - history_id=history_id, - user_gi=user_gi, - no_wait=no_wait, - polling_backoff=polling_backoff, - early_termination=early_termination, - ) - if final_invocation_state != "scheduled" or job_state not in ("ok", "skipped"): - return final_invocation_state, job_state, error_message - - ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") - return final_invocation_state, job_state, error_message - - -def _wait_for_invocation(ctx, gi, invocation_id, polling_backoff=0): - def state_func(): - return retry_on_timeouts(ctx, gi, lambda gi: gi.invocations.show_invocation(invocation_id)) - - return _wait_on_state(state_func, polling_backoff) - - -def has_jobs_in_states(ctx, gi, history_id, states): - params = {"history_id": history_id} - jobs_url = gi.url + "/jobs" - jobs = gi.jobs._get(url=jobs_url, params=params) - target_jobs = [j for j in jobs if j["state"] in states] - return len(target_jobs) > 0 + polling_tracker = PollingTrackerImpl(polling_backoff) + invocation_api = BioblendInvocationApi(ctx, user_gi) + with WorkflowProgressDisplay(invocation_id) as workflow_progress_display: + final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs( + ctx, invocation_id, invocation_api, polling_tracker, workflow_progress_display, early_termination=early_termination + ) + if error_message: + summarize_history(ctx, user_gi, history_id) + return final_invocation_state, job_state, error_message def _wait_for_history(ctx, gi, history_id, polling_backoff=0): @@ -856,19 +815,6 @@ def state_func(): return _wait_on_state(state_func, polling_backoff) -def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0, early_termination=True): - # Wait for invocation jobs to finish. Less brittle than waiting for a history to finish, - # as you could have more than one invocation in a history, or an invocation without - # steps that produce history items. - - ctx.log(f"waiting for invocation {invocation_id}") - - def state_func(): - return retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id)) - - return _wait_on_state(state_func, polling_backoff, early_termination=early_termination) - - def _wait_for_job(gi, job_id, timeout=None): def state_func(): return gi.jobs.show_job(job_id, full_details=True) diff --git a/planemo/galaxy/invocations/__init__.py b/planemo/galaxy/invocations/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/planemo/galaxy/invocations/api.py b/planemo/galaxy/invocations/api.py new file mode 100644 index 000000000..eb15e5f01 --- /dev/null +++ b/planemo/galaxy/invocations/api.py @@ -0,0 +1,61 @@ +"""API interaction for Galaxy's workflow invocation API. + +Gives a mockable surface for testing, type contract consumed by Planemo, +and builtin utilities around bioblend for working around transient request +issues that have been observed in practice. +""" + +from typing import ( + Dict, + List, + Optional, + Protocol, +) + +from typing_extensions import TypedDict + +from planemo.galaxy.api import retry_on_timeouts + + +class InvocationStep(TypedDict, total=False): + state: Optional[str] + subworkflow_invocation_id: Optional[str] + + +class Invocation(TypedDict, total=False): + id: str + state: str + steps: List[InvocationStep] + + +class InvocationJobsSummary(TypedDict, total=False): + states: Dict[str, int] + + +class InvocationApi(Protocol): + + def get_invocation(self, invocation_id: str) -> Invocation: ... + + def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: ... + + +class BioblendInvocationApi(InvocationApi): + + def __init__(self, ctx, user_gi): + self._ctx = ctx + self._user_gi = user_gi + + def get_invocation(self, invocation_id: str) -> Invocation: + return retry_on_timeouts(self._ctx, self._user_gi, lambda gi: gi.invocations.show_invocation(invocation_id)) + + def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: + return retry_on_timeouts( + self._ctx, self._user_gi, lambda gi: gi.invocations.get_invocation_summary(invocation_id) + ) + + +def invocation_state_terminal(state: str): + return state in ["scheduled", "cancelled", "failed"] + + +JOB_ERROR_STATES = ["error", "deleted", "failed", "stopped", "stop", "deleting"] diff --git a/planemo/galaxy/invocations/polling.py b/planemo/galaxy/invocations/polling.py new file mode 100644 index 000000000..28aad0525 --- /dev/null +++ b/planemo/galaxy/invocations/polling.py @@ -0,0 +1,170 @@ +import time +from typing import ( + List, + Optional, + Protocol, +) + +from .api import ( + invocation_state_terminal, + InvocationApi, + InvocationJobsSummary, + JOB_ERROR_STATES, +) +from .progress import WorkflowProgressDisplay + + +class PollingTracker(Protocol): + + def sleep(self) -> None: ... + + +class PollingTrackerImpl(PollingTracker): + + def __init__(self, polling_backoff: int, timeout=None): + self.polling_backoff = polling_backoff + self.timeout = timeout + self.delta = 0.25 + self.total_wait_time = 0 + + def sleep(self): + if self.timeout is not None and self.total_wait_time > self.timeout: + message = "Timed out while polling Galaxy." + raise Exception(message) + self.total_wait_time += self.delta + time.sleep(self.delta) + self.delta += self.polling_backoff + + +def wait_for_invocation_and_jobs( + ctx, + invocation_id: str, + invocation_api: InvocationApi, + polling_tracker: PollingTracker, + workflow_progress_display: WorkflowProgressDisplay, +): + ctx.vlog("Waiting for invocation [%s]" % invocation_id) + + def summarize(invocation_id: str): + invocation = invocation_api.get_invocation(invocation_id) + assert invocation + invocation_jobs = invocation_api.get_invocation_summary(invocation_id) + return invocation, invocation_jobs + + last_invocation = None + last_invocation_jobs = None + last_subworkflow_invocation = None + last_subworkflow_invocation_jobs = None + last_exception = None + + done_polling = False + while not done_polling: + # loop over the main workflow and one subworkflow each iteration for display, + + # skip the main workflow if it is already tracked as complete - if all steps have been + # scheduled there are no new subworkflow invocations to track. + if not workflow_progress_display.workflow_progress.terminal: + try: + last_invocation, last_invocation_jobs = summarize(invocation_id) + workflow_progress_display.handle_invocation(last_invocation, last_invocation_jobs) + except Exception as e: + print(e) + last_exception = e + + error_message = workflow_in_error_message( + ctx, + invocation_id, + last_exception, + last_invocation, + last_invocation_jobs, + ) + if error_message: + final_invocation_state = "new" if not last_invocation else last_invocation["state"] + job_state = summary_job_state(last_invocation_jobs) + return final_invocation_state, job_state, error_message + + assert last_invocation # if we got here... the first check has passed and we have an invocation + + # grab a subworkflow that isn't complete and check it, also register its subworkflow + # invocations so we catch all the children and children of children... + if not workflow_progress_display.all_subworkflows_complete(): + try: + a_subworkflow_invocation_id = workflow_progress_display.an_incomplete_subworkflow_id() + last_subworkflow_invocation, last_subworkflow_invocation_jobs = summarize(a_subworkflow_invocation_id) + workflow_progress_display.handle_subworkflow_invocation( + last_subworkflow_invocation, last_subworkflow_invocation_jobs + ) + except Exception as e: + last_exception = e + + error_message = workflow_in_error_message( + ctx, + invocation_id, + last_exception, + last_subworkflow_invocation, + last_subworkflow_invocation_jobs, + ) + if error_message: + final_invocation_state = ( + "new" if not last_subworkflow_invocation else last_subworkflow_invocation["state"] + ) + job_state = summary_job_state(last_subworkflow_invocation_jobs) + return final_invocation_state, job_state, error_message + + done_polling = ( + workflow_progress_display.workflow_progress.terminal + and workflow_progress_display.all_subworkflows_complete() + ) + if not done_polling: + polling_tracker.sleep() + + ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") + job_state = summary_job_state(last_invocation_jobs) + return last_invocation["state"], job_state, error_message + + +def workflow_in_error_message( + ctx, invocation_id, last_exception, last_invocation, last_invocation_jobs +) -> Optional[str]: + """Return an error message if workflow is in an error state.""" + + invocation_state = "new" if not last_invocation else last_invocation["state"] + job_state = summary_job_state(last_invocation_jobs) + + error_message = None + if last_exception: + ctx.vlog(f"Problem waiting on invocation: {str(last_exception)}") + error_message = f"Final state of invocation {invocation_id} is [{invocation_state}]" + + if invocation_state_terminal(invocation_state) and invocation_state != "scheduled": + msg = f"Failed to run workflow, invocation ended in [{invocation_state}] state." + ctx.vlog(msg) + error_message = msg if not error_message else f"{error_message}. {msg}" + + if job_state in JOB_ERROR_STATES: + msg = f"Failed to run workflow, at least one job is in [{job_state}] state." + ctx.vlog(msg) + error_message = msg if not error_message else f"{error_message}. {msg}" + + return error_message + + +# we're still mocking out the old history state by just picking out a random +# job state of interest. Seems like we should drop this. +def summary_job_state(job_states_summary: Optional[InvocationJobsSummary]): + states = (job_states_summary or {"states": {}}).get("states").copy() + states.pop("ok", None) + states.pop("skipped", None) + if states: + return next(iter(states.keys())) + else: + return "ok" + + +def subworkflow_invocation_ids(invocation_api: InvocationApi, invocation_id: str) -> List[str]: + invocation = invocation_api.get_invocation(invocation_id) + subworkflow_invocation_ids = [] + for step in invocation["steps"]: + if step.get("subworkflow_invocation_id") is not None: + subworkflow_invocation_ids.append(step["subworkflow_invocation_id"]) + return subworkflow_invocation_ids diff --git a/planemo/galaxy/invocations/progress.py b/planemo/galaxy/invocations/progress.py new file mode 100644 index 000000000..239ea54e7 --- /dev/null +++ b/planemo/galaxy/invocations/progress.py @@ -0,0 +1,351 @@ +import random +from io import StringIO +from typing import ( + Dict, + List, + Optional, + Set, +) + +from rich.console import Group +from rich.live import Live +from rich.panel import Panel +from rich.progress import ( + BarColumn, + Progress, + TaskID, + TaskProgressColumn, + TextColumn, +) +from typing_extensions import TypedDict + +from .api import ( + invocation_state_terminal, + JOB_ERROR_STATES, +) +from .progress_display import DisplayConfiguration + + +# Types for various invocation responses +class InvocationStep(TypedDict, total=False): + state: Optional[str] + subworkflow_invocation_id: Optional[str] + + +class Invocation(TypedDict, total=False): + id: str + state: str + steps: List[InvocationStep] + + +class InvocationJobsSummary(TypedDict, total=False): + states: Dict[str, int] + + +class WorkflowProgress(Progress): + invocation_state: str = "new" + step_count: Optional[int] = None + job_count: int = 0 + steps_color: str + jobs_color: str + subworkflows_color: str + step_states: Dict = {} + num_ok: int = 0 + num_new: int = 0 + num_queued: int = 0 + num_running: int = 0 + num_errors: int = 0 + num_paused: int = 0 + + num_subworkflows: int = 0 + num_subworkflows_complete: int = 0 + + _jobs_task: TaskID + _steps_task: TaskID + _subworkflows_task: Optional[TaskID] = None + + def __init__(self, display: DisplayConfiguration): + self.display = display + bar_column = BarColumn( + style=self.display.style_bar_back, + finished_style=self.display.style_bar_finished, + complete_style=self.display.style_bar_complete, + ) + self.jobs_color = self.display.style_initializing + self.steps_color = self.display.style_initializing + self.subworkflows_color = self.display.style_initializing + super().__init__( + TextColumn("[progress.description]{task.description}"), + TextColumn(display.divider), + bar_column, + TextColumn(display.divider), + TaskProgressColumn(f"[{self.display.style_percent}]{{task.percentage:>3.0f}}%"), + TextColumn(display.divider), + TextColumn(text_format="{task.fields[status]}"), + ) + self.add_bars() + + @property + def invocation_scheduling_terminal(self): + return invocation_state_terminal(self.invocation_state) + + @property + def jobs_terminal(self): + return self.job_count is not None and self.job_count == self.jobs_terminal_count + + @property + def terminal(self): + return self.invocation_scheduling_terminal and self.jobs_terminal + + def handle_subworkflow_counts(self, num: int, num_complete: int): + previous_count = self.num_subworkflows + self.num_subworkflows = num + self.num_subworkflows_complete = num_complete + if previous_count < 2 and num >= 2: + self._subworkflows_task = self.add_task( + f"[{self.subworkflows_color}]{self.display.label_progress_subworkflows}", status="" + ) + + if num >= 2: + self.subworkflows_color = self.display.style_ok + subworkflows_status = f"{self.num_subworkflows_complete}/{self.num_subworkflows} terminal" + self.update( + self._subworkflows_task, + total=self.num_subworkflows, + completed=self.num_subworkflows_complete, + description=f"[{self.subworkflows_color}]{self.display.label_progress_subworkflows}", + status=subworkflows_status, + ) + + def handle_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): + self.invocation_state = invocation.get("state") or "new" + self.step_count = len(invocation.get("steps") or []) or None + self.step_states = step_states(invocation) + + steps_completed = None + + steps_status = "" + if self.step_count is None: + steps_status = "Loading steps." + self.steps_color = self.display.style_initializing + elif self.invocation_state == "cancelled": + steps_status = "Invocation cancelled" + self.steps_color = self.display.style_error + elif self.invocation_state == "failed": + steps_status = "Invocation failed" + self.steps_color = self.display.style_error + else: + num_scheduled = self.step_states.get("scheduled") or 0 + if num_scheduled > 0: + self.steps_color = self.display.style_ok + else: + self.steps_color = self.display.style_initializing + steps_completed = num_scheduled + steps_status = f"{num_scheduled}/{self.step_count} scheduled" + + jobs_status = "" + self.job_count = job_count(job_state_summary) + self.num_new = count_states(job_state_summary, ["new"]) + self.num_queued = count_states(job_state_summary, ["queued", "waiting"]) + self.num_running = count_states(job_state_summary, ["running"]) + self.num_errors = error_count(job_state_summary) + self.num_ok = ok_count(job_state_summary) + self.jobs_completed = self.num_ok + self.num_errors + self.num_paused = count_states(job_state_summary, ["paused"]) + self.jobs_terminal_count = self.jobs_completed + self.num_paused + jobs_total = self.job_count + if self.num_errors > 0: + self.jobs_color = self.display.style_error + elif self.job_count > 0: + self.jobs_color = self.display.style_ok + else: + self.jobs_color = self.display.style_initializing + self.jobs_completed = None + jobs_total = None + if self.job_count > 0: + jobs_status = f"{self.jobs_completed}/{self.job_count} terminal" + self.update( + self._steps_task, + total=self.step_count, + completed=steps_completed, + description=f"[{self.steps_color}]{self.display.label_progress_steps}", + status=steps_status, + ) + self.update( + self._jobs_task, + total=jobs_total, + completed=self.jobs_completed, + description=f"[{self.jobs_color}]{self.display.label_progress_jobs}", + status=jobs_status, + ) + + def _job_states_console_line(self): + output = StringIO() + if self.num_ok > 0: + output.write(f"{self.display.icon_state_ok} {self.num_ok} {self.display.divider} ") + if self.num_errors > 0: + output.write(f"{self.display.icon_state_errors} {self.num_errors} {self.display.divider} ") + if self.num_new > 0: + output.write(f"{self.display.icon_state_new} {self.num_new} {self.display.divider} ") + if self.num_queued > 0: + output.write(f"{self.display.icon_state_queued} {self.num_queued} {self.display.divider} ") + if self.num_running > 0: + output.write(f"{self.display.icon_state_running} {self.num_running} {self.display.divider} ") + if self.num_paused > 0: + output.write(f"{self.display.icon_state_paused} {self.num_paused} {self.display.divider} ") + + result = output.getvalue().rstrip(" {self.display.divider} ") + output.close() + # Is there an actual way to reset this? The undefined style seems to work but is a hack. + return f"[{self.jobs_color}]{self.display.label_job_states_prefix} [reset]{self.display.divider} {result}" + + def add_bars(self): + self._steps_task = self.add_task(f"[{self.steps_color}]{self.display.label_progress_steps}", status="") + self._jobs_task = self.add_task(f"[{self.jobs_color}]{self.display.label_progress_jobs}", status="") + + +# converted from Galaxy TypeScript (see util.ts next to WorkflowInvocationState.vue) +def count_states(job_summary: Optional[InvocationJobsSummary], query_states: list[str]) -> int: + count = 0 + states = job_summary.get("states") if job_summary else None + if states: + for state in query_states: + count += states.get(state, 0) + return count + + +def job_count(job_summary: Optional[InvocationJobsSummary]) -> int: + states = job_summary.get("states") if job_summary else None + count = 0 + if states: + for state_count in states.values(): + if state_count: + count += state_count + return count + + +def step_states(invocation: Invocation): + step_states = {} + steps = invocation.get("steps") or [] + for step in steps: + if not step: + continue + step_state = step.get("state") or "unknown" + if step_state not in step_states: + step_states[step_state] = 0 + step_states[step_state] += 1 + + return step_states + + +def ok_count(job_summary: InvocationJobsSummary) -> int: + return count_states(job_summary, ["ok", "skipped"]) + + +def error_count(job_summary: InvocationJobsSummary) -> int: + return count_states(job_summary, JOB_ERROR_STATES) + + +def running_count(job_summary: InvocationJobsSummary) -> int: + return count_states(job_summary, ["running"]) + + +class WorkflowProgressDisplay(Live): + workflow_progress: WorkflowProgress + subworkflow_progress: Optional[WorkflowProgress] = None + subworkflow_invocation_ids_seen: Set[str] = set() + subworkflow_invocation_ids_completed: Set[str] = set() + invocation_id: str + subworkflow_invocation_id: Optional[str] = None + + def __init__( + self, + invocation_id: str, + display_configuration: Optional[DisplayConfiguration] = None, + ): + self.invocation_id = invocation_id + display = display_configuration or DisplayConfiguration() + self.display = display + self.workflow_progress = WorkflowProgress(display) + self.subworkflow_progress = WorkflowProgress(display) + super().__init__(self._panel()) + + def _register_subworkflow_invocation_ids_from(self, invocation: Invocation): + subworkflow_invocation_ids = [] + for step in invocation["steps"]: + if step.get("subworkflow_invocation_id") is not None: + subworkflow_invocation_ids.append(step["subworkflow_invocation_id"]) + self._register_subworkflow_invocation_ids(subworkflow_invocation_ids) + + def _register_subworkflow_invocation_ids(self, ids: List[str]): + for invocation_id in ids: + self.subworkflow_invocation_ids_seen.add(invocation_id) + + def _complete_subworkflow(self, id: str): + self.subworkflow_invocation_ids_completed.add(id) + + def an_incomplete_subworkflow_id(self): + return random.choice(tuple(self.subworkflow_invocation_ids_seen - self.subworkflow_invocation_ids_completed)) + + def all_subworkflows_complete(self): + return len(self.subworkflow_invocation_ids_seen) == len(self.subworkflow_invocation_ids_completed) + + def _panel(self): + def job_states(workflow_progress): + if self.display.include_job_state_breakdown: + return workflow_progress._job_states_console_line() + else: + return None + + title = f"[{self.display.style_header}]{self.display.label_header_prefix}<{self.invocation_id}>" + subworkflow_title = None + if self.subworkflow_invocation_id: + subworkflow_title = f"[{self.display.style_subworkflow_header}]{self.display.label_subworkflow_header_prefix}<{self.subworkflow_invocation_id}>" + + if not self.subworkflow_invocation_id or not self.display.include_nested_subworkflows: + renderable = as_group( + self.workflow_progress, + job_states(self.workflow_progress), + ) + elif not self.display.subworkflows_as_panel: + renderable = as_group( + self.workflow_progress, + job_states(self.workflow_progress), + subworkflow_title, + self.subworkflow_progress, + job_states(self.subworkflow_progress), + ) + else: + renderable = as_group( + self.workflow_progress, + job_states(self.workflow_progress), + Panel( + as_group(self.subworkflow_progress, job_states(self.subworkflow_progress)), + title=subworkflow_title, + ), + ) + return Panel(renderable, title=title, expand=True) + + def _update_panel(self): + self.update(self._panel()) + + def handle_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): + self.workflow_progress.handle_invocation(invocation, job_state_summary) + self._register_subworkflow_invocation_ids_from(invocation) + self._update_panel() + + def handle_subworkflow_invocation(self, invocation: Invocation, job_state_summary: InvocationJobsSummary): + self.subworkflow_invocation_id = invocation["id"] + self.subworkflow_progress.handle_invocation(invocation, job_state_summary) + self._register_subworkflow_invocation_ids_from(invocation) + if self.subworkflow_progress.terminal: + self._complete_subworkflow(invocation["id"]) + self.workflow_progress.handle_subworkflow_counts( + len(self.subworkflow_invocation_ids_seen), + len(self.subworkflow_invocation_ids_completed), + ) + self._update_panel() + + +def as_group(*renderables): + return Group(*(r for r in renderables if r)) diff --git a/planemo/galaxy/invocations/progress_display.py b/planemo/galaxy/invocations/progress_display.py new file mode 100644 index 000000000..8eefffbe6 --- /dev/null +++ b/planemo/galaxy/invocations/progress_display.py @@ -0,0 +1,73 @@ +from pydantic import BaseModel + +# from rich.style import StyleType - doesn't work with Pydantic, keeping to string styles for now +StyleType = str + +# uses a bit more space but has better visual separation between subworkflows and workflows +DISPLAY_INCLUDE_NESTED_SUBWORKFLOWS = True +DISPLAY_SUBWORKFLOWS_AS_PANEL = True +DISPLAY_INCLUDE_JOB_STATE_BREAKDOWN = True +DISPLAY_DIVIDER = "β—†" +# bar.* are Rich defaults for these values. +DISPLAY_STYLE_BAR_BACK: StyleType = "bar.back" +DISPLAY_STYLE_BAR_FINISHED: StyleType = "bar.finished" +DISPLAY_STYLE_BAR_COMPLETE: StyleType = "bar.complete" + +DISPLAY_STYLE_INITIALIZING: StyleType = "cyan" +DISPLAY_STYLE_OK: StyleType = "green" +DISPLAY_STYLE_RUNNING: StyleType = "green" +DISPLAY_STYLE_ERROR: StyleType = "red" + +# Rich default style - a magenta +DISPLAY_STYLE_PERCENT: StyleType = "progress.percentage" + +DISPLAY_STYLE_HEADER: StyleType = "bold" +DISPLAY_STYLE_SUBWORKFLOW_HEADER: StyleType = "bold" + +DISPLAY_LABEL_HEADER_PREFIX = "Invocation " +DISPLAY_LABEL_SUBWORKFLOW_HEADER_PREFIX = "Subworkflow Invocation " +DISPLAY_LABEL_PROGRESS_STEPS = "Steps" +DISPLAY_LABEL_PROGRESS_JOBS = "Jobs" +DISPLAY_LABEL_PROGRESS_SUBWORKFLOWS = "SubWFs" +DISPLAY_LABEL_JOB_STATES_PREFIX = "Job States" + +DISPLAY_ICON_STATE_OK = "🟒" +DISPLAY_ICON_STATE_ERRORS = "πŸ”΄" +DISPLAY_ICON_STATE_NEW = "πŸ†•" +DISPLAY_ICON_STATE_QUEUED = "⏳" +DISPLAY_ICON_STATE_RUNNING = "πŸ‘Ÿ" +DISPLAY_ICON_STATE_PAUSED = "⏸️" + + +class DisplayConfiguration(BaseModel): + include_nested_subworkflows: bool = DISPLAY_INCLUDE_NESTED_SUBWORKFLOWS + include_job_state_breakdown: bool = DISPLAY_INCLUDE_JOB_STATE_BREAKDOWN + subworkflows_as_panel: bool = DISPLAY_SUBWORKFLOWS_AS_PANEL + divider: str = DISPLAY_DIVIDER + style_bar_back: StyleType = DISPLAY_STYLE_BAR_BACK + style_bar_complete: StyleType = DISPLAY_STYLE_BAR_COMPLETE + style_bar_finished: StyleType = DISPLAY_STYLE_BAR_FINISHED + + style_percent: StyleType = DISPLAY_STYLE_PERCENT + + style_initializing: StyleType = DISPLAY_STYLE_INITIALIZING + style_ok: StyleType = DISPLAY_STYLE_OK + style_running: StyleType = DISPLAY_STYLE_RUNNING + style_error: StyleType = DISPLAY_STYLE_ERROR + + style_header: StyleType = DISPLAY_STYLE_HEADER + style_subworkflow_header: StyleType = DISPLAY_STYLE_SUBWORKFLOW_HEADER + + label_header_prefix: str = DISPLAY_LABEL_HEADER_PREFIX + label_subworkflow_header_prefix: str = DISPLAY_LABEL_SUBWORKFLOW_HEADER_PREFIX + label_progress_steps: str = DISPLAY_LABEL_PROGRESS_STEPS + label_progress_jobs: str = DISPLAY_LABEL_PROGRESS_JOBS + label_progress_subworkflows: str = DISPLAY_LABEL_PROGRESS_SUBWORKFLOWS + label_job_states_prefix: str = DISPLAY_LABEL_JOB_STATES_PREFIX + + icon_state_ok: str = DISPLAY_ICON_STATE_OK + icon_state_errors: str = DISPLAY_ICON_STATE_ERRORS + icon_state_new: str = DISPLAY_ICON_STATE_NEW + icon_state_queued: str = DISPLAY_ICON_STATE_QUEUED + icon_state_running: str = DISPLAY_ICON_STATE_RUNNING + icon_state_paused: str = DISPLAY_ICON_STATE_PAUSED diff --git a/planemo/galaxy/invocations/simulations.py b/planemo/galaxy/invocations/simulations.py new file mode 100644 index 000000000..10dfeedf9 --- /dev/null +++ b/planemo/galaxy/invocations/simulations.py @@ -0,0 +1,227 @@ +"""Simulate Galaxy workflows running on a server for testing purposes.""" + +from collections import deque +from typing import ( + List, + Optional, +) +from uuid import uuid4 + +import yaml + +from .api import Invocation as InvocationResponse +from .api import ( + InvocationJobsSummary, +) + + +class Ticks: + after: int + + @property + def active(self): + return self.after <= 0 + + def tick(self) -> None: + if self.active: + self.tick_when_active() + else: + self.after -= 1 + + def tick_when_active(self) -> None: ... + + +class StateWithDuration(Ticks): + + def __init__(self, state: str, duration: int): + self.after = 0 + self.state = state + self.duration = duration + + def tick_when_active(self) -> None: + self.duration -= 1 + + +class HasState(Ticks): + final: Optional[str] + + def __init__(self, after: int, states: List[StateWithDuration]): + self.after = after or 0 + self.states = deque(states) + self.final_state = None + + def tick_when_active(self) -> None: + if self.final_state is not None: + return + + next_state = self.states.popleft() + next_state.tick() + if next_state.duration == 0 and not self.states: + self.final_state = next_state.state + elif next_state.duration != 0: + self.states.appendleft(next_state) + # else: next state will be state during next tick + + @property + def state(self): + if self.final_state is not None: + return self.final_state + else: + return self.states[0].state + + +Job = HasState + + +class InvocationStep(HasState): + invocation: Optional["Invocation"] + jobs: Optional[List[Job]] + + def __init__( + self, jobs: List[Job], invocation: Optional["Invocation"], after: int, states: List[StateWithDuration] + ): + super().__init__(after, states) + self.jobs = jobs + self.invocation = invocation + + def tick_when_active(self) -> None: + super().tick_when_active() + if self.jobs: + for job in self.jobs: + job.tick() + if self.invocation: + self.invocation.tick() + + @property + def active_jobs(self) -> List[Job]: + return [j for j in (self.jobs or []) if j.active] + + +class Invocation(HasState): + + def __init__(self, steps: List[InvocationStep], after: int, states: List[StateWithDuration]): + self.id = str(uuid4())[:8] + self.steps = steps + super().__init__(after, states) + + def tick_when_active(self) -> None: + super().tick_when_active() + for step in self.steps: + step.tick() + + @property + def active_steps(self) -> List[InvocationStep]: + return [s for s in self.steps if s.active] + + def get_invocation_by_id(self, invocation_id: str) -> Optional["Invocation"]: + if self.id == invocation_id: + return self + for step in self.steps: + step_invocation = step.invocation + if step_invocation: + step_subworkflow_invocation_with_id = step_invocation.get_invocation_by_id(invocation_id) + if step_subworkflow_invocation_with_id is not None: + return step_subworkflow_invocation_with_id + return None + + def get_subworkflow_invocation(self, subworkflow_invocation_id: str) -> "Invocation": + for step in self.steps: + if step.invocation and step.invocation.id == subworkflow_invocation_id: + return step.invocation + raise Exception(f"Unknown subworkflow invocation id ({subworkflow_invocation_id})") + + def get_subworkflow_invocation_by_step_index(self, index: int) -> "Invocation": + return self.steps[index].invocation + + def get_api_invocation(self) -> InvocationResponse: + steps = [] + for step in self.active_steps: + api_step = { + "state": step.state, + } + if step.invocation: + api_step["subworkflow_invocation_id"] = step.invocation.id + + steps.append(api_step) + return { + "id": self.id, + "state": self.state, + "steps": steps, + } + + def get_api_jobs_summary(self) -> InvocationJobsSummary: + jobs = [] + for step in self.active_steps: + for job in step.active_jobs: + api_job = { + "state": job.state, + } + jobs.append(api_job) + by_state = {} + for job in jobs: + state = job["state"] + if state not in by_state: + by_state[state] = 0 + by_state[state] += 1 + return {"states": by_state} + + +def parse_workflow_simulation_from_string(workflow_simulation: str) -> Invocation: + return parse_workflow_simulation(yaml.safe_load(workflow_simulation)) + + +def parse_workflow_simulation(workflow_simulation: dict) -> Invocation: + return parse_workflow_simulation_invocation(workflow_simulation) + + +def parse_workflow_simulation_job(workflow_simulation_job: dict) -> Job: + states = parse_states_from(workflow_simulation_job) + after = parse_after_from(workflow_simulation_job) + return Job(after, states) + + +def parse_workflow_simulation_invocation_step(workflow_simulation_invocation_step: dict) -> InvocationStep: + states = parse_states_from(workflow_simulation_invocation_step) + after = parse_after_from(workflow_simulation_invocation_step) + if "invocation" in workflow_simulation_invocation_step: + invocation = parse_workflow_simulation_invocation(workflow_simulation_invocation_step["invocation"]) + else: + invocation = None + jobs = None + if "jobs" in workflow_simulation_invocation_step: + jobs = [] + for job in workflow_simulation_invocation_step.get("jobs"): + jobs.append(parse_workflow_simulation_job(job)) + return InvocationStep(jobs, invocation, after, states) + + +def parse_workflow_simulation_invocation(workflow_simulation_invocation: dict) -> Invocation: + states = parse_states_from(workflow_simulation_invocation) + after = parse_after_from(workflow_simulation_invocation) + steps = [] + for step in workflow_simulation_invocation.get("steps"): + steps.append(parse_workflow_simulation_invocation_step(step)) + + return Invocation(steps, after, states) + + +def parse_after_from(simulation_object: dict) -> int: + return simulation_object.get("after", 0) + + +def parse_states_from(simulation_object: dict) -> List[StateWithDuration]: + if "states" in simulation_object: + states = simulation_object["states"] + states_with_duration = [] + for state in states: + if ":" in state: + state, duration_str = state.split(":", 1) + duration = int(duration_str) + state_with_duration = StateWithDuration(state, duration) + else: + state_with_duration = StateWithDuration(state, 1) + states_with_duration.append(state_with_duration) + return states_with_duration + else: + state = simulation_object["state"] + return [StateWithDuration(state, 1)] diff --git a/planemo/options.py b/planemo/options.py index b61d1760e..79fc425ff 100644 --- a/planemo/options.py +++ b/planemo/options.py @@ -2220,6 +2220,14 @@ def mulled_action_option(): ) +def invocation_target_options(): + return _compose( + required_invocation_id_arg(), + galaxy_url_option(required=True), + galaxy_user_key_option(required=True), + ) + + def mulled_options(): return _compose( mulled_conda_option(), diff --git a/tests/test_invocation_polling.py b/tests/test_invocation_polling.py new file mode 100644 index 000000000..33aa36ef9 --- /dev/null +++ b/tests/test_invocation_polling.py @@ -0,0 +1,142 @@ +from time import sleep +from typing import Optional + +from planemo.galaxy.invocations.api import Invocation as InvocationResponse +from planemo.galaxy.invocations.api import ( + InvocationApi, + InvocationJobsSummary, +) +from planemo.galaxy.invocations.polling import ( + PollingTracker, + wait_for_invocation_and_jobs, +) +from planemo.galaxy.invocations.progress import WorkflowProgressDisplay +from planemo.galaxy.invocations.progress_display import DisplayConfiguration +from planemo.galaxy.invocations.simulations import ( + Invocation, + parse_workflow_simulation_from_string, +) +from .test_utils import create_test_context +from .test_workflow_simulation import ( + SCENARIO_1, + SCENARIO_MULTIPLE_OK_SUBWORKFLOWS, + SCENARIO_NESTED_SUBWORKFLOWS, +) + +SLEEP = 0 + + +class MockPollingTracker(PollingTracker): + + def __init__(self, simulation: Invocation): + self._simulation = simulation + + def sleep(self) -> None: + self._simulation.tick() + if SLEEP > 0: + sleep(SLEEP) + + +def test_polling_scenario_1(): + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + assert error_message + assert "failed" in error_message + + +def test_polling_scenario_three_ok_subworkflows(): + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_MULTIPLE_OK_SUBWORKFLOWS) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_polling_scenario_nested_subworkflows(): + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_NESTED_SUBWORKFLOWS) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_polling_without_display(): + simulation = parse_workflow_simulation_from_string(SCENARIO_1) + invocation_id = simulation.id + invocation_api = SimulatedApi(simulation) + polling_tracker = MockPollingTracker(simulation) + ctx = create_test_context() + # using this without setting up the display context seems to use all the tracking + # without printing to the console. Hides a lot of bad design mixing presentation and + # tracking logic being too mixed together. + display = WorkflowProgressDisplay(invocation_id) + final_invocation_state, job_state, error_message = wait_for_invocation_and_jobs( + ctx, + invocation_id, + invocation_api, + polling_tracker, + display, + ) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + assert error_message + assert "failed" in error_message + + +def test_polling_with_compact_display(): + display_configuration = DisplayConfiguration( + include_nested_subworkflows=False, + include_job_state_breakdown=False, + ) + final_invocation_state, job_state, error_message = run_workflow_simulation( + SCENARIO_NESTED_SUBWORKFLOWS, display_configuration + ) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def test_polling_without_invocation_as_full_subpanel(): + display_configuration = DisplayConfiguration( + include_nested_subworkflows=True, + include_job_state_breakdown=True, + subworkflows_as_panel=False, + ) + final_invocation_state, job_state, error_message = run_workflow_simulation( + SCENARIO_NESTED_SUBWORKFLOWS, display_configuration + ) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def run_workflow_simulation(yaml_str: str, display_configuration: Optional[DisplayConfiguration] = None): + simulation = parse_workflow_simulation_from_string(yaml_str) + invocation_id = simulation.id + invocation_api = SimulatedApi(simulation) + polling_tracker = MockPollingTracker(simulation) + ctx = create_test_context() + with WorkflowProgressDisplay(invocation_id, display_configuration=display_configuration) as display: + return wait_for_invocation_and_jobs( + ctx, + invocation_id, + invocation_api, + polling_tracker, + display, + ) + + +class SimulatedApi(InvocationApi): + _simulation: Invocation + + def __init__(self, invocation: Invocation): + self._simulation = invocation + + def get_invocation(self, invocation_id: str) -> InvocationResponse: + invocation = self._simulation.get_invocation_by_id(invocation_id) + assert invocation, f"Simulation has no invocation_id {invocation_id}" + return invocation.get_api_invocation() + + def get_invocation_summary(self, invocation_id: str) -> InvocationJobsSummary: + invocation = self._simulation.get_invocation_by_id(invocation_id) + assert invocation, f"Simulation has no invocation_id {invocation_id}" + return invocation.get_api_jobs_summary() diff --git a/tests/test_workflow_progress.py b/tests/test_workflow_progress.py new file mode 100644 index 000000000..6a662baf4 --- /dev/null +++ b/tests/test_workflow_progress.py @@ -0,0 +1,141 @@ +import time + +from planemo.galaxy.invocations.progress import ( + WorkflowProgress, + WorkflowProgressDisplay, +) + +STEP_NEW = {"state": "new"} +STEP_SCHEDULED = {"state": "scheduled"} +SLEEP = 0.8 + + +def test_workflow_progress_typical(): + with WorkflowProgressDisplay("myid12345abcde") as live: + new_steps = [STEP_NEW, STEP_NEW, STEP_NEW, STEP_NEW] + one_scheduled_steps = [STEP_SCHEDULED, STEP_NEW, STEP_NEW, STEP_NEW] + two_scheduled_steps = [STEP_SCHEDULED, STEP_SCHEDULED, STEP_NEW, STEP_NEW] + three_scheduled_steps = [STEP_SCHEDULED, STEP_SCHEDULED, STEP_SCHEDULED, STEP_NEW] + all_scheduled_steps = [STEP_SCHEDULED, STEP_SCHEDULED, STEP_SCHEDULED, STEP_SCHEDULED] + + state_pairs = [ + ({"state": "new"}, {}, None), + ({"state": "ready", "steps": new_steps}, {}, None), + ({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}, None), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}, None), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 2, "new": 3}}, None), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 2, "running": 1, "new": 2}}, + None, + ), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 3, "running": 1, "new": 1}}, + None, + ), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 4, "running": 1}}, None), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 5}}, None), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, None), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, None), + ({"state": "ready", "steps": new_steps}, {}, "abcde123456"), + ({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}, "abcde123456"), + ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}, "abcde123456"), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 2, "new": 3}}, "abcde123456"), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 2, "running": 1, "new": 2}}, + "abcde123456", + ), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 3, "running": 1, "new": 1}}, + "abcde123456", + ), + ( + {"state": "scheduled", "steps": three_scheduled_steps}, + {"states": {"ok": 4, "running": 1}}, + "abcde123456", + ), + ({"state": "scheduled", "steps": three_scheduled_steps}, {"states": {"ok": 5}}, "abcde123456"), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, "abcde123456"), + ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}, "abcde123456"), + ] + i = 0 + for invocation, job_states_summary, subworkflow_invocation_id in state_pairs: + i = i + 1 + if subworkflow_invocation_id is None: + live.handle_invocation(invocation, job_states_summary) + else: + invocation = invocation.copy() + invocation["id"] = subworkflow_invocation_id + live.handle_subworkflow_invocation(invocation, job_states_summary) + time.sleep(SLEEP) + + +def test_workflow_progress_scheduling_state_handling(): + with WorkflowProgress() as workflow_progress: + workflow_progress.handle_invocation({"state": "new", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "ready", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "cancelling", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation( + {"state": "requires_materialization", "steps": [STEP_NEW]}, {"states": {"new": 1}} + ) + assert not workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "scheduled", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "cancelled", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert workflow_progress.invocation_scheduling_terminal + + workflow_progress.handle_invocation({"state": "failed", "steps": [STEP_NEW]}, {"states": {"new": 1}}) + assert workflow_progress.invocation_scheduling_terminal + + +# From Galaxy: +# NEW = "new" +# RESUBMITTED = "resubmitted" +# UPLOAD = "upload" +# WAITING = "waiting" +# QUEUED = "queued" +# RUNNING = "running" +# OK = "ok" +# ERROR = "error" +# FAILED = "failed" +# PAUSED = "paused" +# DELETING = "deleting" +# DELETED = "deleted" +# STOPPING = "stop" +# STOPPED = "stopped" +# SKIPPED = "skipped" +def test_workflow_progress_job_state_handling(): + scheduled_invocation = {"state": "scheduled", "steps": [STEP_SCHEDULED]} + + with WorkflowProgress() as workflow_progress: + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"new": 1}}) + assert not workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"new": 1, "ok": 2}}) + assert not workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"ok": 2}}) + assert workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"ok": 2, "paused": 1}}) + assert workflow_progress.jobs_terminal + + workflow_progress.handle_invocation(scheduled_invocation, {"states": {"ok": 2, "paused": 1, "new": 1}}) + assert not workflow_progress.jobs_terminal diff --git a/tests/test_workflow_simulation.py b/tests/test_workflow_simulation.py new file mode 100644 index 000000000..d31dacb2d --- /dev/null +++ b/tests/test_workflow_simulation.py @@ -0,0 +1,260 @@ +from planemo.galaxy.invocations.simulations import parse_workflow_simulation_from_string + +SCENARIO_1 = """ +states: [new, ready:4, scheduled] +steps: +- state: scheduled + jobs: + - states: [new, queued:2, running:2, ok] +- after: 2 + state: scheduled + jobs: + - states: [new, queued, failed] + - states: [new, queued, ok] +- after: 3 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] +""" + +SCENARIO_MULTIPLE_OK_SUBWORKFLOWS = """ +states: [new, ready:4, scheduled] +steps: +- state: scheduled + jobs: + - states: [new, queued:2, running:2, ok] +- after: 2 + state: scheduled + jobs: + - states: [new, queued, running:2, ok] + - states: [new, queued, running:4, ok] +- after: 3 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] +- after: 4 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued, running, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] +- after: 5 + state: scheduled + invocation: + states: [new, ready:2, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued:3, ok] + - states: [new, queued:1, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued:2, running:4, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, running:5, ok] +""" + +SCENARIO_NESTED_SUBWORKFLOWS = """ +states: [new, ready:4, scheduled] +steps: +- state: scheduled + jobs: + - states: [new, queued:2, running:2, ok] +- after: 2 + state: scheduled + jobs: + - states: [new, queued, running:2, ok] + - states: [new, queued, running:4, ok] +- after: 3 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] + - after: 3 + states: [new, ready, scheduled] + invocation: + states: [new, ready:4, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, running, ok] + - states: [new, queued:3, running, ok] +- after: 4 + state: scheduled + invocation: + states: [new, ready, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued, running, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, queued:2, ok] +- after: 5 + state: scheduled + invocation: + states: [new, ready:2, scheduled] + steps: + - state: scheduled + jobs: + - states: [new, queued:3, ok] + - states: [new, queued:1, ok] + - after: 3 + state: scheduled + jobs: + - states: [new, queued:2, running:4, ok] + - states: [new, queued:3, running:2, ok] + - after: 4 + state: scheduled + jobs: + - states: [new, queued, ok] + - states: [new, running:5, ok] +""" + + +def test_parse_scenario_1_invocation_state_evolution(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + invocation_dict = invocation.get_api_invocation() + assert invocation_dict["state"] == "new" + invocation.tick() + invocation_dict = invocation.get_api_invocation() + assert invocation_dict["state"] == "ready" + invocation.tick() + invocation.tick() + invocation.tick() + invocation.tick() + invocation_dict = invocation.get_api_invocation() + assert invocation_dict["state"] == "scheduled" + + +def test_parse_scenario_1_invocation_step_states(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + invocation_dict = invocation.get_api_invocation() + steps = invocation_dict["steps"] + assert len(steps) == 1 + + invocation.tick() + invocation.tick() + + invocation_dict = invocation.get_api_invocation() + steps = invocation_dict["steps"] + assert len(steps) == 2 + assert steps[0]["state"] == "scheduled" + assert steps[1]["state"] == "scheduled" + + invocation.tick() + + invocation_dict = invocation.get_api_invocation() + steps = invocation_dict["steps"] + assert len(steps) == 3 + assert steps[2]["state"] == "scheduled" + + +def test_parse_scenario_1_invocation_job_states(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 1 + assert states["new"] == 1 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 1 + assert states["queued"] == 1 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 2 + assert states["queued"] == 1 + assert states["new"] == 2 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 2 + assert states["queued"] == 2 + assert states["running"] == 1 + + invocation.tick() + + states = invocation.get_api_jobs_summary()["states"] + assert len(states) == 3 + assert states["ok"] == 1 + assert states["running"] == 1 + assert states["failed"] == 1 + + +def test_parse_scenario_1_subworkflow_invocation_state(): + invocation = parse_workflow_simulation_from_string(SCENARIO_1) + + invocation.tick() + invocation.tick() + invocation.tick() + + subworkflow_invocation = invocation.get_subworkflow_invocation_by_step_index(2) + assert subworkflow_invocation.get_api_invocation()["state"] == "new" + + invocation.tick() + + assert subworkflow_invocation.get_api_invocation()["state"] == "ready" + + invocation.tick() + + assert subworkflow_invocation.get_api_invocation()["state"] == "scheduled" + + states = subworkflow_invocation.get_api_jobs_summary()["states"] + assert len(states) == 2 + assert states["ok"] == 1 + assert states["queued"] == 1 + + invocation.tick() + + states = subworkflow_invocation.get_api_jobs_summary()["states"] + assert len(states) == 1 + assert states["ok"] == 2 From 63664aeb93a6ce2c8e614ccb026da35cc4818083 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 16 Apr 2025 17:00:17 +0200 Subject: [PATCH 05/17] Sort imports --- planemo/galaxy/activity.py | 4 +--- planemo/galaxy/invocations/simulations.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index b2e09cce8..18d6910a1 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -42,9 +42,7 @@ summarize_history, ) from planemo.galaxy.invocations.api import BioblendInvocationApi -from planemo.galaxy.invocations.polling import ( - PollingTrackerImpl, -) +from planemo.galaxy.invocations.polling import PollingTrackerImpl from planemo.galaxy.invocations.polling import wait_for_invocation_and_jobs as polling_wait_for_invocation_and_jobs from planemo.galaxy.invocations.progress import WorkflowProgressDisplay from planemo.io import wait_on diff --git a/planemo/galaxy/invocations/simulations.py b/planemo/galaxy/invocations/simulations.py index 10dfeedf9..9c964b82b 100644 --- a/planemo/galaxy/invocations/simulations.py +++ b/planemo/galaxy/invocations/simulations.py @@ -10,9 +10,7 @@ import yaml from .api import Invocation as InvocationResponse -from .api import ( - InvocationJobsSummary, -) +from .api import InvocationJobsSummary class Ticks: From 4bb1b692ec313b176e15c7c0829260cf09838626 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 16 Apr 2025 17:06:55 +0200 Subject: [PATCH 06/17] Adjust flake8 to black --- setup.cfg | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/setup.cfg b/setup.cfg index 8ec77f3d4..e6c8fb2af 100644 --- a/setup.cfg +++ b/setup.cfg @@ -2,6 +2,7 @@ # E203 is whitespace before ':'; we follow black's formatting here. See https://black.readthedocs.io/en/stable/faq.html#why-are-flake8-s-e203-and-w503-violated # E501 is line length, managed by black # W503 is line breaks before binary operators, which has been reversed in PEP 8. -ignore = E203,E501,W503 +# E701,E704 are multiple statements on one line; we follow black's formatting here. See https://black.readthedocs.io/en/stable/guides/using_black_with_other_tools.html#configuration +ignore = E203,E501,W503,E701,E704 max-complexity = 14 exclude=.eggs,.git,.tox,.venv,build,docs/conf.py,docs/standards,project_templates/cwl_draft3_spec/ From eaac0fc0e1c254f98217a1417f4aa8a8a226257f Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 16 Apr 2025 17:10:19 +0200 Subject: [PATCH 07/17] Fix WorkflowProgress import --- planemo/commands/cmd_workflow_track.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/planemo/commands/cmd_workflow_track.py b/planemo/commands/cmd_workflow_track.py index 4c4a132fd..01bcc9a08 100644 --- a/planemo/commands/cmd_workflow_track.py +++ b/planemo/commands/cmd_workflow_track.py @@ -5,7 +5,7 @@ from planemo import options from planemo.cli import command_function from planemo.engine.factory import engine_context -from planemo.galaxy.workflow_progress import WorkflowProgress +from planemo.galaxy.invocations.progress import WorkflowProgress @click.command("workflow_track") From abbf00bfe716b09c592a7376bbaf71eb604bac8a Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 16 Apr 2025 18:07:19 +0200 Subject: [PATCH 08/17] Fix various typing issues Can't use classvars for a lot of the stuff, those are essentially globals. Fixes the tests. --- planemo/galaxy/invocations/polling.py | 9 ++-- planemo/galaxy/invocations/progress.py | 57 +++++++++++------------ planemo/galaxy/invocations/simulations.py | 24 +++++----- tests/test_workflow_progress.py | 5 +- 4 files changed, 49 insertions(+), 46 deletions(-) diff --git a/planemo/galaxy/invocations/polling.py b/planemo/galaxy/invocations/polling.py index 28aad0525..013f7c8b4 100644 --- a/planemo/galaxy/invocations/polling.py +++ b/planemo/galaxy/invocations/polling.py @@ -56,6 +56,7 @@ def summarize(invocation_id: str): last_subworkflow_invocation = None last_subworkflow_invocation_jobs = None last_exception = None + error_message: Optional[str] = None done_polling = False while not done_polling: @@ -120,6 +121,7 @@ def summarize(invocation_id: str): ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") job_state = summary_job_state(last_invocation_jobs) + assert last_invocation return last_invocation["state"], job_state, error_message @@ -152,7 +154,7 @@ def workflow_in_error_message( # we're still mocking out the old history state by just picking out a random # job state of interest. Seems like we should drop this. def summary_job_state(job_states_summary: Optional[InvocationJobsSummary]): - states = (job_states_summary or {"states": {}}).get("states").copy() + states = (job_states_summary or {"states": {}}).get("states", {}).copy() states.pop("ok", None) states.pop("skipped", None) if states: @@ -165,6 +167,7 @@ def subworkflow_invocation_ids(invocation_api: InvocationApi, invocation_id: str invocation = invocation_api.get_invocation(invocation_id) subworkflow_invocation_ids = [] for step in invocation["steps"]: - if step.get("subworkflow_invocation_id") is not None: - subworkflow_invocation_ids.append(step["subworkflow_invocation_id"]) + subworkflow_invocation_id = step.get("subworkflow_invocation_id") + if subworkflow_invocation_id: + subworkflow_invocation_ids.append(subworkflow_invocation_id) return subworkflow_invocation_ids diff --git a/planemo/galaxy/invocations/progress.py b/planemo/galaxy/invocations/progress.py index 239ea54e7..f1f64f584 100644 --- a/planemo/galaxy/invocations/progress.py +++ b/planemo/galaxy/invocations/progress.py @@ -43,37 +43,35 @@ class InvocationJobsSummary(TypedDict, total=False): class WorkflowProgress(Progress): - invocation_state: str = "new" - step_count: Optional[int] = None - job_count: int = 0 - steps_color: str - jobs_color: str - subworkflows_color: str - step_states: Dict = {} - num_ok: int = 0 - num_new: int = 0 - num_queued: int = 0 - num_running: int = 0 - num_errors: int = 0 - num_paused: int = 0 - - num_subworkflows: int = 0 - num_subworkflows_complete: int = 0 _jobs_task: TaskID _steps_task: TaskID _subworkflows_task: Optional[TaskID] = None def __init__(self, display: DisplayConfiguration): + self.invocation_state: str = "new" + self.step_count: Optional[int] = None + self.job_count: Optional[int] = 0 + self.jobs_completed: Optional[int] = None + self.step_states: Dict[str, int] = {} + self.num_ok: int = 0 + self.num_new: int = 0 + self.num_queued: int = 0 + self.num_running: int = 0 + self.num_errors: int = 0 + self.num_paused: int = 0 + + self.num_subworkflows: int = 0 + self.num_subworkflows_complete: int = 0 self.display = display bar_column = BarColumn( style=self.display.style_bar_back, finished_style=self.display.style_bar_finished, complete_style=self.display.style_bar_complete, ) - self.jobs_color = self.display.style_initializing - self.steps_color = self.display.style_initializing - self.subworkflows_color = self.display.style_initializing + self.jobs_color: str = self.display.style_initializing + self.steps_color: str = self.display.style_initializing + self.subworkflows_color: str = self.display.style_initializing super().__init__( TextColumn("[progress.description]{task.description}"), TextColumn(display.divider), @@ -153,7 +151,7 @@ def handle_invocation(self, invocation: Invocation, job_state_summary: Invocatio self.jobs_completed = self.num_ok + self.num_errors self.num_paused = count_states(job_state_summary, ["paused"]) self.jobs_terminal_count = self.jobs_completed + self.num_paused - jobs_total = self.job_count + jobs_total: Optional[int] = self.job_count if self.num_errors > 0: self.jobs_color = self.display.style_error elif self.job_count > 0: @@ -251,18 +249,15 @@ def running_count(job_summary: InvocationJobsSummary) -> int: class WorkflowProgressDisplay(Live): - workflow_progress: WorkflowProgress - subworkflow_progress: Optional[WorkflowProgress] = None - subworkflow_invocation_ids_seen: Set[str] = set() - subworkflow_invocation_ids_completed: Set[str] = set() - invocation_id: str - subworkflow_invocation_id: Optional[str] = None def __init__( self, invocation_id: str, display_configuration: Optional[DisplayConfiguration] = None, ): + self.subworkflow_invocation_ids_seen: Set[str] = set() + self.subworkflow_invocation_ids_completed: Set[str] = set() + self.subworkflow_invocation_id: Optional[str] = None self.invocation_id = invocation_id display = display_configuration or DisplayConfiguration() self.display = display @@ -271,10 +266,12 @@ def __init__( super().__init__(self._panel()) def _register_subworkflow_invocation_ids_from(self, invocation: Invocation): - subworkflow_invocation_ids = [] - for step in invocation["steps"]: - if step.get("subworkflow_invocation_id") is not None: - subworkflow_invocation_ids.append(step["subworkflow_invocation_id"]) + subworkflow_invocation_ids: List[str] = [] + steps = invocation.get("steps") or [] + for step in steps: + subworkflow_invocation_id = step.get("subworkflow_invocation_id") + if subworkflow_invocation_id: + subworkflow_invocation_ids.append(subworkflow_invocation_id) self._register_subworkflow_invocation_ids(subworkflow_invocation_ids) def _register_subworkflow_invocation_ids(self, ids: List[str]): diff --git a/planemo/galaxy/invocations/simulations.py b/planemo/galaxy/invocations/simulations.py index 9c964b82b..f220b49e5 100644 --- a/planemo/galaxy/invocations/simulations.py +++ b/planemo/galaxy/invocations/simulations.py @@ -2,6 +2,7 @@ from collections import deque from typing import ( + Dict, List, Optional, ) @@ -11,6 +12,7 @@ from .api import Invocation as InvocationResponse from .api import InvocationJobsSummary +from .api import InvocationStep as InvocationStepResponse class Ticks: @@ -46,7 +48,7 @@ class HasState(Ticks): def __init__(self, after: int, states: List[StateWithDuration]): self.after = after or 0 self.states = deque(states) - self.final_state = None + self.final_state: Optional[str] = None def tick_when_active(self) -> None: if self.final_state is not None: @@ -128,13 +130,13 @@ def get_subworkflow_invocation(self, subworkflow_invocation_id: str) -> "Invocat return step.invocation raise Exception(f"Unknown subworkflow invocation id ({subworkflow_invocation_id})") - def get_subworkflow_invocation_by_step_index(self, index: int) -> "Invocation": + def get_subworkflow_invocation_by_step_index(self, index: int) -> Optional["Invocation"]: return self.steps[index].invocation def get_api_invocation(self) -> InvocationResponse: - steps = [] + steps: List[InvocationStepResponse] = [] for step in self.active_steps: - api_step = { + api_step: InvocationStepResponse = { "state": step.state, } if step.invocation: @@ -148,16 +150,16 @@ def get_api_invocation(self) -> InvocationResponse: } def get_api_jobs_summary(self) -> InvocationJobsSummary: - jobs = [] + job_states = [] for step in self.active_steps: for job in step.active_jobs: api_job = { "state": job.state, } - jobs.append(api_job) - by_state = {} - for job in jobs: - state = job["state"] + job_states.append(api_job) + by_state: Dict[str, int] = {} + for job_state in job_states: + state = job_state["state"] if state not in by_state: by_state[state] = 0 by_state[state] += 1 @@ -188,7 +190,7 @@ def parse_workflow_simulation_invocation_step(workflow_simulation_invocation_ste jobs = None if "jobs" in workflow_simulation_invocation_step: jobs = [] - for job in workflow_simulation_invocation_step.get("jobs"): + for job in workflow_simulation_invocation_step.get("jobs") or []: jobs.append(parse_workflow_simulation_job(job)) return InvocationStep(jobs, invocation, after, states) @@ -197,7 +199,7 @@ def parse_workflow_simulation_invocation(workflow_simulation_invocation: dict) - states = parse_states_from(workflow_simulation_invocation) after = parse_after_from(workflow_simulation_invocation) steps = [] - for step in workflow_simulation_invocation.get("steps"): + for step in workflow_simulation_invocation.get("steps") or []: steps.append(parse_workflow_simulation_invocation_step(step)) return Invocation(steps, after, states) diff --git a/tests/test_workflow_progress.py b/tests/test_workflow_progress.py index 6a662baf4..1eabd0105 100644 --- a/tests/test_workflow_progress.py +++ b/tests/test_workflow_progress.py @@ -4,6 +4,7 @@ WorkflowProgress, WorkflowProgressDisplay, ) +from planemo.galaxy.invocations.progress_display import DisplayConfiguration STEP_NEW = {"state": "new"} STEP_SCHEDULED = {"state": "scheduled"} @@ -80,7 +81,7 @@ def test_workflow_progress_typical(): def test_workflow_progress_scheduling_state_handling(): - with WorkflowProgress() as workflow_progress: + with WorkflowProgress(DisplayConfiguration()) as workflow_progress: workflow_progress.handle_invocation({"state": "new", "steps": [STEP_NEW]}, {"states": {"new": 1}}) assert not workflow_progress.invocation_scheduling_terminal @@ -124,7 +125,7 @@ def test_workflow_progress_scheduling_state_handling(): def test_workflow_progress_job_state_handling(): scheduled_invocation = {"state": "scheduled", "steps": [STEP_SCHEDULED]} - with WorkflowProgress() as workflow_progress: + with WorkflowProgress(DisplayConfiguration()) as workflow_progress: workflow_progress.handle_invocation(scheduled_invocation, {"states": {"new": 1}}) assert not workflow_progress.jobs_terminal From 4617ad9e952466783b929fed0072124e72ff8d37 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 16 Apr 2025 19:43:47 +0200 Subject: [PATCH 09/17] WIP: Wire up workflow track command Think we still have some work to do once there are failing jobs. --- planemo/commands/cmd_workflow_track.py | 40 ++++++-------------------- planemo/galaxy/activity.py | 10 +++---- planemo/galaxy/invocations/api.py | 1 + 3 files changed, 13 insertions(+), 38 deletions(-) diff --git a/planemo/commands/cmd_workflow_track.py b/planemo/commands/cmd_workflow_track.py index 01bcc9a08..8161c6b46 100644 --- a/planemo/commands/cmd_workflow_track.py +++ b/planemo/commands/cmd_workflow_track.py @@ -5,7 +5,7 @@ from planemo import options from planemo.cli import command_function from planemo.engine.factory import engine_context -from planemo.galaxy.invocations.progress import WorkflowProgress +from planemo.galaxy.activity import wait_for_invocation_and_jobs @click.command("workflow_track") @@ -13,38 +13,14 @@ @command_function def cli(ctx, invocation_id, **kwds): """Run defined tests against existing workflow invocation.""" - with WorkflowProgress() as workflow_progress: - workflow_progress.add_bars() - import time - - time.sleep(1) - new_step = {"state": "new"} - scheduled_step = {"state": "scheduled"} - new_steps = [new_step, new_step, new_step] - one_scheduled_steps = [scheduled_step, new_step, new_step] - two_scheduled_steps = [scheduled_step, scheduled_step, new_step] - all_scheduled_steps = [scheduled_step, scheduled_step, scheduled_step] - state_pairs = [ - ({"state": "new"}, {}), - ({"state": "ready", "steps": new_steps}, {}), - ({"state": "ready", "steps": one_scheduled_steps}, {"states": {"new": 1}}), - ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 2}}), - ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "running": 1}}), - ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"new": 1, "ok": 1}}), - ({"state": "ready", "steps": two_scheduled_steps}, {"states": {"ok": 2}}), - ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "new": 3}}), - ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 2, "running": 1, "new": 2}}), - ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 3, "running": 1, "new": 1}}), - ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 4, "running": 1}}), - ({"state": "scheduled", "steps": all_scheduled_steps}, {"states": {"ok": 5}}), - ] - for invocation, job_states_summary in state_pairs: - workflow_progress.handle_invocation(invocation, job_states_summary) - time.sleep(1) - with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config: user_gi = config.user_gi - invocation = user_gi.invocations.show_invocation(invocation_id) - # https://stackoverflow.com/questions/23113494/double-progress-bar-in-python + wait_for_invocation_and_jobs( + ctx, + invocation_id, + history_id=None, + user_gi=user_gi, + polling_backoff=5, + ) ctx.exit(0) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 18d6910a1..587f54d8c 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -784,12 +784,7 @@ def _history_id(gi, **kwds) -> str: def wait_for_invocation_and_jobs( - ctx, - invocation_id: str, - history_id: str, - user_gi: GalaxyInstance, - polling_backoff: int, - early_termination: bool, + ctx, invocation_id: str, history_id: Optional[str], user_gi: GalaxyInstance, polling_backoff: int, early_termination: bool, ): polling_tracker = PollingTrackerImpl(polling_backoff) invocation_api = BioblendInvocationApi(ctx, user_gi) @@ -798,6 +793,9 @@ def wait_for_invocation_and_jobs( ctx, invocation_id, invocation_api, polling_tracker, workflow_progress_display, early_termination=early_termination ) if error_message: + if not history_id: + invocation = invocation_api.get_invocation(invocation_id) + history_id = invocation["history_id"] summarize_history(ctx, user_gi, history_id) return final_invocation_state, job_state, error_message diff --git a/planemo/galaxy/invocations/api.py b/planemo/galaxy/invocations/api.py index eb15e5f01..1518c125e 100644 --- a/planemo/galaxy/invocations/api.py +++ b/planemo/galaxy/invocations/api.py @@ -26,6 +26,7 @@ class Invocation(TypedDict, total=False): id: str state: str steps: List[InvocationStep] + history_id: Optional[str] class InvocationJobsSummary(TypedDict, total=False): From bb81e42a0f7ee4083e416cc80e8c6f6dc35d0af7 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 22 May 2025 09:44:29 +0200 Subject: [PATCH 10/17] Resolve conflicts with https://github.com/galaxyproject/planemo/pull/1518 `--no-early-termination` is the current behavior. `--fail-fast` could be an option to implement. --- planemo/commands/cmd_run.py | 1 - planemo/galaxy/activity.py | 11 +++-------- planemo/options.py | 10 ---------- 3 files changed, 3 insertions(+), 19 deletions(-) diff --git a/planemo/commands/cmd_run.py b/planemo/commands/cmd_run.py index edd7c9ad2..2b3cf3630 100644 --- a/planemo/commands/cmd_run.py +++ b/planemo/commands/cmd_run.py @@ -30,7 +30,6 @@ @options.run_download_outputs_option() @options.engine_options() @options.test_options() -@options.no_early_termination_option() @command_function def cli(ctx, runnable_identifier, job_path, **kwds): """Planemo command for running tools and jobs. diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 587f54d8c..24775abeb 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -218,7 +218,6 @@ def _execute( # noqa C901 no_wait=kwds.get("no_wait", False), start_datetime=start_datetime, log=log_contents_str(config), - early_termination=not kwds.get("no_early_termination", False), ) else: @@ -252,7 +251,6 @@ def invocation_to_run_response( no_wait=False, start_datetime=None, log=None, - early_termination=True, ): start_datetime = start_datetime or datetime.now() invocation_id = invocation["id"] @@ -268,7 +266,6 @@ def invocation_to_run_response( history_id=history_id, user_gi=user_gi, polling_backoff=polling_backoff, - early_termination=early_termination, ) if final_invocation_state not in ("ok", "skipped", "scheduled"): msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." @@ -784,13 +781,13 @@ def _history_id(gi, **kwds) -> str: def wait_for_invocation_and_jobs( - ctx, invocation_id: str, history_id: Optional[str], user_gi: GalaxyInstance, polling_backoff: int, early_termination: bool, + ctx, invocation_id: str, history_id: Optional[str], user_gi: GalaxyInstance, polling_backoff: int, ): polling_tracker = PollingTrackerImpl(polling_backoff) invocation_api = BioblendInvocationApi(ctx, user_gi) with WorkflowProgressDisplay(invocation_id) as workflow_progress_display: final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs( - ctx, invocation_id, invocation_api, polling_tracker, workflow_progress_display, early_termination=early_termination + ctx, invocation_id, invocation_api, polling_tracker, workflow_progress_display, ) if error_message: if not history_id: @@ -818,7 +815,7 @@ def state_func(): return _wait_on_state(state_func, timeout=timeout) -def _wait_on_state(state_func, polling_backoff=0, timeout=None, early_termination=True): +def _wait_on_state(state_func, polling_backoff=0, timeout=None): def get_state(): response = state_func() if not isinstance(response, list): @@ -840,8 +837,6 @@ def get_state(): "cancelled", "failed", ] - if not early_termination and current_non_terminal_states: - return None for terminal_state in hierarchical_fail_states: if terminal_state in current_states: # If we got here something has failed and we can return (early) diff --git a/planemo/options.py b/planemo/options.py index 79fc425ff..aeea02915 100644 --- a/planemo/options.py +++ b/planemo/options.py @@ -2174,16 +2174,6 @@ def tool_init_example_command_option(help=EXAMPLE_COMMAND_HELP): ) -def no_early_termination_option(): - return planemo_option( - "--no_early_termination", - is_flag=True, - default=False, - prompt=False, - help="Wait until all jobs terminate, even if some jobs have failed", - ) - - def mulled_conda_option(): return planemo_option( "--mulled_conda_version", From a410e22d475e4e06a95d27d45d291a95c5edcbb6 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 22 May 2025 09:53:16 +0200 Subject: [PATCH 11/17] Fix docstring --- planemo/commands/cmd_workflow_track.py | 2 +- planemo/galaxy/activity.py | 12 ++++++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/planemo/commands/cmd_workflow_track.py b/planemo/commands/cmd_workflow_track.py index 8161c6b46..9bf951147 100644 --- a/planemo/commands/cmd_workflow_track.py +++ b/planemo/commands/cmd_workflow_track.py @@ -12,7 +12,7 @@ @options.invocation_target_options() @command_function def cli(ctx, invocation_id, **kwds): - """Run defined tests against existing workflow invocation.""" + """Follow the progress of a workflow invocation.""" with engine_context(ctx, engine="external_galaxy", **kwds) as engine, engine.ensure_runnables_served([]) as config: user_gi = config.user_gi wait_for_invocation_and_jobs( diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 24775abeb..95705bd54 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -781,13 +781,21 @@ def _history_id(gi, **kwds) -> str: def wait_for_invocation_and_jobs( - ctx, invocation_id: str, history_id: Optional[str], user_gi: GalaxyInstance, polling_backoff: int, + ctx, + invocation_id: str, + history_id: Optional[str], + user_gi: GalaxyInstance, + polling_backoff: int, ): polling_tracker = PollingTrackerImpl(polling_backoff) invocation_api = BioblendInvocationApi(ctx, user_gi) with WorkflowProgressDisplay(invocation_id) as workflow_progress_display: final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs( - ctx, invocation_id, invocation_api, polling_tracker, workflow_progress_display, + ctx, + invocation_id, + invocation_api, + polling_tracker, + workflow_progress_display, ) if error_message: if not history_id: From bd1604dcf5fbc538121a17a57fa221225ddf30e5 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 22 May 2025 11:13:53 +0200 Subject: [PATCH 12/17] Use resource_string function from galaxy.util --- planemo/reports/build_report.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/planemo/reports/build_report.py b/planemo/reports/build_report.py index 8aaf0b77d..308dc9957 100644 --- a/planemo/reports/build_report.py +++ b/planemo/reports/build_report.py @@ -1,11 +1,11 @@ import base64 from galaxy.util import strip_control_characters +from galaxy.util.resources import resource_string from jinja2 import ( Environment, PackageLoader, ) -from pkg_resources import resource_string TITLE = "Results (powered by Planemo)" @@ -130,14 +130,10 @@ def __inject_summary(environment): def __style(filename): - resource = __load_resource(filename) + resource = resource_string("planemo.reports", filename) return "" % resource def __script(short_name): - resource = __load_resource("%s.js" % short_name) + resource = resource_string("planemo.reports", "%s.js" % short_name) return "" % resource - - -def __load_resource(name): - return resource_string(__name__, name).decode("UTF-8") From 62b3a08493421066a6c5e256247c0c03cbb1f422 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 22 May 2025 11:28:54 +0200 Subject: [PATCH 13/17] Make invocation link clickable --- planemo/galaxy/activity.py | 2 +- planemo/galaxy/invocations/progress.py | 10 +++++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index 95705bd54..e3a36fdd8 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -789,7 +789,7 @@ def wait_for_invocation_and_jobs( ): polling_tracker = PollingTrackerImpl(polling_backoff) invocation_api = BioblendInvocationApi(ctx, user_gi) - with WorkflowProgressDisplay(invocation_id) as workflow_progress_display: + with WorkflowProgressDisplay(invocation_id, galaxy_url=user_gi.base_url) as workflow_progress_display: final_invocation_state, job_state, error_message = polling_wait_for_invocation_and_jobs( ctx, invocation_id, diff --git a/planemo/galaxy/invocations/progress.py b/planemo/galaxy/invocations/progress.py index f1f64f584..0769db4a4 100644 --- a/planemo/galaxy/invocations/progress.py +++ b/planemo/galaxy/invocations/progress.py @@ -254,12 +254,14 @@ def __init__( self, invocation_id: str, display_configuration: Optional[DisplayConfiguration] = None, + galaxy_url: Optional[str] = None, ): self.subworkflow_invocation_ids_seen: Set[str] = set() self.subworkflow_invocation_ids_completed: Set[str] = set() self.subworkflow_invocation_id: Optional[str] = None self.invocation_id = invocation_id display = display_configuration or DisplayConfiguration() + self.galaxy_url = galaxy_url self.display = display self.workflow_progress = WorkflowProgress(display) self.subworkflow_progress = WorkflowProgress(display) @@ -287,6 +289,12 @@ def an_incomplete_subworkflow_id(self): def all_subworkflows_complete(self): return len(self.subworkflow_invocation_ids_seen) == len(self.subworkflow_invocation_ids_completed) + def get_invocation_ui_link(self): + if self.galaxy_url: + return f"{self.galaxy_url}/workflows/invocations/{self.invocation_id}" + else: + return None + def _panel(self): def job_states(workflow_progress): if self.display.include_job_state_breakdown: @@ -294,7 +302,7 @@ def job_states(workflow_progress): else: return None - title = f"[{self.display.style_header}]{self.display.label_header_prefix}<{self.invocation_id}>" + title = f"[{self.display.style_header}]{self.display.label_header_prefix}<[link={self.get_invocation_ui_link()}]{self.invocation_id}[/link]>" subworkflow_title = None if self.subworkflow_invocation_id: subworkflow_title = f"[{self.display.style_subworkflow_header}]{self.display.label_subworkflow_header_prefix}<{self.subworkflow_invocation_id}>" From fb38581254453b75b6a409a8cbf20aac9269f29e Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Thu, 22 May 2025 15:54:25 +0200 Subject: [PATCH 14/17] Implement fail_fast switch --- planemo/commands/cmd_workflow_test_on_invocation.py | 3 ++- planemo/commands/cmd_workflow_track.py | 2 ++ planemo/galaxy/activity.py | 11 ++++++++--- planemo/galaxy/invocations/polling.py | 7 +++++-- planemo/options.py | 5 +++++ 5 files changed, 22 insertions(+), 6 deletions(-) diff --git a/planemo/commands/cmd_workflow_test_on_invocation.py b/planemo/commands/cmd_workflow_test_on_invocation.py index 9036a0825..3fe621086 100644 --- a/planemo/commands/cmd_workflow_test_on_invocation.py +++ b/planemo/commands/cmd_workflow_test_on_invocation.py @@ -32,7 +32,8 @@ def cli(ctx, path, invocation_id, test_index, **kwds): len(test_cases) >= test_index ), f"Selected test case {test_index}, but only found {len(test_cases)} test case(s)." test_case = test_cases[test_index - 1] - run_response = invocation_to_run_response(ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation) + # Hardcode fail_fast, no need to expose the option to the user IMO. + run_response = invocation_to_run_response(ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation, fail_fast=True) structured_data = test_case.structured_test_data(run_response) test_data = { "version": "0.1", diff --git a/planemo/commands/cmd_workflow_track.py b/planemo/commands/cmd_workflow_track.py index 9bf951147..20c2c09e9 100644 --- a/planemo/commands/cmd_workflow_track.py +++ b/planemo/commands/cmd_workflow_track.py @@ -10,6 +10,7 @@ @click.command("workflow_track") @options.invocation_target_options() +@options.fail_fast_option() @command_function def cli(ctx, invocation_id, **kwds): """Follow the progress of a workflow invocation.""" @@ -21,6 +22,7 @@ def cli(ctx, invocation_id, **kwds): history_id=None, user_gi=user_gi, polling_backoff=5, + fail_fast=kwds.get("fail_fast", False), ) ctx.exit(0) diff --git a/planemo/galaxy/activity.py b/planemo/galaxy/activity.py index e3a36fdd8..ac41b083b 100644 --- a/planemo/galaxy/activity.py +++ b/planemo/galaxy/activity.py @@ -69,12 +69,12 @@ def execute( - ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds + ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, fail_fast=False, **kwds ) -> RunResponse: """Execute a Galaxy activity.""" try: start_datetime = datetime.now() - return _execute(ctx, config, runnable, job_path, **kwds) + return _execute(ctx, config, runnable, job_path, fail_fast=fail_fast, **kwds) except Exception as e: end_datetime = datetime.now() ctx.log("Failed to execute Galaxy activity, throwing ErrorRunResponse") @@ -151,7 +151,7 @@ def _log(self, message): def _execute( # noqa C901 - ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, **kwds + ctx: "PlanemoCliContext", config: "BaseGalaxyConfig", runnable: Runnable, job_path: str, fail_fast=False, **kwds ) -> "GalaxyBaseRunResponse": user_gi = config.user_gi admin_gi = config.gi @@ -218,6 +218,7 @@ def _execute( # noqa C901 no_wait=kwds.get("no_wait", False), start_datetime=start_datetime, log=log_contents_str(config), + fail_fast=fail_fast, ) else: @@ -251,6 +252,7 @@ def invocation_to_run_response( no_wait=False, start_datetime=None, log=None, + fail_fast=False, ): start_datetime = start_datetime or datetime.now() invocation_id = invocation["id"] @@ -266,6 +268,7 @@ def invocation_to_run_response( history_id=history_id, user_gi=user_gi, polling_backoff=polling_backoff, + fail_fast=fail_fast, ) if final_invocation_state not in ("ok", "skipped", "scheduled"): msg = f"Failed to run workflow [{workflow_id}], at least one job is in [{final_invocation_state}] state." @@ -786,6 +789,7 @@ def wait_for_invocation_and_jobs( history_id: Optional[str], user_gi: GalaxyInstance, polling_backoff: int, + fail_fast: bool = False, ): polling_tracker = PollingTrackerImpl(polling_backoff) invocation_api = BioblendInvocationApi(ctx, user_gi) @@ -796,6 +800,7 @@ def wait_for_invocation_and_jobs( invocation_api, polling_tracker, workflow_progress_display, + fail_fast=fail_fast, ) if error_message: if not history_id: diff --git a/planemo/galaxy/invocations/polling.py b/planemo/galaxy/invocations/polling.py index 013f7c8b4..a4a4580f6 100644 --- a/planemo/galaxy/invocations/polling.py +++ b/planemo/galaxy/invocations/polling.py @@ -42,6 +42,7 @@ def wait_for_invocation_and_jobs( invocation_api: InvocationApi, polling_tracker: PollingTracker, workflow_progress_display: WorkflowProgressDisplay, + fail_fast: bool = False, ): ctx.vlog("Waiting for invocation [%s]" % invocation_id) @@ -78,6 +79,7 @@ def summarize(invocation_id: str): last_exception, last_invocation, last_invocation_jobs, + fail_fast=fail_fast, ) if error_message: final_invocation_state = "new" if not last_invocation else last_invocation["state"] @@ -104,6 +106,7 @@ def summarize(invocation_id: str): last_exception, last_subworkflow_invocation, last_subworkflow_invocation_jobs, + fail_fast=fail_fast, ) if error_message: final_invocation_state = ( @@ -126,7 +129,7 @@ def summarize(invocation_id: str): def workflow_in_error_message( - ctx, invocation_id, last_exception, last_invocation, last_invocation_jobs + ctx, invocation_id, last_exception, last_invocation, last_invocation_jobs, fail_fast=False, ) -> Optional[str]: """Return an error message if workflow is in an error state.""" @@ -143,7 +146,7 @@ def workflow_in_error_message( ctx.vlog(msg) error_message = msg if not error_message else f"{error_message}. {msg}" - if job_state in JOB_ERROR_STATES: + if fail_fast and job_state in JOB_ERROR_STATES: msg = f"Failed to run workflow, at least one job is in [{job_state}] state." ctx.vlog(msg) error_message = msg if not error_message else f"{error_message}. {msg}" diff --git a/planemo/options.py b/planemo/options.py index aeea02915..0eff48cf7 100644 --- a/planemo/options.py +++ b/planemo/options.py @@ -1784,6 +1784,10 @@ def test_index_option(): return planemo_option("--test_index", default=1, type=int, help="Select which test to check. Counting starts at 1") +def fail_fast_option(): + return planemo_option("--fail_fast", is_flag=True, help="Stop on first job failure.") + + def test_output_options(): return _compose( planemo_option( @@ -1829,6 +1833,7 @@ def test_options(): return _compose( paste_test_data_paths_option(), test_output_options(), + fail_fast_option() ) From 5d65ee2a6bfa9c96646233903b52ef41329d9097 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Mon, 16 Jun 2025 16:57:57 +0200 Subject: [PATCH 15/17] Add test for fail-fast flag --- .../cmd_workflow_test_on_invocation.py | 4 +- planemo/galaxy/invocations/polling.py | 168 ++++++++++-------- planemo/options.py | 6 +- tests/test_invocation_polling.py | 40 ++++- 4 files changed, 140 insertions(+), 78 deletions(-) diff --git a/planemo/commands/cmd_workflow_test_on_invocation.py b/planemo/commands/cmd_workflow_test_on_invocation.py index 3fe621086..fea77cd6f 100644 --- a/planemo/commands/cmd_workflow_test_on_invocation.py +++ b/planemo/commands/cmd_workflow_test_on_invocation.py @@ -33,7 +33,9 @@ def cli(ctx, path, invocation_id, test_index, **kwds): ), f"Selected test case {test_index}, but only found {len(test_cases)} test case(s)." test_case = test_cases[test_index - 1] # Hardcode fail_fast, no need to expose the option to the user IMO. - run_response = invocation_to_run_response(ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation, fail_fast=True) + run_response = invocation_to_run_response( + ctx, user_gi=config.user_gi, runnable=runnable, invocation=invocation, fail_fast=True + ) structured_data = test_case.structured_test_data(run_response) test_data = { "version": "0.1", diff --git a/planemo/galaxy/invocations/polling.py b/planemo/galaxy/invocations/polling.py index a4a4580f6..bc6e1669d 100644 --- a/planemo/galaxy/invocations/polling.py +++ b/planemo/galaxy/invocations/polling.py @@ -36,6 +36,75 @@ def sleep(self): self.delta += self.polling_backoff +def _summarize_invocation(invocation_api: InvocationApi, invocation_id: str): + invocation = invocation_api.get_invocation(invocation_id) + assert invocation + invocation_jobs = invocation_api.get_invocation_summary(invocation_id) + return invocation, invocation_jobs + + +def _poll_main_workflow( + ctx, + invocation_id: str, + invocation_api: InvocationApi, + workflow_progress_display: WorkflowProgressDisplay, + fail_fast: bool, +): + if workflow_progress_display.workflow_progress.terminal: + return None, None, None + + try: + invocation, invocation_jobs = _summarize_invocation(invocation_api, invocation_id) + workflow_progress_display.handle_invocation(invocation, invocation_jobs) + return invocation, invocation_jobs, None + except Exception as e: + print(e) + return None, None, e + + +def _poll_subworkflow( + ctx, + invocation_id: str, + invocation_api: InvocationApi, + workflow_progress_display: WorkflowProgressDisplay, + fail_fast: bool, +): + if workflow_progress_display.all_subworkflows_complete(): + return None, None, None + + try: + subworkflow_id = workflow_progress_display.an_incomplete_subworkflow_id() + invocation, invocation_jobs = _summarize_invocation(invocation_api, subworkflow_id) + workflow_progress_display.handle_subworkflow_invocation(invocation, invocation_jobs) + return invocation, invocation_jobs, None + except Exception as e: + return None, None, e + + +def _check_for_errors( + ctx, + invocation_id: str, + exception: Optional[Exception], + invocation, + invocation_jobs, + fail_fast: bool, +): + error_message = workflow_in_error_message( + ctx, invocation_id, exception, invocation, invocation_jobs, fail_fast=fail_fast + ) + if error_message: + final_state = "new" if not invocation else invocation["state"] + job_state = summary_job_state(invocation_jobs) + return final_state, job_state, error_message + return None + + +def _is_polling_complete(workflow_progress_display: WorkflowProgressDisplay) -> bool: + return ( + workflow_progress_display.workflow_progress.terminal and workflow_progress_display.all_subworkflows_complete() + ) + + def wait_for_invocation_and_jobs( ctx, invocation_id: str, @@ -46,80 +115,34 @@ def wait_for_invocation_and_jobs( ): ctx.vlog("Waiting for invocation [%s]" % invocation_id) - def summarize(invocation_id: str): - invocation = invocation_api.get_invocation(invocation_id) - assert invocation - invocation_jobs = invocation_api.get_invocation_summary(invocation_id) - return invocation, invocation_jobs - last_invocation = None last_invocation_jobs = None - last_subworkflow_invocation = None - last_subworkflow_invocation_jobs = None - last_exception = None error_message: Optional[str] = None - done_polling = False - while not done_polling: - # loop over the main workflow and one subworkflow each iteration for display, - - # skip the main workflow if it is already tracked as complete - if all steps have been - # scheduled there are no new subworkflow invocations to track. - if not workflow_progress_display.workflow_progress.terminal: - try: - last_invocation, last_invocation_jobs = summarize(invocation_id) - workflow_progress_display.handle_invocation(last_invocation, last_invocation_jobs) - except Exception as e: - print(e) - last_exception = e - - error_message = workflow_in_error_message( - ctx, - invocation_id, - last_exception, - last_invocation, - last_invocation_jobs, - fail_fast=fail_fast, - ) - if error_message: - final_invocation_state = "new" if not last_invocation else last_invocation["state"] - job_state = summary_job_state(last_invocation_jobs) - return final_invocation_state, job_state, error_message - - assert last_invocation # if we got here... the first check has passed and we have an invocation - - # grab a subworkflow that isn't complete and check it, also register its subworkflow - # invocations so we catch all the children and children of children... - if not workflow_progress_display.all_subworkflows_complete(): - try: - a_subworkflow_invocation_id = workflow_progress_display.an_incomplete_subworkflow_id() - last_subworkflow_invocation, last_subworkflow_invocation_jobs = summarize(a_subworkflow_invocation_id) - workflow_progress_display.handle_subworkflow_invocation( - last_subworkflow_invocation, last_subworkflow_invocation_jobs - ) - except Exception as e: - last_exception = e - - error_message = workflow_in_error_message( - ctx, - invocation_id, - last_exception, - last_subworkflow_invocation, - last_subworkflow_invocation_jobs, - fail_fast=fail_fast, - ) - if error_message: - final_invocation_state = ( - "new" if not last_subworkflow_invocation else last_subworkflow_invocation["state"] - ) - job_state = summary_job_state(last_subworkflow_invocation_jobs) - return final_invocation_state, job_state, error_message - - done_polling = ( - workflow_progress_display.workflow_progress.terminal - and workflow_progress_display.all_subworkflows_complete() + while not _is_polling_complete(workflow_progress_display): + # Poll main workflow + main_invocation, main_jobs, main_exception = _poll_main_workflow( + ctx, invocation_id, invocation_api, workflow_progress_display, fail_fast + ) + + if main_invocation: + last_invocation = main_invocation + last_invocation_jobs = main_jobs + + error_result = _check_for_errors(ctx, invocation_id, main_exception, main_invocation, main_jobs, fail_fast) + if error_result: + return error_result + + # Poll subworkflow + sub_invocation, sub_jobs, sub_exception = _poll_subworkflow( + ctx, invocation_id, invocation_api, workflow_progress_display, fail_fast ) - if not done_polling: + + error_result = _check_for_errors(ctx, invocation_id, sub_exception, sub_invocation, sub_jobs, fail_fast) + if error_result: + return error_result + + if not _is_polling_complete(workflow_progress_display): polling_tracker.sleep() ctx.vlog(f"The final state of all jobs and subworkflow invocations for invocation [{invocation_id}] is 'ok'") @@ -129,7 +152,12 @@ def summarize(invocation_id: str): def workflow_in_error_message( - ctx, invocation_id, last_exception, last_invocation, last_invocation_jobs, fail_fast=False, + ctx, + invocation_id, + last_exception, + last_invocation, + last_invocation_jobs, + fail_fast=False, ) -> Optional[str]: """Return an error message if workflow is in an error state.""" diff --git a/planemo/options.py b/planemo/options.py index 0eff48cf7..251a905f2 100644 --- a/planemo/options.py +++ b/planemo/options.py @@ -1830,11 +1830,7 @@ def test_output_options(): def test_options(): - return _compose( - paste_test_data_paths_option(), - test_output_options(), - fail_fast_option() - ) + return _compose(paste_test_data_paths_option(), test_output_options(), fail_fast_option()) def _compose(*functions): diff --git a/tests/test_invocation_polling.py b/tests/test_invocation_polling.py index 33aa36ef9..4327bfac9 100644 --- a/tests/test_invocation_polling.py +++ b/tests/test_invocation_polling.py @@ -38,7 +38,7 @@ def sleep(self) -> None: def test_polling_scenario_1(): - final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1) + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=True) assert final_invocation_state == "scheduled" assert job_state == "failed" assert error_message @@ -75,6 +75,7 @@ def test_polling_without_display(): invocation_api, polling_tracker, display, + fail_fast=True, ) assert final_invocation_state == "scheduled" assert job_state == "failed" @@ -109,7 +110,41 @@ def test_polling_without_invocation_as_full_subpanel(): assert not error_message -def run_workflow_simulation(yaml_str: str, display_configuration: Optional[DisplayConfiguration] = None): +def test_fail_fast_enabled_with_job_failure(): + """Test that fail_fast=True returns error when a job fails.""" + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=True) + # Invocation should still be scheduled (workflow scheduling succeeded) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + # fail_fast should detect the failed job and return error message + assert error_message + assert "Failed to run workflow, at least one job is in [failed] state." in error_message + + +def test_fail_fast_disabled_with_job_failure(): + """Test that fail_fast=False does not report job failures as errors.""" + final_invocation_state, job_state, error_message = run_workflow_simulation(SCENARIO_1, fail_fast=False) + # Invocation should be scheduled (workflow scheduling succeeded) + assert final_invocation_state == "scheduled" + assert job_state == "failed" + # Without fail_fast, job failures shouldn't cause error messages + # (unless invocation itself fails, which it doesn't in this case) + assert error_message is None + + +def test_fail_fast_enabled_with_successful_workflow(): + """Test that fail_fast=True works normally when no jobs fail.""" + final_invocation_state, job_state, error_message = run_workflow_simulation( + SCENARIO_MULTIPLE_OK_SUBWORKFLOWS, fail_fast=True + ) + assert final_invocation_state == "scheduled" + assert job_state == "ok" + assert not error_message + + +def run_workflow_simulation( + yaml_str: str, display_configuration: Optional[DisplayConfiguration] = None, fail_fast: bool = False +): simulation = parse_workflow_simulation_from_string(yaml_str) invocation_id = simulation.id invocation_api = SimulatedApi(simulation) @@ -122,6 +157,7 @@ def run_workflow_simulation(yaml_str: str, display_configuration: Optional[Displ invocation_api, polling_tracker, display, + fail_fast=fail_fast, ) From 009ee3f7e989de8198e322cedea3fa6cffe58b43 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 18 Jun 2025 14:19:41 +0200 Subject: [PATCH 16/17] Skip testing broken functionality --- tests/test_external_galaxy_commands.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/test_external_galaxy_commands.py b/tests/test_external_galaxy_commands.py index ff940f4eb..1f9130aa5 100644 --- a/tests/test_external_galaxy_commands.py +++ b/tests/test_external_galaxy_commands.py @@ -1,6 +1,7 @@ """Tests for planemo commands relating to external Galaxy instances""" import os +from unittest import skip import yaml @@ -15,6 +16,7 @@ ) +@skip("Configuring quay.io/bgruening/galaxy:latest is currently broken") class ExternalGalaxyCommandsTestCase(CliTestCase): @skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS") From f6d0ad50bcc5bb5b54ac0559afcf27e0fd1906a6 Mon Sep 17 00:00:00 2001 From: mvdbeek Date: Wed, 18 Jun 2025 14:19:52 +0200 Subject: [PATCH 17/17] Fix tests We don't really want a failed job to result in failure of the whole workflow. --- tests/test_test_engines.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_test_engines.py b/tests/test_test_engines.py index 59d11fa94..d83555e5b 100644 --- a/tests/test_test_engines.py +++ b/tests/test_test_engines.py @@ -158,6 +158,7 @@ def test_galaxy_workflow_step_failed(): "extra_tools": ["$GALAXY_FUNCTIONAL_TEST_TOOLS"], "test_output_json": json_out.name, "galaxy_branch": target_galaxy_branch(), + "fail_fast": True, } exit_code = t_runnables(ctx, runnables, **kwds) assert exit_code == 1