Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.cloud.dataflow.sdk.values.PCollectionView;
import com.google.cloud.dataflow.sdk.values.TupleTag;
import com.google.cloud.genomics.dataflow.readers.bam.BAMIO;
import com.google.cloud.genomics.dataflow.readers.bam.BAMShard;
import com.google.cloud.genomics.dataflow.readers.bam.ReadBAMTransform;
import com.google.cloud.genomics.dataflow.readers.bam.ReaderOptions;
import com.google.cloud.genomics.dataflow.readers.bam.ShardingPolicy;
Expand All @@ -54,9 +55,11 @@
import com.google.cloud.genomics.utils.Contig;
import com.google.cloud.genomics.utils.GenomicsFactory;
import com.google.cloud.genomics.utils.ReadUtils;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;

import htsjdk.samtools.BAMBlockWriter;
import htsjdk.samtools.BAMIndexer;
import htsjdk.samtools.SAMFileHeader;
import htsjdk.samtools.SAMRecord;
import htsjdk.samtools.SAMRecordIterator;
Expand All @@ -76,6 +79,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -177,7 +181,7 @@ public int compare(Contig o1, Contig o2) {
final SAMRecordIterator recordIterator = samReader.query(
firstContig.referenceName, (int)firstContig.start + 1, (int)firstContig.end + 1, false);
Contig firstShard = null;
while (recordIterator.hasNext()) {
while (recordIterator.hasNext() && result == null) {
SAMRecord record = recordIterator.next();
final int alignmentStart = record.getAlignmentStart();
if (firstShard == null && alignmentStart > firstContig.start && alignmentStart < firstContig.end) {
Expand All @@ -196,6 +200,14 @@ public int compare(Contig o1, Contig o2) {
return result;
}

private static final ShardingPolicy READ_SHARDING_POLICY = ShardingPolicy.BYTE_SIZE_POLICY;
/* new ShardingPolicy() {
@Override
public boolean shardBigEnough(BAMShard shard) {
return shard.sizeInLoci() > 10000000;
}
};*/

private static PCollection<Read> getReadsFromBAMFile() throws IOException {
LOG.info("Sharded reading of "+ options.getBAMFilePath());

Expand All @@ -208,7 +220,7 @@ private static PCollection<Read> getReadsFromBAMFile() throws IOException {
contigs,
readerOptions,
options.getBAMFilePath(),
ShardingPolicy.BYTE_SIZE_POLICY);
READ_SHARDING_POLICY);
}

public static class ShardReadsTransform extends PTransform<PCollection<Read>,
Expand Down Expand Up @@ -486,7 +498,8 @@ static String combineShards(ShardedBAMWritingOptions options, String dest,
idx, endIdx);
final String intermediateCombineResultName = dest + "-" +
String.format("%02d",stageNumber) + "-" +
String.format("%02d",idx);
String.format("%02d",idx) + "- " +
String.format("%02d",endIdx);
final String combineResult = composeAndCleanupShards(storage,
combinableShards, intermediateCombineResultName);
combinedShards.add(combineResult);
Expand All @@ -498,9 +511,43 @@ static String combineShards(ShardedBAMWritingOptions options, String dest,
LOG.info("Combining a final group of " + sortedShardsNames.size() + " shards");
final String combineResult = composeAndCleanupShards(storage,
sortedShardsNames, dest);

generateIndex(options, storage, combineResult);
return combineResult;
}

static void generateIndex(ShardedBAMWritingOptions options,
Storage.Objects storage, String bamFilePath) throws IOException {
final String baiFilePath = bamFilePath + ".bai";
Stopwatch timer = Stopwatch.createStarted();
LOG.info("Generating BAM index: " + baiFilePath);
LOG.info("Reading BAM file: " + bamFilePath);
final SamReader reader = BAMIO.openBAM(storage, bamFilePath, ValidationStringency.LENIENT, true);

final OutputStream outputStream =
Channels.newOutputStream(
new GcsUtil.GcsUtilFactory().create(options)
.create(GcsPath.fromUri(baiFilePath),
"application/octet-stream"));
BAMIndexer indexer = new BAMIndexer(outputStream, reader.getFileHeader());

long processedReads = 0;

// create and write the content
for (SAMRecord rec : reader) {
if (++processedReads % 1000000 == 0) {
dumpStats(processedReads, timer);
}
indexer.processAlignment(rec);
}
indexer.finish();
dumpStats(processedReads, timer);
}

static void dumpStats(long processedReads, Stopwatch timer) {
LOG.info("Processed " + processedReads + " records in " + timer +
". Speed: " + (processedReads*1000)/timer.elapsed(TimeUnit.MILLISECONDS) + " reads/sec");

}
}

static String composeAndCleanupShards(Storage.Objects storage,
Expand All @@ -515,7 +562,7 @@ static String composeAndCleanupShards(Storage.Objects storage,
ArrayList<SourceObjects> sourceObjects = new ArrayList<SourceObjects>();
for (String shard : shardNames) {
final GcsPath shardPath = GcsPath.fromUri(shard);
LOG.info("Adding shard " + shardPath);
LOG.info("Adding shard " + shardPath + " for result " + dest);
sourceObjects.add( new SourceObjects().setName(shardPath.getObject()) );
}

Expand All @@ -529,7 +576,7 @@ static String composeAndCleanupShards(Storage.Objects storage,
LOG.info("Combine result is " + combineResult);
for (SourceObjects sourceObject : sourceObjects) {
final String shardToDelete = sourceObject.getName();
LOG.info("Cleaning up shard " + shardToDelete);
LOG.info("Cleaning up shard " + shardToDelete + " for result " + dest);
storage.delete(destPath.getBucket(), shardToDelete).execute();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,18 @@ public static ReaderAndIndex openBAMAndExposeIndex(Storage.Objects storageClient
ReaderAndIndex result = new ReaderAndIndex();
result.index = openIndexForPath(storageClient, gcsStoragePath);
result.reader = openBAMReader(
openBAMFile(storageClient, gcsStoragePath,result.index), stringency);
openBAMFile(storageClient, gcsStoragePath,result.index), stringency, false);
return result;
}

public static SamReader openBAM(Storage.Objects storageClient, String gcsStoragePath, ValidationStringency stringency) throws IOException {
public static SamReader openBAM(Storage.Objects storageClient, String gcsStoragePath,
ValidationStringency stringency, boolean includeFileSource) throws IOException {
return openBAMReader(openBAMFile(storageClient, gcsStoragePath,
openIndexForPath(storageClient, gcsStoragePath)), stringency);
openIndexForPath(storageClient, gcsStoragePath)), stringency, includeFileSource);
}

public static SamReader openBAM(Storage.Objects storageClient, String gcsStoragePath, ValidationStringency stringency) throws IOException {
return openBAM(storageClient, gcsStoragePath, stringency, false);
}

private static SeekableStream openIndexForPath(Storage.Objects storageClient,String gcsStoragePath) {
Expand All @@ -74,9 +79,14 @@ private static SamInputResource openBAMFile(Storage.Objects storageClient, Strin
return samInputResource;
}

private static SamReader openBAMReader(SamInputResource resource, ValidationStringency stringency) {
SamReaderFactory samReaderFactory = SamReaderFactory.makeDefault().validationStringency(stringency)
private static SamReader openBAMReader(SamInputResource resource, ValidationStringency stringency, boolean includeFileSource) {
SamReaderFactory samReaderFactory = SamReaderFactory
.makeDefault()
.validationStringency(stringency)
.enable(SamReaderFactory.Option.CACHE_FILE_BASED_INDEXES);
if (includeFileSource) {
samReaderFactory.enable(SamReaderFactory.Option.INCLUDE_SOURCE_IN_RECORDS);
}
final SamReader samReader = samReaderFactory.open(resource);
return samReader;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ void openFile() throws IOException {
LOG.info("Processing unmapped");
iterator = reader.queryUnmapped();
} else if (shard.span != null) {
LOG.info("Processing span");
LOG.info("Processing span for " + shard.contig);
iterator = reader.indexing().iterator(shard.span);
} else if (shard.contig.referenceName != null && !shard.contig.referenceName.isEmpty()) {
LOG.info("Processing all bases for " + shard.contig.referenceName);
LOG.info("Processing all bases for " + shard.contig);
iterator = reader.query(shard.contig.referenceName, (int) shard.contig.start,
(int) shard.contig.end, false);
}
Expand Down Expand Up @@ -181,7 +181,7 @@ void dumpStats() {
LOG.info("Processed " + recordsProcessed +
" in " + timer +
". Speed: " + (recordsProcessed*1000)/timer.elapsed(TimeUnit.MILLISECONDS) + " reads/sec"
+ ", skipped other sequences " + mismatchedSequence
+ ", filtered out by reference and mapping " + mismatchedSequence
+ ", skippedBefore " + recordsBeforeStart
+ ", skipped after " + recordsAfterEnd);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,13 +178,15 @@ Contig desiredContigForReference(SAMSequenceRecord reference) {
}

void createShardsForReference(SAMSequenceRecord reference, Contig contig) {
LOG.info("Creating shard for: " + contig);
final BitSet overlappingBins = GenomicIndexUtil.regionToBins(
(int) contig.start, (int) contig.end);
if (overlappingBins == null) {
LOG.warning("No overlapping bins for " + contig.start + ":" + contig.end);
return;
}


BAMShard currentShard = null;
for (int binIndex = overlappingBins.nextSetBit(0); binIndex >= 0; binIndex = overlappingBins.nextSetBit(binIndex + 1)) {
final Bin bin = index.getBinData(reference.getSequenceIndex(), binIndex);
Expand Down Expand Up @@ -234,13 +236,16 @@ void createShardsForReference(SAMSequenceRecord reference, Contig contig) {
if (shardingPolicy.shardBigEnough(currentShard)) {
LOG.info("Shard size is big enough to finalize: " +
currentShard.sizeInLoci() + ", " + currentShard.approximateSizeInBytes() + " bytes");
output.output(currentShard.finalize(index, Math.min(index.getLastLocusInBin(bin), (int)contig.end)));
final BAMShard bamShard = currentShard.finalize(index, Math.min(index.getLastLocusInBin(bin), (int)contig.end));
LOG.info("Outputting shard: " + bamShard.contig);
output.output(bamShard);
currentShard = null;
}
}
if (currentShard != null) {
LOG.info("Outputting last shard of size " +
currentShard.sizeInLoci() + ", " + currentShard.approximateSizeInBytes() + " bytes");
currentShard.sizeInLoci() + ", " + currentShard.approximateSizeInBytes() + " bytes "
+ currentShard.contig);
output.output(currentShard.finalize(index, (int)contig.end));
}
}
Expand Down