Skip to content

Commit

Permalink
Merge pull request #157 from broadinstitute/dp-demux
Browse files Browse the repository at this point in the history
demux fixes
  • Loading branch information
dpark01 committed Oct 15, 2015
2 parents 8ae0a4d + 4a14c9f commit d2a9033
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 86 deletions.
8 changes: 1 addition & 7 deletions broad_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,14 +72,8 @@ def main_get_run_date(args):
# *** misc ***
# ===============

def iterate_lanes(runfile):
for flowcellfile in util.file.read_tabfile(runfile):
for lane in util.file.read_tabfile_dict(flowcellfile[0]):
yield lane


def iterate_wells(runfile):
for lane in iterate_lanes(runfile):
for lane in util.file.read_tabfile_dict(runfile):
for well in util.file.read_tabfile_dict(lane['barcode_file']):
yield (lane, well)

Expand Down
22 changes: 22 additions & 0 deletions illumina.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,28 @@ def parser_miseq_fastq_to_bam(parser=argparse.ArgumentParser()):
__commands__.append(('miseq_fastq_to_bam', parser_miseq_fastq_to_bam))



# ==============================
# *** extract_fc_metadata ***
# ==============================
def extract_fc_metadata(flowcell, outRunInfo, outSampleSheet):
''' Extract RunInfo.xml and SampleSheet.csv from the provided Illumina directory
'''
illumina = IlluminaDirectory(flowcell)
illumina.load()
shutil.copy(illumina.get_RunInfo().get_fname(), outRunInfo)
shutil.copy(illumina.get_SampleSheet().get_fname(), outSampleSheet)
return 0
def parser_extract_fc_metadata(parser=argparse.ArgumentParser()):
parser.add_argument('flowcell', help='Illumina directory (possibly tarball)')
parser.add_argument('outRunInfo', help='Output RunInfo.xml file.')
parser.add_argument('outSampleSheet', help='Output SampleSheet.csv file.')
util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmpDir', None)))
util.cmd.attach_main(parser, extract_fc_metadata, split_args=True)
return parser
__commands__.append(('extract_fc_metadata', parser_extract_fc_metadata))


# =======================
def full_parser():
return util.cmd.make_parser(__commands__, __doc__)
Expand Down
4 changes: 2 additions & 2 deletions pipes/rules/demux.rules
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ rule illumina_demux:
makedirs(set(map(os.path.dirname, output)))
lane = get_one_lane_from_run(wildcards.flowcell, wildcards.lane, config['seqruns_demux'])
opts = ''
for opt in ('minimum_base_quality', 'max_mismatches', 'min_mismatch_delta', 'max_no_calls', 'read_structure', 'minimum_quality'):
for opt in ('minimum_base_quality', 'max_mismatches', 'min_mismatch_delta', 'max_no_calls', 'read_structure', 'minimum_quality', 'run_start_date'):
if lane.get(opt):
opts += ' --%s=%s' % (opt, lane[opt])
shell("{config[binDir]}/illumina.py illumina_demux {lane[bustard_dir]} {wildcards.lane} {outdir} --sampleSheet={lane[barcode_file]} --sequencing_center={params.center} --outMetrics={output[1]} --flowcell={wildcards.flowcell} {opts}")
Expand All @@ -106,7 +106,7 @@ def demux_move_bams_inputs(wildcards):
rule move_bams_demux:
input: demux_move_bams_inputs
output: config['dataDir']+'/'+config['subdirs']['source']+'/{sample}.l{library}.{flowcell}.{lane}.bam'
params: logid="{sample}.l{library}.r{run}.{flowcell}.{lane}"
params: logid="{sample}.l{library}.{flowcell}.{lane}"
run:
makedirs(os.path.join(config['dataDir'], config['subdirs']['source']))
shutil.move(input[0], output[0])
Expand Down
85 changes: 13 additions & 72 deletions pipes/rules/hs_deplete.rules
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
This is a basic framework for depleting human and other contaminant
reads from NGS data. All non-human reads should remain behind.
note: the length of runtime per step is highly variable depending on
the size of the input data. Eventually replace with something TANGO-based.
note: runtime and memory per step can be highly variable depending on
the size of the input data.
"""

__author__ = 'Kristian Andersen <andersen@broadinstitute.org>, Daniel Park <dpark@broadinstitute.org>'
Expand All @@ -20,64 +20,6 @@ rule all_deplete:
sample=read_samples_file(config["samples_depletion"])),
params: LSF="-N"


"""
rule revert_bam:
input: config["dataDir"]+'/'+config["subdirs"]["source"]+'/{sample}.bam'
output: config["tmpDir"]+'/'+config["subdirs"]["depletion"]+'/{sample}.raw.bam'
resources: mem=3
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
logid="{sample}"
run:
makedirs(expand("{dir}/{subdir}",
dir=[config["dataDir"],config["tmpDir"]],
subdir=config["subdirs"]["depletion"]))
shell("{config[binDir]}/read_utils.py revert_bam_picard {input} {output} --picardOptions SORT_ORDER=queryname SANITIZE=true")

rule deplete_bmtagger:
''' This step depletes human reads using BMTagger
This sometimes takes just over 4 hrs for highly dense lanes
'''
input: config["tmpDir"]+'/'+config["subdirs"]["depletion"]+'/{sample}.raw.bam'
# expand("{dbdir}/{db}.{suffix}",
# dbdir=config["bmTaggerDbDir"],
# db=config["bmTaggerDbs_remove"],
# suffix=["bitmask","srprism.idx","srprism.map"])
output: config["tmpDir"]+ '/'+config["subdirs"]["depletion"]+'/{sample}.bmtagger_depleted.bam'
resources: mem=10
params: LSF=config.get('LSF_queues', {}).get('long', '-q forest'),
refDbs = expand("{dbdir}/{db}", dbdir=config["bmTaggerDbDir"], db=config["bmTaggerDbs_remove"]),
logid="{sample}"
shell: "{config[binDir]}/taxon_filter.py deplete_bam_bmtagger {input} {params.refDbs} {output}"

rule rmdup_mvicuna:
''' This step removes PCR duplicate reads using M-Vicuna (our custom
tool included here).
Unlike Picard MarkDuplicates, M-Vicuna does not require reads to be
previously aligned.
'''
input: config["tmpDir"]+ '/'+config["subdirs"]["depletion"]+'/{sample}.bmtagger_depleted.bam'
output: config["tmpDir"]+ '/'+config["subdirs"]["depletion"]+'/{sample}.rmdup.bam'
resources: mem=3
params: LSF=config.get('LSF_queues', {}).get('short', '-W 4:00'),
logid="{sample}"
shell: "{config[binDir]}/read_utils.py rmdup_mvicuna_bam {input} {output}"

rule deplete_blastn:
''' This step depletes human reads using BLASTN, which is more sensitive,
but much slower than BMTagger, so we run it last.
This sometimes takes just over 4 hrs for highly dense lanes.
'''
input: config["tmpDir"] +'/'+config["subdirs"]["depletion"]+'/{sample}.rmdup.bam'
output: config["dataDir"]+'/'+config["subdirs"]["depletion"]+'/{sample}.cleaned.bam'
resources: mem=15
params: LSF=config.get('LSF_queues', {}).get('long', '-q forest'),
refDbs = expand("{dbdir}/{db}", dbdir=config["blastDbDir"], db=config["blastDb_remove"]),
logid="{sample}"
shell: "{config[binDir]}/taxon_filter.py deplete_blastn_bam {input} {params.refDbs} {output}"
"""


rule depletion:
''' Runs a full human read depletion pipeline and removes PCR duplicates
'''
Expand All @@ -96,7 +38,7 @@ rule depletion:
makedirs(expand("{dir}/{subdir}",
dir=[config["dataDir"],config["tmpDir"]],
subdir=config["subdirs"]["depletion"]))
shell("{config[binDir]}/taxon_filter.py deplete_human {input} {params.revert_bam} {output} --bmtaggerDbs {params.bmtaggerDbs} --blastDbs {params.blastDbs}")
shell("{config[binDir]}/taxon_filter.py deplete_human {input} {params.revert_bam} {output} --bmtaggerDbs {params.bmtaggerDbs} --blastDbs {params.blastDbs} --JVMmemory 15g")
os.unlink(params.revert_bam)

rule filter_to_taxon:
Expand Down Expand Up @@ -124,16 +66,15 @@ def merge_one_per_sample_inputs(wildcards):
if 'seqruns_demux' not in config or not os.path.isfile(config['seqruns_demux']):
return config["dataDir"]+'/'+config["subdirs"]["depletion"] +'/'+ wildcards.sample + '.' + wildcards.adjective + '.bam'
runs = set()
for flowcell in read_samples_file(config['seqruns_demux']):
for lane in read_tab_file(flowcell):
for well in read_tab_file(lane['barcode_file']):
if well['sample'] == wildcards.sample:
if wildcards.adjective=='raw':
runs.add(os.path.join(config["dataDir"], config["subdirs"]["source"],
get_run_id(well) +'.'+ lane['flowcell'] +'.'+ lane['lane'] + '.bam'))
else:
runs.add(os.path.join(config["dataDir"], config["subdirs"]["depletion"],
get_run_id(well) +'.'+ lane['flowcell'] +'.'+ lane['lane'] +'.'+ wildcards.adjective + '.bam'))
for lane in read_tab_file(config['seqruns_demux']):
for well in read_tab_file(lane['barcode_file']):
if well['sample'] == wildcards.sample:
if wildcards.adjective=='raw':
runs.add(os.path.join(config["dataDir"], config["subdirs"]["source"],
get_run_id(well) +'.'+ lane['flowcell'] +'.'+ lane['lane'] + '.bam'))
else:
runs.add(os.path.join(config["dataDir"], config["subdirs"]["depletion"],
get_run_id(well) +'.'+ lane['flowcell'] +'.'+ lane['lane'] +'.'+ wildcards.adjective + '.bam'))
return sorted(runs)
rule merge_one_per_sample:
''' All of the above depletion steps are run once per flowcell per lane per
Expand Down
79 changes: 79 additions & 0 deletions read_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,85 @@ def parser_split_bam(parser=argparse.ArgumentParser()):

__commands__.append(('split_bam', parser_split_bam))


# =======================
# *** reheader_bam ***
# =======================


def parser_reheader_bam(parser=argparse.ArgumentParser()):
parser.add_argument('inBam', help='Input reads, BAM format.')
parser.add_argument('rgMap', help='Tabular file containing three columns: field, old, new.')
parser.add_argument('outBam', help='Output reads, BAM format.')
util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmpDir', None)))
util.cmd.attach_main(parser, main_reheader_bam)
return parser


def main_reheader_bam(args):
''' Copy a BAM file (inBam to outBam) while renaming elements of the BAM header.
The mapping file specifies which (key, old value, new value) mappings. For
example:
LB lib1 lib_one
SM sample1 Sample_1
SM sample2 Sample_2
SM sample3 Sample_3
CN broad BI
'''
# read mapping file
mapper = dict((a+':'+b, a+':'+c) for a,b,c in util.file.read_tabfile(args.rgMap))
# read and convert bam header
header_file = mkstempfname('.sam')
with open(header_file, 'wt') as outf:
for row in tools.samtools.SamtoolsTool().getHeader(args.inBam):
if row[0] == '@RG':
row = [mapper.get(x, x) for x in row]
outf.write('\t'.join(row)+'\n')
# write new bam with new header
tools.samtools.SamtoolsTool().reheader(args.inBam, header_file, args.outBam)
os.unlink(header_file)
return 0


__commands__.append(('reheader_bam', parser_reheader_bam))


def parser_reheader_bams(parser=argparse.ArgumentParser()):
parser.add_argument('rgMap', help='Tabular file containing three columns: field, old, new.')
util.cmd.common_args(parser, (('loglevel', None), ('version', None), ('tmpDir', None)))
util.cmd.attach_main(parser, main_reheader_bams)
return parser
def main_reheader_bams(args):
''' Copy BAM files while renaming elements of the BAM header.
The mapping file specifies which (key, old value, new value) mappings. For
example:
LB lib1 lib_one
SM sample1 Sample_1
SM sample2 Sample_2
SM sample3 Sample_3
CN broad BI
FN in1.bam out1.bam
FN in2.bam out2.bam
'''
# read mapping file
mapper = dict((a+':'+b, a+':'+c) for a,b,c in util.file.read_tabfile(args.rgMap) if a != 'FN')
files = list((b,c) for a,b,c in util.file.read_tabfile(args.rgMap) if a == 'FN')
header_file = mkstempfname('.sam')
# read and convert bam headers
for inBam, outBam in files:
if os.path.isfile(inBam):
with open(header_file, 'wt') as outf:
for row in tools.samtools.SamtoolsTool().getHeader(inBam):
if row[0] == '@RG':
row = [mapper.get(x, x) for x in row]
outf.write('\t'.join(row)+'\n')
# write new bam with new header
tools.samtools.SamtoolsTool().reheader(inBam, header_file, outBam)
os.unlink(header_file)
return 0
__commands__.append(('reheader_bams', parser_reheader_bams))


# ============================
# *** dup_remove_mvicuna ***
# ============================
Expand Down
1 change: 0 additions & 1 deletion requirements-pipes.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
snakemake==3.2.1
yappi==0.94
boltons==15.0.0
1 change: 0 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,3 @@

biopython==1.64
pysam==0.8.1
boltons==15.0.0
10 changes: 7 additions & 3 deletions util/genbank.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

# third-party
from Bio import Entrez
import boltons.iterutils

log = logging.getLogger(__name__)

Expand All @@ -32,6 +31,12 @@ def get_feature_table_id(featureTableFile):
return seqid


def _seq_chunks(seq, n):
# http://stackoverflow.com/a/312464/190597 (Ned Batchelder)
""" Yield successive n-sized chunks from seq."""
for i in range(0, len(seq), n):
yield seq[i:i + n]

def _fetch_from_nuccore(accessionList, destinationDir, emailAddress,
forceOverwrite=False, rettype="fasta", retmode="text",
fileExt=None, combinedFilePrefix=None, removeSeparateFiles=False,
Expand Down Expand Up @@ -77,8 +82,7 @@ def _fetch_from_nuccore(accessionList, destinationDir, emailAddress,
log.info("Fetching %s entries from GenBank: %s\n", str(len(accessionList)), ", ".join(accessionList[:10]))
outputFiles = []

for chunkNum, chunk in enumerate(boltons.iterutils.chunked_iter(accessionList, chunkSize)):
# for i,acc in enumerate(chunk):
for chunkNum, chunk in enumerate(_seq_chunks(accessionList, chunkSize)):

accString = ",".join(chunk)

Expand Down

0 comments on commit d2a9033

Please sign in to comment.