diff --git a/snappy_pipeline/workflows/wgs_sv_calling/Snakefile b/snappy_pipeline/workflows/wgs_sv_calling/Snakefile index 9a6fad843..a5a015ce6 100644 --- a/snappy_pipeline/workflows/wgs_sv_calling/Snakefile +++ b/snappy_pipeline/workflows/wgs_sv_calling/Snakefile @@ -141,24 +141,6 @@ rule wgs_sv_calling_delly2_merge_genotypes: wf.wrapper_path("delly2/germline/merge_genotypes") -rule wgs_sv_calling_delly2_reorder_vcf: - input: - unpack(wf.get_input_files("delly2", "reorder_vcf")), - output: - **wf.get_output_files("delly2", "reorder_vcf"), - threads: wf.get_resource("delly2", "reorder_vcf", "threads") - resources: - time=wf.get_resource("delly2", "reorder_vcf", "time"), - memory=wf.get_resource("delly2", "reorder_vcf", "memory"), - partition=wf.get_resource("delly2", "reorder_vcf", "partition"), - log: - wf.get_log_file("delly2", "reorder_vcf"), - params: - ped_members=wf.substep_getattr("delly2", "get_ped_members"), - wrapper: - wf.wrapper_path("delly2/germline/reorder_vcf") - - # SVTK Steps ------------------------------------------------------------------ diff --git a/snappy_pipeline/workflows/wgs_sv_calling/__init__.py b/snappy_pipeline/workflows/wgs_sv_calling/__init__.py index 386d1e1df..1f3cfdaff 100644 --- a/snappy_pipeline/workflows/wgs_sv_calling/__init__.py +++ b/snappy_pipeline/workflows/wgs_sv_calling/__init__.py @@ -191,15 +191,14 @@ class Delly2StepPart(BaseStepPart): name = "delly2" #: Actions in Delly 2 workflow - actions = ("call", "merge_calls", "genotype", "merge_genotypes", "reorder_vcf") + actions = ("call", "merge_calls", "genotype", "merge_genotypes") #: Directory infixes dir_infixes = { "call": r"{mapper,[^\.]+}.delly2.call.{library_name,[^\.]+}", - "merge_calls": r"{mapper,[^\.]+}.delly2.merge_calls", + "merge_calls": r"{mapper,[^\.]+}.delly2.merge_calls.{index_ngs_library,[^\.]+}", "genotype": r"{mapper,[^\.]+}.delly2.genotype.{library_name,[^\.]+}", - "merge_genotypes": r"{mapper,[^\.]+}.delly2.merge_genotypes", - "reorder_vcf": r"{mapper,[^\.]+}.delly2.{index_ngs_library,[^\.]+}", + "merge_genotypes": r"{mapper,[^\.]+}.delly2.{index_ngs_library,[^\.]+}", } #: Class resource usage dictionary. Key: action type (string); Value: resource (ResourceUsage). @@ -226,6 +225,10 @@ def __init__(self, parent): self.index_ngs_library_to_pedigree = OrderedDict() for sheet in self.parent.shortcut_sheets: self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree) + # Build shortcut from donor library name to pedigree + self.donor_ngs_library_to_pedigree = OrderedDict() + for sheet in self.parent.shortcut_sheets: + self.donor_ngs_library_to_pedigree.update(sheet.donor_ngs_library_to_pedigree) # Build shortcut from library name to library info self.library_name_to_library = OrderedDict() for sheet in self.parent.shortcut_sheets: @@ -244,7 +247,6 @@ def get_input_files(self, action): "merge_calls": self._get_input_files_merge_calls, "genotype": self._get_input_files_genotype, "merge_genotypes": self._get_input_files_merge_genotypes, - "reorder_vcf": self._get_input_files_reorder_vcf, } return mapping[action] @@ -256,21 +258,33 @@ def _get_input_files_call(self, wildcards): for name, ext in {"bam": ".bam", "bai": ".bam.bai"}.items(): yield name, ngs_mapping(tpl.format(ext=ext, **wildcards)) + def _get_input_files_merge_calls_or_genotype(self, wildcards, step): + assert step in ("call", "genotype") + # Create path template to per-sample call/genotype BCF + infix = self.dir_infixes[step] + infix = infix.replace(r",[^\.]+", "") + tpl = os.path.join("work", infix, "out", infix + ".bcf") + # Yield paths to pedigree's per-sample call BCF files + pedigree = self.index_ngs_library_to_pedigree[wildcards.index_ngs_library] + for donor in pedigree.donors: + if donor.dna_ngs_library: + yield tpl.format(library_name=donor.dna_ngs_library.name, **wildcards) + @listify def _get_input_files_merge_calls(self, wildcards): """Return input files for "merge_calls" action""" - infix = self.dir_infixes["call"] - infix = infix.replace(r",[^\.]+", "") - tpl = os.path.join("work", infix, "out", infix + ".bcf") - for donor in self._donors_with_dna_ngs_library(): - yield tpl.format(library_name=donor.dna_ngs_library.name, **wildcards) + yield from self._get_input_files_merge_calls_or_genotype(wildcards, "call") @dictify def _get_input_files_genotype(self, wildcards): """Return input files for "genotype" action""" - # Sites VCF file + pedigree = self.donor_ngs_library_to_pedigree[wildcards.library_name] + # Per-pedigree site BCF file infix = self.dir_infixes["merge_calls"] infix = infix.replace(r",[^\.]+", "") + infix = infix.format( + mapper=wildcards.mapper, index_ngs_library=pedigree.index.dna_ngs_library.name + ) yield "bcf", os.path.join("work", infix, "out", infix + ".bcf").format(**wildcards) # BAM files ngs_mapping = self.parent.sub_workflows["ngs_mapping"] @@ -281,19 +295,7 @@ def _get_input_files_genotype(self, wildcards): @listify def _get_input_files_merge_genotypes(self, wildcards): """Return input files for "merge_genotypes" action""" - for donor in self._donors_with_dna_ngs_library(): - infix = self.dir_infixes["genotype"] - infix = infix.replace(r",[^\.]+", "") - tpl = os.path.join("work", infix, "out", infix + ".bcf") - yield tpl.format(library_name=donor.dna_ngs_library.name, **wildcards) - - @dictify - def _get_input_files_reorder_vcf(self, wildcards): - """Return input files for "reorder_vcf" action""" - infix = self.dir_infixes["merge_genotypes"] - infix = infix.replace(r",[^\.]+", "") - tpl = os.path.join("work", infix, "out", infix + ".bcf") - yield "bcf", tpl.format(**wildcards) + yield from self._get_input_files_merge_calls_or_genotype(wildcards, "genotype") def _donors_with_dna_ngs_library(self): """Yield donors with DNA NGS library""" @@ -316,7 +318,7 @@ def get_output_files(self, action): for name, ext in zip(EXT_NAMES, EXT_VALUES): infix = self.dir_infixes[action] infix2 = infix.replace(r",[^\.]+", "") - if action != "reorder_vcf": # generate bcf files internally + if action != "merge_genotypes": # generate bcf files internally name = name.replace("vcf", "bcf") ext = ext.replace("vcf.gz", "bcf") name = name.replace("tbi", "csi") @@ -341,7 +343,7 @@ def get_resource_usage(self, action): """ # Validate action self._validate_action(action) - if action in ("merge_genotypes", "merge_calls", "reorder_vcf"): # cheap actions + if action in ("merge_genotypes", "merge_calls"): # cheap actions return self.resource_usage_dict.get("cheap_action") else: return self.resource_usage_dict.get("default") diff --git a/snappy_pipeline/workflows/wgs_sv_export/Snakefile b/snappy_pipeline/workflows/wgs_sv_export/Snakefile index b9f44764d..0b66c6d16 100644 --- a/snappy_pipeline/workflows/wgs_sv_export/Snakefile +++ b/snappy_pipeline/workflows/wgs_sv_export/Snakefile @@ -68,7 +68,7 @@ rule wgs_sv_export_write_pedigree_run: rule wgs_sv_export_varfish_annotator_annotate_svs: input: - **wf.get_input_files("varfish_annotator", "annotate"), + unpack(wf.get_input_files("varfish_annotator", "annotate")), output: **wf.get_output_files("varfish_annotator", "annotate"), threads: wf.get_resource("varfish_annotator", "annotate", "threads") diff --git a/snappy_pipeline/workflows/wgs_sv_export/__init__.py b/snappy_pipeline/workflows/wgs_sv_export/__init__.py index 10cde4f93..b7be2b352 100644 --- a/snappy_pipeline/workflows/wgs_sv_export/__init__.py +++ b/snappy_pipeline/workflows/wgs_sv_export/__init__.py @@ -2,7 +2,7 @@ """Implementation of the ``wgs_sv_export`` step. The ``wgs_sv_export`` step takes as the input the results of the ``wgs_sv_annotation`` step and -uses ``varfish-annotator-cli annotate`` commmand to create files fit for import into VarFish +uses ``varfish-annotator-cli annotate-sv`` commmand to create files fit for import into VarFish Server. ========== @@ -75,6 +75,7 @@ step_config: wgs_sv_export: path_wgs_sv_annotation: ../wgs_sv_annotation + path_wgs_sv_calling: ../wgs_sv_calling tools_ngs_mapping: null tools_wgs_sv_calling: null path_refseq_ser: REQUIRED # REQUIRED: path to RefSeq .ser file @@ -84,7 +85,7 @@ class VarfishAnnotatorAnnotateStepPart(BaseStepPart): - """Annotate VCF file using "varfish-annotator annotate".""" + """Annotate VCF file using "varfish-annotator annotate-sv".""" #: Step name name = "varfish_annotator" @@ -102,24 +103,40 @@ def __init__(self, parent): for sheet in self.parent.shortcut_sheets: self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree) - @dictify def get_input_files(self, action): """Return path to pedigree input file""" - # Validate action self._validate_action(action) - yield "ped", "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped" - tpl = ( - "output/{mapper}.{var_caller}.annotated.{index_ngs_library}/out/" - "{mapper}.{var_caller}.annotated.{index_ngs_library}" - ) - key_ext = {"vcf": ".vcf.gz", "tbi": ".vcf.gz.tbi"} - wgs_sv_annotation = self.parent.sub_workflows["wgs_sv_annotation"] - for key, ext in key_ext.items(): - yield key, wgs_sv_annotation(tpl + ext) + + @dictify + def input_function(wildcards): + tpl = "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped" + yield "ped", tpl.format(**wildcards) + if wildcards.var_caller == "popdel": + tpl = ( + "output/{mapper}.{var_caller}.annotated.{index_ngs_library}/out/" + "{mapper}.{var_caller}.annotated.{index_ngs_library}" + ) + subworkflow = self.parent.sub_workflows["wgs_sv_annotation"] + else: + tpl = ( + "output/{mapper}.{var_caller}.{index_ngs_library}/out/" + "{mapper}.{var_caller}.{index_ngs_library}" + ) + subworkflow = self.parent.sub_workflows["wgs_sv_calling"] + key_ext = { + "vcf": ".vcf.gz", + "vcf_md5": ".vcf.gz.md5", + "tbi": ".vcf.gz.tbi", + "tbi_md5": ".vcf.gz.tbi.md5", + } + for key, ext in key_ext.items(): + yield key, subworkflow(tpl.format(**wildcards) + ext) + + return input_function @dictify def get_output_files(self, action): - """Return output files for the filtration""" + """Return output files for the export""" # Validate action self._validate_action(action) prefix = ( @@ -211,8 +228,6 @@ def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir) self.register_sub_step_classes( (WritePedigreeStepPart, VarfishAnnotatorAnnotateStepPart, LinkOutStepPart) ) - # Register sub workflows - self.register_sub_workflow("wgs_sv_annotation", self.config["path_wgs_sv_annotation"]) # Copy over "tools" setting from wgs_sv_calling/ngs_mapping if not set here if not self.config["tools_ngs_mapping"]: self.config["tools_ngs_mapping"] = self.w_config["step_config"]["ngs_mapping"]["tools"][ @@ -222,6 +237,11 @@ def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir) self.config["tools_wgs_sv_calling"] = self.w_config["step_config"]["wgs_sv_calling"][ "tools" ]["dna"] + # Register sub workflows + if "popdel" in self.config["tools_wgs_sv_calling"]: + self.register_sub_workflow("wgs_sv_annotation", self.config["path_wgs_sv_annotation"]) + if "delly2" in self.config["tools_wgs_sv_calling"]: + self.register_sub_workflow("wgs_sv_calling", self.config["path_wgs_sv_calling"]) @listify def get_result_files(self): diff --git a/snappy_wrappers/wrappers/delly2/germline/merge_genotypes/wrapper.py b/snappy_wrappers/wrappers/delly2/germline/merge_genotypes/wrapper.py index cec050d87..4908e3eca 100644 --- a/snappy_wrappers/wrappers/delly2/germline/merge_genotypes/wrapper.py +++ b/snappy_wrappers/wrappers/delly2/germline/merge_genotypes/wrapper.py @@ -42,22 +42,24 @@ # If a single sample, there is no need to merge. # ``$i`` is reused from previous BCFs for-loop. if [[ $i -eq 1 ]]; then - cp $TMPDIR/cwd/1.bcf {snakemake.output.bcf} - cp $TMPDIR/cwd/1.bcf.csi {snakemake.output.bcf}".csi" + bcftools view \ + -O z \ + -o {snakemake.output.vcf} \ + $TMPDIR/cwd/1.bcf else - out=$(realpath {snakemake.output.bcf}) + out=$(realpath {snakemake.output.vcf}) pushd $TMPDIR/cwd bcftools merge \ -m id \ - -O b \ + -O z \ -o $out \ *.bcf popd - tabix -f {snakemake.output.bcf} fi + tabix -f {snakemake.output.vcf} - pushd $(dirname {snakemake.output.bcf}) - md5sum $(basename {snakemake.output.bcf}) >$(basename {snakemake.output.bcf}).md5 - md5sum $(basename {snakemake.output.bcf}).csi >$(basename {snakemake.output.bcf}).csi.md5 + pushd $(dirname {snakemake.output.vcf}) + md5sum $(basename {snakemake.output.vcf}) > $(basename {snakemake.output.vcf_md5}) + md5sum $(basename {snakemake.output.tbi}) > $(basename {snakemake.output.tbi_md5}) """ ) diff --git a/snappy_wrappers/wrappers/delly2/germline/reorder_vcf/environment.yaml b/snappy_wrappers/wrappers/delly2/germline/reorder_vcf/environment.yaml deleted file mode 120000 index 2e107ac86..000000000 --- a/snappy_wrappers/wrappers/delly2/germline/reorder_vcf/environment.yaml +++ /dev/null @@ -1 +0,0 @@ -../environment.yaml \ No newline at end of file diff --git a/snappy_wrappers/wrappers/delly2/germline/reorder_vcf/wrapper.py b/snappy_wrappers/wrappers/delly2/germline/reorder_vcf/wrapper.py deleted file mode 100644 index 9c6456fe8..000000000 --- a/snappy_wrappers/wrappers/delly2/germline/reorder_vcf/wrapper.py +++ /dev/null @@ -1,40 +0,0 @@ -# -*- coding: utf-8 -*- -"""Wrapper for running Delly2's calls tep -""" - -from snakemake.shell import shell - -__author__ = "Manuel Holtgrewe" -__email__ = "manuel.holtgrewe@bih-charite.de" - -shell( - r""" -# ----------------------------------------------------------------------------- -# Redirect stderr to log file by default and enable printing executed commands -exec &> >(tee -a "{snakemake.log}") -set -x -# ----------------------------------------------------------------------------- - -export TMPDIR=$(mktemp -d) -trap "rm -rf $TMPDIR" EXIT - -echo '{snakemake.params.ped_members}' \ -| tr ' ' '\n' \ ->$TMPDIR/samples.txt - -bcftools view \ - --samples-file $TMPDIR/samples.txt \ - --output-type u \ - {snakemake.input.bcf} \ -| bcftools view \ - --output-file {snakemake.output.vcf} \ - --output-type z \ - --include '(GT !~ "\.") && (GT ~ "1")' - -tabix -s1 -b2 -e2 -f {snakemake.output.vcf} - -pushd $(dirname {snakemake.output.vcf}) -md5sum $(basename {snakemake.output.vcf}) >$(basename {snakemake.output.vcf}).md5 -md5sum $(basename {snakemake.output.vcf}).tbi >$(basename {snakemake.output.vcf}).tbi.md5 -""" -) diff --git a/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_calling.py b/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_calling.py index 203b91243..3b7340d51 100644 --- a/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_calling.py +++ b/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_calling.py @@ -109,7 +109,7 @@ def test_delly2_step_part_get_resource_usage(wgs_sv_calling_workflow): """Tests Delly2StepPart.get_resource_usage()""" # Set actions all_actions = wgs_sv_calling_workflow.substep_getattr("delly2", "actions") - cheap_actions = ("merge_genotypes", "merge_calls", "reorder_vcf") + cheap_actions = ("merge_genotypes", "merge_calls") default_actions = [action for action in all_actions if action not in cheap_actions] # Define expected diff --git a/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_export.py b/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_export.py index f461385a1..f3e4f581b 100644 --- a/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_export.py +++ b/tests/snappy_pipeline/workflows/test_workflows_wgs_sv_export.py @@ -72,6 +72,7 @@ def wgs_sv_export_workflow( dummy_workflow.globals = { "ngs_mapping": lambda x: "NGS_MAPPING/" + x, "wgs_sv_annotation": lambda x: "WGS_SV_ANNOTATION/" + x, + "wgs_sv_calling": lambda x: "WGS_SV_CALLING/" + x, } # Construct the workflow object return WgsSvExportWorkflow(