Skip to content

Commit

Permalink
Refactor code for assembly, localization, and calling (#294)
Browse files Browse the repository at this point in the history
Currently, kevlar optimizes the assembly, localization, and calling operations by composing several generator functions and invoking them with multiprocessing. A lot of the benefit is lost, however: the calls to BWA and fermi-lite do take advantage of multithreading, but the coordinating code is limited by GIL. Plus, there are a tremendous number of calls to BWA for localizing, which introduces a lot of overhead.

This update takes a different approach to optimizing these operations. The `kevlar assemble`, `kevlar cutout`, `kevlar call` commands now accept partitioned input files, and the `kevlar cutout` command (replacing `kevlar localize`) makes a single call to BWA rather than one call per partition. If multithreading is going to provide any benefit, it will be at the assembly stage, but that is out of scope for this update.
  • Loading branch information
standage committed Nov 10, 2018
1 parent 7f32e70 commit d61db75
Show file tree
Hide file tree
Showing 22 changed files with 488 additions and 57 deletions.
1 change: 1 addition & 0 deletions kevlar/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from kevlar import effcount
from kevlar import partition
from kevlar import localize
from kevlar import cutout
from kevlar import call
from kevlar import alac
from kevlar import simplex
Expand Down
15 changes: 7 additions & 8 deletions kevlar/alac.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ def make_call_from_reads(queue, idx, calls, refrfile, ksize=31, delta=50,
sleep(1)
continue
reads = queue.get()
ccmatch = re.search(r'kvcc=(\d+)', reads[0].name)
cc = ccmatch.group(1) if ccmatch else None
cc = kevlar.seqio.partition_id(reads[0].name)
if cc is not None and int(cc) % 1000 == 0: # pragma: no cover
message = '[kevlar::alac::make_call_from_reads'
message += ' (thread={:d})]'.format(idx)
Expand Down Expand Up @@ -72,7 +71,7 @@ def make_call_from_reads(queue, idx, calls, refrfile, ksize=31, delta=50,
queue.task_done()


def alac(pstream, refrfile, threads=1, ksize=31, bigpart=10000, delta=50,
def alac(pstream, refrfile, threads=1, ksize=31, maxreads=10000, delta=50,
seedsize=31, maxdiff=None, inclpattern=None, exclpattern=None,
match=1, mismatch=2, gapopen=5, gapextend=0, min_ikmers=None,
maskfile=None, maskmem=1e6, maskmaxfpr=0.01, logstream=sys.stderr):
Expand Down Expand Up @@ -100,14 +99,14 @@ def alac(pstream, refrfile, threads=1, ksize=31, bigpart=10000, delta=50,
worker.setDaemon(True)
worker.start()

for np, partition in enumerate(pstream, 1):
for partid, partition in pstream:
reads = list(partition)
if len(reads) > bigpart:
if len(reads) > maxreads:
message = 'skipping partition with {:d} reads'.format(len(reads))
print('[kevlar::alac] WARNING:', message, file=logstream)
continue
if np % 1000 == 0: # pragma: no cover
message = '{} partitions scheduled for processing'.format(np)
if partid is not None and int(partid) % 1000 == 0: # pragma: no cover
message = '{} partitions scheduled for processing'.format(partid)
print('[kevlar::alac]', message, file=logstream)
part_queue.put(reads)

Expand Down Expand Up @@ -137,7 +136,7 @@ def main(args):
outstream = kevlar.open(args.out, 'w')
workflow = alac(
pstream, args.refr, threads=args.threads, ksize=args.ksize,
bigpart=args.bigpart, delta=args.delta, seedsize=args.seed_size,
maxreads=args.max_reads, delta=args.delta, seedsize=args.seed_size,
maxdiff=args.max_diff, inclpattern=args.include,
exclpattern=args.exclude, match=args.match, mismatch=args.mismatch,
gapopen=args.open, gapextend=args.extend, min_ikmers=args.min_ikmers,
Expand Down
45 changes: 41 additions & 4 deletions kevlar/assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,57 @@
# -----------------------------------------------------------------------------

import kevlar
import re
import sys


def assemble_fml_asm(readstream, logstream=sys.stderr):
reads = [r for r in readstream]
def assemble_fml_asm(partition, logstream=sys.stderr):
reads = list(partition)
assembler = kevlar.assembly.fml_asm(reads)
for n, contig in enumerate(assembler, 1):
name = 'contig{:d}'.format(n)
record = kevlar.sequence.Record(name=name, sequence=contig)
yield next(kevlar.augment.augment(reads, [record]))


def assemble(partstream, maxreads=10000, logstream=sys.stderr):
n = 0
pn = 0
upint = 10
upthreshold = upint
for partid, partition in partstream:
pn += 1

# Status update intervals on a log scale :-)
if pn in [100, 1000, 10000]:
upint = pn
if pn >= upthreshold:
upthreshold += upint
message = 'processed {} read partitions'.format(pn)
print('[kevlar::assemble]', message, file=logstream)

numreads = len(partition)
if numreads > maxreads: # pragma: no cover
message = 'skipping partition with {:d} reads'.format(numreads)
print('[kevlar::assemble] WARNING:', message, file=logstream)
continue
for contig in assemble_fml_asm(partition, logstream=logstream):
n += 1
newname = 'contig{}'.format(n)
if partid is not None:
newname += ' kvcc={}'.format(partid)
contig.name = newname
yield contig


def main(args):
reads = kevlar.parse_augmented_fastx(kevlar.open(args.augfastq, 'r'))
readstream = kevlar.parse_augmented_fastx(kevlar.open(args.augfastq, 'r'))
if args.part_id:
pstream = kevlar.parse_single_partition(readstream, args.part_id)
else:
pstream = kevlar.parse_partitioned_reads(readstream)
outstream = kevlar.open(args.out, 'w')
for contig in assemble_fml_asm(reads):
assembler = assemble(pstream, maxreads=args.max_reads,
logstream=args.logfile)
for contig in assembler:
kevlar.print_augmented_fastx(contig, outstream)
2 changes: 0 additions & 2 deletions kevlar/augment.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ def augment(augseqstream, nakedseqstream, collapsemates=False, upint=10000):
assert len(record.mates) in (0, 1)
if len(record.mates) == 1:
mateseqs[record.name] = record.mates[0]
if n > 100:
print('[kevlar::augment] done loading input', file=sys.stderr)

for record in nakedseqstream:
qual = None
Expand Down
30 changes: 20 additions & 10 deletions kevlar/call.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ def align_mates(record, refrfile):
fasta += '>mateseq{:d}\n{:s}\n'.format(n, mateseq)
cmd = 'bwa mem {:s} -'.format(refrfile)
cmdargs = cmd.split()
for seqid, start, end in bwa_align(cmdargs, seqstring=fasta):
for seqid, start, end, seq in bwa_align(cmdargs, seqstring=fasta):
yield seqid, start, end


Expand Down Expand Up @@ -125,17 +125,27 @@ def call(*args, **kwargs):
def main(args):
outstream = kevlar.open(args.out, 'w')
qinstream = kevlar.parse_augmented_fastx(kevlar.open(args.queryseq, 'r'))
queryseqs = list(qinstream)
qparts = kevlar.parse_partitioned_reads(qinstream)
contigs_by_partition = dict()
for partid, contiglist in qparts:
contigs_by_partition[partid] = contiglist

tinstream = kevlar.open(args.targetseq, 'r')
targetseqs = list(kevlar.reference.load_refr_cutouts(tinstream))
caller = call(
targetseqs, queryseqs,
args.match, args.mismatch, args.open, args.extend,
args.ksize, args.refr, args.debug, 5, args.logfile
)
targetseqs = kevlar.reference.load_refr_cutouts(tinstream)
targetparts = kevlar.parse_partitioned_reads(targetseqs)

writer = kevlar.vcf.VCFWriter(
outstream, source='kevlar::call', refr=args.refr,
)
writer.write_header()
for varcall in caller:
writer.write(varcall)

for partid, gdnas in targetparts:
contigs = contigs_by_partition[partid]
caller = call(
gdnas, contigs, match=args.match, mismatch=args.mismatch,
gapopen=args.open, gapextend=args.extend, ksize=args.ksize,
refrfile=args.refr, debug=args.debug, mindist=5,
logstream=args.logfile
)
for varcall in caller:
writer.write(varcall)
3 changes: 3 additions & 0 deletions kevlar/cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from . import gentrio
from . import partition
from . import localize
from . import cutout
from . import call
from . import alac
from . import simplex
Expand All @@ -40,6 +41,7 @@
'mutate': kevlar.mutate.main,
'gentrio': kevlar.gentrio.main,
'partition': kevlar.partition.main,
'cutout': kevlar.cutout.main,
'localize': kevlar.localize.main,
'call': kevlar.call.main,
'alac': kevlar.alac.main,
Expand All @@ -61,6 +63,7 @@
'gentrio': gentrio.subparser,
'partition': partition.subparser,
'localize': localize.subparser,
'cutout': cutout.subparser,
'call': call.subparser,
'alac': alac.subparser,
'simplex': simplex.subparser,
Expand Down
9 changes: 5 additions & 4 deletions kevlar/cli/alac.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ def subparser(subparsers):

asmbl_args = subparser.add_argument_group('Read assembly')
asmbl_args.add_argument('-p', '--part-id', type=str, metavar='ID',
help='only process partition "PID" in the input')
asmbl_args.add_argument('--bigpart', type=int, metavar='N', default=10000,
help='do not attempt to assembly any partitions '
'with more than N reads (default: 10000)')
help='only process partition "ID" in the input')
asmbl_args.add_argument('--max-reads', type=int, metavar='N',
default=10000, help='do not attempt to assemble '
'any partitions with more than N reads (default: '
'10000)')

local_args = subparser.add_argument_group('Target extraction')
local_args.add_argument('-z', '--seed-size', type=int, default=51,
Expand Down
5 changes: 5 additions & 0 deletions kevlar/cli/assemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ def subparser(subparsers):

desc = 'Assemble reads into contigs representing putative variants'
subparser = subparsers.add_parser('assemble', description=desc)
subparser.add_argument('-p', '--part-id', type=str, metavar='ID',
help='only assemble partition "ID" in the input')
subparser.add_argument('--max-reads', type=int, metavar='N', default=10000,
help='do not attempt to assemble any partitions '
'with more than N reads (default: 10000)')
subparser.add_argument('-o', '--out', metavar='FILE',
help='output file; default is terminal (stdout)')
subparser.add_argument('augfastq', help='annotated reads in augmented '
Expand Down
46 changes: 46 additions & 0 deletions kevlar/cli/cutout.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python
#
# -----------------------------------------------------------------------------
# Copyright (c) 2018 The Regents of the University of California
#
# This file is part of kevlar (http://github.com/dib-lab/kevlar) and is
# licensed under the MIT license: see LICENSE.
# -----------------------------------------------------------------------------

import re


def subparser(subparsers):
"""Define the `kevlar cutout` command-line interface."""

desc = """\
Given a reference genome and a set of contigs assembled from
variant-spanning reads, retrieve the portions of the reference genome
corresponding to the variants. NOTE: this command relies on the `bwa`
program being in the PATH environmental variable.
"""

subparser = subparsers.add_parser('cutout', description=desc)
subparser.add_argument('-d', '--delta', type=int, metavar='D',
default=50, help='retrieve the genomic interval '
'from the reference by extending beyond the span '
'of all k-mer starting positions by D bp')
subparser.add_argument('-p', '--part-id', type=str, metavar='ID',
help='only localize partition "ID" in the input')
subparser.add_argument('-o', '--out', metavar='FILE', default='-',
help='output file; default is terminal (stdout)')
subparser.add_argument('-z', '--seed-size', type=int, metavar='Z',
default=51, help='seed size; default is 51')
subparser.add_argument('-x', '--max-diff', type=int, metavar='X',
default=None, help='split and report multiple '
'reference targets if the distance between two '
'seed matches is > X; by default, X is 3 times the '
'length of the longest contig')
subparser.add_argument('--include', metavar='REGEX', type=re.escape,
help='discard alignments to any chromosomes whose '
'sequence IDs do not match the given pattern')
subparser.add_argument('--exclude', metavar='REGEX', type=re.escape,
help='discard alignments to any chromosomes whose '
'sequence IDs match the given pattern')
subparser.add_argument('contigs', help='assembled reads in Fasta format')
subparser.add_argument('refr', help='BWA indexed reference genome')

0 comments on commit d61db75

Please sign in to comment.