Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes to fetch real BAMs #69

Merged
merged 5 commits into from
Apr 15, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ Run the tests in a real browser:
grunt browsertests
open tests/runner.html

To iterate on code while running the type checker:

grunt watchFlow

To continuously regenerate the combined JS, run:

grunt browserify:watchDist
Expand All @@ -40,4 +36,10 @@ To continuously regenerate the testing JS, run:

grunt browserify:watchTest

To typecheck the code, run

flow status .

For best results, use one of the flowtype editor integrations.

[hs]: https://github.com/nodeapps/http-server
8 changes: 8 additions & 0 deletions src/ContigInterval.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ class ContigInterval<T: (number|string)> {
this.interval.containsInterval(other.interval));
}

/*
This method doesn't typecheck. See https://github.com/facebook/flow/issues/388
isAfterInterval(other: ContigInterval): boolean {
return (this.contig > other.contig ||
(this.contig === other.contig && this.start() > other.start()));
}
*/

toString(): string {
return `${this.contig}:${this.start()}-${this.stop()}`;
}
Expand Down
40 changes: 28 additions & 12 deletions src/RemoteFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class RemoteFile {
}

getFromNetwork(start: number, stop: number): Q.Promise<ArrayBuffer> {
var length = stop - start + 1;
if (length > 50000000) {
throw `Monster request: Won't fetch ${length} bytes from ${this.url}`;
}

var xhr = new XMLHttpRequest();
xhr.open('GET', this.url);
xhr.responseType = 'arraybuffer';
Expand All @@ -110,19 +115,13 @@ class RemoteFile {
this.chunks.push(newChunk);

// Record the full file length if it's available.
var contentRange = xhr.getResponseHeader('Content-Range');
if (contentRange) {
var m = /\/(\d+)$/.exec(contentRange);
if (m) {
var size = Number(m[1]);
if (this.fileLength != -1 && this.fileLength != size) {
console.warn(`Size of remote file ${this.url} changed from ` +
`${this.fileLength} to ${size}`);
} else {
this.fileLength = size;
}
var size = this._getLengthFromContentRange(xhr);
if (size !== null && size !== undefined) {
if (this.fileLength != -1 && this.fileLength != size) {
console.warn(`Size of remote file ${this.url} changed from ` +
`${this.fileLength} to ${size}`);
} else {
console.warn(`Received improper Content-Range value for ${this.url}: ${contentRange}`);
this.fileLength = size;
}
}

Expand All @@ -149,6 +148,23 @@ class RemoteFile {
return deferred.promise;
}

// Attempting to access Content-Range directly may raise security errors.
// This ensures the access is safe before making it.
_getLengthFromContentRange(xhr: XMLHttpRequest): ?number {
if (!/Content-Range/i.exec(xhr.getAllResponseHeaders())) {
return null;
}

var contentRange = xhr.getResponseHeader('Content-Range');
var m = /\/(\d+)$/.exec(contentRange);
if (m) {
return Number(m[1]);
}
console.warn(`Received improper Content-Range value for ` +
`${this.url}: ${contentRange}`);
return null;
}

clearCache() {
this.chunks = [];
}
Expand Down
20 changes: 18 additions & 2 deletions src/bai.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var _ = require('underscore');
function computeIndexChunks(buffer) {
var view = new jDataView(buffer, 0, buffer.byteLength, true /* little endian */);

var minBlockIndex = Infinity;
var contigStartOffsets = [];
view.getInt32(); // magic
var n_ref = view.getInt32();
Expand All @@ -35,13 +36,22 @@ function computeIndexChunks(buffer) {
view.skip(n_chunk * 16);
}
var n_intv = view.getInt32();
view.skip(n_intv * 8);
if (n_intv) {
var buf = view.getBytes(8),
jb = new jBinary(buf, bamTypes.TYPE_SET),
offset = jb.read('VirtualOffset'),
coffset = offset.coffset + (offset.uoffset ? 65536 : 0);
if (coffset) {
minBlockIndex = Math.min(coffset, minBlockIndex);
}
view.skip((n_intv - 1) * 8);
}
}
contigStartOffsets.push(view.tell());

return {
chunks: _.zip(_.initial(contigStartOffsets), _.rest(contigStartOffsets)),
minBlockIndex: 0 // TODO: compute this, it tightens the initial header request
minBlockIndex
};
}

Expand Down Expand Up @@ -165,6 +175,12 @@ class BaiFile {
return immediate.getChunksForInterval(range);
});
}

getHeaderSize(): Q.Promise<number> {
return this.immediate.then(immediate => {
return immediate.indexChunks.minBlockIndex;
});
}
}


Expand Down
191 changes: 134 additions & 57 deletions src/bam.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
'use strict';

import type * as RemoteFile from './RemoteFile';
import type * as Q from 'q';
import type * as VirtualOffset from './VirtualOffset';

var jBinary = require('jbinary'),
_ = require('underscore');
_ = require('underscore'),
Q = require('q');

var bamTypes = require('./formats/bamTypes'),
utils = require('./utils'),
BaiFile = require('./bai'),
ContigInterval = require('./ContigInterval');
ContigInterval = require('./ContigInterval'),
VirtualOffset = require('./VirtualOffset');


/**
Expand All @@ -36,22 +36,143 @@ function filterAlignments(alignments: Object[],
}
});
}


// TODO: import from src/bai.js
type Chunk = {
chunk_beg: VirtualOffset;
chunk_end: VirtualOffset;
}

var kMaxFetch = 65536 * 2;


// This tracks how many bytes were read.
function readAlignmentsToEnd(buffer: ArrayBuffer) {
var jb = new jBinary(buffer, bamTypes.TYPE_SET);
var alignments = [];
var lastStartOffset = 0;
try {
while (jb.tell() < buffer.byteLength) {
var alignment = jb.read('ThinBamAlignment');
if (!alignment) break;
alignments.push(alignment.contents);
lastStartOffset = jb.tell();
}
// Code gets here if the compression block ended exactly at the end of
// an Alignment.
} catch (e) {
// Partial record
if (!(e instanceof RangeError)) {
throw e;
}
}

return {
alignments,
lastByteRead: lastStartOffset - 1
};
}

// Given an offset in a concatenated buffer, determine the offset it
// corresponds to in the original buffer.
function splitOffset(buffers: ArrayBuffer[], chunk: Chunk, lastByteRead: number): number {
for (var i = 0; i < buffers.length - 1; i++) {
lastByteRead -= buffers[i].byteLength;
}
if (lastByteRead < 0) {
throw 'Last byte read was not in last chunk';
}

if (buffers.length == 1) {
lastByteRead += chunk.chunk_beg.uoffset;
}

return lastByteRead;
}

// Fetch alignments from the remote source at the locations specified by Chunks.
// This can potentially result in many network requests.
// The returned promise is fulfilled once it can be proved that no more
// alignments need to be fetched.
function fetchAlignments(remoteFile: RemoteFile,
idxRange: ContigInterval<number>,
contained: boolean,
chunks: Chunk[],
alignments: Object[]): Q.Promise<Object[]> {
if (chunks.length === 0) {
return Q.when(alignments);
}

// Never fetch more than 128k at a time -- this reduces contention on the
// main thread and can avoid sending unnecessary bytes over the network.
var chunk = chunks[0],
chunk_beg = chunk.chunk_beg.coffset,
chunk_end = chunk.chunk_end.coffset;
var bytesToFetch = Math.min(kMaxFetch, (chunk_end + 65535) - chunk_beg);
return remoteFile.getBytes(chunk_beg, bytesToFetch).then(buffer => {
var blocks = utils.inflateConcatenatedGzip(buffer, chunk_end - chunk_beg);

// If the chunk hasn't been exhausted, resume it at an appropriate place.
// The last block needs to be re-read, since it may not have been exhausted.
var lastBlock = blocks[blocks.length - 1],
lastByte = chunk_beg + lastBlock.offset - 1,
newChunk = null;
if (lastByte < chunk_end) {
newChunk = {
chunk_beg: new VirtualOffset(lastByte + 1, 0),
chunk_end: chunk.chunk_end
};
}

var buffers = blocks.map(x => x.buffer);
buffers[0] = buffers[0].slice(chunk.chunk_beg.uoffset);
var decomp = utils.concatArrayBuffers(buffers);
var {alignments: newAlignments, lastByteRead} = readAlignmentsToEnd(decomp);
if (newChunk) {
var lastUOffset = splitOffset(buffers, chunk, lastByteRead);
newChunk.chunk_beg.uoffset = lastUOffset + 1;
}
alignments = alignments.concat(
filterAlignments(newAlignments, idxRange, contained));

// Optimization: if the last alignment started after the requested range,
// then no other chunks can possibly contain matching alignments.
var lastAlignment = newAlignments[newAlignments.length - 1],
lastStart = lastAlignment.pos,
lastRange = new ContigInterval(lastAlignment.refID, lastStart, lastStart + 1);
// TODO: use contigInterval.isAfterInterval when that's possible.
if (lastRange.contig > idxRange.contig ||
(lastRange.contig == idxRange.contig && lastRange.start() > idxRange.stop())) {
return Q.when(alignments);
} else {
return fetchAlignments(remoteFile,
idxRange,
contained,
(newChunk ? [newChunk] : []).concat(_.rest(chunks)),
alignments);
}
});
}


class Bam {
index: ?BaiFile;
header: Q.Promise<Object>;

constructor(remoteFile: RemoteFile, remoteIndexFile?: RemoteFile) {
this.remoteFile = remoteFile;
// TODO: compute 65535 from index chunks
this.header = this.remoteFile.getBytes(0, 65535).then(buf => {
var decomp = utils.inflateGzip(buf);
var jb = new jBinary(decomp, bamTypes.TYPE_SET);
return jb.read('BamHeader');
this.index = remoteIndexFile ? new BaiFile(remoteIndexFile) : null;

var sizePromise = this.index ? this.index.getHeaderSize() : Q.when(2 * 65535);
this.header = sizePromise.then(size => {
return this.remoteFile.getBytes(0, size).then(buf => {
var decomp = utils.inflateGzip(buf);
var jb = new jBinary(decomp, bamTypes.TYPE_SET);
return jb.read('BamHeader');
});
});
this.header.done();

this.index = remoteIndexFile ? new BaiFile(remoteIndexFile) : null;
}

/**
Expand All @@ -65,7 +186,7 @@ class Bam {
*/
readAll(thinReads?: boolean): Q.Promise<Object> {
return this.remoteFile.getAll().then(buf => {
var decomp = utils.concatArrayBuffers(utils.inflateConcatenatedGzip(buf));
var decomp = utils.inflateGzip(buf);
var jb = new jBinary(decomp, bamTypes.TYPE_SET);
var o = jb.read(thinReads ? 'ThinBamFile' : 'BamFile');
// Do some mild re-shaping.
Expand All @@ -74,44 +195,6 @@ class Bam {
});
}

/**
* Read alignments for a chunk of the BAM file.
* If stop is omitted, reads alignments to the end of the compression block.
*/
readChunk(start: VirtualOffset, stop?: VirtualOffset): Q.Promise<Object[]> {
var lastCOffset = (stop ? stop.coffset : start.coffset);
// Blocks are no larger than 64k when compressed
return this.remoteFile.getBytes(start.coffset,
lastCOffset + 65535).then(buf => {
var blocks = utils.inflateConcatenatedGzip(buf, lastCOffset - start.coffset);
if (stop) {
var lastBlock = blocks[blocks.length - 1];
blocks[blocks.length - 1] = lastBlock.slice(0, stop.uoffset);
}
blocks[0] = blocks[0].slice(start.uoffset);
var decomp = utils.concatArrayBuffers(blocks);

var jb = new jBinary(decomp, bamTypes.TYPE_SET);
var alignments = [];
try {
while (jb.tell() < decomp.byteLength) {
var alignment = jb.read('ThinBamAlignment');
if (!alignment) break;
alignments.push(alignment.contents);
}
// Code gets here if the compression block ended exactly at the end of
// an Alignment.
} catch (e) {
// If stop was specified, it must be precise.
// Otherwise, allow partial records.
if (stop || !(e instanceof RangeError)) {
throw e;
}
}
return alignments;
});
}

/**
* Map a contig name to a contig index.
*/
Expand Down Expand Up @@ -142,13 +225,7 @@ class Bam {
return this.getContigIndex(range.contig).then(contigIdx => {
var idxRange = new ContigInterval(contigIdx, range.start(), range.stop());
return index.getChunksForInterval(idxRange).then(chunks => {
if (chunks.length > 1) {
throw 'Multi-chunk queries are not implemented';
}
var c = chunks[0];
return this.readChunk(c.chunk_beg, c.chunk_end).then(alignments => {
return filterAlignments(alignments, idxRange, contained);
});
return fetchAlignments(this.remoteFile, idxRange, contained, chunks, []);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/formats/bamTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var TYPE_SET: any = {
var u64 = this.baseRead();
return new VirtualOffset(
// offset of beginning of gzip block in the compressed file.
u64.hi * 65536 + (u64.lo >> 16),
u64.hi * 65536 + (u64.lo >>> 16),
// offset of data within the decompressed block
u64.lo & 0xffff
);
Expand Down
Loading