diff --git a/client/src/components/WorkflowInvocationState/InvocationMessage.vue b/client/src/components/WorkflowInvocationState/InvocationMessage.vue new file mode 100644 index 000000000000..3d3efe121ca9 --- /dev/null +++ b/client/src/components/WorkflowInvocationState/InvocationMessage.vue @@ -0,0 +1,192 @@ + + + + + + {{ infoString }} + + + Problem occurred at this step: + + + + {{ stepDescription }} + + + + This dataset failed: + + + + This dataset collection failed: + + + + This job failed: + + + + diff --git a/client/src/components/WorkflowInvocationState/WorkflowInvocationDetails.vue b/client/src/components/WorkflowInvocationState/WorkflowInvocationDetails.vue index fcf9af8baba2..c8977d2b5c07 100644 --- a/client/src/components/WorkflowInvocationState/WorkflowInvocationDetails.vue +++ b/client/src/components/WorkflowInvocationState/WorkflowInvocationDetails.vue @@ -1,8 +1,5 @@ diff --git a/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue b/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue index 1949dc2661b9..5853c90edd1c 100644 --- a/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue +++ b/client/src/components/WorkflowInvocationState/WorkflowInvocationSummary.vue @@ -12,18 +12,26 @@ title="Download PDF" /> - + + + + + getRootFromIndexLink() + path; export default { components: { + InvocationMessage, ProgressBar, LoadingSpan, }, diff --git a/client/src/components/WorkflowInvocationState/invocationMessageModel.ts b/client/src/components/WorkflowInvocationState/invocationMessageModel.ts new file mode 100644 index 000000000000..7c86d43ad17c --- /dev/null +++ b/client/src/components/WorkflowInvocationState/invocationMessageModel.ts @@ -0,0 +1,242 @@ +/* tslint:disable */ +/* eslint-disable */ +/** +/* This file was automatically generated from pydantic models by running pydantic2ts. +/* Do not modify it by hand - just update the pydantic models and then re-run the script +*/ + +export type InvocationMessageResponseModel = + | GenericInvocationCancellationReviewFailedEncodedDatabaseIdField + | GenericInvocationCancellationHistoryDeletedEncodedDatabaseIdField + | GenericInvocationCancellationUserRequestEncodedDatabaseIdField + | GenericInvocationFailureDatasetFailedEncodedDatabaseIdField + | GenericInvocationFailureCollectionFailedEncodedDatabaseIdField + | GenericInvocationFailureJobFailedEncodedDatabaseIdField + | GenericInvocationFailureOutputNotFoundEncodedDatabaseIdField + | GenericInvocationFailureExpressionEvaluationFailedEncodedDatabaseIdField + | GenericInvocationFailureWhenNotBooleanEncodedDatabaseIdField + | GenericInvocationUnexpectedFailureEncodedDatabaseIdField + | GenericInvocationEvaluationWarningWorkflowOutputNotFoundEncodedDatabaseIdField; + +export interface GenericInvocationCancellationHistoryDeletedEncodedDatabaseIdField { + reason: "history_deleted"; + /** + * History ID of history that was deleted. + */ + history_id: string; +} +export interface GenericInvocationCancellationHistoryDeletedInt { + reason: "history_deleted"; + /** + * History ID of history that was deleted. + */ + history_id: number; +} +export interface GenericInvocationCancellationReviewFailedEncodedDatabaseIdField { + reason: "cancelled_on_review"; + /** + * Workflow step id of paused step that did not pass review. + */ + workflow_step_id: number; +} +export interface GenericInvocationCancellationReviewFailedInt { + reason: "cancelled_on_review"; + /** + * Workflow step id of paused step that did not pass review. + */ + workflow_step_id: number; +} +export interface GenericInvocationCancellationUserRequestEncodedDatabaseIdField { + reason: "user_request"; +} +export interface GenericInvocationCancellationUserRequestInt { + reason: "user_request"; +} +export interface GenericInvocationEvaluationWarningWorkflowOutputNotFoundEncodedDatabaseIdField { + reason: "workflow_output_not_found"; + workflow_step_id: number; + /** + * Output that was designated as workflow output but that has not been found + */ + output_name: string; +} +export interface GenericInvocationEvaluationWarningWorkflowOutputNotFoundInt { + reason: "workflow_output_not_found"; + workflow_step_id: number; + /** + * Output that was designated as workflow output but that has not been found + */ + output_name: string; +} +export interface GenericInvocationFailureCollectionFailedEncodedDatabaseIdField { + reason: "collection_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * HistoryDatasetCollectionAssociation ID that relates to failure. + */ + hdca_id?: string; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id: number; +} +export interface GenericInvocationFailureCollectionFailedInt { + reason: "collection_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * HistoryDatasetCollectionAssociation ID that relates to failure. + */ + hdca_id?: number; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id: number; +} +export interface GenericInvocationFailureDatasetFailedEncodedDatabaseIdField { + reason: "dataset_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * HistoryDatasetAssociation ID that relates to failure. + */ + hda_id: string; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id?: number; +} +export interface GenericInvocationFailureDatasetFailedInt { + reason: "dataset_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * HistoryDatasetAssociation ID that relates to failure. + */ + hda_id: number; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id?: number; +} +export interface GenericInvocationFailureExpressionEvaluationFailedEncodedDatabaseIdField { + reason: "expression_evaluation_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * May contain details to help troubleshoot this problem. + */ + details?: string; +} +export interface GenericInvocationFailureExpressionEvaluationFailedInt { + reason: "expression_evaluation_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * May contain details to help troubleshoot this problem. + */ + details?: string; +} +export interface GenericInvocationFailureJobFailedEncodedDatabaseIdField { + reason: "job_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * Job ID that relates to failure. + */ + job_id?: string; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id: number; +} +export interface GenericInvocationFailureJobFailedInt { + reason: "job_failed"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * Job ID that relates to failure. + */ + job_id?: number; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id: number; +} +export interface GenericInvocationFailureOutputNotFoundEncodedDatabaseIdField { + reason: "output_not_found"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + output_name: string; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id: number; +} +export interface GenericInvocationFailureOutputNotFoundInt { + reason: "output_not_found"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + output_name: string; + /** + * Workflow step id of step that caused failure. + */ + dependent_workflow_step_id: number; +} +export interface GenericInvocationFailureWhenNotBooleanEncodedDatabaseIdField { + reason: "when_not_boolean"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * Contains details to help troubleshoot this problem. + */ + details: string; +} +export interface GenericInvocationFailureWhenNotBooleanInt { + reason: "when_not_boolean"; + /** + * Workflow step id of step that failed. + */ + workflow_step_id: number; + /** + * Contains details to help troubleshoot this problem. + */ + details: string; +} +export interface GenericInvocationUnexpectedFailureEncodedDatabaseIdField { + reason: "unexpected_failure"; + /** + * May contains details to help troubleshoot this problem. + */ + details?: string; +} +export interface GenericInvocationUnexpectedFailureInt { + reason: "unexpected_failure"; + /** + * May contains details to help troubleshoot this problem. + */ + details?: string; +} diff --git a/client/src/composables/useWorkflowInstance.ts b/client/src/composables/useWorkflowInstance.ts new file mode 100644 index 000000000000..4e1a376397bc --- /dev/null +++ b/client/src/composables/useWorkflowInstance.ts @@ -0,0 +1,23 @@ +import { useWorkflowStore } from "@/stores/workflowStore"; +import { ref } from "vue"; + +export function useWorkflowInstance(workflowId: string) { + const workflowStore = useWorkflowStore(); + const workflow = ref(workflowStore.getWorkflowByInstanceId(workflowId)); + const loading = ref(false); + + async function getWorkflowInstance() { + if (!workflow.value) { + loading.value = true; + try { + await workflowStore.fetchWorkflowForInstanceId(workflowId); + } catch (e) { + loading.value = false; + console.error("unable to fetch workflow \n", e); + } + } + } + getWorkflowInstance(); + + return { workflow, loading }; +} diff --git a/client/src/schema/schema.ts b/client/src/schema/schema.ts index 29886efe1fba..f4d5523245c4 100644 --- a/client/src/schema/schema.ts +++ b/client/src/schema/schema.ts @@ -4087,6 +4087,12 @@ export interface components { * @default 0 */ running?: number; + /** + * Skipped jobs + * @description Number of jobs that were skipped due to conditional workflow step execution. + * @default 0 + */ + skipped?: number; /** * Upload jobs * @description Number of jobs in the `upload` state. @@ -7493,7 +7499,8 @@ export interface components { | "deleted" | "deleted_new" | "stop" - | "stopped"; + | "stopped" + | "skipped"; /** * populated_states * @description An enumeration. diff --git a/client/src/stores/workflowStore.js b/client/src/stores/workflowStore.ts similarity index 65% rename from client/src/stores/workflowStore.js rename to client/src/stores/workflowStore.ts index 5ba6400e1fe7..5cf2b8b8d9d0 100644 --- a/client/src/stores/workflowStore.js +++ b/client/src/stores/workflowStore.ts @@ -1,20 +1,25 @@ import { defineStore } from "pinia"; import axios from "axios"; +import type { Steps } from "@/stores/workflowStepStore"; +import { getAppRoot } from "@/onload/loadConfig"; -import { getAppRoot } from "onload/loadConfig"; +interface Workflow { + [index: string]: any; + steps: Steps; +} export const useWorkflowStore = defineStore("workflowStore", { state: () => ({ - workflowsByInstanceId: {}, + workflowsByInstanceId: {} as { [index: string]: Workflow }, }), getters: { getWorkflowByInstanceId: (state) => { - return (workflowId) => { - state.workflowsByInstanceId[workflowId]; + return (workflowId: string) => { + return state.workflowsByInstanceId[workflowId]; }; }, getWorkflowNameByInstanceId: (state) => { - return (workflowId) => { + return (workflowId: string) => { const details = state.workflowsByInstanceId[workflowId]; if (details && details.name) { return details.name; @@ -24,19 +29,19 @@ export const useWorkflowStore = defineStore("workflowStore", { }; }, getStoredWorkflowIdByInstanceId: (state) => { - return (workflowId) => { + return (workflowId: string) => { const storedWorkflow = state.workflowsByInstanceId[workflowId]; return storedWorkflow?.id; }; }, }, actions: { - async fetchWorkflowForInstanceId(workflowId) { + async fetchWorkflowForInstanceId(workflowId: string) { console.debug("Fetching workflow details for", workflowId); const params = { instance: "true" }; const { data } = await axios.get(`${getAppRoot()}api/workflows/${workflowId}`, { params }); this.$patch((state) => { - state.workflowsByInstanceId[workflowId] = data; + state.workflowsByInstanceId[workflowId] = data as Workflow; }); }, }, diff --git a/lib/galaxy/dependencies/pinned-requirements.txt b/lib/galaxy/dependencies/pinned-requirements.txt index d369ca9eb30c..3bd0f4ea87b7 100644 --- a/lib/galaxy/dependencies/pinned-requirements.txt +++ b/lib/galaxy/dependencies/pinned-requirements.txt @@ -6,6 +6,7 @@ aiofiles==22.1.0 ; python_version >= "3.7" and python_version < "3.11" aiohttp==3.8.3 ; python_version >= "3.7" and python_version < "3.11" aiosignal==1.3.1 ; python_version >= "3.7" and python_version < "3.11" alembic==1.9.1 ; python_version >= "3.7" and python_version < "3.11" +alembic_utils==0.7.8 ; python_version >= "3.7" and python_version < "3.11" amqp==5.1.1 ; python_version >= "3.7" and python_version < "3.11" anyio==3.6.2 ; python_version >= "3.7" and python_version < "3.11" apispec==6.0.2 ; python_version >= "3.7" and python_version < "3.11" diff --git a/lib/galaxy/job_execution/actions/post.py b/lib/galaxy/job_execution/actions/post.py index c377681640a6..8b329b48ce11 100644 --- a/lib/galaxy/job_execution/actions/post.py +++ b/lib/galaxy/job_execution/actions/post.py @@ -109,6 +109,9 @@ class ChangeDatatypeAction(DefaultJobAction): @classmethod def execute(cls, app, sa_session, action, job, replacement_dict, final_job_state=None): + if job.state == job.states.SKIPPED: + # Don't change datatype, must remain expression.json + return for dataset_assoc in job.output_datasets: if action.output_name == "" or dataset_assoc.name == action.output_name: app.datatypes_registry.change_datatype(dataset_assoc.dataset, action.action_arguments["newtype"]) diff --git a/lib/galaxy/managers/workflows.py b/lib/galaxy/managers/workflows.py index f2d88ea5d3f0..8604cc8dc31e 100644 --- a/lib/galaxy/managers/workflows.py +++ b/lib/galaxy/managers/workflows.py @@ -10,6 +10,7 @@ NamedTuple, Optional, Tuple, + Union, ) from gxformat2 import ( @@ -57,6 +58,7 @@ text_column_filter, ) from galaxy.model.item_attrs import UsesAnnotations +from galaxy.schema.invocation import InvocationCancellationUserRequest from galaxy.schema.schema import WorkflowIndexQueryPayload from galaxy.structured_app import MinimalManagerApp from galaxy.tools.parameters import ( @@ -359,11 +361,11 @@ def check_security(self, trans, has_workflow, check_ownership=True, check_access return True - def get_invocation(self, trans, decoded_invocation_id, eager=False): - q = trans.sa_session.query(self.app.model.WorkflowInvocation) + def get_invocation(self, trans, decoded_invocation_id, eager=False) -> model.WorkflowInvocation: + q = trans.sa_session.query(model.WorkflowInvocation) if eager: q = q.options( - subqueryload(self.app.model.WorkflowInvocation.steps) + subqueryload(model.WorkflowInvocation.steps) .joinedload("implicit_collection_jobs") .joinedload("jobs") .joinedload("job") @@ -401,6 +403,7 @@ def cancel_invocation(self, trans, decoded_invocation_id): cancelled = workflow_invocation.cancel() if cancelled: + workflow_invocation.add_message(InvocationCancellationUserRequest(reason="user_request")) trans.sa_session.add(workflow_invocation) trans.sa_session.flush() else: @@ -1668,7 +1671,11 @@ def __module_from_dict( self.add_item_annotation(sa_session, trans.get_user(), step, annotation) # Stick this in the step temporarily - step.temp_input_connections = step_dict.get("input_connections", {}) + DictConnection = Dict[str, Union[int, str]] + temp_input_connections: Dict[str, Union[List[DictConnection], DictConnection]] = step_dict.get( + "input_connections", {} + ) + step.temp_input_connections = temp_input_connections # Create the model class for the step steps.append(step) @@ -1709,8 +1716,11 @@ def __module_from_dict( step_input.default_value = default step_input.default_value_set = True + if "when" in step_dict: + step.when_expression = step_dict["when"] if dry_run and step in trans.sa_session: trans.sa_session.expunge(step) + return module, step def __load_subworkflow_from_step_dict( diff --git a/lib/galaxy/model/__init__.py b/lib/galaxy/model/__init__.py index 2d74b52e3359..29af9c116d7e 100644 --- a/lib/galaxy/model/__init__.py +++ b/lib/galaxy/model/__init__.py @@ -152,6 +152,9 @@ from galaxy.util.json import safe_loads from galaxy.util.sanitize_html import sanitize_html +if TYPE_CHECKING: + from galaxy.schema.invocation import InvocationMessageUnion + log = logging.getLogger(__name__) _datatypes_registry = None @@ -1072,6 +1075,7 @@ class states(str, Enum): DELETED_NEW = "deleted_new" # now DELETING, remove after 21.0 STOPPING = "stop" STOPPED = "stopped" + SKIPPED = "skipped" terminal_states = [states.OK, states.ERROR, states.DELETED] #: job states where the job hasn't finished and the model may still change @@ -7040,6 +7044,7 @@ class WorkflowStep(Base, RepresentById): position = Column(MutableJSONType) config = Column(JSONType) order_index: int = Column(Integer) + when_expression = Column(JSONType) uuid = Column(UUIDType) label = Column(Unicode(255)) temp_input_connections: Optional[InputConnDictType] @@ -7084,10 +7089,12 @@ class WorkflowStep(Base, RepresentById): def __init__(self): self.uuid = uuid4() self._input_connections_by_name = None + self._inputs_by_name = None @reconstructor def init_on_load(self): self._input_connections_by_name = None + self._inputs_by_name = None @property def tool_uuid(self): @@ -7113,6 +7120,23 @@ def input_optional(self): tool_state = self.tool_inputs return tool_state.get("optional") or False + def setup_inputs_by_name(self): + # Ensure input_connections has already been set. + + # Make connection information available on each step by input name. + inputs_by_name = {} + for step_input in self.inputs: + input_name = step_input.name + assert input_name not in inputs_by_name + inputs_by_name[input_name] = step_input + self._inputs_by_name = inputs_by_name + + @property + def inputs_by_name(self): + if self._inputs_by_name is None: + self.setup_inputs_by_name() + return self._inputs_by_name + def get_input(self, input_name): for step_input in self.inputs: if step_input.name == input_name: @@ -7482,9 +7506,26 @@ class WorkflowInvocation(Base, UsesCreateAndUpdateTime, Dictifiable, Serializabl ) output_datasets = relationship("WorkflowInvocationOutputDatasetAssociation", back_populates="workflow_invocation") output_values = relationship("WorkflowInvocationOutputValue", back_populates="workflow_invocation") + messages = relationship("WorkflowInvocationMessage", back_populates="workflow_invocation") - dict_collection_visible_keys = ["id", "update_time", "create_time", "workflow_id", "history_id", "uuid", "state"] - dict_element_visible_keys = ["id", "update_time", "create_time", "workflow_id", "history_id", "uuid", "state"] + dict_collection_visible_keys = [ + "id", + "update_time", + "create_time", + "workflow_id", + "history_id", + "uuid", + "state", + ] + dict_element_visible_keys = [ + "id", + "update_time", + "create_time", + "workflow_id", + "history_id", + "uuid", + "state", + ] class states(str, Enum): NEW = "new" # Brand new workflow invocation... maybe this should be same as READY @@ -7560,7 +7601,7 @@ def step_invocations_by_step_id(self): step_invocations[step_id] = invocation_step return step_invocations - def step_invocation_for_step_id(self, step_id): + def step_invocation_for_step_id(self, step_id: int) -> Optional["WorkflowInvocationStep"]: target_invocation_step = None for invocation_step in self.steps: if step_id == invocation_step.workflow_step_id: @@ -7857,6 +7898,19 @@ def attach_step(request_to_content): attach_step(request_to_content) self.input_step_parameters.append(request_to_content) + def add_message(self, message: "InvocationMessageUnion"): + self.messages.append( + WorkflowInvocationMessage( + workflow_invocation_id=self.id, + **message.dict( + exclude_unset=True, + exclude={ + "history_id" + }, # history_id comes in through workflow_invocation and isn't persisted in database + ), + ) + ) + @property def resource_parameters(self): resource_type = WorkflowRequestInputParameter.types.RESOURCE_PARAMETERS @@ -7918,6 +7972,36 @@ class WorkflowInvocationToSubworkflowInvocationAssociation(Base, Dictifiable, Re dict_element_visible_keys = ["id", "workflow_step_id", "workflow_invocation_id", "subworkflow_invocation_id"] +class WorkflowInvocationMessage(Base, Dictifiable, Serializable): + __tablename__ = "workflow_invocation_message" + id = Column(Integer, primary_key=True) + workflow_invocation_id = Column(Integer, ForeignKey("workflow_invocation.id"), index=True, nullable=False) + reason = Column(String(32)) + details = Column(TrimmedString(255), nullable=True) + output_name = Column(String(255), nullable=True) + workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), nullable=True) + dependent_workflow_step_id = Column(Integer, ForeignKey("workflow_step.id"), nullable=True) + job_id = Column(Integer, ForeignKey("job.id"), nullable=True) + hda_id = Column(Integer, ForeignKey("history_dataset_association.id"), nullable=True) + hdca_id = Column(Integer, ForeignKey("history_dataset_collection_association.id"), nullable=True) + + workflow_invocation = relationship("WorkflowInvocation", back_populates="messages", lazy=True) + workflow_step = relationship("WorkflowStep", foreign_keys=workflow_step_id, lazy=True) + dependent_workflow_step = relationship("WorkflowStep", foreign_keys=dependent_workflow_step_id, lazy=True) + + @property + def workflow_step_index(self): + return self.workflow_step and self.workflow_step.order_index + + @property + def dependent_workflow_step_index(self): + return self.dependent_workflow_step and self.dependent_workflow_step.order_index + + @property + def history_id(self): + return self.workflow_invocation.history_id + + class WorkflowInvocationStep(Base, Dictifiable, Serializable): __tablename__ = "workflow_invocation_step" diff --git a/lib/galaxy/model/dataset_collections/matching.py b/lib/galaxy/model/dataset_collections/matching.py index 4b1d7efbff62..948e317ec69a 100644 --- a/lib/galaxy/model/dataset_collections/matching.py +++ b/lib/galaxy/model/dataset_collections/matching.py @@ -48,6 +48,7 @@ def __init__(self): self.collections = {} self.subcollection_types = {} self.action_tuples = {} + self.when_values = None def __attempt_add_to_linked_match(self, input_name, hdca, collection_type_description, subcollection_type): structure = get_structure(hdca, collection_type_description, leaf_subcollection_type=subcollection_type) @@ -62,6 +63,7 @@ def __attempt_add_to_linked_match(self, input_name, hdca, collection_type_descri self.subcollection_types[input_name] = subcollection_type def slice_collections(self): + self.linked_structure.when_values = self.when_values return self.linked_structure.walk_collections(self.collections) def subcollection_mapping_type(self, input_name): @@ -77,6 +79,7 @@ def structure(self): if linked_structure is None: linked_structure = leaf effective_structure = effective_structure.multiply(linked_structure) + effective_structure.when_values = self.when_values return None if effective_structure.is_leaf else effective_structure def map_over_action_tuples(self, input_name): diff --git a/lib/galaxy/model/dataset_collections/structure.py b/lib/galaxy/model/dataset_collections/structure.py index 79c85b76eddc..08d1946bda39 100644 --- a/lib/galaxy/model/dataset_collections/structure.py +++ b/lib/galaxy/model/dataset_collections/structure.py @@ -63,9 +63,10 @@ def __str__(self): class Tree(BaseTree): children_known = True - def __init__(self, children, collection_type_description): + def __init__(self, children, collection_type_description, when_values=None): super().__init__(collection_type_description) self.children = children + self.when_values = when_values @staticmethod def for_dataset_collection(dataset_collection, collection_type_description): @@ -94,10 +95,11 @@ def get_element(collection): return collection[index] # noqa: B023 if substructure.is_leaf: - yield dict_map(get_element, collection_dict) + yield dict_map(get_element, collection_dict), self.when_values[index] if self.when_values else None else: sub_collections = dict_map(lambda collection: get_element(collection).child_collection, collection_dict) - yield from substructure._walk_collections(sub_collections) + for element, _when_value in substructure._walk_collections(sub_collections): + yield element, self.when_values[index] if self.when_values else None @property def is_leaf(self): diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/3100452fa030_add_workflow_invocation_message_table.py b/lib/galaxy/model/migrations/alembic/versions_gxy/3100452fa030_add_workflow_invocation_message_table.py new file mode 100644 index 000000000000..263a47110cc1 --- /dev/null +++ b/lib/galaxy/model/migrations/alembic/versions_gxy/3100452fa030_add_workflow_invocation_message_table.py @@ -0,0 +1,46 @@ +"""Add reason column on invocation table + +Revision ID: 3100452fa030 +Revises: 518c8438a91b +Create Date: 2023-01-13 16:13:09.578391 + +""" +from alembic import op +from sqlalchemy import ( + Column, + ForeignKey, + Integer, + String, +) + +from galaxy.model.custom_types import TrimmedString + +# revision identifiers, used by Alembic. +revision = "3100452fa030" +down_revision = "518c8438a91b" +branch_labels = None +depends_on = None + + +# database object names used in this revision +table_name = "workflow_invocation_message" + + +def upgrade(): + op.create_table( + table_name, + Column("id", Integer, primary_key=True), + Column("reason", String(32)), + Column("details", TrimmedString(255)), + Column("output_name", String(255)), + Column("workflow_invocation_id", Integer, ForeignKey("workflow_invocation.id"), index=True, nullable=False), + Column("workflow_step_id", Integer, ForeignKey("workflow_step.id"), nullable=True), + Column("dependent_workflow_step_id", Integer, ForeignKey("workflow_step.id"), nullable=True), + Column("job_id", Integer, ForeignKey("job.id"), nullable=True), + Column("hda_id", Integer, ForeignKey("history_dataset_association.id"), nullable=True), + Column("hdca_id", Integer, ForeignKey("history_dataset_collection_association.id"), nullable=True), + ) + + +def downgrade(): + op.drop_table(table_name) diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/518c8438a91b_add_when_expression_column.py b/lib/galaxy/model/migrations/alembic/versions_gxy/518c8438a91b_add_when_expression_column.py new file mode 100644 index 000000000000..879ec4e35ac1 --- /dev/null +++ b/lib/galaxy/model/migrations/alembic/versions_gxy/518c8438a91b_add_when_expression_column.py @@ -0,0 +1,34 @@ +"""Add when_expression column + +Revision ID: 518c8438a91b +Revises: 59e024ceaca1 +Create Date: 2022-10-24 16:43:39.565871 + +""" +import sqlalchemy as sa +from alembic import op + +from galaxy.model.custom_types import JSONType +from galaxy.model.migrations.util import ( + column_exists, + drop_column, +) + +# revision identifiers, used by Alembic. +revision = "518c8438a91b" +down_revision = "59e024ceaca1" +branch_labels = None +depends_on = None + +# database object names used in this revision +table_name = "workflow_step" +column_name = "when_expression" + + +def upgrade(): + if not column_exists(table_name, column_name): + op.add_column(table_name, sa.Column(column_name, JSONType)) + + +def downgrade(): + drop_column(table_name, column_name) diff --git a/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py b/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py new file mode 100644 index 000000000000..86fd5053e49e --- /dev/null +++ b/lib/galaxy/model/migrations/alembic/versions_gxy/c39f1de47a04_add_skipped_state_to_collection_job_.py @@ -0,0 +1,70 @@ +"""Add skipped state to collection_job_state_summary_view + +Revision ID: c39f1de47a04 +Revises: 3100452fa030 +Create Date: 2023-01-16 11:53:59.783836 + +""" +from alembic import op +from alembic_utils.pg_view import PGView + +from galaxy.model.view import HistoryDatasetCollectionJobStateSummary + +# revision identifiers, used by Alembic. +revision = "c39f1de47a04" +down_revision = "3100452fa030" +branch_labels = None +depends_on = None + +PREVIOUS_AGGREGATE_QUERY = """ +SELECT + hdca_id, + SUM(CASE WHEN state = 'new' THEN 1 ELSE 0 END) AS new, + SUM(CASE WHEN state = 'resubmitted' THEN 1 ELSE 0 END) AS resubmitted, + SUM(CASE WHEN state = 'waiting' THEN 1 ELSE 0 END) AS waiting, + SUM(CASE WHEN state = 'queued' THEN 1 ELSE 0 END) AS queued, + SUM(CASE WHEN state = 'running' THEN 1 ELSE 0 END) AS running, + SUM(CASE WHEN state = 'ok' THEN 1 ELSE 0 END) AS ok, + SUM(CASE WHEN state = 'error' THEN 1 ELSE 0 END) AS error, + SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed, + SUM(CASE WHEN state = 'paused' THEN 1 ELSE 0 END) AS paused, + SUM(CASE WHEN state = 'deleted' THEN 1 ELSE 0 END) AS deleted, + SUM(CASE WHEN state = 'deleted_new' THEN 1 ELSE 0 END) AS deleted_new, + SUM(CASE WHEN state = 'upload' THEN 1 ELSE 0 END) AS upload, + SUM(CASE WHEN job_id IS NOT NULL THEN 1 ELSE 0 END) AS all_jobs +FROM ( + SELECT hdca.id AS hdca_id, job.id AS job_id, job.state as state + FROM history_dataset_collection_association hdca + LEFT JOIN implicit_collection_jobs icj + ON icj.id = hdca.implicit_collection_jobs_id + LEFT JOIN implicit_collection_jobs_job_association icjja + ON icj.id = icjja.implicit_collection_jobs_id + LEFT JOIN job + ON icjja.job_id = job.id + + UNION + + SELECT hdca.id AS hdca_id, job.id AS job_id, job.state AS state + FROM history_dataset_collection_association hdca + LEFT JOIN job + ON hdca.job_id = job.id +) jobstates +GROUP BY jobstates.hdca_id +""" + + +def upgrade(): + public_collection_job_state_summary_view = PGView( + schema="public", + signature="collection_job_state_summary_view", + definition=HistoryDatasetCollectionJobStateSummary.aggregate_state_query, + ) + # op.replace_entity comes from alembic_utils plugin + op.replace_entity(public_collection_job_state_summary_view) # type: ignore[attr-defined] + + +def downgrade(): + public_collection_job_state_summary_view = PGView( + schema="public", signature="collection_job_state_summary_view", definition=PREVIOUS_AGGREGATE_QUERY + ) + op.replace_entity(public_collection_job_state_summary_view) # type: ignore[attr-defined] diff --git a/lib/galaxy/model/view/__init__.py b/lib/galaxy/model/view/__init__.py index 0bb5c2771a1d..04fd7d648709 100644 --- a/lib/galaxy/model/view/__init__.py +++ b/lib/galaxy/model/view/__init__.py @@ -26,6 +26,7 @@ class HistoryDatasetCollectionJobStateSummary(View): SUM(CASE WHEN state = 'error' THEN 1 ELSE 0 END) AS error, SUM(CASE WHEN state = 'failed' THEN 1 ELSE 0 END) AS failed, SUM(CASE WHEN state = 'paused' THEN 1 ELSE 0 END) AS paused, + SUM(CASE WHEN state = 'skipped' THEN 1 ELSE 0 END) AS skipped, SUM(CASE WHEN state = 'deleted' THEN 1 ELSE 0 END) AS deleted, SUM(CASE WHEN state = 'deleted_new' THEN 1 ELSE 0 END) AS deleted_new, SUM(CASE WHEN state = 'upload' THEN 1 ELSE 0 END) AS upload, @@ -61,6 +62,7 @@ class HistoryDatasetCollectionJobStateSummary(View): column("error", Integer), column("failed", Integer), column("paused", Integer), + column("skipped", Integer), column("deleted", Integer), column("deleted_new", Integer), column("upload", Integer), diff --git a/lib/galaxy/schema/invocation.py b/lib/galaxy/schema/invocation.py new file mode 100644 index 000000000000..6803d2183e92 --- /dev/null +++ b/lib/galaxy/schema/invocation.py @@ -0,0 +1,214 @@ +from enum import Enum +from typing import ( + Any, + Generic, + Optional, + TypeVar, + Union, +) + +from pydantic import ( + BaseModel, + Field, +) +from pydantic.generics import GenericModel +from pydantic.utils import GetterDict +from typing_extensions import ( + Annotated, + Literal, +) + +from galaxy.schema.fields import EncodedDatabaseIdField + + +class WarningReason(str, Enum): + workflow_output_not_found = "workflow_output_not_found" + + +class FailureReason(str, Enum): + dataset_failed = "dataset_failed" + collection_failed = "collection_failed" + job_failed = "job_failed" + output_not_found = "output_not_found" + expression_evaluation_failed = "expression_evaluation_failed" + when_not_boolean = "when_not_boolean" + unexpected_failure = "unexpected_failure" + + +class CancelReason(str, Enum): + """Possible reasons for a cancelled workflow.""" + + history_deleted = "history_deleted" + user_request = "user_request" + cancelled_on_review = "cancelled_on_review" + + +DatabaseIdT = TypeVar("DatabaseIdT") + + +class StepOrderIndexGetter(GetterDict): + def get(self, key: Any, default: Any = None) -> Any: + + # Fetch the order_index when serializing for the API, + # which makes much more sense when pointing to steps. + if key == "workflow_step_id": + return self._obj.workflow_step.order_index + elif key == "dependent_workflow_step_id": + return self._obj.dependent_workflow_step.order_index + + return super().get(key, default) + + +class InvocationMessageBase(GenericModel): + reason: Union[CancelReason, FailureReason, WarningReason] + + class Config: + orm_mode = True + getter_dict = StepOrderIndexGetter + + +class GenericInvocationCancellationReviewFailed(InvocationMessageBase, Generic[DatabaseIdT]): + reason: Literal[CancelReason.cancelled_on_review] + workflow_step_id: int = Field(..., description="Workflow step id of paused step that did not pass review.") + + +class GenericInvocationCancellationHistoryDeleted(InvocationMessageBase, Generic[DatabaseIdT]): + reason: Literal[CancelReason.history_deleted] + history_id: DatabaseIdT = Field(..., title="History ID", description="History ID of history that was deleted.") + + +class GenericInvocationCancellationUserRequest(InvocationMessageBase, Generic[DatabaseIdT]): + reason: Literal[CancelReason.user_request] + + +class InvocationFailureMessageBase(InvocationMessageBase, Generic[DatabaseIdT]): + workflow_step_id: int = Field(..., description="Workflow step id of step that failed.") + + +class GenericInvocationFailureDatasetFailed(InvocationFailureMessageBase[DatabaseIdT], Generic[DatabaseIdT]): + reason: Literal[FailureReason.dataset_failed] + hda_id: DatabaseIdT = Field( + ..., title="HistoryDatasetAssociation ID", description="HistoryDatasetAssociation ID that relates to failure." + ) + dependent_workflow_step_id: Optional[int] = Field(None, description="Workflow step id of step that caused failure.") + + +class GenericInvocationFailureCollectionFailed(InvocationFailureMessageBase[DatabaseIdT], Generic[DatabaseIdT]): + reason: Literal[FailureReason.collection_failed] + hdca_id: DatabaseIdT = Field( + None, + title="HistoryDatasetCollectionAssociation ID", + description="HistoryDatasetCollectionAssociation ID that relates to failure.", + ) + dependent_workflow_step_id: int = Field(..., description="Workflow step id of step that caused failure.") + + +class GenericInvocationFailureJobFailed(InvocationFailureMessageBase[DatabaseIdT], Generic[DatabaseIdT]): + reason: Literal[FailureReason.job_failed] + job_id: DatabaseIdT = Field(None, title="Job ID", description="Job ID that relates to failure.") + dependent_workflow_step_id: int = Field(..., description="Workflow step id of step that caused failure.") + + +class GenericInvocationFailureOutputNotFound(InvocationFailureMessageBase[DatabaseIdT], Generic[DatabaseIdT]): + reason: Literal[FailureReason.output_not_found] + output_name: str = Field(..., title="Tool or module output name that was referenced but not produced") + dependent_workflow_step_id: int = Field(..., description="Workflow step id of step that caused failure.") + + +class GenericInvocationFailureExpressionEvaluationFailed( + InvocationFailureMessageBase[DatabaseIdT], Generic[DatabaseIdT] +): + reason: Literal[FailureReason.expression_evaluation_failed] + details: Optional[str] = Field(None, description="May contain details to help troubleshoot this problem.") + + +class GenericInvocationFailureWhenNotBoolean(InvocationFailureMessageBase[DatabaseIdT], Generic[DatabaseIdT]): + reason: Literal[FailureReason.when_not_boolean] + details: str = Field(..., description="Contains details to help troubleshoot this problem.") + + +class GenericInvocationUnexpectedFailure(InvocationMessageBase, Generic[DatabaseIdT]): + reason: Literal[FailureReason.unexpected_failure] + details: Optional[str] = Field(None, description="May contains details to help troubleshoot this problem.") + + +class GenericInvocationWarning(InvocationMessageBase, Generic[DatabaseIdT]): + reason: WarningReason = Field(..., title="Failure Reason", description="Reason for warning") + workflow_step_id: Optional[int] = Field(None, title="Workflow step id of step that caused a warning.") + + +class GenericInvocationEvaluationWarningWorkflowOutputNotFound( + GenericInvocationWarning[DatabaseIdT], Generic[DatabaseIdT] +): + reason: Literal[WarningReason.workflow_output_not_found] + workflow_step_id: int = Field(..., title="Workflow step id of step that caused a warning.") + output_name: str = Field( + ..., description="Output that was designated as workflow output but that has not been found" + ) + + +InvocationCancellationReviewFailed = GenericInvocationCancellationReviewFailed[int] +InvocationCancellationHistoryDeleted = GenericInvocationCancellationHistoryDeleted[int] +InvocationCancellationUserRequest = GenericInvocationCancellationUserRequest[int] +InvocationFailureDatasetFailed = GenericInvocationFailureDatasetFailed[int] +InvocationFailureCollectionFailed = GenericInvocationFailureCollectionFailed[int] +InvocationFailureJobFailed = GenericInvocationFailureJobFailed[int] +InvocationFailureOutputNotFound = GenericInvocationFailureOutputNotFound[int] +InvocationFailureExpressionEvaluationFailed = GenericInvocationFailureExpressionEvaluationFailed[int] +InvocationFailureWhenNotBoolean = GenericInvocationFailureWhenNotBoolean[int] +InvocationUnexpectedFailure = GenericInvocationUnexpectedFailure[int] +InvocationWarningWorkflowOutputNotFound = GenericInvocationEvaluationWarningWorkflowOutputNotFound[int] + +InvocationMessageUnion = Union[ + InvocationCancellationReviewFailed, + InvocationCancellationHistoryDeleted, + InvocationCancellationUserRequest, + InvocationFailureDatasetFailed, + InvocationFailureCollectionFailed, + InvocationFailureJobFailed, + InvocationFailureOutputNotFound, + InvocationFailureExpressionEvaluationFailed, + InvocationFailureWhenNotBoolean, + InvocationUnexpectedFailure, + InvocationWarningWorkflowOutputNotFound, +] + +InvocationCancellationReviewFailedResponseModel = GenericInvocationCancellationReviewFailed[EncodedDatabaseIdField] +InvocationCancellationHistoryDeletedResponseModel = GenericInvocationCancellationHistoryDeleted[EncodedDatabaseIdField] +InvocationCancellationUserRequestResponseModel = GenericInvocationCancellationUserRequest[EncodedDatabaseIdField] +InvocationFailureDatasetFailedResponseModel = GenericInvocationFailureDatasetFailed[EncodedDatabaseIdField] +InvocationFailureCollectionFailedResponseModel = GenericInvocationFailureCollectionFailed[EncodedDatabaseIdField] +InvocationFailureJobFailedResponseModel = GenericInvocationFailureJobFailed[EncodedDatabaseIdField] +InvocationFailureOutputNotFoundResponseModel = GenericInvocationFailureOutputNotFound[EncodedDatabaseIdField] +InvocationFailureExpressionEvaluationFailedResponseModel = GenericInvocationFailureExpressionEvaluationFailed[ + EncodedDatabaseIdField +] +InvocationFailureWhenNotBooleanResponseModel = GenericInvocationFailureWhenNotBoolean[EncodedDatabaseIdField] +InvocationUnexpectedFailureResponseModel = GenericInvocationUnexpectedFailure[EncodedDatabaseIdField] +InvocationWarningWorkflowOutputNotFoundResponseModel = GenericInvocationEvaluationWarningWorkflowOutputNotFound[ + EncodedDatabaseIdField +] + +InvocationMessageResponseUnion = Annotated[ + Union[ + InvocationCancellationReviewFailedResponseModel, + InvocationCancellationHistoryDeletedResponseModel, + InvocationCancellationUserRequestResponseModel, + InvocationFailureDatasetFailedResponseModel, + InvocationFailureCollectionFailedResponseModel, + InvocationFailureJobFailedResponseModel, + InvocationFailureOutputNotFoundResponseModel, + InvocationFailureExpressionEvaluationFailedResponseModel, + InvocationFailureWhenNotBooleanResponseModel, + InvocationUnexpectedFailureResponseModel, + InvocationWarningWorkflowOutputNotFoundResponseModel, + ], + Field(discriminator="reason"), +] + + +class InvocationMessageResponseModel(BaseModel): + __root__: InvocationMessageResponseUnion + + class Config: + orm_mode = True diff --git a/lib/galaxy/schema/schema.py b/lib/galaxy/schema/schema.py index 2c81fab87f97..b50faa5feb81 100644 --- a/lib/galaxy/schema/schema.py +++ b/lib/galaxy/schema/schema.py @@ -733,6 +733,11 @@ class HDCJobStateSummary(Model): title="Paused jobs", description="Number of jobs in the `paused` state.", ) + skipped: int = Field( + 0, + title="Skipped jobs", + description="Number of jobs that were skipped due to conditional workflow step execution.", + ) deleted_new: int = Field( 0, title="Deleted new jobs", diff --git a/lib/galaxy/tool_util/cwl/parser.py b/lib/galaxy/tool_util/cwl/parser.py index 97d79c1fc988..467c6070f66c 100644 --- a/lib/galaxy/tool_util/cwl/parser.py +++ b/lib/galaxy/tool_util/cwl/parser.py @@ -686,7 +686,6 @@ def cwl_input_to_galaxy_step(self, input, i): "input_connections": {}, # Should the Galaxy API really require this? - Seems to. "workflow_outputs": self.get_outputs_for_label(label), } - if input_type == "File" and "default" not in input: input_as_dict["type"] = "data_input" elif isinstance(input_type, dict) and input_type.get("type") == "array": @@ -1057,7 +1056,8 @@ def to_dict(self, input_connections): tool_state[input_name] = None outputs = self.galaxy_workflow_outputs_list() - return { + when_expression = self._step.tool.get("when") + rval = { "id": self._index, "tool_uuid": self.tool_proxy.uuid, # TODO: make sure this is respected... "label": self.label, @@ -1068,6 +1068,9 @@ def to_dict(self, input_connections): "inputs": self.inputs_to_dicts(), "workflow_outputs": outputs, } + if when_expression: + rval["when"] = when_expression + return rval class SubworkflowStepProxy(BaseStepProxy): diff --git a/lib/galaxy/tools/__init__.py b/lib/galaxy/tools/__init__.py index fba2844f7d09..a569f884a824 100644 --- a/lib/galaxy/tools/__init__.py +++ b/lib/galaxy/tools/__init__.py @@ -1899,6 +1899,7 @@ def handle_single_execution( collection_info=None, job_callback=None, flush_job=True, + skip=False, ): """ Return a pair with whether execution is successful as well as either @@ -1916,6 +1917,7 @@ def handle_single_execution( collection_info=collection_info, job_callback=job_callback, flush_job=flush_job, + skip=skip, ) job = rval[0] out_data = rval[1] diff --git a/lib/galaxy/tools/actions/__init__.py b/lib/galaxy/tools/actions/__init__.py index c4c77f5648b3..45eba238fd3e 100644 --- a/lib/galaxy/tools/actions/__init__.py +++ b/lib/galaxy/tools/actions/__init__.py @@ -23,6 +23,7 @@ ) from galaxy.model.dataset_collections.builder import CollectionBuilder from galaxy.model.none_like import NoneDataset +from galaxy.objectstore import ObjectStorePopulator from galaxy.tools.parameters import update_dataset_ids from galaxy.tools.parameters.basic import ( DataCollectionToolParameter, @@ -367,6 +368,7 @@ def execute( collection_info=None, job_callback=None, flush_job=True, + skip=False, ): """ Executes a tool, creating job and tool outputs, associating them, and @@ -506,7 +508,7 @@ def handle_output(name, output, hidden=None): ) data.copy_tags_to(preserved_tags.values()) - # This may not be neccesary with the new parent/child associations + # This may not be necessary with the new parent/child associations data.designation = name # Copy metadata from one of the inputs if requested. @@ -638,6 +640,17 @@ def handle_output(name, output, hidden=None): job_setup_timer = ExecutionTimer() # Create the job object job, galaxy_session = self._new_job_for_session(trans, tool, history) + if skip: + job.state = job.states.SKIPPED + for output_collection in output_collections.out_collections.values(): + output_collection.mark_as_populated() + object_store_populator = ObjectStorePopulator(trans.app, trans.user) + for data in out_data.values(): + object_store_populator.set_object_store_id(data) + data.extension = "expression.json" + data.state = "ok" + with open(data.dataset.file_name, "w") as out: + out.write(json.dumps(None)) self._record_inputs(trans, tool, job, incoming, inp_data, inp_dataset_collections) self._record_outputs(job, out_data, output_collections) # execute immediate post job actions and associate post job actions that are to be executed after the job is complete @@ -694,7 +707,7 @@ def handle_output(name, output, hidden=None): trans.sa_session.flush() log.info(f"Flushed transaction for job {job.log_str()} {job_flush_timer}") - return job, out_data, history + return job, out_data, history def _remap_job_on_rerun(self, trans, galaxy_session, rerun_remap_job_id, current_job, out_data): """ diff --git a/lib/galaxy/tools/execute.py b/lib/galaxy/tools/execute.py index 507b68abaf56..a325966fb608 100644 --- a/lib/galaxy/tools/execute.py +++ b/lib/galaxy/tools/execute.py @@ -90,7 +90,7 @@ def execute( ) execution_cache = ToolExecutionCache(trans) - def execute_single_job(execution_slice, completed_job): + def execute_single_job(execution_slice, completed_job, skip=False): job_timer = tool.app.execution_timer_factory.get_timer( "internals.galaxy.tools.execute.job_single", SINGLE_EXECUTION_SUCCESS_MESSAGE ) @@ -121,6 +121,7 @@ def execute_single_job(execution_slice, completed_job): collection_info, job_callback=job_callback, flush_job=False, + skip=skip, ) if job: log.debug(job_timer.to_str(tool_id=tool.id, job_id=job.id)) @@ -161,7 +162,8 @@ def execute_single_job(execution_slice, completed_job): has_remaining_jobs = True break else: - execute_single_job(execution_slice, completed_jobs[i]) + skip = execution_slice.param_combination.pop("__when_value__", None) is False + execute_single_job(execution_slice, completed_jobs[i], skip=skip) history = execution_slice.history or history jobs_executed += 1 @@ -526,7 +528,7 @@ def record_success(self, execution_slice, job, outputs): self.outputs_by_output_name[job_output.name].append(job_output.dataset_collection) def new_collection_execution_slices(self): - for job_index, (param_combination, dataset_collection_elements) in enumerate( + for job_index, (param_combination, (dataset_collection_elements, _when_value)) in enumerate( zip(self.param_combinations, self.walk_implicit_collections()) ): completed_job = self.completed_jobs and self.completed_jobs[job_index] @@ -550,7 +552,7 @@ def record_success(self, execution_slice, job, outputs): self.invocation_step.job = job def new_collection_execution_slices(self): - for job_index, (param_combination, dataset_collection_elements) in enumerate( + for job_index, (param_combination, (dataset_collection_elements, _when_value)) in enumerate( zip(self.param_combinations, self.walk_implicit_collections()) ): completed_job = self.completed_jobs and self.completed_jobs[job_index] diff --git a/lib/galaxy/tools/parameters/wrapped_json.py b/lib/galaxy/tools/parameters/wrapped_json.py index 1cac8c2e2e5b..912ea3eb2c74 100644 --- a/lib/galaxy/tools/parameters/wrapped_json.py +++ b/lib/galaxy/tools/parameters/wrapped_json.py @@ -1,3 +1,4 @@ +import json import logging from typing import ( Any, @@ -168,6 +169,15 @@ def _json_wrap_input(input, value_wrapper, profile, handle_files="skip"): def _hda_to_object(hda): + if hda.extension == "expression.json": + # We may have a null data value + with open(str(hda)) as inp: + try: + rval = json.loads(inp.read(5)) + if rval is None: + return rval + except Exception: + pass hda_dict = hda.to_dict() metadata_dict = {} diff --git a/lib/galaxy/webapps/galaxy/api/workflows.py b/lib/galaxy/webapps/galaxy/api/workflows.py index d2e4d0e891d9..c4f3b0c97c04 100644 --- a/lib/galaxy/webapps/galaxy/api/workflows.py +++ b/lib/galaxy/webapps/galaxy/api/workflows.py @@ -48,6 +48,7 @@ from galaxy.model.item_attrs import UsesAnnotations from galaxy.model.store import BcoExportOptions from galaxy.schema.fields import DecodedDatabaseIdField +from galaxy.schema.invocation import InvocationMessageResponseModel from galaxy.schema.schema import ( AsyncFile, AsyncTaskResultSummary, @@ -236,6 +237,7 @@ def create(self, trans: GalaxyWebTransaction, payload=None, **kwd): """ ways_to_create = { + "archive_file", "archive_source", "from_history_id", "from_path", @@ -254,8 +256,8 @@ def create(self, trans: GalaxyWebTransaction, payload=None, **kwd): message = f"Only one parameter among - {', '.join(ways_to_create)} - must be specified" raise exceptions.RequestParameterInvalidException(message) - if "archive_source" in payload: - archive_source = payload["archive_source"] + if "archive_source" in payload or "archive_file" in payload: + archive_source = payload.get("archive_source") archive_file = payload.get("archive_file") archive_data = None if archive_source: @@ -764,12 +766,19 @@ def invoke(self, trans: GalaxyWebTransaction, workflow_id, payload, **kwd): invocations.append(workflow_invocation) trans.sa_session.flush() - invocations = [self.encode_all_ids(trans, invocation.to_dict(), recursive=True) for invocation in invocations] + encoded_invocations = [] + for invocation in invocations: + as_dict = workflow_invocation.to_dict() + as_dict = self.encode_all_ids(trans, as_dict, recursive=True) + as_dict["messages"] = [ + InvocationMessageResponseModel.parse_obj(message).__root__.dict() for message in invocation.messages + ] + encoded_invocations.append(as_dict) if is_batch: - return invocations + return encoded_invocations else: - return invocations[0] + return encoded_invocations[0] @expose_api def index_invocations(self, trans: GalaxyWebTransaction, **kwd): diff --git a/lib/galaxy/webapps/galaxy/services/invocations.py b/lib/galaxy/webapps/galaxy/services/invocations.py index 82e1bb536d21..44d0655a3fee 100644 --- a/lib/galaxy/webapps/galaxy/services/invocations.py +++ b/lib/galaxy/webapps/galaxy/services/invocations.py @@ -24,11 +24,13 @@ ) from galaxy.managers.histories import HistoryManager from galaxy.managers.workflows import WorkflowsManager +from galaxy.model import WorkflowInvocation from galaxy.model.store import ( BcoExportOptions, get_export_store_factory, ) from galaxy.schema.fields import DecodedDatabaseIdField +from galaxy.schema.invocation import InvocationMessageResponseModel from galaxy.schema.schema import ( AsyncFile, AsyncTaskResultSummary, @@ -190,7 +192,7 @@ def write_store( def serialize_workflow_invocation( self, - invocation, + invocation: WorkflowInvocation, params: InvocationSerializationParams, default_view: InvocationSerializationView = InvocationSerializationView.element, ): @@ -198,7 +200,11 @@ def serialize_workflow_invocation( step_details = params.step_details legacy_job_state = params.legacy_job_state as_dict = invocation.to_dict(view, step_details=step_details, legacy_job_state=legacy_job_state) - return self.security.encode_all_ids(as_dict, recursive=True) + as_dict = self.security.encode_all_ids(as_dict, recursive=True) + as_dict["messages"] = [ + InvocationMessageResponseModel.parse_obj(message).__root__.dict() for message in invocation.messages + ] + return as_dict def serialize_workflow_invocations( self, diff --git a/lib/galaxy/workflow/modules.py b/lib/galaxy/workflow/modules.py index 1a7cd41037bb..ce0445dd17bd 100644 --- a/lib/galaxy/workflow/modules.py +++ b/lib/galaxy/workflow/modules.py @@ -13,10 +13,12 @@ List, Optional, Type, + TYPE_CHECKING, Union, ) import packaging.version +from cwl_utils.expression import do_eval from typing_extensions import TypedDict from galaxy import ( @@ -36,6 +38,15 @@ WorkflowStepConnection, ) from galaxy.model.dataset_collections import matching +from galaxy.schema.invocation import ( + CancelReason, + FailureReason, + InvocationCancellationReviewFailed, + InvocationFailureDatasetFailed, + InvocationFailureExpressionEvaluationFailed, + InvocationFailureWhenNotBoolean, +) +from galaxy.tool_util.cwl.util import set_basename_and_derived_properties from galaxy.tool_util.parser.output_objects import ToolExpressionOutput from galaxy.tools import ( DatabaseOperationTool, @@ -86,6 +97,9 @@ from galaxy.util.template import fill_template from galaxy.util.tool_shed.common_util import get_tool_shed_url_from_tool_shed_registry +if TYPE_CHECKING: + from galaxy.schema.invocation import InvocationMessageUnion + log = logging.getLogger(__name__) # Key into Tool state to describe invocation-specific runtime properties. @@ -104,6 +118,126 @@ def __str__(self): NO_REPLACEMENT = NoReplacement() +class ConditionalStepWhen(BooleanToolParameter): + pass + + +def evaluate_value_from_expressions(progress, step, execution_state, extra_step_state): + when_expression = step.when_expression + value_from_expressions = {} + + if execution_state: + for key in execution_state.inputs.keys(): + step_input = step.inputs_by_name.get(key) + if step_input and step_input.value_from is not None: + value_from_expressions[key] = step_input.value_from + + if not value_from_expressions and when_expression is None: + return {} + + hda_references = [] + + def to_cwl(value): + element_identifier = None + if isinstance(value, model.DatasetCollectionElement) and value.hda: + element_identifier = value.element_identifier + value = value.hda + if isinstance(value, model.HistoryDatasetAssociation): + # I think the following two checks are needed but they may + # not be needed. + if not value.dataset.in_ready_state(): + why = "dataset [%s] is needed for valueFrom expression and is non-ready" % value.id + raise DelayedWorkflowEvaluation(why=why) + if not value.is_ok: + raise FailWorkflowEvaluation( + why=InvocationFailureDatasetFailed( + reason=FailureReason.dataset_failed, hda_id=value.id, workflow_step_id=step.id + ) + ) + if value.ext == "expression.json": + with open(value.file_name) as f: + # OUR safe_loads won't work, will not load numbers, etc... + return json.load(f) + else: + hda_references.append(value) + properties = { + "class": "File", + "location": "step_input://%d" % len(hda_references), + } + set_basename_and_derived_properties( + properties, value.dataset.created_from_basename or element_identifier or value.name + ) + return properties + elif hasattr(value, "collection"): + collection = value.collection + if collection.collection_type == "list": + return [to_cwl(dce) for dce in collection.dataset_elements] + else: + # Could be record or nested lists + rval = {} + for element in collection.elements: + rval[element.element_identifier] = to_cwl(element.element_object) + return rval + else: + return value + + def from_cwl(value): + # TODO: turn actual files into HDAs here ... somehow I suppose. Things with + # file:// locations for instance. + if isinstance(value, dict) and "class" in value and "location" in value: + if value["class"] == "File": + # This is going to re-file -> HDA this each iteration I think, not a good + # implementation. + return progress.raw_to_galaxy(value) + assert value["location"].startswith("step_input://"), "Invalid location %s" % value + return hda_references[int(value["location"][len("step_input://") :]) - 1] + elif isinstance(value, dict): + raise NotImplementedError() + else: + return value + + step_state = {} + for key, value in extra_step_state.items(): + step_state[key] = to_cwl(value) + if execution_state: + for key, value in execution_state.inputs.items(): + step_state[key] = to_cwl(value) + + if when_expression is not None: + try: + as_cwl_value = do_eval( + when_expression, + step_state, + [{"class": "InlineJavascriptRequirement"}], + None, + None, + {}, + ) + except Exception: + # Exception contains script and traceback, which could be helpful for debugging workflows, + # but both could conceivably contain secrets. + # CWL has a secret hint that should cause values to be sanitized, + # but Galaxy does not, so we can't really display anything here at this point. + # In any case I believe the CWL secret hint can be bypassed if the value is passed on + # to another step input that doesn't have the secret set. + # Complicated stuff, ignore for now. + raise FailWorkflowEvaluation( + InvocationFailureExpressionEvaluationFailed( + reason=FailureReason.expression_evaluation_failed, workflow_step_id=step.id + ) + ) + when_value = from_cwl(as_cwl_value) + if not isinstance(when_value, bool): + raise FailWorkflowEvaluation( + InvocationFailureWhenNotBoolean( + reason=FailureReason.when_not_boolean, + workflow_step_id=step.id, + details=f"Type is: {when_value.__class__.__name__}", + ) + ) + return when_value + + class WorkflowModule: label: str @@ -361,19 +495,15 @@ def get_replacement_parameters(self, step): return [] def compute_collection_info(self, progress, step, all_inputs): - """Use get_all_inputs (if implemented) to determine collection mapping for execution. - - Hopefully this can be reused for Tool and Subworkflow modules. """ - + Use get_all_inputs (if implemented) to determine collection mapping for execution. + """ collections_to_match = self._find_collections_to_match(progress, step, all_inputs) # Have implicit collections... - if collections_to_match.has_collections(): - collection_info = self.trans.app.dataset_collection_manager.match_collections(collections_to_match) - else: - collection_info = None - - return collection_info + collection_info = self.trans.app.dataset_collection_manager.match_collections(collections_to_match) + if collection_info and progress.subworkflow_collection_info: + collection_info.when_values = progress.subworkflow_collection_info.when_values + return collection_info or progress.subworkflow_collection_info def _find_collections_to_match(self, progress, step, all_inputs): collections_to_match = matching.CollectionsToMatch() @@ -440,6 +570,20 @@ def _find_collections_to_match(self, progress, step, all_inputs): collections_to_match.add(name, data) continue + known_input_names = {input_dict["name"] for input_dict in all_inputs} + + if step.when_expression: + for step_input in step.inputs: + step_input_name = step_input.name + input_in_execution_state = step_input_name not in known_input_names + if input_in_execution_state: + maybe_collection = progress.replacement_for_connection( + step.input_connections_by_name[step_input_name][0] + ) + if hasattr(maybe_collection, "collection"): + # Is that always right ? + collections_to_match.add(step_input_name, maybe_collection) + return collections_to_match @@ -596,10 +740,47 @@ def execute(self, trans, progress, invocation_step, use_cached_job=False): inputs, etc... """ step = invocation_step.workflow_step - collection_info = self.compute_collection_info(progress, step, self.get_all_inputs()) - structure = collection_info.structure if collection_info else None + all_inputs = self.get_all_inputs() + collection_info = self.compute_collection_info(progress, step, all_inputs) + + if collection_info: + iteration_elements_iter = collection_info.slice_collections() + else: + if progress.when_values: + # If we have more than one item in when_values it must have come from an expression.json + # collection, so we'd have a collection_info instance ... I think. + assert len(progress.when_values) == 1, "Got more than 1 when value, this shouldn't be possible" + iteration_elements_iter = [(None, progress.when_values[0] if progress.when_values else None)] + + when_values = [] + if step.when_expression: + for (iteration_elements, when_value) in iteration_elements_iter: + if when_value is False: + when_values.append(when_value) + continue + extra_step_state = {} + for step_input in step.inputs: + step_input_name = step_input.name + if iteration_elements and step_input_name in iteration_elements: # noqa: B023 + value = iteration_elements[step_input_name] # noqa: B023 + else: + value = progress.replacement_for_connection(step_input.connections[0], is_data=True) + extra_step_state[step_input_name] = value + + when_values.append( + evaluate_value_from_expressions( + progress, step, execution_state={}, extra_step_state=extra_step_state + ) + ) + if collection_info: + collection_info.when_values = when_values + subworkflow_invoker = progress.subworkflow_invoker( - trans, step, use_cached_job=use_cached_job, subworkflow_structure=structure + trans, + step, + use_cached_job=use_cached_job, + subworkflow_collection_info=collection_info, + when_values=when_values, ) subworkflow_invoker.invoke() subworkflow = subworkflow_invoker.workflow @@ -1457,7 +1638,11 @@ def recover_mapping(self, invocation_step, progress): progress.set_step_outputs(invocation_step, {"output": replacement}) return elif action is False: - raise CancelWorkflowEvaluation() + raise CancelWorkflowEvaluation( + why=InvocationCancellationReviewFailed( + reason=CancelReason.cancelled_on_review, workflow_step_id=step.id + ) + ) delayed_why = "workflow paused at this step waiting for review" raise DelayedWorkflowEvaluation(why=delayed_why) @@ -1919,15 +2104,18 @@ def execute(self, trans, progress, invocation_step, use_cached_job=False): step = invocation_step.workflow_step tool = trans.app.toolbox.get_tool(step.tool_id, tool_version=step.tool_version, tool_uuid=step.tool_uuid) if not tool.is_workflow_compatible: - message = f"Specified tool [{tool.id}] in workflow is not workflow-compatible." - raise Exception(message) + # TODO: why do we even create an invocation, seems like something we could check on submit? + message = f"Specified tool [{tool.id}] in step {step.order_index + 1} is not workflow-compatible." + raise exceptions.MessageException(message) tool_state = step.state + tool_inputs = tool.inputs.copy() # Not strictly needed - but keep Tool state clean by stripping runtime # metadata parameters from it. if RUNTIME_STEP_META_STATE_KEY in tool_state.inputs: del tool_state.inputs[RUNTIME_STEP_META_STATE_KEY] all_inputs = self.get_all_inputs() + all_inputs_by_name = {} for input_dict in all_inputs: all_inputs_by_name[input_dict["name"]] = input_dict @@ -1937,10 +2125,12 @@ def execute(self, trans, progress, invocation_step, use_cached_job=False): if collection_info: iteration_elements_iter = collection_info.slice_collections() else: - iteration_elements_iter = [None] + if progress.when_values: + assert len(progress.when_values) == 1, "Got more than 1 when value, this shouldn't be possible" + iteration_elements_iter = [(None, progress.when_values[0] if progress.when_values else None)] resource_parameters = invocation.resource_parameters - for iteration_elements in iteration_elements_iter: + for (iteration_elements, when_value) in iteration_elements_iter: execution_state = tool_state.copy() # TODO: Move next step into copy() execution_state.inputs = make_dict_copy(execution_state.inputs) @@ -1971,22 +2161,58 @@ def callback(input, prefixed_name, **kwargs): replacement = json.load(f) found_replacement_keys.add(prefixed_name) # noqa: B023 + # bool cast should be fine, can only have true/false on ConditionalStepWhen + # also terrible of course and it's not needed for API requests + if isinstance(input, ConditionalStepWhen) and bool(replacement) is False: + raise SkipWorkflowStepEvaluation + return replacement try: # Replace DummyDatasets with historydatasetassociations visit_input_values( - tool.inputs, + tool_inputs, execution_state.inputs, callback, no_replacement_value=NO_REPLACEMENT, replace_optional_connections=True, ) except KeyError as k: - message_template = "Error due to input mapping of '%s' in '%s'. A common cause of this is conditional outputs that cannot be determined until runtime, please review your workflow." - message = message_template % (tool.name, unicodify(k)) + message = f"Error due to input mapping of '{unicodify(k)}' in tool '{tool.id}'. A common cause of this is conditional outputs that cannot be determined until runtime, please review workflow step {step.order_index + 1}." raise exceptions.MessageException(message) + if step.when_expression and when_value is not False: + extra_step_state = {} + for step_input in step.inputs: + step_input_name = step_input.name + input_in_execution_state = step_input_name not in execution_state.inputs + if input_in_execution_state: + if step_input_name in all_inputs_by_name: + if iteration_elements and step_input_name in iteration_elements: # noqa: B023 + value = iteration_elements[step_input_name] # noqa: B023 + else: + value = progress.replacement_for_input(step, all_inputs_by_name[step_input_name]) + # TODO: only do this for values... is everything with a default + # this way a field parameter? I guess not? + extra_step_state[step_input_name] = value + # Might be needed someday... + # elif step_input.default_value_set: + # extra_step_state[step_input_name] = step_input.default_value + else: + if iteration_elements and step_input_name in iteration_elements: # noqa: B023 + value = iteration_elements[step_input_name] # noqa: B023 + else: + value = progress.replacement_for_connection(step_input.connections[0], is_data=True) + extra_step_state[step_input_name] = value + + if when_value is not False: + when_value = evaluate_value_from_expressions( + progress, step, execution_state=execution_state, extra_step_state=extra_step_state + ) + if when_value is not None: + # Track this more formally ? + execution_state.inputs["__when_value__"] = when_value + unmatched_input_connections = expected_replacement_keys - found_replacement_keys if unmatched_input_connections: log.warning(f"Failed to use input connections for inputs [{unmatched_input_connections}]") @@ -2052,8 +2278,9 @@ def callback(input, prefixed_name, **kwargs): self._handle_mapped_over_post_job_actions(step, step_inputs, step_outputs, progress.replacement_dict) if execution_tracker.execution_errors: - message = "Failed to create one or more job(s) for workflow step." - raise Exception(message) + # TODO: formalize into InvocationFailure ? + message = f"Failed to create {len(execution_tracker.execution_errors)} job(s) for workflow step {step.order_index + 1}: {str(execution_tracker.execution_errors[0])}" + raise exceptions.MessageException(message) return complete @@ -2196,6 +2423,16 @@ def __init__(self, why=None): class CancelWorkflowEvaluation(Exception): + def __init__(self, why: "InvocationMessageUnion"): + self.why = why + + +class FailWorkflowEvaluation(Exception): + def __init__(self, why: "InvocationMessageUnion"): + self.why = why + + +class SkipWorkflowStepEvaluation(Exception): pass diff --git a/lib/galaxy/workflow/run.py b/lib/galaxy/workflow/run.py index 002b93ee2115..ef447aaabb0f 100644 --- a/lib/galaxy/workflow/run.py +++ b/lib/galaxy/workflow/run.py @@ -13,10 +13,23 @@ from typing_extensions import Protocol from galaxy import model +from galaxy.exceptions import MessageException from galaxy.model import ( WorkflowInvocation, WorkflowInvocationStep, ) +from galaxy.schema.invocation import ( + CancelReason, + FailureReason, + InvocationCancellationHistoryDeleted, + InvocationFailureCollectionFailed, + InvocationFailureDatasetFailed, + InvocationFailureJobFailed, + InvocationFailureOutputNotFound, + InvocationUnexpectedFailure, + InvocationWarningWorkflowOutputNotFound, + WarningReason, +) from galaxy.util import ExecutionTimer from galaxy.workflow import modules from galaxy.workflow.run_request import ( @@ -73,21 +86,31 @@ def __invoke( workflow_invocation=workflow_invocation, ) workflow_invocation = invoker.workflow_invocation + outputs = {} try: outputs = invoker.invoke() - except modules.CancelWorkflowEvaluation: + except modules.CancelWorkflowEvaluation as e: if workflow_invocation.cancel(): - trans.sa_session.add(workflow_invocation) - outputs = {} + workflow_invocation.add_message(e.why) + except modules.FailWorkflowEvaluation as e: + workflow_invocation.fail() + workflow_invocation.add_message(e.why) + except MessageException as e: + # Convention for safe message we can show to users + workflow_invocation.fail() + failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure, details=str(e)) + workflow_invocation.add_message(failure) except Exception: + # Could potentially be large and/or contain raw ids or other secrets, don't add details log.exception("Failed to execute scheduled workflow.") # Running workflow invocation in background, just mark # persistent workflow invocation as failed. + failure = InvocationUnexpectedFailure(reason=FailureReason.unexpected_failure) workflow_invocation.fail() - trans.sa_session.add(workflow_invocation) - outputs = {} + workflow_invocation.add_message(failure) # Be sure to update state of workflow_invocation. + trans.sa_session.add(workflow_invocation) trans.sa_session.flush() return outputs, workflow_invocation @@ -176,8 +199,11 @@ def invoke(self) -> Dict[int, Any]: return self.progress.outputs if workflow_invocation.history.deleted: - log.info("Cancelled workflow evaluation due to deleted history") - raise modules.CancelWorkflowEvaluation() + raise modules.CancelWorkflowEvaluation( + why=InvocationCancellationHistoryDeleted( + reason=CancelReason.history_deleted, history_id=workflow_invocation.history_id + ) + ) remaining_steps = self.progress.remaining_steps() delayed_steps = False @@ -244,9 +270,9 @@ def __check_implicitly_dependent_steps(self, step): for input_connection in step.input_connections: if input_connection.non_data_connection: output_id = input_connection.output_step.id - self.__check_implicitly_dependent_step(output_id) + self.__check_implicitly_dependent_step(output_id, step.id) - def __check_implicitly_dependent_step(self, output_id): + def __check_implicitly_dependent_step(self, output_id: int, step_id: int): step_invocation = self.workflow_invocation.step_invocation_for_step_id(output_id) # No steps created yet - have to delay evaluation. @@ -268,7 +294,14 @@ def __check_implicitly_dependent_step(self, output_id): raise modules.DelayedWorkflowEvaluation(why=delayed_why) if job.state != job.states.OK: - raise modules.CancelWorkflowEvaluation() + raise modules.FailWorkflowEvaluation( + why=InvocationFailureJobFailed( + reason=FailureReason.job_failed, + job_id=job.id, + workflow_step_id=step_id, + dependent_workflow_step_id=output_id, + ) + ) def _invoke_step(self, invocation_step: WorkflowInvocationStep) -> Optional[bool]: incomplete_or_none = invocation_step.workflow_step.module.execute( @@ -305,7 +338,8 @@ def __init__( copy_inputs_to_history: bool = False, use_cached_job: bool = False, replacement_dict: Optional[Dict[str, str]] = None, - subworkflow_structure=None, + subworkflow_collection_info=None, + when_values=None, ) -> None: self.outputs: Dict[int, Any] = {} self.module_injector = module_injector @@ -317,7 +351,9 @@ def __init__( self.copy_inputs_to_history = copy_inputs_to_history self.use_cached_job = use_cached_job self.replacement_dict = replacement_dict or {} - self.subworkflow_structure = subworkflow_structure + self.subworkflow_collection_info = subworkflow_collection_info + self.subworkflow_structure = subworkflow_collection_info.structure if subworkflow_collection_info else None + self.when_values = when_values @property def maximum_jobs_to_schedule_or_none(self) -> Optional[int]: @@ -346,9 +382,10 @@ def remaining_steps( step_args = self.param_map.get(step_id, {}) self.module_injector.compute_runtime_state(step, step_args=step_args) if step_id not in step_states: - raise Exception( - f"Workflow invocation [{self.workflow_invocation.id}] has no step state for step {step.log_str()}. States ids are {list(step_states.keys())}." - ) + # Can this ever happen? + public_message = f"Workflow invocation has no step state for step {step.order_index + 1}" + log.error(f"{public_message}. State is known for these step ids: {list(step_states.keys())}.") + raise MessageException(public_message) runtime_state = step_states[step_id].value assert step.module step.state = step.module.decode_runtime_state(runtime_state) @@ -390,29 +427,45 @@ def replacement_for_input(self, step: "WorkflowStep", input_dict: Dict[str, Any] def replacement_for_connection(self, connection: "WorkflowStepConnection", is_data: bool = True) -> Any: output_step_id = connection.output_step.id + output_name = connection.output_name if output_step_id not in self.outputs: - message = f"No outputs found for step id {output_step_id}, outputs are {self.outputs}" - raise Exception(message) + raise modules.FailWorkflowEvaluation( + why=InvocationFailureOutputNotFound( + reason=FailureReason.output_not_found, + workflow_step_id=connection.input_step_id, + output_name=output_name, + dependent_workflow_step_id=output_step_id, + ) + ) step_outputs = self.outputs[output_step_id] if step_outputs is STEP_OUTPUT_DELAYED: delayed_why = f"dependent step [{output_step_id}] delayed, so this step must be delayed" raise modules.DelayedWorkflowEvaluation(why=delayed_why) - output_name = connection.output_name try: replacement = step_outputs[output_name] except KeyError: - # Must resolve. - template = "Workflow evaluation problem - failed to find output_name %s in step_outputs %s" - message = template % (output_name, step_outputs) - raise Exception(message) + raise modules.FailWorkflowEvaluation( + why=InvocationFailureOutputNotFound( + reason=FailureReason.output_not_found, + workflow_step_id=connection.input_step_id, + output_name=output_name, + dependent_workflow_step_id=output_step_id, + ) + ) if isinstance(replacement, model.HistoryDatasetCollectionAssociation): if not replacement.collection.populated: if not replacement.waiting_for_elements: # If we are not waiting for elements, there was some # problem creating the collection. Collection will never # be populated. - # TODO: consider distinguish between cancelled and failed? - raise modules.CancelWorkflowEvaluation() + raise modules.FailWorkflowEvaluation( + why=InvocationFailureCollectionFailed( + reason=FailureReason.collection_failed, + hdca_id=replacement.id, + workflow_step_id=connection.input_step_id, + dependent_workflow_step_id=output_step_id, + ) + ) delayed_why = f"dependent collection [{replacement.id}] not yet populated with datasets" raise modules.DelayedWorkflowEvaluation(why=delayed_why) @@ -426,7 +479,14 @@ def replacement_for_connection(self, connection: "WorkflowStepConnection", is_da if replacement.is_pending: raise modules.DelayedWorkflowEvaluation() if not replacement.is_ok: - raise modules.CancelWorkflowEvaluation() + raise modules.FailWorkflowEvaluation( + why=InvocationFailureDatasetFailed( + reason=FailureReason.dataset_failed, + hda_id=replacement.id, + workflow_step_id=connection.input_step_id, + dependent_workflow_step_id=output_step_id, + ) + ) else: if not replacement.collection.populated: raise modules.DelayedWorkflowEvaluation() @@ -435,7 +495,14 @@ def replacement_for_connection(self, connection: "WorkflowStepConnection", is_da if dataset_instance.is_pending: pending = True elif not dataset_instance.is_ok: - raise modules.CancelWorkflowEvaluation() + raise modules.FailWorkflowEvaluation( + why=InvocationFailureDatasetFailed( + reason=FailureReason.dataset_failed, + hda_id=replacement.id, + workflow_step_id=connection.input_step_id, + dependent_workflow_step_id=output_step_id, + ) + ) if pending: raise modules.DelayedWorkflowEvaluation() @@ -466,7 +533,15 @@ def set_outputs_for_input( if default_value or step.input_optional: outputs["output"] = default_value else: - raise ValueError(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") + log.error(f"{step.log_str()} not found in inputs_step_id {self.inputs_by_step_id}") + raise modules.FailWorkflowEvaluation( + why=InvocationFailureOutputNotFound( + reason=FailureReason.output_not_found, + workflow_step_id=invocation_step.workflow_step_id, + output_name="output", + dependent_workflow_step_id=invocation_step.workflow_step_id, + ) + ) elif step_id in self.inputs_by_step_id: outputs["output"] = self.inputs_by_step_id[step_id] @@ -494,13 +569,14 @@ def set_step_outputs( for workflow_output in step.workflow_outputs: output_name = workflow_output.output_name if output_name not in outputs: + invocation_step.workflow_invocation.add_message( + InvocationWarningWorkflowOutputNotFound( + reason=WarningReason.workflow_output_not_found, + workflow_step_id=step.id, + output_name=output_name, + ) + ) message = f"Failed to find expected workflow output [{output_name}] in step outputs [{outputs}]" - # raise KeyError(message) - # Pre-18.01 we would have never even detected this output wasn't configured - # and even in 18.01 we don't have a way to tell the user something bad is - # happening so I guess we just log a debug message and continue sadly for now. - # Once https://github.com/galaxyproject/galaxy/issues/5142 is complete we could - # at least tell the user what happened, give them a warning. log.debug(message) continue output = outputs[output_name] @@ -523,7 +599,7 @@ def _subworkflow_invocation(self, step: "WorkflowStep") -> WorkflowInvocation: workflow_invocation = self.workflow_invocation subworkflow_invocation = workflow_invocation.get_subworkflow_invocation_for_step(step) if subworkflow_invocation is None: - raise Exception(f"Failed to find persisted workflow invocation for step [{step.id}]") + raise MessageException(f"Failed to find persisted subworkflow invocation for step [{step.order_index + 1}]") return subworkflow_invocation def subworkflow_invoker( @@ -531,12 +607,17 @@ def subworkflow_invoker( trans: "WorkRequestContext", step: "WorkflowStep", use_cached_job: bool = False, - subworkflow_structure=None, + subworkflow_collection_info=None, + when_values=None, ) -> WorkflowInvoker: subworkflow_invocation = self._subworkflow_invocation(step) workflow_run_config = workflow_request_to_run_config(subworkflow_invocation, use_cached_job) subworkflow_progress = self.subworkflow_progress( - subworkflow_invocation, step, workflow_run_config.param_map, subworkflow_structure=subworkflow_structure + subworkflow_invocation, + step, + workflow_run_config.param_map, + subworkflow_collection_info=subworkflow_collection_info, + when_values=when_values, ) subworkflow_invocation = subworkflow_progress.workflow_invocation return WorkflowInvoker( @@ -551,7 +632,8 @@ def subworkflow_progress( subworkflow_invocation: WorkflowInvocation, step: "WorkflowStep", param_map: Dict, - subworkflow_structure=None, + subworkflow_collection_info=None, + when_values=None, ) -> "WorkflowProgress": subworkflow = subworkflow_invocation.workflow subworkflow_inputs = {} @@ -570,7 +652,14 @@ def subworkflow_progress( break if not connection_found and not input_subworkflow_step.input_optional: - raise Exception("Could not find connections for all subworkflow inputs.") + raise modules.FailWorkflowEvaluation( + InvocationFailureOutputNotFound( + reason=FailureReason.output_not_found, + workflow_step_id=step.id, + output_name=input_connection.output_name, + dependent_workflow_step_id=input_connection.output_step.id, + ) + ) return WorkflowProgress( subworkflow_invocation, @@ -579,7 +668,8 @@ def subworkflow_progress( param_map=param_map, use_cached_job=self.use_cached_job, replacement_dict=self.replacement_dict, - subworkflow_structure=subworkflow_structure, + subworkflow_collection_info=subworkflow_collection_info, + when_values=when_values, ) def _recover_mapping(self, step_invocation: WorkflowInvocationStep) -> None: diff --git a/lib/galaxy_test/api/test_workflows.py b/lib/galaxy_test/api/test_workflows.py index 921f4ad3dcc1..4f5e15533254 100644 --- a/lib/galaxy_test/api/test_workflows.py +++ b/lib/galaxy_test/api/test_workflows.py @@ -36,6 +36,7 @@ WorkflowPopulator, ) from galaxy_test.base.workflow_fixtures import ( + NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE, WORKFLOW_INPUTS_AS_OUTPUTS, WORKFLOW_NESTED_REPLACEMENT_PARAMETER, WORKFLOW_NESTED_RUNTIME_PARAMETER, @@ -1930,6 +1931,431 @@ def test_workflow_metadata_validation_0(self): history_id=history_id, ) + def test_run_workflow_simple_conditional_step(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """class: GalaxyWorkflow +inputs: + should_run: + type: boolean + some_file: + type: data +steps: + cat1: + tool_id: cat1 + in: + input1: some_file + should_run: should_run + when: $(inputs.should_run) +""", + test_data=""" +some_file: + value: 1.bed + type: File +should_run: + value: false + type: raw +""", + history_id=history_id, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + for step in invocation_details["steps"]: + if step["workflow_step_label"] == "cat1": + assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1 + + def test_run_workflow_invalid_when_expression(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """class: GalaxyWorkflow +inputs: + should_run: + type: boolean + some_file: + type: data +steps: + cat1: + tool_id: cat1 + in: + input1: some_file + should_run: should_run + when: $(:syntaxError:) +""", + test_data=""" +some_file: + value: 1.bed + type: File +should_run: + value: false + type: raw +""", + history_id=history_id, + wait=True, + assert_ok=False, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + assert invocation_details["state"] == "failed" + assert len(invocation_details["messages"]) == 1 + message = invocation_details["messages"][0] + assert message["reason"] == "expression_evaluation_failed" + + def test_run_workflow_fails_when_expression_not_boolean(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """class: GalaxyWorkflow +inputs: + should_run: + type: boolean + some_file: + type: data +steps: + cat1: + tool_id: cat1 + in: + input1: some_file + should_run: should_run + when: $("false") +""", + test_data=""" +some_file: + value: 1.bed + type: File +should_run: + value: false + type: raw +""", + history_id=history_id, + wait=True, + assert_ok=False, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + assert invocation_details["state"] == "failed" + assert len(invocation_details["messages"]) == 1 + message = invocation_details["messages"][0] + assert message["reason"] == "when_not_boolean" + assert message["details"] == "Type is: str" + assert message["workflow_step_id"] == 2 + + def test_run_workflow_subworkflow_conditional_step(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """class: GalaxyWorkflow +inputs: + should_run: + type: boolean + some_file: + type: data +steps: + subworkflow: + run: + class: GalaxyWorkflow + inputs: + some_file: + type: data + steps: + a_tool_step: + tool_id: cat1 + in: + input1: some_file + in: + some_file: some_file + should_run: should_run + outputs: + inner_out: a_tool_step/out_file1 + when: $(inputs.should_run) +outputs: + outer_output: + outputSource: subworkflow/inner_out +""", + test_data=""" +some_file: + value: 1.bed + type: File +should_run: + value: false + type: raw +""", + history_id=history_id, + wait=True, + assert_ok=True, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + subworkflow_invocation_id = invocation_details["steps"][-1]["subworkflow_invocation_id"] + self.workflow_populator.wait_for_invocation_and_jobs( + history_id=history_id, workflow_id="whatever", invocation_id=subworkflow_invocation_id + ) + invocation_details = self.workflow_populator.get_invocation(subworkflow_invocation_id, step_details=True) + for step in invocation_details["steps"]: + if step["workflow_step_label"] == "a_tool_step": + assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1 + + def test_run_workflow_conditional_step_map_over_expression_tool(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: + boolean_input_files: collection +steps: +- label: param_out + tool_id: param_value_from_file + in: + input1: boolean_input_files + state: + param_type: boolean +- label: consume_expression_parameter + tool_id: cat1 + in: + input1: boolean_input_files + should_run: param_out/boolean_param + out: + out_file1: + change_datatype: txt + when: $(inputs.should_run) +test_data: + boolean_input_files: + collection_type: list + elements: + - identifier: true + content: true + - identifier: false + content: false +""", + history_id=history_id, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + for step in invocation_details["steps"]: + if step["workflow_step_label"] == "consume_expression_parameter": + skipped_jobs = [j for j in step["jobs"] if j["state"] == "skipped"] + assert len(skipped_jobs) == 1 + # also assert that change_datatype was ignored for null output + job_details = self.dataset_populator.get_job_details(skipped_jobs[0]["id"], full=True).json() + skipped_hda_id = job_details["outputs"]["out_file1"]["id"] + dataset_details = self.dataset_populator.get_history_dataset_details( + history_id=history_id, content_id=skipped_hda_id + ) + assert dataset_details["file_ext"] == "expression.json", dataset_details + + def test_run_workflow_conditional_subworkflow_step_map_over_expression_tool(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: + boolean_input_files: collection +steps: + create_list_of_boolean: + tool_id: param_value_from_file + in: + input1: boolean_input_files + state: + param_type: boolean + subworkflow: + run: + class: GalaxyWorkflow + inputs: + boolean_input_file: data + steps: + consume_expression_parameter: + tool_id: cat1 + in: + input1: boolean_input_file + out: + out_file1: + change_datatype: txt + outputs: + inner_output: + outputSource: consume_expression_parameter/out_file1 + in: + boolean_input_file: boolean_input_files + should_run: create_list_of_boolean/boolean_param + when: $(inputs.should_run) +outputs: + outer_output: + outputSource: subworkflow/inner_output +test_data: + boolean_input_files: + collection_type: list + elements: + - identifier: true + content: true + - identifier: false + content: false +""", + history_id=history_id, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + assert "outer_output" in invocation_details["output_collections"] + outer_output = invocation_details["output_collections"]["outer_output"] + outer_hdca = self.dataset_populator.get_history_collection_details( + history_id, content_id=outer_output["id"] + ) + assert outer_hdca["job_state_summary"]["all_jobs"] == 2 + assert outer_hdca["job_state_summary"]["ok"] == 1 + assert outer_hdca["job_state_summary"]["skipped"] == 1 + + def test_run_workflow_conditional_subworkflow_step_map_over_expression_tool_with_extra_nesting(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE, + test_data="""boolean_input_files: + collection_type: list + elements: + - identifier: true + content: true + - identifier: false + content: false +""", + history_id=history_id, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + outer_create_nested_id = invocation_details["output_collections"]["outer_create_nested"]["id"] + outer_create_nested = self.dataset_populator.get_history_collection_details( + history_id, content_id=outer_create_nested_id + ) + assert outer_create_nested["job_state_summary"]["all_jobs"] == 2 + assert outer_create_nested["job_state_summary"]["ok"] == 1 + assert outer_create_nested["job_state_summary"]["skipped"] == 1 + + for cat1_output in ["outer_output_1", "outer_output_2"]: + outer_output = invocation_details["output_collections"][cat1_output] + outer_hdca = self.dataset_populator.get_history_collection_details( + history_id, content_id=outer_output["id"] + ) + # You might expect 12 total jobs, 6 ok and 6 skipped, + # but because we're not actually running one branch of collection_creates_dynamic_nested + # there's no input to consume_expression_parameter. + # It's unclear if that's a problem or not ... probably not a major one, + # since we keep producing "empty" outer collections, which seems somewhat correct. + assert outer_hdca["job_state_summary"]["all_jobs"] == 6 + assert outer_hdca["job_state_summary"]["ok"] == 6 + assert outer_hdca["collection_type"] == "list:list:list" + elements = outer_hdca["elements"] + assert elements[0]["element_identifier"] == "True" + assert elements[0]["object"]["element_count"] == 3 + assert elements[1]["element_identifier"] == "False" + assert elements[1]["object"]["element_count"] == 0 + + def test_run_workflow_conditional_subworkflow_step_map_over_expression_tool_with_extra_nesting_skip_all(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE, + test_data="""boolean_input_files: + collection_type: list + elements: + - identifier: false + content: false + - identifier: also_false + content: false +""", + history_id=history_id, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + outer_create_nested_id = invocation_details["output_collections"]["outer_create_nested"]["id"] + outer_create_nested = self.dataset_populator.get_history_collection_details( + history_id, content_id=outer_create_nested_id + ) + assert outer_create_nested["job_state_summary"]["all_jobs"] == 2 + assert outer_create_nested["job_state_summary"]["skipped"] == 2 + + for cat1_output in ["outer_output_1", "outer_output_2"]: + outer_output = invocation_details["output_collections"][cat1_output] + outer_hdca = self.dataset_populator.get_history_collection_details( + history_id, content_id=outer_output["id"] + ) + assert outer_hdca["job_state_summary"]["all_jobs"] == 0 + assert outer_hdca["collection_type"] == "list:list:list" + + def test_run_workflow_conditional_step_map_over_expression_tool_pick_value(self): + with self.dataset_populator.test_history() as history_id: + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: + boolean_input_files_1: collection + boolean_input_files_2: collection +outputs: + my_output: + outputSource: pick_value/data_param +steps: +- label: param_out_1 + tool_id: param_value_from_file + in: + input1: boolean_input_files_1 + state: + param_type: boolean +- label: param_out_2 + tool_id: param_value_from_file + in: + input1: boolean_input_files_2 + state: + param_type: boolean +- label: consume_expression_parameter_1 + tool_id: cat1 + in: + input1: boolean_input_files_1 + should_run: param_out_1/boolean_param + when: $(inputs.should_run) +- label: consume_expression_parameter_2 + tool_id: cat1 + in: + input1: boolean_input_files_2 + should_run: param_out_2/boolean_param + when: $(inputs.should_run) +- label: pick_value + tool_id: pick_value + tool_state: + style_cond: + __current_case__: 2 + pick_style: first_or_error + type_cond: + __current_case__: 4 + param_type: data + pick_from: + - __index__: 0 + value: + __class__: RuntimeValue + - __index__: 1 + value: + __class__: RuntimeValue + in: + style_cond|type_cond|pick_from_0|value: + source: consume_expression_parameter_1/out_file1 + style_cond|type_cond|pick_from_1|value: + source: consume_expression_parameter_2/out_file1 +test_data: + boolean_input_files_1: + collection_type: list + elements: + - identifier: true + content: true + - identifier: false + content: false + boolean_input_files_2: + collection_type: list + elements: + - identifier: false + content: false + - identifier: true + content: true +""", + history_id=history_id, + ) + invocation_details = self.workflow_populator.get_invocation(summary.invocation_id, step_details=True) + output_collection_id = invocation_details["output_collections"]["my_output"]["id"] + hdca_details = self.dataset_populator.get_history_collection_details( + history_id=history_id, content_id=output_collection_id + ) + elements = hdca_details["elements"] + assert len(elements) == 2 + for element in elements: + content = self.dataset_populator.get_history_dataset_content( + history_id, content_id=element["object"]["id"] + ) + assert content == "True" + for step in invocation_details["steps"]: + if step["workflow_step_label"].startswith("consume_expression_parameter_"): + assert sum(1 for j in step["jobs"] if j["state"] == "skipped") == 1 + def test_run_subworkflow_simple(self) -> None: with self.dataset_populator.test_history() as history_id: summary = self._run_workflow( @@ -3096,6 +3522,12 @@ def test_cancel_new_workflow_when_history_deleted(self): self._delete(f"histories/{history_id}") invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "cancelled") + workflow_details = self._invocation_details(uploaded_workflow_id, invocation_id) + assert len(workflow_details["messages"]) == 1 + message = workflow_details["messages"][0] + assert message["history_id"] == history_id + assert message["reason"] == "history_deleted" + assert invocation_cancelled, "Workflow state is not cancelled..." @skip_without_tool("cat") @@ -3117,6 +3549,11 @@ def test_cancel_ready_workflow_when_history_deleted(self): invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "cancelled") assert invocation_cancelled, "Workflow state is not cancelled..." + workflow_details = self._invocation_details(uploaded_workflow_id, invocation_id) + assert len(workflow_details["messages"]) == 1 + message = workflow_details["messages"][0] + assert message["history_id"] == history_id + assert message["reason"] == "history_deleted" @skip_without_tool("cat") def test_workflow_pause(self): @@ -3164,6 +3601,11 @@ def test_workflow_pause_cancel(self): # Ensure the workflow eventually becomes cancelled. invocation_cancelled = self._wait_for_invocation_state(uploaded_workflow_id, invocation_id, "cancelled") + workflow_details = self._invocation_details(uploaded_workflow_id, invocation_id) + assert len(workflow_details["messages"]) == 1 + message = workflow_details["messages"][0] + assert "workflow_step_id" in message + assert message["reason"] == "cancelled_on_review" assert invocation_cancelled, "Workflow state is not cancelled..." @skip_without_tool("head") @@ -3220,6 +3662,71 @@ def test_cancel_workflow_invocation(self): invocation = self._invocation_details(uploaded_workflow_id, invocation_id) assert invocation["state"] == "cancelled" + message = invocation["messages"][0] + assert message["reason"] == "user_request" + + def test_workflow_failed_output_not_found(self, history_id): + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: [] +steps: + create_2: + tool_id: create_2 + state: + sleep_time: 0 + outputs: + out_file1: + rename: "my new name" + out_file2: + rename: "my other new name" + first_cat1: + tool_id: cat + in: + input1: create_2/does_not_exist + """, + history_id=history_id, + assert_ok=False, + wait=True, + ) + invocation = self.workflow_populator.get_invocation(summary.invocation_id) + assert invocation["state"] == "failed" + assert len(invocation["messages"]) == 1 + message = invocation["messages"][0] + assert message["reason"] == "output_not_found" + assert message["workflow_step_id"] == 1 + assert message["dependent_workflow_step_id"] == 0 + + def test_workflow_warning_workflow_output_not_found(self, history_id): + summary = self._run_workflow( + """ +class: GalaxyWorkflow +inputs: [] +steps: + create_2: + tool_id: create_2 + state: + sleep_time: 0 + outputs: + out_file1: + rename: "my new name" + out_file2: + rename: "my other new name" +outputs: + main_out: + outputSource: create_2/does_not_exist + """, + history_id=history_id, + assert_ok=False, + wait=True, + ) + invocation = self.workflow_populator.get_invocation(summary.invocation_id) + assert invocation["state"] == "scheduled" + assert len(invocation["messages"]) == 1 + message = invocation["messages"][0] + assert message["reason"] == "workflow_output_not_found" + assert "workflow_step_id" in message + assert message["output_name"] == "does_not_exist" @skip_without_tool("identifier_multiple") def test_invocation_map_over(self, history_id): diff --git a/lib/galaxy_test/base/workflow_fixtures.py b/lib/galaxy_test/base/workflow_fixtures.py index f01a3c7a13ed..0178e935dae6 100644 --- a/lib/galaxy_test/base/workflow_fixtures.py +++ b/lib/galaxy_test/base/workflow_fixtures.py @@ -994,3 +994,59 @@ file_type: bed type: File """ + + +NESTED_WORKFLOW_WITH_CONDITIONAL_SUBWORKFLOW_AND_DISCONNECTED_MAP_OVER_SOURCE = """ +class: GalaxyWorkflow +inputs: + boolean_input_files: collection +steps: + create_list_of_boolean: + tool_id: param_value_from_file + in: + input1: boolean_input_files + state: + param_type: boolean + subworkflow: + run: + class: GalaxyWorkflow + inputs: + boolean_input_file: data + steps: + create_more_inputs: + tool_id: collection_creates_dynamic_nested + consume_expression_parameter: + tool_id: cat1 + state: + input1: + $link: create_more_inputs/list_output + queries: + - input2: + $link: boolean_input_file + out: + out_file1: + change_datatype: txt + consume_expression_parameter_2: + tool_id: cat1 + state: + input1: + $link: consume_expression_parameter/out_file1 + outputs: + inner_create_nested: + outputSource: create_more_inputs/list_output + inner_output_1: + outputSource: consume_expression_parameter/out_file1 + inner_output_2: + outputSource: consume_expression_parameter_2/out_file1 + in: + boolean_input_file: boolean_input_files + should_run: create_list_of_boolean/boolean_param + when: $(inputs.should_run) +outputs: + outer_create_nested: + outputSource: subworkflow/inner_create_nested + outer_output_1: + outputSource: subworkflow/inner_output_1 + outer_output_2: + outputSource: subworkflow/inner_output_2 +""" diff --git a/pyproject.toml b/pyproject.toml index b1e7c1f6e1a3..d8b3484a6e43 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ url = "https://wheels.galaxyproject.org/simple" a2wsgi = "*" aiofiles = "*" alembic = "*" +alembic_utils = "*" apispec = "*" Babel = "*" bdbag = ">=1.6.3"