diff --git a/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java b/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java index 5275c471ee..b4161b06e7 100644 --- a/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java +++ b/public/java/src/org/broadinstitute/sting/gatk/downsampling/PerSampleDownsamplingReadsIterator.java @@ -104,37 +104,6 @@ private boolean readyToReleaseReads() { readComparator.compare(orderedDownsampledReadsCache.peek(), earliestPendingRead) <= 0; } - private void updateEarliestPendingRead( ReadsDownsampler currentDownsampler ) { - // If there is no recorded earliest pending read and this downsampler has pending items, - // then this downsampler's first pending item becomes the new earliest pending read: - if ( earliestPendingRead == null && currentDownsampler.hasPendingItems() ) { - earliestPendingRead = currentDownsampler.peekPending(); - earliestPendingDownsampler = currentDownsampler; - } - // In all other cases, we only need to update the earliest pending read when the downsampler - // associated with it experiences a change in its pending reads, since by assuming a sorted - // read stream we're assured that each downsampler's earliest pending read will only increase - // in genomic position over time. - // - // TODO: An occasional O(samples) linear search seems like a better option than keeping the downsamplers - // TODO: sorted by earliest pending read, which would cost at least O(total_reads * (samples + log(samples))), - // TODO: but need to verify this empirically. - else if ( currentDownsampler == earliestPendingDownsampler && - (! currentDownsampler.hasPendingItems() || readComparator.compare(currentDownsampler.peekPending(), earliestPendingRead) != 0) ) { - - earliestPendingRead = null; - earliestPendingDownsampler = null; - for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { - if ( perSampleDownsampler.hasPendingItems() && - (earliestPendingRead == null || readComparator.compare(perSampleDownsampler.peekPending(), earliestPendingRead) < 0) ) { - - earliestPendingRead = perSampleDownsampler.peekPending(); - earliestPendingDownsampler = perSampleDownsampler; - } - } - } - } - private boolean fillDownsampledReadsCache() { SAMRecord prevRead = null; int numPositionalChanges = 0; @@ -152,7 +121,7 @@ private boolean fillDownsampledReadsCache() { } thisSampleDownsampler.submit(read); - updateEarliestPendingRead(thisSampleDownsampler); + processFinalizedAndPendingItems(thisSampleDownsampler); if ( prevRead != null && prevRead.getAlignmentStart() != read.getAlignmentStart() ) { numPositionalChanges++; @@ -164,7 +133,7 @@ private boolean fillDownsampledReadsCache() { if ( numPositionalChanges > 0 && numPositionalChanges % DOWNSAMPLER_POSITIONAL_UPDATE_INTERVAL == 0 ) { for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { perSampleDownsampler.signalNoMoreReadsBefore(read); - updateEarliestPendingRead(perSampleDownsampler); + processFinalizedAndPendingItems(perSampleDownsampler); } } @@ -174,18 +143,53 @@ private boolean fillDownsampledReadsCache() { if ( ! nestedSAMIterator.hasNext() ) { for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { perSampleDownsampler.signalEndOfInput(); + if ( perSampleDownsampler.hasFinalizedItems() ) { + orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems()); + } } earliestPendingRead = null; earliestPendingDownsampler = null; } - for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { - if ( perSampleDownsampler.hasFinalizedItems() ) { - orderedDownsampledReadsCache.addAll(perSampleDownsampler.consumeFinalizedItems()); + return readyToReleaseReads(); + } + + private void updateEarliestPendingRead( ReadsDownsampler currentDownsampler ) { + // If there is no recorded earliest pending read and this downsampler has pending items, + // then this downsampler's first pending item becomes the new earliest pending read: + if ( earliestPendingRead == null && currentDownsampler.hasPendingItems() ) { + earliestPendingRead = currentDownsampler.peekPending(); + earliestPendingDownsampler = currentDownsampler; + } + // In all other cases, we only need to update the earliest pending read when the downsampler + // associated with it experiences a change in its pending reads, since by assuming a sorted + // read stream we're assured that each downsampler's earliest pending read will only increase + // in genomic position over time. + // + // TODO: An occasional O(samples) linear search seems like a better option than keeping the downsamplers + // TODO: sorted by earliest pending read, which would cost at least O(total_reads * (samples + log(samples))), + // TODO: but need to verify this empirically. + else if ( currentDownsampler == earliestPendingDownsampler && + (! currentDownsampler.hasPendingItems() || readComparator.compare(currentDownsampler.peekPending(), earliestPendingRead) != 0) ) { + + earliestPendingRead = null; + earliestPendingDownsampler = null; + for ( ReadsDownsampler perSampleDownsampler : perSampleDownsamplers.values() ) { + if ( perSampleDownsampler.hasPendingItems() && + (earliestPendingRead == null || readComparator.compare(perSampleDownsampler.peekPending(), earliestPendingRead) < 0) ) { + + earliestPendingRead = perSampleDownsampler.peekPending(); + earliestPendingDownsampler = perSampleDownsampler; + } } } + } - return readyToReleaseReads(); + private void processFinalizedAndPendingItems( ReadsDownsampler currentDownsampler ) { + if ( currentDownsampler.hasFinalizedItems() ) { + orderedDownsampledReadsCache.addAll(currentDownsampler.consumeFinalizedItems()); + } + updateEarliestPendingRead(currentDownsampler); } public void remove() {