Skip to content

Commit

Permalink
Merge pull request #1297 from mvdbeek/fix_workflow_test_input_optional
Browse files Browse the repository at this point in the history
Fix workflow test when input is optional but also workflow output
  • Loading branch information
mvdbeek committed Oct 25, 2022
2 parents 75cbd3a + 86b7893 commit 808f9b7
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 20 deletions.
75 changes: 56 additions & 19 deletions planemo/galaxy/activity.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ def _execute(ctx, config, runnable, job_path, **kwds): # noqa C901
ctx.vlog("Final invocation state is [%s]" % final_invocation_state)

if not kwds.get("no_wait"):
final_state = _wait_for_history(ctx, user_gi, history_id, polling_backoff)
final_state = _wait_for_invocation_jobs(ctx, user_gi, invocation_id, polling_backoff)
if final_state != "ok":
msg = "Failed to run workflow final history state is [%s]." % final_state
error_message = msg if not error_message else f"{error_message}. {msg}"
Expand Down Expand Up @@ -261,7 +261,7 @@ def stage_in(ctx, runnable, config, job_path, **kwds): # noqa C901

ctx.vlog("final state is %s" % final_state)
if final_state != "ok":
msg = "Failed to run job final job state is [%s]." % final_state
msg = "Failed to upload data, upload state is [%s]." % final_state
summarize_history(ctx, user_gi, history_id)
raise Exception(msg)
return job_dict, datasets, history_id
Expand Down Expand Up @@ -415,23 +415,23 @@ def get_dataset(dataset_details, filename=None):
if not output_id:
ctx.vlog("Workflow output identified without an ID (label), skipping")
continue
output_dict_value = None
is_cwl = self._runnable.type in [RunnableType.cwl_workflow, RunnableType.cwl_tool]
output_src = self.output_src(runnable_output)
if not output_src:
# Optional workflow output
ctx.vlog(f"Optional workflow output '{output_id}' not created, skipping")
outputs_dict[output_id] = None
continue
output_dataset_id = output_src["id"]
galaxy_output = self.to_galaxy_output(runnable_output)
try:
cwl_output = output_to_cwl_json(
galaxy_output,
self._get_metadata,
get_dataset,
self._get_extra_files,
pseduo_location=True,
)
except AssertionError:
# In galaxy-tool-util < 21.05 output_to_cwl_json will raise an AssertionError when the output state is not OK
# Remove with new galaxy-tool-util release.
continue
cwl_output = output_to_cwl_json(
galaxy_output,
self._get_metadata,
get_dataset,
self._get_extra_files,
pseduo_location=True,
)
output_dict_value = None
if is_cwl or output_src["src"] == "hda":
output_dict_value = cwl_output
else:
Expand Down Expand Up @@ -617,6 +617,8 @@ def output_src(self, output):
return invocation["outputs"][output.get_id()]
elif output_name in invocation["output_collections"]:
return invocation["output_collections"][output.get_id()]
elif output.is_optional():
return None
else:
raise Exception(f"Failed to find output [{output_name}] in invocation outputs [{invocation['outputs']}]")

Expand Down Expand Up @@ -732,6 +734,19 @@ def state_func():
return _wait_on_state(state_func, polling_backoff)


def _wait_for_invocation_jobs(ctx, gi, invocation_id, polling_backoff=0):
# Wait for invocation jobs to finish. Less brittle than waiting for a history to finish,
# as you could have more than one invocation in a history, or an invocation without
# steps that produce history items.

ctx.log(f"waiting for invocation {invocation_id}")

def state_func():
return _retry_on_timeouts(ctx, gi, lambda gi: gi.jobs.get_jobs(invocation_id=invocation_id))

return _wait_on_state(state_func, polling_backoff)


def _wait_for_job(gi, job_id, timeout=None):
def state_func():
return gi.jobs.show_job(job_id, full_details=True)
Expand All @@ -742,11 +757,33 @@ def state_func():
def _wait_on_state(state_func, polling_backoff=0, timeout=None):
def get_state():
response = state_func()
state = response["state"]
if str(state) not in ["running", "queued", "new", "ready"]:
return state
else:
if not isinstance(response, list):
response = [response]
if not response:
# invocation may not have any attached jobs, that's fine
return "ok"
non_terminal_states = {"running", "queued", "new", "ready", "resubmitted", "upload", "waiting"}
current_states = set(item["state"] for item in response)
current_non_terminal_states = non_terminal_states.intersection(current_states)
# Mix of "error"-ish terminal job, dataset, invocation terminal states, so we can use this for whatever we throw at it
hierarchical_fail_states = [
"error",
"paused",
"deleted",
"stopped",
"discarded",
"failed_metadata",
"cancelled",
"failed",
]
for terminal_state in hierarchical_fail_states:
if terminal_state in current_states:
# If we got here something has failed and we can return (early)
return terminal_state
if current_non_terminal_states:
return None
assert len(current_states) == 1, f"unexpected state(s) found: {current_states}"
return current_states.pop()

timeout = timeout or 60 * 60 * 24
final_state = wait_on(get_state, "state", timeout, polling_backoff)
Expand Down
8 changes: 7 additions & 1 deletion planemo/galaxy/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def register_tool_ids(tool_ids, workflow):
return list(tool_ids)


WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label"])
WorkflowOutput = namedtuple("WorkflowOutput", ["order_index", "output_name", "label", "optional"])


def remote_runnable_to_workflow_id(runnable):
Expand All @@ -165,12 +165,18 @@ def describe_outputs(runnable, gi=None):

outputs = []
for (order_index, step) in workflow["steps"].items():
optional = False
if not step.get("tool_id"):
# One of the parameter types ... need eliminate this guesswork on the Galaxy side
tool_state = json.loads(step.get("tool_state", "{}"))
optional = tool_state.get("optional", False)
step_outputs = step.get("workflow_outputs", [])
for step_output in step_outputs:
output = WorkflowOutput(
int(order_index),
step_output["output_name"],
step_output["label"],
optional,
)
outputs.append(output)
return outputs
Expand Down
6 changes: 6 additions & 0 deletions planemo/runnable.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,6 +538,9 @@ class RunnableOutput(metaclass=abc.ABCMeta):
def get_id(self):
"""An identifier that describes this output."""

def is_optional(self):
return False


class ToolOutput(RunnableOutput):
"""Implementation of RunnableOutput corresponding to Galaxy tool outputs."""
Expand All @@ -558,6 +561,9 @@ def __init__(self, workflow_output: WorkflowOutput) -> None:
def get_id(self) -> Optional[str]:
return self._workflow_output.label

def is_optional(self):
return self.workflow_output.optional

@property
def workflow_output(self):
return self._workflow_output
Expand Down
2 changes: 2 additions & 0 deletions tests/data/wf16_optional_input_output_label-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- doc: Test optional input workflow output
job: {}
38 changes: 38 additions & 0 deletions tests/data/wf16_optional_input_output_label.ga
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
{
"a_galaxy_workflow": "true",
"annotation": "",
"format-version": "0.1",
"name": "optional_input_label",
"steps": {
"0": {
"annotation": "",
"content_id": null,
"errors": null,
"id": 0,
"input_connections": {},
"inputs": [],
"label": null,
"name": "Input dataset",
"outputs": [],
"position": {
"left": 0,
"top": 0.0
},
"tool_id": null,
"tool_state": "{\"optional\": true, \"tag\": \"\"}",
"tool_version": null,
"type": "data_input",
"uuid": "e6301247-43c4-452d-a3d1-8e140c3945fd",
"workflow_outputs": [
{
"label": "optional_output",
"output_name": "output",
"uuid": "6b333ea9-6c5c-4823-be37-be9a36112c39"
}
]
}
},
"tags": [],
"uuid": "25f7fecd-45f1-4ece-93a9-86e085e02fc3",
"version": 1
}
9 changes: 9 additions & 0 deletions tests/test_cmd_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,12 @@ def append_profile_argument_if_needed(self, command):
command += ["--database_type", database_type]

return command

@skip_if_environ("PLANEMO_SKIP_GALAXY_TESTS")
def test_workflow_with_optional_input_output_not_provided(self):
with self._isolate():
test_artifact = os.path.join(TEST_DATA_DIR, "wf16_optional_input_output_label.ga")
test_command = self._test_command()
test_command = self.append_profile_argument_if_needed(test_command)
test_command.append(test_artifact)
self._check_exit_code(test_command, exit_code=0)

0 comments on commit 808f9b7

Please sign in to comment.