Skip to content

Commit

Permalink
Make delly2 SV calls per-family only (#188)
Browse files Browse the repository at this point in the history
  • Loading branch information
holtgrewe committed Sep 6, 2022
1 parent dd4b8b2 commit 62d6e24
Show file tree
Hide file tree
Showing 11 changed files with 171 additions and 158 deletions.
18 changes: 0 additions & 18 deletions snappy_pipeline/workflows/wgs_sv_calling/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -141,24 +141,6 @@ rule wgs_sv_calling_delly2_merge_genotypes:
wf.wrapper_path("delly2/germline/merge_genotypes")


rule wgs_sv_calling_delly2_reorder_vcf:
input:
unpack(wf.get_input_files("delly2", "reorder_vcf")),
output:
**wf.get_output_files("delly2", "reorder_vcf"),
threads: wf.get_resource("delly2", "reorder_vcf", "threads")
resources:
time=wf.get_resource("delly2", "reorder_vcf", "time"),
memory=wf.get_resource("delly2", "reorder_vcf", "memory"),
partition=wf.get_resource("delly2", "reorder_vcf", "partition"),
log:
wf.get_log_file("delly2", "reorder_vcf"),
params:
ped_members=wf.substep_getattr("delly2", "get_ped_members"),
wrapper:
wf.wrapper_path("delly2/germline/reorder_vcf")


# SVTK Steps ------------------------------------------------------------------


Expand Down
54 changes: 28 additions & 26 deletions snappy_pipeline/workflows/wgs_sv_calling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,15 +191,14 @@ class Delly2StepPart(BaseStepPart):
name = "delly2"

#: Actions in Delly 2 workflow
actions = ("call", "merge_calls", "genotype", "merge_genotypes", "reorder_vcf")
actions = ("call", "merge_calls", "genotype", "merge_genotypes")

#: Directory infixes
dir_infixes = {
"call": r"{mapper,[^\.]+}.delly2.call.{library_name,[^\.]+}",
"merge_calls": r"{mapper,[^\.]+}.delly2.merge_calls",
"merge_calls": r"{mapper,[^\.]+}.delly2.merge_calls.{index_ngs_library,[^\.]+}",
"genotype": r"{mapper,[^\.]+}.delly2.genotype.{library_name,[^\.]+}",
"merge_genotypes": r"{mapper,[^\.]+}.delly2.merge_genotypes",
"reorder_vcf": r"{mapper,[^\.]+}.delly2.{index_ngs_library,[^\.]+}",
"merge_genotypes": r"{mapper,[^\.]+}.delly2.{index_ngs_library,[^\.]+}",
}

#: Class resource usage dictionary. Key: action type (string); Value: resource (ResourceUsage).
Expand All @@ -226,6 +225,10 @@ def __init__(self, parent):
self.index_ngs_library_to_pedigree = OrderedDict()
for sheet in self.parent.shortcut_sheets:
self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree)
# Build shortcut from donor library name to pedigree
self.donor_ngs_library_to_pedigree = OrderedDict()
for sheet in self.parent.shortcut_sheets:
self.donor_ngs_library_to_pedigree.update(sheet.donor_ngs_library_to_pedigree)
# Build shortcut from library name to library info
self.library_name_to_library = OrderedDict()
for sheet in self.parent.shortcut_sheets:
Expand All @@ -244,7 +247,6 @@ def get_input_files(self, action):
"merge_calls": self._get_input_files_merge_calls,
"genotype": self._get_input_files_genotype,
"merge_genotypes": self._get_input_files_merge_genotypes,
"reorder_vcf": self._get_input_files_reorder_vcf,
}
return mapping[action]

Expand All @@ -256,21 +258,33 @@ def _get_input_files_call(self, wildcards):
for name, ext in {"bam": ".bam", "bai": ".bam.bai"}.items():
yield name, ngs_mapping(tpl.format(ext=ext, **wildcards))

def _get_input_files_merge_calls_or_genotype(self, wildcards, step):
assert step in ("call", "genotype")
# Create path template to per-sample call/genotype BCF
infix = self.dir_infixes[step]
infix = infix.replace(r",[^\.]+", "")
tpl = os.path.join("work", infix, "out", infix + ".bcf")
# Yield paths to pedigree's per-sample call BCF files
pedigree = self.index_ngs_library_to_pedigree[wildcards.index_ngs_library]
for donor in pedigree.donors:
if donor.dna_ngs_library:
yield tpl.format(library_name=donor.dna_ngs_library.name, **wildcards)

@listify
def _get_input_files_merge_calls(self, wildcards):
"""Return input files for "merge_calls" action"""
infix = self.dir_infixes["call"]
infix = infix.replace(r",[^\.]+", "")
tpl = os.path.join("work", infix, "out", infix + ".bcf")
for donor in self._donors_with_dna_ngs_library():
yield tpl.format(library_name=donor.dna_ngs_library.name, **wildcards)
yield from self._get_input_files_merge_calls_or_genotype(wildcards, "call")

@dictify
def _get_input_files_genotype(self, wildcards):
"""Return input files for "genotype" action"""
# Sites VCF file
pedigree = self.donor_ngs_library_to_pedigree[wildcards.library_name]
# Per-pedigree site BCF file
infix = self.dir_infixes["merge_calls"]
infix = infix.replace(r",[^\.]+", "")
infix = infix.format(
mapper=wildcards.mapper, index_ngs_library=pedigree.index.dna_ngs_library.name
)
yield "bcf", os.path.join("work", infix, "out", infix + ".bcf").format(**wildcards)
# BAM files
ngs_mapping = self.parent.sub_workflows["ngs_mapping"]
Expand All @@ -281,19 +295,7 @@ def _get_input_files_genotype(self, wildcards):
@listify
def _get_input_files_merge_genotypes(self, wildcards):
"""Return input files for "merge_genotypes" action"""
for donor in self._donors_with_dna_ngs_library():
infix = self.dir_infixes["genotype"]
infix = infix.replace(r",[^\.]+", "")
tpl = os.path.join("work", infix, "out", infix + ".bcf")
yield tpl.format(library_name=donor.dna_ngs_library.name, **wildcards)

@dictify
def _get_input_files_reorder_vcf(self, wildcards):
"""Return input files for "reorder_vcf" action"""
infix = self.dir_infixes["merge_genotypes"]
infix = infix.replace(r",[^\.]+", "")
tpl = os.path.join("work", infix, "out", infix + ".bcf")
yield "bcf", tpl.format(**wildcards)
yield from self._get_input_files_merge_calls_or_genotype(wildcards, "genotype")

def _donors_with_dna_ngs_library(self):
"""Yield donors with DNA NGS library"""
Expand All @@ -316,7 +318,7 @@ def get_output_files(self, action):
for name, ext in zip(EXT_NAMES, EXT_VALUES):
infix = self.dir_infixes[action]
infix2 = infix.replace(r",[^\.]+", "")
if action != "reorder_vcf": # generate bcf files internally
if action != "merge_genotypes": # generate bcf files internally
name = name.replace("vcf", "bcf")
ext = ext.replace("vcf.gz", "bcf")
name = name.replace("tbi", "csi")
Expand All @@ -341,7 +343,7 @@ def get_resource_usage(self, action):
"""
# Validate action
self._validate_action(action)
if action in ("merge_genotypes", "merge_calls", "reorder_vcf"): # cheap actions
if action in ("merge_genotypes", "merge_calls"): # cheap actions
return self.resource_usage_dict.get("cheap_action")
else:
return self.resource_usage_dict.get("default")
Expand Down
2 changes: 1 addition & 1 deletion snappy_pipeline/workflows/wgs_sv_export/Snakefile
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ rule wgs_sv_export_write_pedigree_run:

rule wgs_sv_export_varfish_annotator_annotate_svs:
input:
**wf.get_input_files("varfish_annotator", "annotate"),
unpack(wf.get_input_files("varfish_annotator", "annotate")),
output:
**wf.get_output_files("varfish_annotator", "annotate"),
threads: wf.get_resource("varfish_annotator", "annotate", "threads")
Expand Down
52 changes: 36 additions & 16 deletions snappy_pipeline/workflows/wgs_sv_export/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"""Implementation of the ``wgs_sv_export`` step.
The ``wgs_sv_export`` step takes as the input the results of the ``wgs_sv_annotation`` step and
uses ``varfish-annotator-cli annotate`` commmand to create files fit for import into VarFish
uses ``varfish-annotator-cli annotate-sv`` commmand to create files fit for import into VarFish
Server.
==========
Expand Down Expand Up @@ -75,6 +75,7 @@
step_config:
wgs_sv_export:
path_wgs_sv_annotation: ../wgs_sv_annotation
path_wgs_sv_calling: ../wgs_sv_calling
tools_ngs_mapping: null
tools_wgs_sv_calling: null
path_refseq_ser: REQUIRED # REQUIRED: path to RefSeq .ser file
Expand All @@ -84,7 +85,7 @@


class VarfishAnnotatorAnnotateStepPart(BaseStepPart):
"""Annotate VCF file using "varfish-annotator annotate"."""
"""Annotate VCF file using "varfish-annotator annotate-sv"."""

#: Step name
name = "varfish_annotator"
Expand All @@ -102,24 +103,40 @@ def __init__(self, parent):
for sheet in self.parent.shortcut_sheets:
self.index_ngs_library_to_pedigree.update(sheet.index_ngs_library_to_pedigree)

@dictify
def get_input_files(self, action):
"""Return path to pedigree input file"""
# Validate action
self._validate_action(action)
yield "ped", "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"
tpl = (
"output/{mapper}.{var_caller}.annotated.{index_ngs_library}/out/"
"{mapper}.{var_caller}.annotated.{index_ngs_library}"
)
key_ext = {"vcf": ".vcf.gz", "tbi": ".vcf.gz.tbi"}
wgs_sv_annotation = self.parent.sub_workflows["wgs_sv_annotation"]
for key, ext in key_ext.items():
yield key, wgs_sv_annotation(tpl + ext)

@dictify
def input_function(wildcards):
tpl = "work/write_pedigree.{index_ngs_library}/out/{index_ngs_library}.ped"
yield "ped", tpl.format(**wildcards)
if wildcards.var_caller == "popdel":
tpl = (
"output/{mapper}.{var_caller}.annotated.{index_ngs_library}/out/"
"{mapper}.{var_caller}.annotated.{index_ngs_library}"
)
subworkflow = self.parent.sub_workflows["wgs_sv_annotation"]
else:
tpl = (
"output/{mapper}.{var_caller}.{index_ngs_library}/out/"
"{mapper}.{var_caller}.{index_ngs_library}"
)
subworkflow = self.parent.sub_workflows["wgs_sv_calling"]
key_ext = {
"vcf": ".vcf.gz",
"vcf_md5": ".vcf.gz.md5",
"tbi": ".vcf.gz.tbi",
"tbi_md5": ".vcf.gz.tbi.md5",
}
for key, ext in key_ext.items():
yield key, subworkflow(tpl.format(**wildcards) + ext)

return input_function

@dictify
def get_output_files(self, action):
"""Return output files for the filtration"""
"""Return output files for the export"""
# Validate action
self._validate_action(action)
prefix = (
Expand Down Expand Up @@ -211,8 +228,6 @@ def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir)
self.register_sub_step_classes(
(WritePedigreeStepPart, VarfishAnnotatorAnnotateStepPart, LinkOutStepPart)
)
# Register sub workflows
self.register_sub_workflow("wgs_sv_annotation", self.config["path_wgs_sv_annotation"])
# Copy over "tools" setting from wgs_sv_calling/ngs_mapping if not set here
if not self.config["tools_ngs_mapping"]:
self.config["tools_ngs_mapping"] = self.w_config["step_config"]["ngs_mapping"]["tools"][
Expand All @@ -222,6 +237,11 @@ def __init__(self, workflow, config, config_lookup_paths, config_paths, workdir)
self.config["tools_wgs_sv_calling"] = self.w_config["step_config"]["wgs_sv_calling"][
"tools"
]["dna"]
# Register sub workflows
if "popdel" in self.config["tools_wgs_sv_calling"]:
self.register_sub_workflow("wgs_sv_annotation", self.config["path_wgs_sv_annotation"])
if "delly2" in self.config["tools_wgs_sv_calling"]:
self.register_sub_workflow("wgs_sv_calling", self.config["path_wgs_sv_calling"])

@listify
def get_result_files(self):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,22 +42,24 @@
# If a single sample, there is no need to merge.
# ``$i`` is reused from previous BCFs for-loop.
if [[ $i -eq 1 ]]; then
cp $TMPDIR/cwd/1.bcf {snakemake.output.bcf}
cp $TMPDIR/cwd/1.bcf.csi {snakemake.output.bcf}".csi"
bcftools view \
-O z \
-o {snakemake.output.vcf} \
$TMPDIR/cwd/1.bcf
else
out=$(realpath {snakemake.output.bcf})
out=$(realpath {snakemake.output.vcf})
pushd $TMPDIR/cwd
bcftools merge \
-m id \
-O b \
-O z \
-o $out \
*.bcf
popd
tabix -f {snakemake.output.bcf}
fi
tabix -f {snakemake.output.vcf}
pushd $(dirname {snakemake.output.bcf})
md5sum $(basename {snakemake.output.bcf}) >$(basename {snakemake.output.bcf}).md5
md5sum $(basename {snakemake.output.bcf}).csi >$(basename {snakemake.output.bcf}).csi.md5
pushd $(dirname {snakemake.output.vcf})
md5sum $(basename {snakemake.output.vcf}) > $(basename {snakemake.output.vcf_md5})
md5sum $(basename {snakemake.output.tbi}) > $(basename {snakemake.output.tbi_md5})
"""
)

This file was deleted.

40 changes: 0 additions & 40 deletions snappy_wrappers/wrappers/delly2/germline/reorder_vcf/wrapper.py

This file was deleted.

10 changes: 9 additions & 1 deletion tests/snappy_pipeline/workflows/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

from collections import namedtuple
import io
import os
import random
import textwrap
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

from biomedsheets.io_tsv import read_germline_tsv_sheet
from biomedsheets.shortcuts import GenericSampleSheet, GermlineCaseSheet
Expand All @@ -16,6 +17,13 @@
from snappy_pipeline.workflows.abstract import BaseStep


@pytest.fixture(autouse=True)
def mock_settings_env_vars():
"""For tests, we want to have fake use medium partition."""
with patch.dict(os.environ, {"SNAPPY_PIPELINE_PARTITION": "medium"}):
yield


@pytest.fixture
def dummy_config():
"""Return dummy configuration OrderedDicts"""
Expand Down

0 comments on commit 62d6e24

Please sign in to comment.