Skip to content

Commit

Permalink
WIP: implementing case mode...
Browse files Browse the repository at this point in the history
  • Loading branch information
eudesbarbosa committed Nov 3, 2021
1 parent 7257f85 commit 8910805
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 36 deletions.
147 changes: 142 additions & 5 deletions snappy_pipeline/workflows/targeted_seq_cnv_calling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@
- ``xhmm``
"""

from collections import OrderedDict
from collections import OrderedDict, defaultdict
import os
import re

Expand Down Expand Up @@ -76,6 +75,10 @@
# Set the max number of samples used in the CNV calling model.
# If None is set, it will use all samples in the model. Recommended value: XXX
model_max_size: null
# Set the offset for max number of samples used in the CNV calling model.
# Example: if max is 100 and offset is 50, as the number of samples available grow the max in
# the models will be: 100, 150, 200, 250, ...
model_max_size_offset: null
# Path to the ngs_mapping step.
path_ngs_mapping: ../ngs_mapping
Expand Down Expand Up @@ -597,16 +600,40 @@ def _get_input_files_annotate_gc(wildcards):

@dictify
def _get_input_files_filter_intervals(self, wildcards):
"""Yield input files for rule ``targeted_seq_cnv_calling_gcnv_filter_intervals``.
Coverage files need to be generated for all samples for both CASE and COHORT mode.
Nevertheless, if the max number of samples per model is set, the number of coverage files
will be capped per library kit.
:param wildcards: Snakemake wildcards associated with rule.
:type wildcards: snakemake.io.Wildcards
"""
# Initialise variables
library_counter_dict = defaultdict(lambda: 0)
max_model_size = self.get_adjusted_max_samples_in_model()

# Yield annotate gc files - from rule `targeted_seq_cnv_calling_gcnv_annotate_gc`
yield from self._get_input_files_annotate_gc(wildcards).items()
name_pattern = "gcnv_annotate_gc.{wildcards.library_kit}".format(wildcards=wildcards)
ext = "tsv"
yield ext, "work/{name_pattern}/out/{name_pattern}.{ext}".format(
name_pattern=name_pattern, ext=ext
)
# Yield coverage files - from rule `targeted_seq_cnv_calling_gcnv_coverage`
key = "covs"
covs = []
for lib in sorted(self.index_ngs_library_to_donor):
if self.ngs_library_to_kit.get(lib) == wildcards.library_kit:

# Update counter
library_counter_dict[wildcards.library_kit] += 1

# Evaluate if max required
if max_model_size and library_counter_dict[wildcards.library_kit] > max_model_size:
continue

# Prepare and append name patterns
name_pattern = "{mapper}.gcnv_coverage.{library_name}".format(
mapper=wildcards.mapper, library_name=lib
)
Expand Down Expand Up @@ -664,6 +691,11 @@ def _get_input_files_coverage(self, wildcards):

@dictify
def _get_input_files_call_cnvs_cohort_mode(self, wildcards):
"""Yield input files for rule `targeted_seq_cnv_calling_gcnv_call_cnvs` in COHORT mode
:param wildcards: Snakemake wildcards associated with rule.
:type wildcards: snakemake.io.Wildcards
"""
path_pattern = (
"work/{name_pattern}/out/{name_pattern}/temp_{{shard}}/scattered.interval_list"
)
Expand Down Expand Up @@ -691,6 +723,40 @@ def _get_input_files_call_cnvs_cohort_mode(self, wildcards):
name_pattern=path_pattern, ext="tsv"
)

@dictify
def _get_input_files_call_cnvs_case_mode(self, wildcards):
"""Yield input files for rule `targeted_seq_cnv_calling_gcnv_call_cnvs` in CASE mode
:param wildcards: Snakemake wildcards associated with rule.
:type wildcards: snakemake.io.Wildcards
"""
# Initialise variables
tsv_ext = "tsv"
tsv_path_pattern = "{mapper}.gcnv_coverage.{library_name}"
ploidy_ext = "ploidy"
ploidy_path_pattern = "{mapper}.gcnv_contig_ploidy.{library_kit}"
scatter_path_pattern = (
"work/{name_pattern}/out/{name_pattern}/temp_{{shard}}/scattered.interval_list"
)

# Yield coverage tsv files for all library associated with kit
for lib in sorted(self.index_ngs_library_to_donor):
if self.ngs_library_to_kit.get(lib) == wildcards.library_kit:
path_pattern = tsv_path_pattern.format(mapper=wildcards.mapper, library_name=lib)
yield tsv_ext, "work/{name_pattern}/out/{name_pattern}.{ext}".format(
name_pattern=path_pattern, ext=tsv_ext
)

# Yield ploidy files
path_pattern = ploidy_path_pattern.format(**wildcards)
yield ploidy_ext, "work/{name_pattern}/out/{name_pattern}/.done".format(
name_pattern=path_pattern
)

# Yield scatter files
name_pattern = "{mapper}.gcnv_scatter_intervals.{library_kit}"
yield "interval_list_shard", scatter_path_pattern.format(name_pattern=name_pattern)

@dictify
def _get_input_files_post_germline_calls(self, wildcards, checkpoints):
checkpoint = checkpoints.targeted_seq_cnv_calling_gcnv_scatter_intervals
Expand Down Expand Up @@ -814,6 +880,17 @@ def _get_output_files_call_cnvs_cohort_mode():
)
)

@staticmethod
@dictify
def _get_output_files_call_cnvs_case_mode():
ext = "done"
name_pattern_calls = "{mapper}.gcnv_call_cnvs.{library_kit}.{shard}"
yield ext, touch(
"work/{name_pattern}/out/{name_pattern}/{{library_name}}/.{ext}".format(
name_pattern=name_pattern_calls, ext=ext
)
)

@staticmethod
@dictify
def _get_output_files_post_germline_calls():
Expand Down Expand Up @@ -870,10 +947,13 @@ def get_log_file(self, action):
name_pattern = "{{mapper}}.gcnv_{action}.{{library_kit}}".format(action=action)
return "work/{name_pattern}/log/{name_pattern}.log".format(name_pattern=name_pattern)
elif action == "call_cnvs_cohort_mode":
name_pattern = "{{mapper}}.gcnv_call_cnvs.{{library_kit}}.{{shard}}".format(
action=action
)
name_pattern = "{mapper}.gcnv_call_cnvs.{library_kit}.{shard}"
return "work/{name_pattern}/log/{name_pattern}.log".format(name_pattern=name_pattern)
elif action == "call_cnvs_case_mode":
name_pattern = "{mapper}.gcnv_call_cnvs.{library_kit}.{shard}"
return "work/{name_pattern}/log/{name_pattern}_{{library_name}}.log".format(
name_pattern=name_pattern
)
else:
name_pattern = "{{mapper}}.gcnv_{action}.{{library_name}}".format(action=action)
return "work/{name_pattern}/log/{name_pattern}.log".format(name_pattern=name_pattern)
Expand Down Expand Up @@ -911,6 +991,7 @@ def get_cnv_model_result_files(self, _unused):

def get_run_mode(self, wildcards):
"""Get run mode
:param wildcards: Snakemake wildcards associated with rule.
:type wildcards: snakemake.io.Wildcards
Expand All @@ -927,6 +1008,40 @@ def get_run_mode(self, wildcards):
else:
return "COHORT"

def get_adjusted_max_samples_in_model(self):
"""Get adjusted max samples in model
Scenarios:
* If model_max_size is integer and offset is null: Model will at most have size
equal to model_max_size.
* If model_max_size is integer and offset is integer: Model will be rebuilt whenever
amount of samples in cohort is equal to (model_max_size + n_adjust * offset),
where n is an integer and it must be greater than zero:
n_adjust = floor( (total samples - model_max_size) / offset )
:return: Returns max number of samples used to build gCNV model adjusted by the offset.
"""
# Initialise variables
n_adjust = 0
max_size = self.config["model_max_size"]
offset = self.config["model_max_size_offset"]
offset = offset if offset else 0
_, _, kit_counts = self.parent.pick_kits_and_donors() # get list of library kits counts
max_kit_count = max(kit_counts.values())

# Return null if not defined
if not max_size:
return None

# Define adjust factor
if offset:
n_adjust = (int(max_kit_count) - int(max_size)) // int(offset)

# Calculate adjusted max
max_size_adjusted = int(max_size) + (n_adjust * int(offset))

return max_size_adjusted

def update_cluster_config(self, cluster_config):
"""Update cluster configuration for gCNV CNV calling"""
for action in self.actions:
Expand Down Expand Up @@ -1066,7 +1181,29 @@ def _yield_result_files(self, tpl, donors, **kwargs):

def check_config(self):
"""Check that the necessary configuration is available for the step"""
# Initialise variable
message = "Argument for {par} must be an integer, received: '{value}'."

# Check if path to BAM files is available
self.ensure_w_config(
config_keys=("step_config", "targeted_seq_cnv_calling", "path_ngs_mapping"),
msg="Path to NGS mapping not configured but required for targeted seq. CNV calling",
)

# Check max model values
model_max_size = self.w_config["step_config"]["targeted_seq_cnv_calling"]["model_max_size"]
if model_max_size:
try:
int(model_max_size)
except ValueError:
message = message.format(par="model max size", value=model_max_size)
raise ValueError(message)

# Check max model values offset
m_offset = self.w_config["step_config"]["targeted_seq_cnv_calling"]["model_max_size_offset"]
if m_offset:
try:
int(m_offset)
except ValueError:
message = message.format(par="model max size offset", value=m_offset)
raise ValueError(message)
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,67 @@
# gCNV needs many rules, thus they are in their own file.


# Include common rule
include: "gcnv_common.rules"
def targeted_seq_cnv_calling_gcnv_post_germline_calls_input(wildcards):
# Poor mans currying.
return wf.get_input_files("gcnv", "post_germline_calls")(wildcards, checkpoints)


def get_gcnv_scatter_intervals_input():
return wf.substep_getattr("gcnv", "get_cnv_model_result_files")(None)


checkpoint targeted_seq_cnv_calling_gcnv_scatter_intervals:
input:
gcnv_model_wf(get_gcnv_scatter_intervals_input()),
output:
directory(wf.get_output_files("gcnv", "scatter_intervals")),
log:
wf.get_log_file("gcnv", "scatter_intervals"),
wrapper:
wf.wrapper_path("gcnv/scatter_intervals")


rule targeted_seq_cnv_calling_gcnv_call_cnvs:
input:
unpack(wf.get_input_files("gcnv", "call_cnvs_case_mode")),
output:
**wf. get_output_files("gcnv","call_cnvs_case_mode"),
log:
wf.get_log_file("gcnv", "call_cnvs_case_mode"),
wrapper:
wf.wrapper_path("gcnv/call_cnvs_case_mode")


rule targeted_seq_cnv_calling_gcnv_post_germline_calls:
input:
unpack(targeted_seq_cnv_calling_gcnv_post_germline_calls_input),
output:
**wf. get_output_files("gcnv","post_germline_calls"),
log:
wf.get_log_file("gcnv", "post_germline_calls"),
wrapper:
wf.wrapper_path("gcnv/post_germline_calls")


rule targeted_seq_cnv_calling_gcnv_merge_cohort_vcfs:
input:
wf.get_input_files("gcnv", "merge_cohort_vcfs"),
output:
**wf. get_output_files("gcnv","merge_cohort_vcfs"),
log:
wf.get_log_file("gcnv", "merge_cohort_vcfs"),
wrapper:
wf.wrapper_path("gcnv/merge_cohort_vcfs")


rule targeted_seq_cnv_calling_gcnv_extract_ped:
input:
unpack(wf.get_input_files("gcnv", "extract_ped")),
output:
**( wf. get_output_files("gcnv","extract_ped")),
log:
wf.get_log_file("gcnv", "extract_ped"),
params:
ped_members=wf.substep_getattr("gcnv", "get_ped_members"),
wrapper:
wf.wrapper_path("gcnv/extract_ped")
27 changes: 27 additions & 0 deletions snappy_wrappers/wrappers/gcnv/call_cnvs_case_mode/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# -*- coding: utf-8 -*-

from snakemake.shell import shell

print("snakemake.input =", vars(snakemake.input))

shell(
r"""
set -x
set -euo pipefail
export TMPDIR=$(mktemp -d)
trap "rm -rf $TMPDIR" ERR EXIT
export MKL_NUM_THREADS=16
export OMP_NUM_THREADS=16
export THEANO_FLAGS="base_compiledir=$TMPDIR/theano_compile_dir"
gatk GermlineCNVCaller \
--run-mode CASE \
--input {snakemake.input.tsv} \
--contig-ploidy-calls $(dirname {snakemake.input.ploidy})/ploidy-calls \
--model $(dirname {snakemake.input.interval_list_shard})/ploidy-model \
--output $(dirname {snakemake.output.done}) \
--output-prefix cnv_calls
"""
)
Loading

0 comments on commit 8910805

Please sign in to comment.