diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting.java index 7c207a3..75d285b 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ShardedBAMWriting.java @@ -43,6 +43,7 @@ import com.google.cloud.dataflow.sdk.values.PCollectionView; import com.google.cloud.dataflow.sdk.values.TupleTag; import com.google.cloud.genomics.dataflow.readers.bam.BAMIO; +import com.google.cloud.genomics.dataflow.readers.bam.BAMShard; import com.google.cloud.genomics.dataflow.readers.bam.ReadBAMTransform; import com.google.cloud.genomics.dataflow.readers.bam.ReaderOptions; import com.google.cloud.genomics.dataflow.readers.bam.ShardingPolicy; @@ -54,9 +55,11 @@ import com.google.cloud.genomics.utils.Contig; import com.google.cloud.genomics.utils.GenomicsFactory; import com.google.cloud.genomics.utils.ReadUtils; +import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; import htsjdk.samtools.BAMBlockWriter; +import htsjdk.samtools.BAMIndexer; import htsjdk.samtools.SAMFileHeader; import htsjdk.samtools.SAMRecord; import htsjdk.samtools.SAMRecordIterator; @@ -76,6 +79,7 @@ import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.logging.Logger; /** @@ -177,7 +181,7 @@ public int compare(Contig o1, Contig o2) { final SAMRecordIterator recordIterator = samReader.query( firstContig.referenceName, (int)firstContig.start + 1, (int)firstContig.end + 1, false); Contig firstShard = null; - while (recordIterator.hasNext()) { + while (recordIterator.hasNext() && result == null) { SAMRecord record = recordIterator.next(); final int alignmentStart = record.getAlignmentStart(); if (firstShard == null && alignmentStart > firstContig.start && alignmentStart < firstContig.end) { @@ -196,6 +200,14 @@ public int compare(Contig o1, Contig o2) { return result; } + private static final ShardingPolicy READ_SHARDING_POLICY = ShardingPolicy.BYTE_SIZE_POLICY; + /* new ShardingPolicy() { + @Override + public boolean shardBigEnough(BAMShard shard) { + return shard.sizeInLoci() > 10000000; + } + };*/ + private static PCollection getReadsFromBAMFile() throws IOException { LOG.info("Sharded reading of "+ options.getBAMFilePath()); @@ -208,7 +220,7 @@ private static PCollection getReadsFromBAMFile() throws IOException { contigs, readerOptions, options.getBAMFilePath(), - ShardingPolicy.BYTE_SIZE_POLICY); + READ_SHARDING_POLICY); } public static class ShardReadsTransform extends PTransform, @@ -486,7 +498,8 @@ static String combineShards(ShardedBAMWritingOptions options, String dest, idx, endIdx); final String intermediateCombineResultName = dest + "-" + String.format("%02d",stageNumber) + "-" + - String.format("%02d",idx); + String.format("%02d",idx) + "- " + + String.format("%02d",endIdx); final String combineResult = composeAndCleanupShards(storage, combinableShards, intermediateCombineResultName); combinedShards.add(combineResult); @@ -498,9 +511,43 @@ static String combineShards(ShardedBAMWritingOptions options, String dest, LOG.info("Combining a final group of " + sortedShardsNames.size() + " shards"); final String combineResult = composeAndCleanupShards(storage, sortedShardsNames, dest); - + generateIndex(options, storage, combineResult); return combineResult; } + + static void generateIndex(ShardedBAMWritingOptions options, + Storage.Objects storage, String bamFilePath) throws IOException { + final String baiFilePath = bamFilePath + ".bai"; + Stopwatch timer = Stopwatch.createStarted(); + LOG.info("Generating BAM index: " + baiFilePath); + LOG.info("Reading BAM file: " + bamFilePath); + final SamReader reader = BAMIO.openBAM(storage, bamFilePath, ValidationStringency.LENIENT, true); + + final OutputStream outputStream = + Channels.newOutputStream( + new GcsUtil.GcsUtilFactory().create(options) + .create(GcsPath.fromUri(baiFilePath), + "application/octet-stream")); + BAMIndexer indexer = new BAMIndexer(outputStream, reader.getFileHeader()); + + long processedReads = 0; + + // create and write the content + for (SAMRecord rec : reader) { + if (++processedReads % 1000000 == 0) { + dumpStats(processedReads, timer); + } + indexer.processAlignment(rec); + } + indexer.finish(); + dumpStats(processedReads, timer); + } + + static void dumpStats(long processedReads, Stopwatch timer) { + LOG.info("Processed " + processedReads + " records in " + timer + + ". Speed: " + (processedReads*1000)/timer.elapsed(TimeUnit.MILLISECONDS) + " reads/sec"); + + } } static String composeAndCleanupShards(Storage.Objects storage, @@ -515,7 +562,7 @@ static String composeAndCleanupShards(Storage.Objects storage, ArrayList sourceObjects = new ArrayList(); for (String shard : shardNames) { final GcsPath shardPath = GcsPath.fromUri(shard); - LOG.info("Adding shard " + shardPath); + LOG.info("Adding shard " + shardPath + " for result " + dest); sourceObjects.add( new SourceObjects().setName(shardPath.getObject()) ); } @@ -529,7 +576,7 @@ static String composeAndCleanupShards(Storage.Objects storage, LOG.info("Combine result is " + combineResult); for (SourceObjects sourceObject : sourceObjects) { final String shardToDelete = sourceObject.getName(); - LOG.info("Cleaning up shard " + shardToDelete); + LOG.info("Cleaning up shard " + shardToDelete + " for result " + dest); storage.delete(destPath.getBucket(), shardToDelete).execute(); } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/BAMIO.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/BAMIO.java index b8edf8b..cebd15b 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/BAMIO.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/BAMIO.java @@ -43,13 +43,18 @@ public static ReaderAndIndex openBAMAndExposeIndex(Storage.Objects storageClient ReaderAndIndex result = new ReaderAndIndex(); result.index = openIndexForPath(storageClient, gcsStoragePath); result.reader = openBAMReader( - openBAMFile(storageClient, gcsStoragePath,result.index), stringency); + openBAMFile(storageClient, gcsStoragePath,result.index), stringency, false); return result; } - public static SamReader openBAM(Storage.Objects storageClient, String gcsStoragePath, ValidationStringency stringency) throws IOException { + public static SamReader openBAM(Storage.Objects storageClient, String gcsStoragePath, + ValidationStringency stringency, boolean includeFileSource) throws IOException { return openBAMReader(openBAMFile(storageClient, gcsStoragePath, - openIndexForPath(storageClient, gcsStoragePath)), stringency); + openIndexForPath(storageClient, gcsStoragePath)), stringency, includeFileSource); + } + + public static SamReader openBAM(Storage.Objects storageClient, String gcsStoragePath, ValidationStringency stringency) throws IOException { + return openBAM(storageClient, gcsStoragePath, stringency, false); } private static SeekableStream openIndexForPath(Storage.Objects storageClient,String gcsStoragePath) { @@ -74,9 +79,14 @@ private static SamInputResource openBAMFile(Storage.Objects storageClient, Strin return samInputResource; } - private static SamReader openBAMReader(SamInputResource resource, ValidationStringency stringency) { - SamReaderFactory samReaderFactory = SamReaderFactory.makeDefault().validationStringency(stringency) + private static SamReader openBAMReader(SamInputResource resource, ValidationStringency stringency, boolean includeFileSource) { + SamReaderFactory samReaderFactory = SamReaderFactory + .makeDefault() + .validationStringency(stringency) .enable(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES); + if (includeFileSource) { + samReaderFactory.enable(SamReaderFactory.Option.INCLUDE_SOURCE_IN_RECORDS); + } final SamReader samReader = samReaderFactory.open(resource); return samReader; } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Reader.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Reader.java index 81401b5..b05cfbb 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Reader.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Reader.java @@ -110,10 +110,10 @@ void openFile() throws IOException { LOG.info("Processing unmapped"); iterator = reader.queryUnmapped(); } else if (shard.span != null) { - LOG.info("Processing span"); + LOG.info("Processing span for " + shard.contig); iterator = reader.indexing().iterator(shard.span); } else if (shard.contig.referenceName != null && !shard.contig.referenceName.isEmpty()) { - LOG.info("Processing all bases for " + shard.contig.referenceName); + LOG.info("Processing all bases for " + shard.contig); iterator = reader.query(shard.contig.referenceName, (int) shard.contig.start, (int) shard.contig.end, false); } @@ -181,7 +181,7 @@ void dumpStats() { LOG.info("Processed " + recordsProcessed + " in " + timer + ". Speed: " + (recordsProcessed*1000)/timer.elapsed(TimeUnit.MILLISECONDS) + " reads/sec" - + ", skipped other sequences " + mismatchedSequence + + ", filtered out by reference and mapping " + mismatchedSequence + ", skippedBefore " + recordsBeforeStart + ", skipped after " + recordsAfterEnd); } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Sharder.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Sharder.java index f652537..c3a8f0a 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Sharder.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/bam/Sharder.java @@ -178,6 +178,7 @@ Contig desiredContigForReference(SAMSequenceRecord reference) { } void createShardsForReference(SAMSequenceRecord reference, Contig contig) { + LOG.info("Creating shard for: " + contig); final BitSet overlappingBins = GenomicIndexUtil.regionToBins( (int) contig.start, (int) contig.end); if (overlappingBins == null) { @@ -185,6 +186,7 @@ void createShardsForReference(SAMSequenceRecord reference, Contig contig) { return; } + BAMShard currentShard = null; for (int binIndex = overlappingBins.nextSetBit(0); binIndex >= 0; binIndex = overlappingBins.nextSetBit(binIndex + 1)) { final Bin bin = index.getBinData(reference.getSequenceIndex(), binIndex); @@ -234,13 +236,16 @@ void createShardsForReference(SAMSequenceRecord reference, Contig contig) { if (shardingPolicy.shardBigEnough(currentShard)) { LOG.info("Shard size is big enough to finalize: " + currentShard.sizeInLoci() + ", " + currentShard.approximateSizeInBytes() + " bytes"); - output.output(currentShard.finalize(index, Math.min(index.getLastLocusInBin(bin), (int)contig.end))); + final BAMShard bamShard = currentShard.finalize(index, Math.min(index.getLastLocusInBin(bin), (int)contig.end)); + LOG.info("Outputting shard: " + bamShard.contig); + output.output(bamShard); currentShard = null; } } if (currentShard != null) { LOG.info("Outputting last shard of size " + - currentShard.sizeInLoci() + ", " + currentShard.approximateSizeInBytes() + " bytes"); + currentShard.sizeInLoci() + ", " + currentShard.approximateSizeInBytes() + " bytes " + + currentShard.contig); output.output(currentShard.finalize(index, (int)contig.end)); } }