Skip to content

Commit

Permalink
Merge 338f843 into 1edb037
Browse files Browse the repository at this point in the history
  • Loading branch information
danvk committed Apr 15, 2015
2 parents 1edb037 + 338f843 commit 3ac2b80
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 115 deletions.
10 changes: 6 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ Run the tests in a real browser:
grunt browsertests
open tests/runner.html

To iterate on code while running the type checker:

grunt watchFlow

To continuously regenerate the combined JS, run:

grunt browserify:watchDist
Expand All @@ -40,4 +36,10 @@ To continuously regenerate the testing JS, run:

grunt browserify:watchTest

To typecheck the code, run

flow status .

For best results, use one of the flowtype editor integrations.

[hs]: https://github.com/nodeapps/http-server
8 changes: 8 additions & 0 deletions src/ContigInterval.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ class ContigInterval<T: (number|string)> {
this.interval.containsInterval(other.interval));
}

/*
This method doesn't typecheck. See https://github.com/facebook/flow/issues/388
isAfterInterval(other: ContigInterval): boolean {
return (this.contig > other.contig ||
(this.contig === other.contig && this.start() > other.start()));
}
*/

toString(): string {
return `${this.contig}:${this.start()}-${this.stop()}`;
}
Expand Down
40 changes: 28 additions & 12 deletions src/RemoteFile.js
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ class RemoteFile {
}

getFromNetwork(start: number, stop: number): Q.Promise<ArrayBuffer> {
var length = stop - start + 1;
if (length > 50000000) {
throw `Monster request: Won't fetch ${length} bytes from ${this.url}`;
}

var xhr = new XMLHttpRequest();
xhr.open('GET', this.url);
xhr.responseType = 'arraybuffer';
Expand All @@ -110,19 +115,13 @@ class RemoteFile {
this.chunks.push(newChunk);

// Record the full file length if it's available.
var contentRange = xhr.getResponseHeader('Content-Range');
if (contentRange) {
var m = /\/(\d+)$/.exec(contentRange);
if (m) {
var size = Number(m[1]);
if (this.fileLength != -1 && this.fileLength != size) {
console.warn(`Size of remote file ${this.url} changed from ` +
`${this.fileLength} to ${size}`);
} else {
this.fileLength = size;
}
var size = this._getLengthFromContentRange(xhr);
if (size !== null && size !== undefined) {
if (this.fileLength != -1 && this.fileLength != size) {
console.warn(`Size of remote file ${this.url} changed from ` +
`${this.fileLength} to ${size}`);
} else {
console.warn(`Received improper Content-Range value for ${this.url}: ${contentRange}`);
this.fileLength = size;
}
}

Expand All @@ -149,6 +148,23 @@ class RemoteFile {
return deferred.promise;
}

// Attempting to access Content-Range directly may raise security errors.
// This ensures the access is safe before making it.
_getLengthFromContentRange(xhr: XMLHttpRequest): ?number {
if (!/Content-Range/i.exec(xhr.getAllResponseHeaders())) {
return null;
}

var contentRange = xhr.getResponseHeader('Content-Range');
var m = /\/(\d+)$/.exec(contentRange);
if (m) {
return Number(m[1]);
}
console.warn(`Received improper Content-Range value for ` +
`${this.url}: ${contentRange}`);
return null;
}

clearCache() {
this.chunks = [];
}
Expand Down
20 changes: 18 additions & 2 deletions src/bai.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var _ = require('underscore');
function computeIndexChunks(buffer) {
var view = new jDataView(buffer, 0, buffer.byteLength, true /* little endian */);

var minBlockIndex = Infinity;
var contigStartOffsets = [];
view.getInt32(); // magic
var n_ref = view.getInt32();
Expand All @@ -35,13 +36,22 @@ function computeIndexChunks(buffer) {
view.skip(n_chunk * 16);
}
var n_intv = view.getInt32();
view.skip(n_intv * 8);
if (n_intv) {
var buf = view.getBytes(8),
jb = new jBinary(buf, bamTypes.TYPE_SET),
offset = jb.read('VirtualOffset'),
coffset = offset.coffset + (offset.uoffset ? 65536 : 0);
if (coffset) {
minBlockIndex = Math.min(coffset, minBlockIndex);
}
view.skip((n_intv - 1) * 8);
}
}
contigStartOffsets.push(view.tell());

return {
chunks: _.zip(_.initial(contigStartOffsets), _.rest(contigStartOffsets)),
minBlockIndex: 0 // TODO: compute this, it tightens the initial header request
minBlockIndex
};
}

Expand Down Expand Up @@ -165,6 +175,12 @@ class BaiFile {
return immediate.getChunksForInterval(range);
});
}

getHeaderSize(): Q.Promise<number> {
return this.immediate.then(immediate => {
return immediate.indexChunks.minBlockIndex;
});
}
}


Expand Down
191 changes: 134 additions & 57 deletions src/bam.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
'use strict';

import type * as RemoteFile from './RemoteFile';
import type * as Q from 'q';
import type * as VirtualOffset from './VirtualOffset';

var jBinary = require('jbinary'),
_ = require('underscore');
_ = require('underscore'),
Q = require('q');

var bamTypes = require('./formats/bamTypes'),
utils = require('./utils'),
BaiFile = require('./bai'),
ContigInterval = require('./ContigInterval');
ContigInterval = require('./ContigInterval'),
VirtualOffset = require('./VirtualOffset');


/**
Expand All @@ -36,22 +36,143 @@ function filterAlignments(alignments: Object[],
}
});
}


// TODO: import from src/bai.js
type Chunk = {
chunk_beg: VirtualOffset;
chunk_end: VirtualOffset;
}

var kMaxFetch = 65536 * 2;


// This tracks how many bytes were read.
function readAlignmentsToEnd(buffer: ArrayBuffer) {
var jb = new jBinary(buffer, bamTypes.TYPE_SET);
var alignments = [];
var lastStartOffset = 0;
try {
while (jb.tell() < buffer.byteLength) {
var alignment = jb.read('ThinBamAlignment');
if (!alignment) break;
alignments.push(alignment.contents);
lastStartOffset = jb.tell();
}
// Code gets here if the compression block ended exactly at the end of
// an Alignment.
} catch (e) {
// Partial record
if (!(e instanceof RangeError)) {
throw e;
}
}

return {
alignments,
lastByteRead: lastStartOffset - 1
};
}

// Given an offset in a concatenated buffer, determine the offset it
// corresponds to in the original buffer.
function splitOffset(buffers: ArrayBuffer[], chunk: Chunk, lastByteRead: number): number {
for (var i = 0; i < buffers.length - 1; i++) {
lastByteRead -= buffers[i].byteLength;
}
if (lastByteRead < 0) {
throw 'Last byte read was not in last chunk';
}

if (buffers.length == 1) {
lastByteRead += chunk.chunk_beg.uoffset;
}

return lastByteRead;
}

// Fetch alignments from the remote source at the locations specified by Chunks.
// This can potentially result in many network requests.
// The returned promise is fulfilled once it can be proved that no more
// alignments need to be fetched.
function fetchAlignments(remoteFile: RemoteFile,
idxRange: ContigInterval<number>,
contained: boolean,
chunks: Chunk[],
alignments: Object[]): Q.Promise<Object[]> {
if (chunks.length === 0) {
return Q.when(alignments);
}

// Never fetch more than 128k at a time -- this reduces contention on the
// main thread and can avoid sending unnecessary bytes over the network.
var chunk = chunks[0],
chunk_beg = chunk.chunk_beg.coffset,
chunk_end = chunk.chunk_end.coffset;
var bytesToFetch = Math.min(kMaxFetch, (chunk_end + 65535) - chunk_beg);
return remoteFile.getBytes(chunk_beg, bytesToFetch).then(buffer => {
var blocks = utils.inflateConcatenatedGzip(buffer, chunk_end - chunk_beg);

// If the chunk hasn't been exhausted, resume it at an appropriate place.
// The last block needs to be re-read, since it may not have been exhausted.
var lastBlock = blocks[blocks.length - 1],
lastByte = chunk_beg + lastBlock.offset - 1,
newChunk = null;
if (lastByte < chunk_end) {
newChunk = {
chunk_beg: new VirtualOffset(lastByte + 1, 0),
chunk_end: chunk.chunk_end
};
}

var buffers = blocks.map(x => x.buffer);
buffers[0] = buffers[0].slice(chunk.chunk_beg.uoffset);
var decomp = utils.concatArrayBuffers(buffers);
var {alignments: newAlignments, lastByteRead} = readAlignmentsToEnd(decomp);
if (newChunk) {
var lastUOffset = splitOffset(buffers, chunk, lastByteRead);
newChunk.chunk_beg.uoffset = lastUOffset + 1;
}
alignments = alignments.concat(
filterAlignments(newAlignments, idxRange, contained));

// Optimization: if the last alignment started after the requested range,
// then no other chunks can possibly contain matching alignments.
var lastAlignment = newAlignments[newAlignments.length - 1],
lastStart = lastAlignment.pos,
lastRange = new ContigInterval(lastAlignment.refID, lastStart, lastStart + 1);
// TODO: use contigInterval.isAfterInterval when that's possible.
if (lastRange.contig > idxRange.contig ||
(lastRange.contig == idxRange.contig && lastRange.start() > idxRange.stop())) {
return Q.when(alignments);
} else {
return fetchAlignments(remoteFile,
idxRange,
contained,
(newChunk ? [newChunk] : []).concat(_.rest(chunks)),
alignments);
}
});
}


class Bam {
index: ?BaiFile;
header: Q.Promise<Object>;

constructor(remoteFile: RemoteFile, remoteIndexFile?: RemoteFile) {
this.remoteFile = remoteFile;
// TODO: compute 65535 from index chunks
this.header = this.remoteFile.getBytes(0, 65535).then(buf => {
var decomp = utils.inflateGzip(buf);
var jb = new jBinary(decomp, bamTypes.TYPE_SET);
return jb.read('BamHeader');
this.index = remoteIndexFile ? new BaiFile(remoteIndexFile) : null;

var sizePromise = this.index ? this.index.getHeaderSize() : Q.when(2 * 65535);
this.header = sizePromise.then(size => {
return this.remoteFile.getBytes(0, size).then(buf => {
var decomp = utils.inflateGzip(buf);
var jb = new jBinary(decomp, bamTypes.TYPE_SET);
return jb.read('BamHeader');
});
});
this.header.done();

this.index = remoteIndexFile ? new BaiFile(remoteIndexFile) : null;
}

/**
Expand All @@ -65,7 +186,7 @@ class Bam {
*/
readAll(thinReads?: boolean): Q.Promise<Object> {
return this.remoteFile.getAll().then(buf => {
var decomp = utils.concatArrayBuffers(utils.inflateConcatenatedGzip(buf));
var decomp = utils.inflateGzip(buf);
var jb = new jBinary(decomp, bamTypes.TYPE_SET);
var o = jb.read(thinReads ? 'ThinBamFile' : 'BamFile');
// Do some mild re-shaping.
Expand All @@ -74,44 +195,6 @@ class Bam {
});
}

/**
* Read alignments for a chunk of the BAM file.
* If stop is omitted, reads alignments to the end of the compression block.
*/
readChunk(start: VirtualOffset, stop?: VirtualOffset): Q.Promise<Object[]> {
var lastCOffset = (stop ? stop.coffset : start.coffset);
// Blocks are no larger than 64k when compressed
return this.remoteFile.getBytes(start.coffset,
lastCOffset + 65535).then(buf => {
var blocks = utils.inflateConcatenatedGzip(buf, lastCOffset - start.coffset);
if (stop) {
var lastBlock = blocks[blocks.length - 1];
blocks[blocks.length - 1] = lastBlock.slice(0, stop.uoffset);
}
blocks[0] = blocks[0].slice(start.uoffset);
var decomp = utils.concatArrayBuffers(blocks);

var jb = new jBinary(decomp, bamTypes.TYPE_SET);
var alignments = [];
try {
while (jb.tell() < decomp.byteLength) {
var alignment = jb.read('ThinBamAlignment');
if (!alignment) break;
alignments.push(alignment.contents);
}
// Code gets here if the compression block ended exactly at the end of
// an Alignment.
} catch (e) {
// If stop was specified, it must be precise.
// Otherwise, allow partial records.
if (stop || !(e instanceof RangeError)) {
throw e;
}
}
return alignments;
});
}

/**
* Map a contig name to a contig index.
*/
Expand Down Expand Up @@ -142,13 +225,7 @@ class Bam {
return this.getContigIndex(range.contig).then(contigIdx => {
var idxRange = new ContigInterval(contigIdx, range.start(), range.stop());
return index.getChunksForInterval(idxRange).then(chunks => {
if (chunks.length > 1) {
throw 'Multi-chunk queries are not implemented';
}
var c = chunks[0];
return this.readChunk(c.chunk_beg, c.chunk_end).then(alignments => {
return filterAlignments(alignments, idxRange, contained);
});
return fetchAlignments(this.remoteFile, idxRange, contained, chunks, []);
});
});
}
Expand Down
2 changes: 1 addition & 1 deletion src/formats/bamTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var TYPE_SET: any = {
var u64 = this.baseRead();
return new VirtualOffset(
// offset of beginning of gzip block in the compressed file.
u64.hi * 65536 + (u64.lo >> 16),
u64.hi * 65536 + (u64.lo >>> 16),
// offset of data within the decompressed block
u64.lo & 0xffff
);
Expand Down
Loading

0 comments on commit 3ac2b80

Please sign in to comment.