Skip to content

Commit

Permalink
Fix bug in the PerSampleDownsamplingReadsIterator that could lead to …
Browse files Browse the repository at this point in the history
…excessive memory usage at traversal startup

This is a MUST-HAVE update for GATK 2.3 users who want to try out the new
ability to use -dcov with ReadWalkers.
  • Loading branch information
droazen committed Dec 19, 2012
1 parent 28e02c2 commit d080dce
Showing 1 changed file with 41 additions and 37 deletions.
Expand Up @@ -104,37 +104,6 @@ private boolean readyToReleaseReads() {
readComparator.compare(orderedDownsampledReadsCache.peek(), earliestPendingRead) <= 0; readComparator.compare(orderedDownsampledReadsCache.peek(), earliestPendingRead) <= 0;
} }


private void updateEarliestPendingRead( ReadsDownsampler<SAMRecord> currentDownsampler ) {
// If there is no recorded earliest pending read and this downsampler has pending items,
// then this downsampler's first pending item becomes the new earliest pending read:
if ( earliestPendingRead == null && currentDownsampler.hasPendingItems() ) {
earliestPendingRead = currentDownsampler.peekPending();
earliestPendingDownsampler = currentDownsampler;
}
// In all other cases, we only need to update the earliest pending read when the downsampler
// associated with it experiences a change in its pending reads, since by assuming a sorted
// read stream we're assured that each downsampler's earliest pending read will only increase
// in genomic position over time.
//
// TODO: An occasional O(samples) linear search seems like a better option than keeping the downsamplers
// TODO: sorted by earliest pending read, which would cost at least O(total_reads * (samples + log(samples))),
// TODO: but need to verify this empirically.
else if ( currentDownsampler == earliestPendingDownsampler &&
(! currentDownsampler.hasPendingItems() || readComparator.compare(currentDownsampler.peekPending(), earliestPendingRead) != 0) ) {

earliestPendingRead = null;
earliestPendingDownsampler = null;
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
if ( perSampleDownsampler.hasPendingItems() &&
(earliestPendingRead == null || readComparator.compare(perSampleDownsampler.peekPending(), earliestPendingRead) < 0) ) {

earliestPendingRead = perSampleDownsampler.peekPending();
earliestPendingDownsampler = perSampleDownsampler;
}
}
}
}

private boolean fillDownsampledReadsCache() { private boolean fillDownsampledReadsCache() {
SAMRecord prevRead = null; SAMRecord prevRead = null;
int numPositionalChanges = 0; int numPositionalChanges = 0;
Expand All @@ -152,7 +121,7 @@ private boolean fillDownsampledReadsCache() {
} }


thisSampleDownsampler.submit(read); thisSampleDownsampler.submit(read);
updateEarliestPendingRead(thisSampleDownsampler); processFinalizedAndPendingItems(thisSampleDownsampler);


if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) { if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) {
numPositionalChanges++; numPositionalChanges++;
Expand All @@ -164,7 +133,7 @@ private boolean fillDownsampledReadsCache() {
if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) { if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) {
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) { for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
perSampleDownsampler.signalNoMoreReadsBefore(read); perSampleDownsampler.signalNoMoreReadsBefore(read);
updateEarliestPendingRead(perSampleDownsampler); processFinalizedAndPendingItems(perSampleDownsampler);
} }
} }


Expand All @@ -174,18 +143,53 @@ private boolean fillDownsampledReadsCache() {
if ( ! nestedSAMIterator.hasNext() ) { if ( ! nestedSAMIterator.hasNext() ) {
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) { for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
perSampleDownsampler.signalEndOfInput(); perSampleDownsampler.signalEndOfInput();
if ( perSampleDownsampler.hasFinalizedItems() ) {
orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems());
}
} }
earliestPendingRead = null; earliestPendingRead = null;
earliestPendingDownsampler = null; earliestPendingDownsampler = null;
} }


for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) { return readyToReleaseReads();
if ( perSampleDownsampler.hasFinalizedItems() ) { }
orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems());
private void updateEarliestPendingRead( ReadsDownsampler<SAMRecord> currentDownsampler ) {
// If there is no recorded earliest pending read and this downsampler has pending items,
// then this downsampler's first pending item becomes the new earliest pending read:
if ( earliestPendingRead == null && currentDownsampler.hasPendingItems() ) {
earliestPendingRead = currentDownsampler.peekPending();
earliestPendingDownsampler = currentDownsampler;
}
// In all other cases, we only need to update the earliest pending read when the downsampler
// associated with it experiences a change in its pending reads, since by assuming a sorted
// read stream we're assured that each downsampler's earliest pending read will only increase
// in genomic position over time.
//
// TODO: An occasional O(samples) linear search seems like a better option than keeping the downsamplers
// TODO: sorted by earliest pending read, which would cost at least O(total_reads * (samples + log(samples))),
// TODO: but need to verify this empirically.
else if ( currentDownsampler == earliestPendingDownsampler &&
(! currentDownsampler.hasPendingItems() || readComparator.compare(currentDownsampler.peekPending(), earliestPendingRead) != 0) ) {

earliestPendingRead = null;
earliestPendingDownsampler = null;
for ( ReadsDownsampler<SAMRecord> perSampleDownsampler : perSampleDownsamplers.values() ) {
if ( perSampleDownsampler.hasPendingItems() &&
(earliestPendingRead == null || readComparator.compare(perSampleDownsampler.peekPending(), earliestPendingRead) < 0) ) {

earliestPendingRead = perSampleDownsampler.peekPending();
earliestPendingDownsampler = perSampleDownsampler;
}
} }
} }
}


return readyToReleaseReads(); private void processFinalizedAndPendingItems( ReadsDownsampler<SAMRecord> currentDownsampler ) {
if ( currentDownsampler.hasFinalizedItems() ) {
orderedDownsampledReadsCache.addAll(currentDownsampler.consumeFinalizedItems());
}
updateEarliestPendingRead(currentDownsampler);
} }


public void remove() { public void remove() {
Expand Down

0 comments on commit d080dce

Please sign in to comment.