From 38cbb88424f291661ebc43a9c3a0393e00751e0c Mon Sep 17 00:00:00 2001 From: Danny Park Date: Thu, 7 Jul 2016 14:40:37 -0400 Subject: [PATCH 1/5] add BWA tool from conda, add multi-threading to samtools sort, add samtools bam2fq and idxstats --- requirements-conda.txt | 1 + tools/bwa.py | 63 ++++++++++++++++++++++++++++++++++++++++++ tools/samtools.py | 36 ++++++++++++++++-------- 3 files changed, 89 insertions(+), 11 deletions(-) create mode 100644 tools/bwa.py diff --git a/requirements-conda.txt b/requirements-conda.txt index 8c07ba11b..a33d88e64 100644 --- a/requirements-conda.txt +++ b/requirements-conda.txt @@ -1,5 +1,6 @@ blast=2.2.31 bmtagger=3.101 +bwa=0.7.15 diamond=0.7.10=boost1.60_1 kraken-all=0.10.6_eaf8fb68 last=719 diff --git a/tools/bwa.py b/tools/bwa.py new file mode 100644 index 000000000..56760c23e --- /dev/null +++ b/tools/bwa.py @@ -0,0 +1,63 @@ +''' + The BWA aligner. + +''' + +import logging +import os +import os.path +import subprocess +import tools +import tools.samtools +import util.file +import util.misc + +TOOL_NAME = 'bwa' +TOOL_VERSION = '0.7.15' + +log = logging.getLogger(__name__) + + +class Bwa(tools.Tool): + + def __init__(self, install_methods=None): + if install_methods is None: + install_methods = [tools.CondaPackage(TOOL_NAME, version=TOOL_VERSION)] + tools.Tool.__init__(self, install_methods=install_methods) + + def version(self): + return TOOL_VERSION + + def execute(self, command, args, stdout=None): # pylint: disable=W0221 + tool_cmd = [self.install_and_get_path(), command] + args + log.debug(' '.join(tool_cmd)) + if stdout: + stdout = open(stdout, 'w') + subprocess.check_call(tool_cmd, stdout=stdout) + if stdout: + stdout.close() + + def index(self, inFasta, algorithm=None): + cmd = [] + if algorithm is not None: + if algorithm not in ('is', 'bwtsw'): + raise NameError(algorithm + " is not a recognized algorithm") + cmd.extend(('-a', algorithm)) + cmd.append(inFasta) + self.execute('index', cmd) + + def mem(self, inReads, refDb, outAlign, threads=None): + threads = threads or util.misc.available_cpu_count() + samtools = tools.samtools.SamtoolsTool() + fq1 = util.file.mkstempfname('.1.fastq') + fq2 = util.file.mkstempfname('.2.fastq') + aln_sam = util.file.mkstempfname('.sam') + samtools.bam2fq(inReads, fq1, fq2) + self.execute('mem', ['-t', str(threads), refDb, fq1, fq2], stdout=aln_sam) + os.unlink(fq1) + os.unlink(fq2) + samtools.sort(aln_sam, outAlign) + os.unlink(aln_sam) + samtools.index(outAlign) + + diff --git a/tools/samtools.py b/tools/samtools.py index ba338ff00..23d271000 100644 --- a/tools/samtools.py +++ b/tools/samtools.py @@ -21,13 +21,13 @@ import os import os.path import subprocess +import tempfile from collections import OrderedDict +import util.misc #import pysam TOOL_NAME = 'samtools' TOOL_VERSION = '1.2' -TOOL_URL = 'https://github.com/samtools/samtools/releases/download/{ver}/samtools-{ver}.tar.bz2'.format(ver=TOOL_VERSION) -CONDA_TOOL_VERSION = '1.2' log = logging.getLogger(__name__) @@ -37,24 +37,20 @@ class SamtoolsTool(tools.Tool): def __init__(self, install_methods=None): if install_methods is None: install_methods = [] - install_methods.append(tools.CondaPackage(TOOL_NAME, version=CONDA_TOOL_VERSION)) + install_methods.append(tools.CondaPackage(TOOL_NAME, version=TOOL_VERSION)) tools.Tool.__init__(self, install_methods=install_methods) def version(self): return TOOL_VERSION - def execute(self, command, args, stdin=None, stdout=None, stderr=None): # pylint: disable=W0221 + def execute(self, command, args, stdout=None, stderr=None): # pylint: disable=W0221 tool_cmd = [self.install_and_get_path(), command] + args log.debug(' '.join(tool_cmd)) - if stdin: - stdin = open(stdin, 'r') if stdout: stdout = open(stdout, 'w') if stderr: stderr = open(stderr, 'w') - subprocess.check_call(tool_cmd, stdin=stdin, stdout=stdout, stderr=stderr) - if stdin: - stdin.close() + subprocess.check_call(tool_cmd, stdout=stdout, stderr=stderr) if stdout: stdout.close() if stderr: @@ -67,10 +63,24 @@ def view(self, args, inFile, outFile, regions=None): #opts = args + ['-o', outFile, inFile] + regions #pysam.view(*opts) - def sort(self, inFile, outFile, args=None): + def bam2fq(self, inBam, outFq1, outFq2=None): + if outFq2 is None: + self.execute('bam2fq', ['-s', outFq1, inBam]) + else: + self.execute('bam2fq', ['-1', outFq1, '-2', outFq2, inBam]) + + def sort(self, inFile, outFile, args=None, threads=None): + # inFile can be .sam, .bam, .cram + # outFile can be .sam, .bam, .cram args = args or [] + threads = threads or util.misc.available_cpu_count() + if '-@' not in args: + args.extend(('-@', str(threads))) + if '-T' not in args and os.path.isdir(tempfile.tempdir): + args.extend(('-T', tempfile.tempdir)) - self.execute('sort', args + ['-o', outFile, inFile]) + args.extend(('-o', outFile, inFile)) + self.execute('sort', args) def merge(self, inFiles, outFile, options=None): "Merge a list of inFiles to create outFile." @@ -82,6 +92,7 @@ def merge(self, inFiles, outFile, options=None): self.execute('merge', options + [outFile] + inFiles) def index(self, inBam): + # inBam can be .bam or .cram self.execute('index', [inBam]) def faidx(self, inFasta, overwrite=False): @@ -95,6 +106,9 @@ def faidx(self, inFasta, overwrite=False): #pysam.faidx(inFasta) self.execute('faidx', [inFasta]) + def idxstats(self, inBam, statsFile): + self.execute('idxstats', [inBam], stdout=statsFile) + def reheader(self, inBam, headerFile, outBam): self.execute('reheader', [headerFile, inBam], stdout=outBam) From 08f2d900bffdb96667cd2ff270534e6536fb8001 Mon Sep 17 00:00:00 2001 From: Danny Park Date: Thu, 7 Jul 2016 14:43:45 -0400 Subject: [PATCH 2/5] convert a number of bam->fastq conversions to samtools instead of Picard for speed --- metagenomics.py | 2 ++ taxon_filter.py | 8 ++++---- tools/kraken.py | 2 ++ tools/samtools.py | 3 +-- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/metagenomics.py b/metagenomics.py index 2b3c4f350..6c7aa18e6 100755 --- a/metagenomics.py +++ b/metagenomics.py @@ -507,6 +507,8 @@ def diamond(inBam, db, taxDb, outReport, outM8=None, outLca=None, numThreads=1): ''' tmp_fastq = util.file.mkstempfname('.fastq') tmp_fastq2 = util.file.mkstempfname('.fastq') + # do not convert this to samtools bam2fq unless we can figure out how to replicate + # the clipping functionality of Picard SamToFastq picard = tools.picard.SamToFastqTool() picard_opts = { 'CLIPPING_ATTRIBUTE': tools.picard.SamToFastqTool.illumina_clipping_attribute, diff --git a/taxon_filter.py b/taxon_filter.py index 738cf70c9..465cdd221 100755 --- a/taxon_filter.py +++ b/taxon_filter.py @@ -379,7 +379,7 @@ def filter_lastal_bam( # convert BAM to paired FASTQ inReads1 = mkstempfname('.1.fastq') inReads2 = mkstempfname('.2.fastq') - tools.picard.SamToFastqTool().execute(inBam, inReads1, inReads2) + tools.samtools.SamtoolsTool().bam2fq(inBam, inReads1, inReads2) # look for hits in inReads1 and inReads2 hitList1 = mkstempfname('.1.hits') @@ -509,7 +509,7 @@ def deplete_bmtagger_bam(inBam, db, outBam, threads=None, JVMmemory=None): inReads1 = mkstempfname('.1.fastq') inReads2 = mkstempfname('.2.fastq') - tools.picard.SamToFastqTool().execute(inBam, inReads1, inReads2) + tools.samtools.SamtoolsTool().bam2fq(inBam, inReads1, inReads2) bmtaggerConf = mkstempfname('.bmtagger.conf') with open(bmtaggerConf, 'w') as f: @@ -974,7 +974,7 @@ def deplete_blastn_bam(inBam, db, outBam, threads, chunkSize=1000000, JVMmemory= blastOutFile = mkstempfname('.hits.txt') # Initial BAM -> FASTQ pair - tools.picard.SamToFastqTool().execute(inBam, fastq1, fastq2) + tools.samtools.SamtoolsTool().bam2fq(inBam, fastq1, fastq2) # Find BLAST hits against FASTQ1 read_utils.fastq_to_fasta(fastq1, fasta) @@ -996,7 +996,7 @@ def deplete_blastn_bam(inBam, db, outBam, threads, chunkSize=1000000, JVMmemory= tools.picard.FilterSamReadsTool().execute(inBam, True, blast_hits, halfBam, JVMmemory=JVMmemory) # Depleted BAM -> FASTQ pair - tools.picard.SamToFastqTool().execute(halfBam, fastq1, fastq2) + tools.samtools.SamtoolsTool().bam2fq(halfBam, fastq1, fastq2) # Find BLAST hits against FASTQ2 (which is already smaller than before) read_utils.fastq_to_fasta(fastq2, fasta) diff --git a/tools/kraken.py b/tools/kraken.py index 9c0e311cb..8e151a839 100644 --- a/tools/kraken.py +++ b/tools/kraken.py @@ -69,6 +69,8 @@ def classify(self, inBam, db, outReads, numThreads=None): return tmp_fastq1 = util.file.mkstempfname('.1.fastq') tmp_fastq2 = util.file.mkstempfname('.2.fastq') + # do not convert this to samtools bam2fq unless we can figure out how to replicate + # the clipping functionality of Picard SamToFastq picard = tools.picard.SamToFastqTool() picard_opts = { 'CLIPPING_ATTRIBUTE': tools.picard.SamToFastqTool.illumina_clipping_attribute, diff --git a/tools/samtools.py b/tools/samtools.py index 23d271000..e3e02fad1 100644 --- a/tools/samtools.py +++ b/tools/samtools.py @@ -36,8 +36,7 @@ class SamtoolsTool(tools.Tool): def __init__(self, install_methods=None): if install_methods is None: - install_methods = [] - install_methods.append(tools.CondaPackage(TOOL_NAME, version=TOOL_VERSION)) + install_methods = [tools.CondaPackage(TOOL_NAME, version=TOOL_VERSION)] tools.Tool.__init__(self, install_methods=install_methods) def version(self): From a3bd41dbc1b5b266d769c2c14a6f81352bbdce7d Mon Sep 17 00:00:00 2001 From: Danny Park Date: Thu, 7 Jul 2016 16:25:01 -0400 Subject: [PATCH 3/5] update samtools to 1.3.1, replace align_and_count with bwamem_idxstats, also in snakemake rules, move spikeins db in config.yaml, add tests --- pipes/config.yaml | 2 +- pipes/rules/reports.rules | 2 +- read_utils.py | 50 ++++++++++++++---------------------- reports.py | 4 +-- requirements-conda.txt | 2 +- test/unit/test_read_utils.py | 22 ++++++++++++++++ tools/samtools.py | 2 +- 7 files changed, 47 insertions(+), 37 deletions(-) diff --git a/pipes/config.yaml b/pipes/config.yaml index deabf89c0..5dcf9629f 100644 --- a/pipes/config.yaml +++ b/pipes/config.yaml @@ -37,7 +37,7 @@ blast_db_remove: "metag_v3.ncRNA.mRNA.mitRNA.consensus" trim_clip_db: "/idi/sabeti-scratch/kandersen/references/depletion_databases/contaminants.fasta" # a fasta file containing sequences to report downstream as spike-ins -spikeins_db: "/idi/sabeti-scratch/kandersen/references/other/ercc_spike-ins.fasta" +spikeins_db: "/idi/sabeti-scratch/genomes/other/ercc_spike-ins.fasta" # Directory of kraken database for metagenomics identification # For pre-built hosted databases, you may download: diff --git a/pipes/rules/reports.rules b/pipes/rules/reports.rules index 18dfb18c7..105cbc3f9 100644 --- a/pipes/rules/reports.rules +++ b/pipes/rules/reports.rules @@ -56,7 +56,7 @@ if config.get("spikeins_db"): run: makedirs(os.path.join(config["reports_dir"], 'spike_count')) makedirs(os.path.join(config["tmp_dir"], config["subdirs"]["depletion"])) - shell("{config[bin_dir]}/read_utils.py align_and_count_hits {input} {params.spikeins_db} {output}") + shell("{config[bin_dir]}/read_utils.py bwamem_and_idxstats {input} {params.spikeins_db} {output}") rule consolidate_spike_count: input: expand("{{dir}}/spike_count/{sample}.spike_count.txt", \ diff --git a/read_utils.py b/read_utils.py index 0db8f5d9b..7639b4eb1 100755 --- a/read_utils.py +++ b/read_utils.py @@ -21,6 +21,7 @@ import util.file import util.misc from util.file import mkstempfname +import tools.bwa import tools.picard import tools.samtools import tools.mvicuna @@ -1019,48 +1020,35 @@ def parser_align_and_fix(parser=argparse.ArgumentParser()): # ========================= -def align_and_count_hits(inBam, refFasta, outCounts, includeZeros=False, - JVMmemory=None): - ''' Take reads, align to reference with Novoalign and return aligned - read counts for each reference sequence. +def bwamem_idxstats(inBam, refFasta, outBam=None, outStats=None): + ''' Take reads, align to reference with BWA-MEM and perform samtools idxstats. ''' - - bam_aligned = mkstempfname('.aligned.bam') - tools.novoalign.NovoalignTool().execute( - inBam, - refFasta, - bam_aligned, - options=['-r', 'Random'], - JVMmemory=JVMmemory) + if outBam is None: + bam_aligned = mkstempfname('.aligned.bam') + else: + bam_aligned = outBam samtools = tools.samtools.SamtoolsTool() - seqs = list(dict(x.split(':', 1) for x in row[1:])['SN'] - for row in samtools.getHeader(bam_aligned) - if row[0]=='@SQ') + bwa = tools.bwa.Bwa() - with util.file.open_or_gzopen(outCounts, 'w') as outf: - for seq in seqs: - n = samtools.count(bam_aligned, regions=[seq]) - if n>0 or includeZeros: - outf.write("{}\t{}\n".format(seq, n)) + bwa.mem(inBam, refFasta, bam_aligned) - os.unlink(bam_aligned) + if outStats is not None: + samtools.idxstats(bam_aligned, outStats) + + if outBam is None: + os.unlink(bam_aligned) -def parser_align_and_count_hits(parser=argparse.ArgumentParser()): +def parser_bwamem_idxstats(parser=argparse.ArgumentParser()): parser.add_argument('inBam', help='Input unaligned reads, BAM format.') parser.add_argument('refFasta', help='Reference genome, FASTA format, pre-indexed by Picard and Novoalign.') - parser.add_argument('outCounts', help='Output counts file') - parser.add_argument("--includeZeros", - help="Output lines with no hits (default: %(default)s)", - default=False, - action="store_true", - dest="includeZeros") - parser.add_argument('--JVMmemory', default='4g', help='JVM virtual memory size (default: %(default)s)') + parser.add_argument('outBam', help='Output aligned, indexed BAM file', default=None) + parser.add_argument('outStats', help='Output idxstats file', default=None) util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmp_dir', None))) - util.cmd.attach_main(parser, align_and_count_hits, split_args=True) + util.cmd.attach_main(parser, bwamem_idxstats, split_args=True) return parser -__commands__.append(('align_and_count_hits', parser_align_and_count_hits)) +__commands__.append(('bwamem_idxstats', parser_bwamem_idxstats)) # ========================= diff --git a/reports.py b/reports.py index 50edb4b3d..158e2114f 100755 --- a/reports.py +++ b/reports.py @@ -359,8 +359,8 @@ def consolidate_spike_count(inDir, outFile): s = s[:-len('.spike_count.txt')] with open(fn, 'rt') as inf: for line in inf: - if not line.startswith('Input bam'): - spike, count = line.strip().split('\t') + if not line.startswith('Input bam') and not line.startswith('*'): + spike, count = line.strip().split('\t')[0,2] outf.write('\t'.join([s, spike, count]) + '\n') diff --git a/requirements-conda.txt b/requirements-conda.txt index a33d88e64..ca146e89a 100644 --- a/requirements-conda.txt +++ b/requirements-conda.txt @@ -11,7 +11,7 @@ mvicuna=1.0 novoalign=3.03.02 picard=1.126 prinseq=0.20.4 -samtools=1.2 +samtools=1.3.1 snpeff=4.1l trimmomatic=0.36 trinity=date.2011_11_26 diff --git a/test/unit/test_read_utils.py b/test/unit/test_read_utils.py index 269c0ebd5..14d1c2875 100644 --- a/test/unit/test_read_utils.py +++ b/test/unit/test_read_utils.py @@ -11,6 +11,7 @@ import util.file import read_utils import tools +import tools.bwa import tools.samtools from test import TestCaseWithTmp @@ -59,6 +60,27 @@ def test_purge_unmated_sra(self): self.assertEqualContents(outFastq2, expected2Fastq) +class TestBwamemIdxstats(TestCaseWithTmp): + + def setUp(self): + TestCaseWithTmp.setUp(self) + self.tempDir = tempfile.mkdtemp() + self.ebolaRef = util.file.mkstempfname('.fasta', directory=self.tempDir) + shutil.copyfile(os.path.join(util.file.get_test_input_path(), 'G5012.3.fasta'), self.ebolaRef) + self.bwa = tools.bwa.Bwa() + self.samtools = tools.samtools.SamtoolsTool() + self.bwa.index(self.ebolaRef) + + def test_bwamem_idxstats(self): + inBam = os.path.join(util.file.get_test_input_path(), 'G5012.3.testreads.bam') + outBam = util.file.mkstempfname('.bam', directory=self.tempDir) + outStats = util.file.mkstempfname('.stats.txt', directory=self.tempDir) + read_utils.bwamem_idxstats(inBam, self.ebolaRef, outBam, outStats) + with open(outStats, 'rt') as inf: + actual_count = int(inf.readline().strip().split('\t')[2]) + self.assertEqual(actual_count, self.samtools.count(outBam, opts=['-F', '4'])) + self.assertEqual(actual_count, 18873) + class TestFastqToFasta(TestCaseWithTmp): def test_fastq_to_fasta(self): diff --git a/tools/samtools.py b/tools/samtools.py index e3e02fad1..a34fe59dc 100644 --- a/tools/samtools.py +++ b/tools/samtools.py @@ -27,7 +27,7 @@ #import pysam TOOL_NAME = 'samtools' -TOOL_VERSION = '1.2' +TOOL_VERSION = '1.3.1' log = logging.getLogger(__name__) From 2930e1a6e8f86d27c5e9903208cc4cf40f626e4f Mon Sep 17 00:00:00 2001 From: Danny Park Date: Thu, 7 Jul 2016 16:27:50 -0400 Subject: [PATCH 4/5] test update --- test/unit/test_read_utils.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/test/unit/test_read_utils.py b/test/unit/test_read_utils.py index 14d1c2875..3b984e012 100644 --- a/test/unit/test_read_utils.py +++ b/test/unit/test_read_utils.py @@ -3,16 +3,17 @@ __author__ = "irwin@broadinstitute.org" import unittest -import os -import tempfile import argparse import filecmp -import util -import util.file +import os import read_utils +import shutil +import tempfile import tools import tools.bwa import tools.samtools +import util +import util.file from test import TestCaseWithTmp @@ -79,7 +80,7 @@ def test_bwamem_idxstats(self): with open(outStats, 'rt') as inf: actual_count = int(inf.readline().strip().split('\t')[2]) self.assertEqual(actual_count, self.samtools.count(outBam, opts=['-F', '4'])) - self.assertEqual(actual_count, 18873) + self.assertGreater(actual_count, 18000) class TestFastqToFasta(TestCaseWithTmp): From 11627db349e240a1ad4c9eb142ca0dc667643bfd Mon Sep 17 00:00:00 2001 From: Danny Park Date: Thu, 7 Jul 2016 16:59:52 -0400 Subject: [PATCH 5/5] quick hack patch on brittle unit test --- test/unit/test_read_utils.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/test/unit/test_read_utils.py b/test/unit/test_read_utils.py index 3b984e012..b3f1fb1ff 100644 --- a/test/unit/test_read_utils.py +++ b/test/unit/test_read_utils.py @@ -117,6 +117,7 @@ def test_fastq_bam(self): outFastq1 = util.file.mkstempfname('.fastq') outFastq2 = util.file.mkstempfname('.fastq') outHeader = util.file.mkstempfname('.txt') + outHeaderFix = util.file.mkstempfname('.fix.txt') # in1.fastq, in2.fastq -> out.bam; header params from command-line parser = read_utils.parser_fastq_to_bam(argparse.ArgumentParser()) @@ -133,6 +134,12 @@ def test_fastq_bam(self): 'SEQUENCING_CENTER=KareemAbdul-Jabbar',]) args.func_main(args) + # Note for developers: if you're fixing the tests to handle non-bugs + # (ie our testing here is too brittle), let's just replace a lot of this + # in the future with code that just reads the header, sorts it, and + # tests for equality of sorted values in the RG line (and stricter + # equality in the non-header lines). This is kind of hacky. + # samtools view for out.sam and compare to expected samtools = tools.samtools.SamtoolsTool() samtools.view(['-h'], outBamCmd, outSam) @@ -165,10 +172,18 @@ def test_fastq_bam(self): 'READ1_TRIM=1',]) args.func_main(args) + # filter out any "PG" lines from header for testing purposes + # I don't like this... let's replace later. + with open(outHeader, 'rt') as inf: + with open(outHeaderFix, 'wt') as outf: + for line in inf: + if not line.startswith('@PG'): + outf.write(line) + # compare to out1.fastq, out2.fastq, outHeader.txt to in and expected self.assertEqualContents(outFastq1, expectedFastq1) # 1 base trimmed self.assertEqualContents(outFastq2, inFastq2) - self.assertEqualContents(outHeader, inHeader) + self.assertEqualContents(outHeaderFix, inHeader) class TestSplitReads(TestCaseWithTmp):