Skip to content

Commit

Permalink
Removed DRMAA from Jannovar annotate_somatic_vcf parallel execution.
Browse files Browse the repository at this point in the history
  • Loading branch information
eudesbarbosa committed Jun 2, 2022
1 parent c67e49a commit 7dee973
Show file tree
Hide file tree
Showing 5 changed files with 245 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,9 @@
# Default configuration variant_annotation
step_config:
somatic_variant_annotation:
drmaa_snippet: '' # value to pass in as additional DRMAA arguments
window_length: 50000000 # split input into windows of this size, each triggers a job
num_jobs: 100 # number of windows to process in parallel
use_drmaa: true # use DRMAA for parallel processing
use_profile: true # use Snakemake profile for parallel processing
restart_times: 5 # number of times to re-launch jobs in case of failure
max_jobs_per_second: 10 # throttling of job creation
max_status_checks_per_second: 10 # throttling of status checks
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
"""Definition for Jannovar somatic annotation in parallel, genome is split into windows
isort:skip_file
"""

import os
import sys

# A hack is required for being able to import snappy_wrappers modules when in development mode.
# TODO: is there a more elegant way?
base_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
sys.path.insert(0, base_dir)

from snappy_wrappers.resource_usage import ResourceUsage
from snappy_wrappers.wrapper_parallel import (
ParallelSomaticVariantAnnotationBaseWrapper,
gib_to_string,
hours,
)


class ParallelJannovarAnnotateSomaticVcfWrapper(ParallelSomaticVariantAnnotationBaseWrapper):
"""Parallel execution of somatic ``jannovar annotate-vcf``."""

inner_wrapper = "jannovar/annotate_somatic_vcf"
step_name = "somatic_variant_annotation"
tool_name = None # tool confg is on top level

def __init__(self, snakemake):
super().__init__(snakemake)
self.job_resources = ResourceUsage(
threads=2,
memory=gib_to_string(10.0 * self.get_job_mult_memory()),
time=hours(4 * self.get_job_mult_time()),
partition=os.getenv("SNAPPY_PIPELINE_PARTITION"),
)
self.merge_resources = ResourceUsage(
threads=2,
memory=gib_to_string(2.0 * self.get_merge_mult_memory()),
time=hours(4 * self.get_merge_mult_time()),
partition=os.getenv("SNAPPY_PIPELINE_PARTITION"),
)
Original file line number Diff line number Diff line change
@@ -1,47 +1,9 @@
# -*- coding: utf-8 -*-
"""Wrapper for Jannovar somatic annotation in parallel, genome is split into windows
isort:skip_file
"""

import os
import sys
"""Wrapper for Jannovar somatic annotation in parallel"""

from snakemake import shell

# A hack is required for being able to import snappy_wrappers modules when in development mode.
# TODO: is there a more elegant way?
base_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), "..", "..", "..", ".."))
sys.path.insert(0, base_dir)

from snappy_wrappers.wrapper_parallel import (
ParallelSomaticVariantAnnotationBaseWrapper,
ResourceUsage,
gib,
hours,
)


class ParallelJannovarAnnotateSomaticVcfWrapper(ParallelSomaticVariantAnnotationBaseWrapper):
"""Parallel execution of somatic ``jannovar annotate-vcf``."""

inner_wrapper = "jannovar/annotate_somatic_vcf"
step_name = "somatic_variant_annotation"
tool_name = None # tool confg is on top level

def __init__(self, snakemake):
super().__init__(snakemake)
self.job_resources = ResourceUsage(
cores=2,
memory=gib(10.0 * self.get_job_mult_memory()),
duration=hours(4 * self.get_job_mult_time()),
)
self.merge_resources = ResourceUsage(
cores=2,
memory=gib(2.0 * self.get_merge_mult_memory()),
duration=hours(4 * self.get_merge_mult_time()),
)

from parallel_annotate_somatic_vcf import ParallelJannovarAnnotateSomaticVcfWrapper

# Kick off execution using the wrapper class defined above.
ParallelJannovarAnnotateSomaticVcfWrapper(snakemake).run().shutdown_logging()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
rule merge_all:
input: ['job_out.0.d/out/tmp_0.vcf.gz', 'job_out.1.d/out/tmp_1.vcf.gz', 'job_out.2.d/out/tmp_2.vcf.gz', 'job_out.3.d/out/tmp_3.vcf.gz', 'job_out.4.d/out/tmp_4.vcf.gz', 'job_out.5.d/out/tmp_5.vcf.gz', 'job_out.6.d/out/tmp_6.vcf.gz', 'job_out.7.d/out/tmp_7.vcf.gz', 'job_out.8.d/out/tmp_8.vcf.gz', 'job_out.9.d/out/tmp_9.vcf.gz', 'job_out.10.d/out/tmp_10.vcf.gz', 'job_out.11.d/out/tmp_11.vcf.gz', 'job_out.12.d/out/tmp_12.vcf.gz', 'job_out.13.d/out/tmp_13.vcf.gz', 'job_out.14.d/out/tmp_14.vcf.gz', 'job_out.15.d/out/tmp_15.vcf.gz', 'job_out.16.d/out/tmp_16.vcf.gz', 'job_out.17.d/out/tmp_17.vcf.gz', 'job_out.18.d/out/tmp_18.vcf.gz', 'job_out.19.d/out/tmp_19.vcf.gz', 'job_out.20.d/out/tmp_20.vcf.gz', 'job_out.21.d/out/tmp_21.vcf.gz', 'job_out.22.d/out/tmp_22.vcf.gz', 'job_out.23.d/out/tmp_23.vcf.gz', 'job_out.24.d/out/tmp_24.vcf.gz', 'job_out.25.d/out/tmp_25.vcf.gz', 'job_out.26.d/out/tmp_26.vcf.gz', 'job_out.27.d/out/tmp_27.vcf.gz', 'job_out.28.d/out/tmp_28.vcf.gz', 'job_out.29.d/out/tmp_29.vcf.gz', 'job_out.30.d/out/tmp_30.vcf.gz', 'job_out.31.d/out/tmp_31.vcf.gz', 'job_out.32.d/out/tmp_32.vcf.gz', 'job_out.33.d/out/tmp_33.vcf.gz', 'job_out.34.d/out/tmp_34.vcf.gz', 'job_out.35.d/out/tmp_35.vcf.gz', 'job_out.36.d/out/tmp_36.vcf.gz', 'job_out.37.d/out/tmp_37.vcf.gz', 'job_out.38.d/out/tmp_38.vcf.gz', 'job_out.39.d/out/tmp_39.vcf.gz', 'job_out.40.d/out/tmp_40.vcf.gz', 'job_out.41.d/out/tmp_41.vcf.gz', 'job_out.42.d/out/tmp_42.vcf.gz', 'job_out.43.d/out/tmp_43.vcf.gz', 'job_out.44.d/out/tmp_44.vcf.gz', 'job_out.45.d/out/tmp_45.vcf.gz', 'job_out.46.d/out/tmp_46.vcf.gz', 'job_out.47.d/out/tmp_47.vcf.gz', 'job_out.48.d/out/tmp_48.vcf.gz', 'job_out.49.d/out/tmp_49.vcf.gz', 'job_out.50.d/out/tmp_50.vcf.gz', 'job_out.51.d/out/tmp_51.vcf.gz', 'job_out.52.d/out/tmp_52.vcf.gz', 'job_out.53.d/out/tmp_53.vcf.gz', 'job_out.54.d/out/tmp_54.vcf.gz', 'job_out.55.d/out/tmp_55.vcf.gz', 'job_out.56.d/out/tmp_56.vcf.gz', 'job_out.57.d/out/tmp_57.vcf.gz', 'job_out.58.d/out/tmp_58.vcf.gz', 'job_out.59.d/out/tmp_59.vcf.gz', 'job_out.60.d/out/tmp_60.vcf.gz', 'job_out.61.d/out/tmp_61.vcf.gz', 'job_out.62.d/out/tmp_62.vcf.gz', 'job_out.63.d/out/tmp_63.vcf.gz', 'job_out.64.d/out/tmp_64.vcf.gz', 'job_out.65.d/out/tmp_65.vcf.gz', 'job_out.66.d/out/tmp_66.vcf.gz', 'job_out.67.d/out/tmp_67.vcf.gz', 'job_out.68.d/out/tmp_68.vcf.gz', 'job_out.69.d/out/tmp_69.vcf.gz', 'job_out.70.d/out/tmp_70.vcf.gz', 'job_out.71.d/out/tmp_71.vcf.gz']
output: **{'vcf': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/out/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.vcf.gz', 'vcf_md5': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/out/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.vcf.gz.md5', 'tbi': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/out/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.vcf.gz.tbi', 'tbi_md5': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/out/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.vcf.gz.tbi.md5'}
threads: resource_merge_threads
resources:
time=resource_merge_time,
memory=resource_merge_memory,
partition=resource_merge_partition,
log: **{'conda_info': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/log/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.conda_info.txt', 'conda_info_md5': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/log/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.conda_info.txt.md5', 'conda_list': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/log/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.conda_list.txt', 'conda_list_md5': '/work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/log/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1.conda_list.txt.md5'}
shell:
r'''
set -euo pipefail # Unofficial Bash strict mode

# Initialize output directory -----------------------------------------

outdir=$(basename {output.vcf})

mkdir -p output

# Concatenate files ---------------------------------------------------
bcftools concat \
--allow-overlaps \
-d none \
-o output/out.vcf.gz \
-O z \
{input}

tabix -f output/out.vcf.gz

pushd output
for f in *; do md5sum $f >$f.md5; done
popd

# Move to output directory --------------------------------------------
mkdir -p $(dirname {output.vcf})
mv output/out.vcf.gz {output.vcf}
mv output/out.vcf.gz.md5 {output.vcf_md5}
mv output/out.vcf.gz.tbi {output.tbi}
mv output/out.vcf.gz.tbi.md5 {output.tbi_md5}

# Write out information about conda installation.
conda list >{log.conda_list}
conda info >{log.conda_info}

pushd $(dirname {log.conda_list})
md5sum $(basename {log.conda_list}) >$(basename {log.conda_list}).md5
md5sum $(basename {log.conda_info}) >$(basename {log.conda_info}).md5
popd
'''
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# -*- coding: utf-8 -*-
"""Code for testing gatk_hc_par/run wrapper"""
from pathlib import Path
import textwrap

import pytest
from ruamel import yaml as yaml_ruamel
from snakemake.io import InputFiles, Log, OutputFiles, Params, Resources, Wildcards
from snakemake.script import Snakemake

from snappy_wrappers.wrappers.jannovar_par.annotate_somatic_vcf.parallel_annotate_somatic_vcf import (
ParallelJannovarAnnotateSomaticVcfWrapper,
)

from .conftest import patch_module_fs


@pytest.fixture(scope="module") # otherwise: performance issues
def minimal_config():
"""Return YAML parsing result for configuration"""
return yaml_ruamel.round_trip_load(
textwrap.dedent(
r"""
static_data_config:
reference:
path: /path/to/ref.fa
step_config:
ngs_mapping:
tools:
dna: ['bwa']
compute_coverage_bed: true
path_target_regions: /path/to/regions.bed
bwa:
path_index: /path/to/bwa/index.fa
somatic_variant_annotation:
num_cores: 2 # number of cores to use locally
window_length: 3500000 # split input into windows, each triggers a job
num_jobs: 500 # number of windows to process in parallel
restart_times: 5 # number of times to re-launch jobs in case of failure
max_jobs_per_second: 2 # throttling of job creation
max_status_checks_per_second: 10 # throttling of status checks
debug_trunc_tokens: 0 # truncation to first N tokens (0 for none)
keep_tmpdir: never # keep temporary directory, {always, never, onerror}
job_mult_memory: 1 # memory multiplier
job_mult_time: 1 # running time multiplier
merge_mult_memory: 1 # memory multiplier for merging
merge_mult_time: 1 # running time multiplier for merging
ignore_chroms: # patterns of chromosome names to ignore
- NC_007605 # herpes virus
- hs37d5 # GRCh37 decoy
- chrEBV # Eppstein-Barr Virus
- '*_decoy' # decoy contig
- 'HLA-*' # HLA genes
- 'GL000220.*' # Contig with problematic, repetitive DNA in GRCh37
data_sets:
first_batch:
file: sheet.tsv
search_patterns:
- {'left': '*/*/*_R1.fastq.gz', 'right': '*/*/*_R2.fastq.gz'}
search_paths: ['/path']
type: germline_variants
naming_scheme: only_secondary_id
"""
).lstrip()
)


@pytest.fixture
def snakemake_obj(minimal_config):
"""Returns Snakemake object."""
# Define helper variables
rule_name = "somatic_variant_annotation_jannovar_annotate_somatic_vcf"
threads = 2
bench_iteration = 2
script_dir = "/work"
input_base_name = (
"SOMATIC_VARIANT_CALLING/output/bwa.P001-T1-DNA1-WGS1/out/bwa.P001-T1-DNA1-WGS1"
)
input_dict = {
"vcf": input_base_name + ".vcf.gz",
"tbi": input_base_name + ".vcf.gz.tbi",
"ped": "work/write_pedigree.P001-T1-DNA1-WGS1/out/P001-T1-DNA1-WGS1.ped",
}
output_base_name = (
"work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/out/"
"bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1"
)
output_dict = {
"tbi": output_base_name + ".vcf.gz.tbi",
"tbi_md5": output_base_name + ".vcf.gz.tbi.md5",
"vcf": output_base_name + ".vcf.gz",
"vcf_md5": output_base_name + ".vcf.gz.md5",
}
log_base_name = (
"work/bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1/log/"
"bwa.gatk_hc.jannovar_annotate_somatic_vcf.P001-T1-DNA1-WGS1"
)
log_dict = {
"conda_info": log_base_name + ".conda_info.txt",
"conda_info_md5": log_base_name + ".conda_info.txt.md5",
"conda_list": log_base_name + ".conda_list.txt",
"conda_list_md5": log_base_name + ".conda_list.txt.md5",
"log": log_base_name + ".log",
"log_md5": log_base_name + ".log.md5",
}
wildcards_dict = {"mapper": "bwa", "index_library_name": "P001-N1-DNA1-WGS1"}
params_dict = {"tumor_library": "P001-T1-DNA1-WGS1", "normal_library": "P001-N1-DNA1-WGS1"}

# Define Snakemake class input
input_ = InputFiles(fromdict=input_dict)
output_ = OutputFiles(fromdict=output_dict)
params_ = Params(fromdict=params_dict)
log_ = Log(fromdict=log_dict)
wildcards_ = Wildcards(fromdict=wildcards_dict)
resources_ = Resources(fromdict={})

return Snakemake(
rulename=rule_name,
threads=threads,
bench_iteration=bench_iteration,
input_=input_,
output=output_,
log=log_,
params=params_,
wildcards=wildcards_,
config=minimal_config,
scriptdir=script_dir,
resources=resources_,
)


def test_jannovar_wrapper_annotate_somatic_vcf_construct_merge_rule(
snakemake_obj, variant_caller_fake_fs, mocker
):
"""Tests ParallelJannovarAnnotateSomaticVcfWrapper.construct_merge_rule()"""
# Patch out file-system
patch_module_fs("snappy_wrappers.wrapper_parallel", variant_caller_fake_fs, mocker)
wrapper_par = ParallelJannovarAnnotateSomaticVcfWrapper(snakemake=snakemake_obj)
# Define expected
data_path = (
Path(__file__).parent / "data/jannovar_par_annotate_somatic_vcf.snakemake"
).resolve()
with open(data_path, "r", encoding="utf8") as f:
expected = f.read()
# Get actual and assert
actual = wrapper_par.construct_merge_rule()
assert actual == expected

0 comments on commit 7dee973

Please sign in to comment.