Skip to content

Commit

Permalink
Quick fix to support scatter
Browse files Browse the repository at this point in the history
Doesn't run jobs in parallel. Based on internal
cwltool functionality
  • Loading branch information
michael-kotliar committed Oct 13, 2020
1 parent 26d636c commit c39befa
Showing 1 changed file with 12 additions and 1 deletion.
13 changes: 12 additions & 1 deletion cwl_airflow/utilities/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,8 @@ def fast_cwl_step_load(workflow, target_id, cwl_args=None, location=None):
selected by "target_id" from the parsed "workflow". Other steps
are removed. Workflow inputs and outputs are updated based on
source fields of "in" and "out" from the selected workflow step.
If selected step includes "scatter" field all output types will
be transformed to the array of items of the same type.
IDs of updated workflow inputs and outputs as well as IDs of
correspondent "source" fields also include step id separated by
underscore. All other fields remain unchanged.
Expand Down Expand Up @@ -670,9 +672,18 @@ def fast_cwl_step_load(workflow, target_id, cwl_args=None, location=None):
get_short_id(step_out, only_id=True)
))[0][1]
step_out_with_step_id = step_out.replace("/", "_") # to include both step name and id

if "scatter" in selected_step: # in case of scatter, wrap all outputs to arrays
selected_step_output_type = {
"type": "array",
"items": selected_step_output["type"]
}
else:
selected_step_output_type = selected_step_output["type"]

workflow_outputs.append({
"id": step_out_with_step_id,
"type": selected_step_output["type"],
"type": selected_step_output_type,
"outputSource": step_out
})

Expand Down

0 comments on commit c39befa

Please sign in to comment.