Skip to content

Commit

Permalink
Add simple conditional support. Need to be improved
Browse files Browse the repository at this point in the history
  • Loading branch information
michael-kotliar committed Jan 20, 2021
1 parent 063a1a0 commit 260a6d9
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cwl_airflow/extensions/operators/cwljobgatherer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def __init__(
task_id,
*args, **kwargs
):
super().__init__(task_id=task_id, *args, **kwargs)
super().__init__(task_id=task_id, trigger_rule="none_failed", *args, **kwargs) # change default trigger_rule as the upstream can be skipped


def execute(self, context):
Expand Down
10 changes: 8 additions & 2 deletions cwl_airflow/extensions/operators/cwlstepoperator.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#! /usr/bin/env python3
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.exceptions import AirflowSkipException
from airflow.models.xcom import XCOM_RETURN_KEY

from cwl_airflow.utilities.cwl import (
execute_workflow_step,
Expand All @@ -21,7 +23,7 @@ def __init__(
task_id,
*args, **kwargs
):
super().__init__(task_id=task_id, *args, **kwargs)
super().__init__(task_id=task_id, trigger_rule="none_failed", *args, **kwargs) # change default trigger_rule as the upstream can be skipped


def execute(self, context):
Expand All @@ -35,13 +37,17 @@ def execute(self, context):
post_status(context)

self.job_data = collect_reports(context) # we need it also in "on_kill"
_, step_report = execute_workflow_step(
_, step_report, skipped = execute_workflow_step(
workflow=context["dag"].workflow,
task_id=self.task_id,
job_data=self.job_data,
cwl_args=context["dag"].default_args["cwl"]
)

if skipped:
self.xcom_push(context, XCOM_RETURN_KEY, step_report) # need to save empty report before raising exception
raise AirflowSkipException("Skip workflow step execution") # to mark it as skipped for Airflow

return step_report


Expand Down
75 changes: 55 additions & 20 deletions cwl_airflow/utilities/cwl.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
RuntimeContext
)
from cwltool.process import relocateOutputs
from cwltool.expression import do_eval
from cwltool.load_tool import (
load_tool,
jobloaderctx
Expand Down Expand Up @@ -531,6 +532,28 @@ def kill_containers(containers):
logging.error(f"Failed to kill container. \n {err}")


def need_to_run(workflow_data, job_data, task_id):
"""
If step selected by task_id has field "when" then decide
whether it should be run or not based on the evaluation of
this field. Otherwise return True (run the step)
"""

selected_step = list(get_items(workflow_data.tool["steps"], task_id))[0][1]
if "when" in selected_step:
return do_eval(
ex=selected_step["when"],
jobinput=job_data,
requirements=workflow_data.requirements,
outdir=None,
tmpdir=None,
resources={},
timeout=120 # harcoded to 120, because default 30 sec might not be enough when running from Docker
)
else:
return True


def execute_workflow_step(
workflow,
task_id,
Expand All @@ -544,7 +567,9 @@ def execute_workflow_step(
and "task_id". "cwl_args" can be used to update default parameters
used for loading and runtime contexts. Exports json file with the
execution results. Removes temporary data if workflow step has failed,
unless "remove_tmp_folder" is set to False.
unless "remove_tmp_folder" is set to False. If the step was evaluated
as the one that need to be skipped, the output "skipped" will set to True
and the step_report file will include "nulls"
"""

cwl_args = {} if cwl_args is None else cwl_args
Expand Down Expand Up @@ -578,30 +603,37 @@ def execute_workflow_step(
location=workflow_step_path
)

_stderr = sys.stderr # to trick the logger
sys.stderr = sys.__stderr__
step_outputs, step_status = executor(
slow_cwl_load(
workflow=workflow_step_path,
cwl_args=default_cwl_args),
job_data,
RuntimeContext(default_cwl_args)
workflow_data = slow_cwl_load(
workflow=workflow_step_path,
cwl_args=default_cwl_args
)
sys.stderr = _stderr

skipped = True
step_outputs = {output_id: None for output_id, _ in get_items(workflow_data.tool["outputs"])}
if need_to_run(workflow_data, job_data, task_id):
skipped = False
_stderr = sys.stderr # to trick the logger
sys.stderr = sys.__stderr__
step_outputs, step_status = executor(
workflow_data,
job_data,
RuntimeContext(default_cwl_args)
)
sys.stderr = _stderr

if step_status != "success":
if remove_tmp_folder:
logging.info(f"Removing workflow step temporary data:\n - {step_cache_folder}\n - {step_outputs_folder}")
shutil.rmtree(step_cache_folder, ignore_errors=False)
shutil.rmtree(step_outputs_folder, ignore_errors=False)
raise ValueError("Failed to run workflow step")
if step_status != "success":
if remove_tmp_folder:
logging.info(f"Removing workflow step temporary data:\n - {step_cache_folder}\n - {step_outputs_folder}")
shutil.rmtree(step_cache_folder, ignore_errors=False)
shutil.rmtree(step_outputs_folder, ignore_errors=False)
raise ValueError("Failed to run workflow step")

# To remove "http://commonwl.org/cwltool#generation": 0 (copied from cwltool)
visit_class(step_outputs, ("File",), MutationManager().unset_generation)
# To remove "http://commonwl.org/cwltool#generation": 0 (copied from cwltool)
visit_class(step_outputs, ("File",), MutationManager().unset_generation)

dump_json(step_outputs, step_report)

return step_outputs, step_report
return step_outputs, step_report, skipped


def get_temp_folders(task_id, job_data):
Expand Down Expand Up @@ -803,9 +835,12 @@ def fast_cwl_step_load(workflow, target_id, cwl_args=None, location=None):
step_in_source_with_step_id = step_in_source.replace("/", "_") # to include both step name and id

# Check if it should be assumed optional (default field is present)
# Also if we see "pickValue" we should add "null" too, but it might
# be not the best solution, as we are not sure whether all of the
# items from "source" should be optional or only some of them.
# NOTE: consider also checking if upstream step had scatter, so the
# output type should become array based on the scatter parameters
if "default" in step_in:
if "default" in step_in or "pickValue" in step_in:
upstream_step_output_type = ["null", upstream_step_output["type"]]
else:
upstream_step_output_type = upstream_step_output["type"]
Expand Down
7 changes: 7 additions & 0 deletions tests/data/jobs/sort-bedgraph-step-conditional-false.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"bam_to_bedgraph_genome_coverage_file": {
"class": "File",
"location": "../inputs/chr4_100_mapped_reads.bedGraph"
},
"sort_bedgraph": false
}
7 changes: 7 additions & 0 deletions tests/data/jobs/sort-bedgraph-step-conditional-true.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"bam_to_bedgraph_genome_coverage_file": {
"class": "File",
"location": "../inputs/chr4_100_mapped_reads.bedGraph"
},
"sort_bedgraph": true
}
181 changes: 181 additions & 0 deletions tests/data/workflows/bam-bedgraph-bigwig-conditional.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
cwlVersion: v1.2.0-dev4
class: Workflow


requirements:
- class: StepInputExpressionRequirement
- class: InlineJavascriptRequirement


inputs:

bam_file:
type: File
doc: "Input BAM file, sorted by coordinates"

chrom_length_file:
type: File
doc: "Tab delimited chromosome length file: <chromName><TAB><chromSize>"

scale:
type: float?
doc: "Coefficient to scale the genome coverage by a constant factor"

mapped_reads_number:
type: int?
doc: |
Parameter to calculate scale as 1000000/mapped_reads_number. Ignored by bedtools-genomecov.cwl in
bam_to_bedgraph step if scale is provided

sort_bedgraph:
type: boolean?
doc: "Sort bedGraph file"

pairchip:
type: boolean?
doc: "Enable paired-end genome coverage calculation"

fragment_size:
type: int?
doc: "Set fixed fragment size for genome coverage calculation"

strand:
type: string?
doc: "Calculate genome coverage of intervals from a specific strand"

bigwig_filename:
type: string?
doc: "Output filename for generated bigWig"

bedgraph_filename:
type: string?
doc: "Output filename for generated bedGraph"

split:
type: boolean?
doc: "Calculate genome coverage for each part of the splitted by 'N' and 'D' read"

dutp:
type: boolean?
doc: "Change strand af the mate read, so both reads come from the same strand"


outputs:

bigwig_file:
type: File
outputSource: sorted_bedgraph_to_bigwig/bigwig_file
doc: "Output bigWig file"

bedgraph_file:
type: File
outputSource: sort_bedgraph/sorted_file
label: "bedGraph output file"
doc: "Output bedGraph file"


steps:

bam_to_bedgraph:
run: ../tools/bedtools-genomecov.cwl
in:
input_file: bam_file
depth:
default: "-bg"
split:
source: split
valueFrom: |
${
if (self == null){
return true;
} else {
return self;
}
}
output_filename: bedgraph_filename
pairchip: pairchip
fragment_size: fragment_size
scale: scale
mapped_reads_number: mapped_reads_number
strand: strand
du: dutp
out: [genome_coverage_file]

sort_bedgraph:
run: ../tools/linux-sort.cwl
in:
unsorted_file: bam_to_bedgraph/genome_coverage_file
sort_bedgraph: sort_bedgraph
key:
default: ["1,1","2,2n"]
when: $(inputs.sort_bedgraph)
out: [sorted_file]

sorted_bedgraph_to_bigwig:
run: ../tools/ucsc-bedgraphtobigwig.cwl
in:
bedgraph_file: sort_bedgraph/sorted_file
chrom_length_file: chrom_length_file
output_filename: bigwig_filename
out: [bigwig_file]


$namespaces:
s: http://schema.org/

$schemas:
- https://schema.org/version/latest/schema.rdf

s:name: "bam-bedgraph-bigwig"
s:downloadUrl: https://raw.githubusercontent.com/Barski-lab/workflows/master/tools/bam-bedgraph-bigwig.cwl
s:codeRepository: https://github.com/Barski-lab/workflows
s:license: http://www.apache.org/licenses/LICENSE-2.0

s:isPartOf:
class: s:CreativeWork
s:name: Common Workflow Language
s:url: http://commonwl.org/

s:creator:
- class: s:Organization
s:legalName: "Cincinnati Children's Hospital Medical Center"
s:location:
- class: s:PostalAddress
s:addressCountry: "USA"
s:addressLocality: "Cincinnati"
s:addressRegion: "OH"
s:postalCode: "45229"
s:streetAddress: "3333 Burnet Ave"
s:telephone: "+1(513)636-4200"
s:logo: "https://www.cincinnatichildrens.org/-/media/cincinnati%20childrens/global%20shared/childrens-logo-new.png"
s:department:
- class: s:Organization
s:legalName: "Allergy and Immunology"
s:department:
- class: s:Organization
s:legalName: "Barski Research Lab"
s:member:
- class: s:Person
s:name: Michael Kotliar
s:email: mailto:misha.kotliar@gmail.com
s:sameAs:
- id: http://orcid.org/0000-0002-6486-3898


doc: |
Workflow converts input BAM file into bigWig and bedGraph files.

Input BAM file should be sorted by coordinates (required by `bam_to_bedgraph` step).

If `split` input is not provided use true by default. Default logic is implemented in `valueFrom` field of `split`
input inside `bam_to_bedgraph` step to avoid possible bug in cwltool with setting default values for workflow inputs.

`scale` has higher priority over the `mapped_reads_number`. The last one is used to calculate `-scale` parameter for
`bedtools genomecov` (step `bam_to_bedgraph`) only in a case when input `scale` is not provided. All logic is
implemented inside `bedtools-genomecov.cwl`.

`bigwig_filename` defines the output name only for generated bigWig file. `bedgraph_filename` defines the output name
for generated bedGraph file and can influence on generated bigWig filename in case when `bigwig_filename` is not provided.

All workflow inputs and outputs don't have `format` field to avoid format incompatibility errors when workflow is used
as subworkflow.

0 comments on commit 260a6d9

Please sign in to comment.