Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ public static interface Options extends GCSOutputOptions, GCSOptions {}
private static final Logger LOG = Logger.getLogger(CombineShardsFn.class.getName());

final PCollectionView<Iterable<String>> shards;
final PCollectionView<byte[]> eofContent;
final PCollectionView<byte[]> eofContents;
Aggregator<Integer, Integer> filesToCombineAggregator;
Aggregator<Integer, Integer> combinedFilesAggregator;
Aggregator<Integer, Integer> createdFilesAggregator;
Aggregator<Integer, Integer> deletedFilesAggregator;

public CombineShardsFn(PCollectionView<Iterable<String>> shards, PCollectionView<byte[]> eofContent) {
this.shards = shards;
this.eofContent = eofContent;
this.eofContents = eofContent;
filesToCombineAggregator = createAggregator("Files to combine", new SumIntegerFn());
combinedFilesAggregator = createAggregator("Files combined", new SumIntegerFn());
createdFilesAggregator = createAggregator("Created files", new SumIntegerFn());
Expand All @@ -76,20 +76,20 @@ public void processElement(DoFn<String, String>.ProcessContext c) throws Excepti
c.getPipelineOptions().as(Options.class),
c.element(),
c.sideInput(shards),
c.sideInput(eofContent));
c.sideInput(eofContents));
c.output(result);
}

String combineShards(Options options, String dest,
Iterable<String> shards, byte[] eofContent) throws IOException {
Iterable<String> srcShards, byte[] eofContent) throws IOException {
LOG.info("Combining shards into " + dest);
final Storage.Objects storage = Transport.newStorageClient(
options
.as(GCSOptions.class))
.build()
.objects();

ArrayList<String> sortedShardsNames = Lists.newArrayList(shards);
ArrayList<String> sortedShardsNames = Lists.newArrayList(srcShards);
Collections.sort(sortedShardsNames);

// Write an EOF block (empty gzip block), and put it at the end.
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/htsjdk/samtools/BAMShardIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@ public class BAMShardIndexer {
private final BAMIndexBuilder indexBuilder;

// Index of the reference for which the index is being written
int reference;
int referenceIndex;

public BAMShardIndexer(OutputStream output, SAMFileHeader header, int reference) {
indexBuilder = new BAMIndexBuilder(header.getSequenceDictionary(), reference);
final boolean isFirstIndexShard = reference == 0;
final int numReferencesToWriteInTheHeader = isFirstIndexShard ?
header.getSequenceDictionary().size() : 0;
outputWriter = new BinaryBAMShardIndexWriter(numReferencesToWriteInTheHeader, output);
this.reference = reference;
this.referenceIndex = reference;
}

public void processAlignment(final SAMRecord rec) {
Expand All @@ -40,7 +40,7 @@ public void processAlignment(final SAMRecord rec) {
* @return count of records with no coordinates.
*/
public long finish() {
final BAMIndexContent content = indexBuilder.processReference(reference);
final BAMIndexContent content = indexBuilder.processReference(referenceIndex);
outputWriter.writeReference(content);
outputWriter.close();
return indexBuilder.getNoCoordinateRecordCount();
Expand Down