Skip to content

Commit

Permalink
Fixes for handling delayed steps within subworkflows.
Browse files Browse the repository at this point in the history
With new test case to verify this fix is needed. The previous test case that was verifying collection mapping and rescheduling of subworkflows turned out to really be testing the connections with respect to the outer workflow since only the last step of the subworkflow required rescheduling iterations. This newer test case verifies the connections within the subworkflow also - in addition to re-testing the connections and mapping in the outer workflow.
  • Loading branch information
jmchilton committed Dec 6, 2017
1 parent 4d30b4f commit b1d6c01
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 2 deletions.
7 changes: 6 additions & 1 deletion lib/galaxy/workflow/run.py
Expand Up @@ -366,7 +366,12 @@ def replacement_for_connection(self, connection, is_data=True):
def get_replacement_workflow_output(self, workflow_output):
step = workflow_output.workflow_step
output_name = workflow_output.output_name
return self.outputs[step.id][output_name]
step_outputs = self.outputs[step.id]
if step_outputs is STEP_OUTPUT_DELAYED:
delayed_why = "depends on workflow output [%s] but that output has not been created yet" % output_name
raise modules.DelayedWorkflowEvaluation(why=delayed_why)
else:
return step_outputs[output_name]

def set_outputs_for_input(self, invocation_step, outputs=None):
step = invocation_step.workflow_step
Expand Down
72 changes: 71 additions & 1 deletion test/api/test_workflows.py
Expand Up @@ -1165,7 +1165,11 @@ def test_workflow_run_input_mapping_with_subworkflows(self):
@skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_subworkflow_recover_mapping(self):
def test_subworkflow_recover_mapping_1(self):
# This test case tests an outer workflow continues to scheduling and handle
# collection mapping properly after the last step of a subworkflow requires delayed
# evaluation. Testing rescheduling and propagating connections within a subworkflow
# is handled by the next test case.
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
Expand Down Expand Up @@ -1211,6 +1215,72 @@ def test_subworkflow_recover_mapping(self):
input1:
$link: split#output
test_data:
outer_input:
value: 1.bed
type: File
""", history_id=history_id, wait=True)
self.assertEqual("chr16\t142908\t143003\tCCDS10397.1_cds_0_0_chr16_142909_f\t0\t+\nchr5\t131424298\t131424460\tCCDS4149.1_cds_0_0_chr5_131424299_f\t0\t+\n", self.dataset_populator.get_history_dataset_content(history_id))

@skip_without_tool("cat_list")
@skip_without_tool("random_lines1")
@skip_without_tool("split")
def test_subworkflow_recover_mapping_2(self):
# Like the above test case, this test case tests an outer workflow continues to
# schedule and handle collection mapping properly after a subworkflow needs to be
# delayed, but this also tests recovering and handling scheduling within the subworkflow
# since the delayed step (split) isn't the last step of the subworkflow.
with self.dataset_populator.test_history() as history_id:
self._run_jobs("""
class: GalaxyWorkflow
inputs:
- id: outer_input
outputs:
- id: outer_output
source: second_cat#out_file1
steps:
- tool_id: cat1
label: first_cat
state:
input1:
$link: outer_input
- run:
class: GalaxyWorkflow
inputs:
- id: inner_input
outputs:
- id: workflow_output
source: inner_cat#out_file1
steps:
- tool_id: random_lines1
label: random_lines
state:
num_lines: 2
input:
$link: inner_input
seed_source:
seed_source_selector: set_seed
seed: asdf
- tool_id: split
label: split
state:
input1:
$link: random_lines#out_file1
- tool_id: cat1
label: inner_cat
state:
input1:
$link: split#output
label: nested_workflow
connect:
inner_input: first_cat#out_file1
- tool_id: cat_list
label: second_cat
state:
input1:
$link: nested_workflow#workflow_output
test_data:
outer_input:
value: 1.bed
Expand Down

0 comments on commit b1d6c01

Please sign in to comment.