Skip to content

Commit

Permalink
Merge branch 'master' of github.com:broadinstitute/viral-ngs into sy-…
Browse files Browse the repository at this point in the history
…krona-integration
  • Loading branch information
yesimon committed May 2, 2016
2 parents 2e4285b + 9d52ba9 commit 6f05343
Show file tree
Hide file tree
Showing 22 changed files with 66 additions and 56 deletions.
2 changes: 1 addition & 1 deletion assembly.py
Expand Up @@ -93,7 +93,7 @@ def trim_rmdup_subsamp_reads(inBam, clipDb, outBam, n_reads=100000):
'-out',
subsampfq[0],
subsampfq[1],]
util.misc.run_and_print(cmd)
util.misc.run_and_print(cmd, check=True)
os.unlink(purgefq[0])
os.unlink(purgefq[1])

Expand Down
5 changes: 3 additions & 2 deletions pipes/rules/hs_deplete.rules
Expand Up @@ -33,12 +33,13 @@ rule depletion:
bmtaggerDbs = expand("{dbdir}/{db}", dbdir=config["bmtagger_db_dir"], db=config["bmtagger_dbs_remove"]),
blastDbs = expand("{dbdir}/{db}", dbdir=config["blast_db_dir"], db=config["blast_db_remove"]),
revert_bam = config["tmp_dir"] +'/'+config["subdirs"]["depletion"]+'/{sample}.raw.bam',
logid="{sample}"
logid="{sample}",
threads=int(config.get("number_of_threads", 1))
run:
makedirs(expand("{dir}/{subdir}",
dir=[config["data_dir"],config["tmp_dir"]],
subdir=config["subdirs"]["depletion"]))
shell("{config[bin_dir]}/taxon_filter.py deplete_human {input} {params.revert_bam} {output} --bmtaggerDbs {params.bmtaggerDbs} --blastDbs {params.blastDbs} --JVMmemory 15g")
shell("{config[bin_dir]}/taxon_filter.py deplete_human {input} {params.revert_bam} {output} --bmtaggerDbs {params.bmtaggerDbs} --blastDbs {params.blastDbs} --threads {params.threads} --JVMmemory 15g")
os.unlink(params.revert_bam)

rule filter_to_taxon:
Expand Down
4 changes: 2 additions & 2 deletions read_utils.py
Expand Up @@ -44,7 +44,7 @@ def purge_unmated(inFastq1, inFastq2, outFastq1, outFastq2, regex=r'^@(\S+)/[1|2
mergeShuffledFastqSeqsPath = os.path.join(util.file.get_scripts_path(), 'mergeShuffledFastqSeqs.pl')
cmdline = [mergeShuffledFastqSeqsPath, '-t', '-r', regex, '-f1', inFastq1, '-f2', inFastq2, '-o', tempOutput]
log.debug(' '.join(cmdline))
util.misc.run_and_print(cmdline)
util.misc.run_and_print(cmdline, check=True)
shutil.move(tempOutput + '.1.fastq', outFastq1)
shutil.move(tempOutput + '.2.fastq', outFastq2)
return 0
Expand Down Expand Up @@ -823,7 +823,7 @@ def rmdup_prinseq_fastq(inFastq, outFastq):
inFastq, '-out_bad', 'null', '-line_width', '0', '-out_good', outFastq[:-6]
]
log.debug(' '.join(cmd))
util.misc.run_and_print(cmd)
util.misc.run_and_print(cmd, check=True)


def parser_rmdup_prinseq_fastq(parser=argparse.ArgumentParser()):
Expand Down
44 changes: 25 additions & 19 deletions taxon_filter.py
Expand Up @@ -68,6 +68,7 @@ def parser_deplete_human(parser=argparse.ArgumentParser()):
help='One reference database for last (required if --taxfiltBam is specified).',
default=None
)
parser.add_argument('--threads', type=int, default=4, help='The number of threads to use in running blastn.')
parser.add_argument(
'--JVMmemory',
default=tools.picard.FilterSamReadsTool.jvmMemDefault,
Expand All @@ -91,10 +92,11 @@ def main_deplete_human(args):
args.bmtaggerDbs,
deplete_bmtagger_bam,
args.bmtaggerBam,
threads=args.threads,
JVMmemory=args.JVMmemory
)
read_utils.rmdup_mvicuna_bam(args.bmtaggerBam, args.rmdupBam, JVMmemory=args.JVMmemory)
multi_db_deplete_bam(args.rmdupBam, args.blastDbs, deplete_blastn_bam, args.blastnBam, JVMmemory=args.JVMmemory)
multi_db_deplete_bam(args.rmdupBam, args.blastDbs, deplete_blastn_bam, args.blastnBam, threads=args.threads, JVMmemory=args.JVMmemory)
if args.taxfiltBam and args.lastDb:
filter_lastal_bam(args.blastnBam, args.lastDb, args.taxfiltBam, JVMmemory=args.JVMmemory)
return 0
Expand Down Expand Up @@ -135,7 +137,7 @@ def trimmomatic(inFastq1, inFastq2, pairedOutFastq1, pairedOutFastq2, clipFasta)
)

log.debug(' '.join(javaCmd))
util.misc.run_and_print(javaCmd)
util.misc.run_and_print(javaCmd, check=True)
os.unlink(tmpUnpaired1)
os.unlink(tmpUnpaired2)

Expand Down Expand Up @@ -375,7 +377,7 @@ def filter_lastal(
'-line_width', '0', '-out_good', outFastq[:-6]
]
log.debug(' '.join(prinseqCmd))
util.misc.run_and_print(prinseqCmd)
util.misc.run_and_print(prinseqCmd, check=True)
os.unlink(filteredFastq)


Expand All @@ -396,7 +398,7 @@ def parser_filter_lastal(parser=argparse.ArgumentParser()):
# ============================


def deplete_bmtagger_bam(inBam, db, outBam, JVMmemory=None):
def deplete_bmtagger_bam(inBam, db, outBam, threads=None, JVMmemory=None):
"""
Use bmtagger to partition the input reads into ones that match at least one
of the databases and ones that don't match any of the databases.
Expand Down Expand Up @@ -504,7 +506,7 @@ def partition_bmtagger(inFastq1, inFastq2, databases, outMatch=None, outNoMatch=
curReads2, '-o', matchesFile
]
log.debug(' '.join(cmdline))
util.misc.run_and_print(cmdline)
util.misc.run_and_print(cmdline, check=True)
prevReads1, prevReads2 = curReads1, curReads2
if count < len(databases) - 1:
curReads1, curReads2 = mkstempfname(), mkstempfname()
Expand Down Expand Up @@ -636,6 +638,7 @@ def parser_deplete_bam_bmtagger(parser=argparse.ArgumentParser()):
and db.srprism.idx, db.srprism.map, etc. by srprism mkindex.'''
)
parser.add_argument('outBam', help='Output BAM file.')
parser.add_argument('--threads', type=int, default=4, help='The number of threads to use in running blastn.')
parser.add_argument(
'--JVMmemory',
default=tools.picard.FilterSamReadsTool.jvmMemDefault,
Expand All @@ -648,17 +651,17 @@ def parser_deplete_bam_bmtagger(parser=argparse.ArgumentParser()):

def main_deplete_bam_bmtagger(args):
'''Use bmtagger to deplete input reads against several databases.'''
multi_db_deplete_bam(args.inBam, args.refDbs, deplete_bmtagger_bam, args.outBam, JVMmemory=args.JVMmemory)
multi_db_deplete_bam(args.inBam, args.refDbs, deplete_bmtagger_bam, args.outBam, threads=args.threads, JVMmemory=args.JVMmemory)


__commands__.append(('deplete_bam_bmtagger', parser_deplete_bam_bmtagger))


def multi_db_deplete_bam(inBam, refDbs, deplete_method, outBam, JVMmemory=None):
def multi_db_deplete_bam(inBam, refDbs, deplete_method, outBam, threads=1, JVMmemory=None):
tmpBamIn = inBam
for db in refDbs:
tmpBamOut = mkstempfname('.bam')
deplete_method(tmpBamIn, db, tmpBamOut, JVMmemory=JVMmemory)
deplete_method(tmpBamIn, db, tmpBamOut, threads=threads, JVMmemory=JVMmemory)
if tmpBamIn != inBam:
os.unlink(tmpBamIn)
tmpBamIn = tmpBamOut
Expand All @@ -669,7 +672,7 @@ def multi_db_deplete_bam(inBam, refDbs, deplete_method, outBam, JVMmemory=None):
# ========================


def blastn_chunked_fasta(fasta, db, chunkSize=1000000):
def blastn_chunked_fasta(fasta, db, chunkSize=1000000, threads=1):
"""
Helper function: blastn a fasta file, overcoming apparent memory leaks on
an input with many query sequences, by splitting it into multiple chunks
Expand All @@ -689,7 +692,7 @@ def blastn_chunked_fasta(fasta, db, chunkSize=1000000):

chunk_hits = mkstempfname('.hits.txt')
blastnCmd = [
blastnPath, '-db', db, '-word_size', '16', '-evalue', '1e-6', '-outfmt', '6', '-max_target_seqs', '2',
blastnPath, '-db', db, '-word_size', '16', '-num_threads', str(threads), '-evalue', '1e-6', '-outfmt', '6', '-max_target_seqs', '2',
'-query', chunk_fasta, '-out', chunk_hits
]
log.debug(' '.join(blastnCmd))
Expand Down Expand Up @@ -733,7 +736,7 @@ def no_blast_hits(blastOutCombined, inFastq, outFastq):
outf.write(line1 + line2 + line3 + line4)


def deplete_blastn(inFastq, outFastq, refDbs):
def deplete_blastn(inFastq, outFastq, refDbs, threads):
'Use blastn to remove reads that match at least one of the databases.'

# Convert to fasta
Expand All @@ -744,7 +747,7 @@ def deplete_blastn(inFastq, outFastq, refDbs):
blastOutFiles = []
for db in refDbs:
log.info("running blastn on %s against %s", inFastq, db)
blastOutFiles += blastn_chunked_fasta(inFasta, db)
blastOutFiles += blastn_chunked_fasta(inFasta, db, threads)

# Combine results from different databases
blastOutCombined = mkstempfname('.txt')
Expand All @@ -761,6 +764,7 @@ def parser_deplete_blastn(parser=argparse.ArgumentParser()):
parser.add_argument('inFastq', help='Input fastq file.')
parser.add_argument('outFastq', help='Output fastq file with matching reads removed.')
parser.add_argument('refDbs', nargs='+', help='One or more reference databases for blast.')
parser.add_argument('--threads', type=int, default=4, help='The number of threads to use in running blastn.')
util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmp_dir', None)))
util.cmd.attach_main(parser, deplete_blastn, split_args=True)
return parser
Expand All @@ -769,7 +773,7 @@ def parser_deplete_blastn(parser=argparse.ArgumentParser()):
__commands__.append(('deplete_blastn', parser_deplete_blastn))


def deplete_blastn_paired(infq1, infq2, outfq1, outfq2, refDbs):
def deplete_blastn_paired(infq1, infq2, outfq1, outfq2, refDbs, threads):
'Use blastn to remove reads that match at least one of the databases.'
tmpfq1_a = mkstempfname('.fastq')
tmpfq1_b = mkstempfname('.fastq')
Expand All @@ -781,7 +785,7 @@ def deplete_blastn_paired(infq1, infq2, outfq1, outfq2, refDbs):
# (this should significantly speed up the second run of deplete_blastn)
read_utils.purge_unmated(tmpfq1_a, infq2, tmpfq1_b, tmpfq2_b)
# deplete fq2
deplete_blastn(tmpfq2_b, tmpfq2_c, refDbs)
deplete_blastn(tmpfq2_b, tmpfq2_c, refDbs, threads)
# purge fq1 of read pairs lost in fq2
read_utils.purge_unmated(tmpfq1_b, tmpfq2_c, outfq1, outfq2)

Expand All @@ -792,6 +796,7 @@ def parser_deplete_blastn_paired(parser=argparse.ArgumentParser()):
parser.add_argument('outfq1', help='Output fastq file with matching reads removed.')
parser.add_argument('outfq2', help='Output fastq file with matching reads removed.')
parser.add_argument('refDbs', nargs='+', help='One or more reference databases for blast.')
parser.add_argument('--threads', type=int, default=4, help='The number of threads to use in running blastn.')
util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmp_dir', None)))
util.cmd.attach_main(parser, deplete_blastn_paired, split_args=True)
return parser
Expand All @@ -800,7 +805,7 @@ def parser_deplete_blastn_paired(parser=argparse.ArgumentParser()):
__commands__.append(('deplete_blastn_paired', parser_deplete_blastn_paired))


def deplete_blastn_bam(inBam, db, outBam, chunkSize=1000000, JVMmemory=None):
def deplete_blastn_bam(inBam, db, outBam, threads, chunkSize=1000000, JVMmemory=None):
'Use blastn to remove reads that match at least one of the databases.'

#blastnPath = tools.blast.BlastnTool().install_and_get_path()
Expand All @@ -819,7 +824,7 @@ def deplete_blastn_bam(inBam, db, outBam, chunkSize=1000000, JVMmemory=None):
os.unlink(fastq1)
os.unlink(fastq2)
log.info("running blastn on %s pair 1 against %s", inBam, db)
blastOutFiles = blastn_chunked_fasta(fasta, db, chunkSize)
blastOutFiles = blastn_chunked_fasta(fasta, db, chunkSize, threads)
with open(blast_hits, 'wt') as outf:
for blastOutFile in blastOutFiles:
with open(blastOutFile, 'rt') as inf:
Expand Down Expand Up @@ -864,6 +869,7 @@ def parser_deplete_blastn_bam(parser=argparse.ArgumentParser()):
parser.add_argument('inBam', help='Input BAM file.')
parser.add_argument('refDbs', nargs='+', help='One or more reference databases for blast.')
parser.add_argument('outBam', help='Output BAM file with matching reads removed.')
parser.add_argument('--threads', type=int, default=4, help='The number of threads to use in running blastn.')
parser.add_argument("--chunkSize", type=int, default=1000000, help='FASTA chunk size (default: %(default)s)')
parser.add_argument(
'--JVMmemory',
Expand All @@ -878,10 +884,10 @@ def parser_deplete_blastn_bam(parser=argparse.ArgumentParser()):
def main_deplete_blastn_bam(args):
'''Use blastn to remove reads that match at least one of the specified databases.'''

def wrapper(inBam, db, outBam, JVMmemory=None):
return deplete_blastn_bam(inBam, db, outBam, chunkSize=args.chunkSize, JVMmemory=JVMmemory)
def wrapper(inBam, db, outBam, threads, JVMmemory=None):
return deplete_blastn_bam(inBam, db, outBam, threads=threads, chunkSize=args.chunkSize, JVMmemory=JVMmemory)

multi_db_deplete_bam(args.inBam, args.refDbs, wrapper, args.outBam, JVMmemory=args.JVMmemory)
multi_db_deplete_bam(args.inBam, args.refDbs, wrapper, args.outBam, threads=args.threads, JVMmemory=args.JVMmemory)
return 0


Expand Down
4 changes: 2 additions & 2 deletions test/unit/test_assembly.py
Expand Up @@ -236,7 +236,7 @@ def test_ebov_refine1(self):
imputeFasta = util.file.mkstempfname('.imputed.fasta')
refine1Fasta = util.file.mkstempfname('.refine1.fasta')
shutil.copy(inFasta, imputeFasta)
tools.picard.CreateSequenceDictionaryTool().execute(imputeFasta)
tools.picard.CreateSequenceDictionaryTool().execute(imputeFasta, overwrite=True)
tools.novoalign.NovoalignTool().index_fasta(imputeFasta)
assembly.refine_assembly(
imputeFasta,
Expand All @@ -256,7 +256,7 @@ def test_ebov_refine2(self):
refine1Fasta = util.file.mkstempfname('.refine1.fasta')
refine2Fasta = util.file.mkstempfname('.refine2.fasta')
shutil.copy(inFasta, refine1Fasta)
tools.picard.CreateSequenceDictionaryTool().execute(refine1Fasta)
tools.picard.CreateSequenceDictionaryTool().execute(refine1Fasta, overwrite=True)
tools.novoalign.NovoalignTool().index_fasta(refine1Fasta)
assembly.refine_assembly(
refine1Fasta,
Expand Down
4 changes: 2 additions & 2 deletions test/unit/test_taxon_filter.py
Expand Up @@ -146,7 +146,7 @@ def test_deplete_blastn(self):
refDb = os.path.join(tempDir, dbname)
os.symlink(os.path.join(myInputDir, dbname), refDb)
refDbs.append(refDb)
util.misc.run_and_print([makeblastdbPath, '-dbtype', 'nucl', '-in', refDb])
util.misc.run_and_print([makeblastdbPath, '-dbtype', 'nucl', '-in', refDb], check=True)

# Run deplete_blastn
outFile = os.path.join(tempDir, 'out.fastq')
Expand All @@ -172,7 +172,7 @@ def test_deplete_blastn_bam(self):
refDb = os.path.join(tempDir, dbname)
os.symlink(os.path.join(myInputDir, dbname), refDb)
refDbs.append(refDb)
util.misc.run_and_print([makeblastdbPath, '-dbtype', 'nucl', '-in', refDb])
util.misc.run_and_print([makeblastdbPath, '-dbtype', 'nucl', '-in', refDb], check=True)

# convert the input fastq's to a bam
inFastq1 = os.path.join(myInputDir, "in1.fastq")
Expand Down
2 changes: 1 addition & 1 deletion test/unit/test_tools_picard.py
Expand Up @@ -26,7 +26,7 @@ def test_fasta_index(self):
shutil.copyfile(orig_ref, inRef)
outDict = inRef[:-len(ext)] + '.dict'

picard_index.execute(inRef)
picard_index.execute(inRef, overwrite=True)

# the dict files will not be exactly the same, just the first 3 cols
with open(outDict, 'rt') as inf:
Expand Down
14 changes: 8 additions & 6 deletions tools/__init__.py
Expand Up @@ -303,7 +303,7 @@ def _patch(self, path, patch):

@property
def _package_installed(self):
result = util.misc.run_and_print(["conda", "list", "-f", "-c", "-p", self.env_path, "--json", self.package], silent=True, env=self.conda_env)
result = util.misc.run_and_print(["conda", "list", "-f", "-c", "-p", self.env_path, "--json", self.package], silent=True, check=True, env=self.conda_env)
if result.returncode == 0:
command_output = result.stdout.decode("UTF-8")
data = json.loads(self._string_from_start_of_json(command_output))
Expand Down Expand Up @@ -337,7 +337,7 @@ def verify_install(self):
def _attempt_install(self):
try:
# check for presence of conda command
util.misc.run_and_print(["conda", "-V"], silent=True, env=self.conda_env)
util.misc.run_and_print(["conda", "-V"], silent=True, check=True, env=self.conda_env)
except:
_log.debug("conda NOT installed; using custom tool install")
self._is_attempted = True
Expand All @@ -348,10 +348,10 @@ def _attempt_install(self):
# conda-build is not needed for pre-built binaries from conda channels
# though we may will need it in the future for custom local builds
# try:
# util.misc.run_and_print(["conda", "build", "-V"], silent=True, env=self.conda_env)
# util.misc.run_and_print(["conda", "build", "-V"], silent=True, check=True, env=self.conda_env)
# except:
# _log.warning("conda-build must be installed; installing...")
# util.misc.run_and_print(["conda", "install", "-y", "conda-build"])
# util.misc.run_and_print(["conda", "install", "-y", "conda-build"], check=True)

# if the package is already installed, we need to check if the version is correct
pkg_version = self.verify_install()
Expand Down Expand Up @@ -381,7 +381,7 @@ def get_installed_version(self):
run_cmd = ["conda", "list", "-c", "--json", "-f", "-p", self.env_path, self.package]


result = util.misc.run_and_print(run_cmd, silent=True, env=self.conda_env)
result = util.misc.run_and_print(run_cmd, silent=True, check=True, env=self.conda_env)
if result.returncode == 0:
try:
command_output = result.stdout.decode("UTF-8")
Expand All @@ -407,6 +407,7 @@ def uninstall_package(self):
result = util.misc.run_and_print(
run_cmd,
silent=True,
check=True,
env=self.conda_env)

if result.returncode == 0:
Expand Down Expand Up @@ -434,7 +435,7 @@ def install_package(self):
python_version = "python=" + python_version if python_version else ""
run_cmd.extend([python_version])

result = util.misc.run_and_print(run_cmd, loglevel=logging.DEBUG, env=self.conda_env)
result = util.misc.run_and_print(run_cmd, silent=True, check=True, env=self.conda_env)
try:
command_output = result.stdout.decode("UTF-8")
data = json.loads(self._string_from_start_of_json(command_output))
Expand All @@ -453,6 +454,7 @@ def install_package(self):
self._package_str
],
silent=True,
check=True,
env=self.conda_env,
)

Expand Down
2 changes: 1 addition & 1 deletion tools/blast.py
Expand Up @@ -55,7 +55,7 @@ def __init__(self, install_methods=None):
super(BlastTools, self).__init__(install_methods=install_methods)

def execute(self, *args):
util.misc.run_and_print(self.exec_path, args)
util.misc.run_and_print(self.exec_path, args, check=True)


class BlastnTool(BlastTools):
Expand Down
2 changes: 1 addition & 1 deletion tools/bmtagger.py
Expand Up @@ -35,7 +35,7 @@ def __init__(self, install_methods=None):
tools.Tool.__init__(self, install_methods=install_methods)

def execute(self, *args):
util.misc.run_and_print(self.exec_path, args)
util.misc.run_and_print(self.exec_path, args, check=True)


class BmtaggerShTool(BmtaggerTools):
Expand Down
2 changes: 1 addition & 1 deletion tools/diamond.py
Expand Up @@ -117,4 +117,4 @@ def post_download(self):
env['CC'] = 'gcc-4.9'
env['CXX'] = 'g++-4.9'
#util.misc.run_and_print(['cmake', '..'], env=env, cwd=build_dir)
util.misc.run_and_print(['make'], env=env, cwd=build_dir)
util.misc.run_and_print(['make'], env=env, cwd=build_dir, check=True)
2 changes: 1 addition & 1 deletion tools/gatk.py
Expand Up @@ -51,7 +51,7 @@ def execute(self, command, gatkOptions=None, JVMmemory=None): # pylint: disab
'-T', command
] + list(map(str, gatkOptions))
_log.debug(' '.join(tool_cmd))
util.misc.run_and_print(tool_cmd)
util.misc.run_and_print(tool_cmd, check=True)

@staticmethod
def dict_to_gatk_opts(options):
Expand Down

0 comments on commit 6f05343

Please sign in to comment.