Skip to content

Commit

Permalink
Better handling of pipes. Closes #16
Browse files Browse the repository at this point in the history
  • Loading branch information
hueniverse committed Oct 12, 2019
1 parent f4b7511 commit cf8b049
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 5 deletions.
27 changes: 22 additions & 5 deletions lib/index.js
Expand Up @@ -36,8 +36,7 @@ exports.header = function (header, length) {

// Handle headers with multiple ranges

for (let i = 0; i < ranges.length; ++i) {
let range = ranges[i];
for (let range of ranges) {
if (range.length === 1) { // '-'
return null;
}
Expand Down Expand Up @@ -125,6 +124,10 @@ exports.Stream = internals.Stream = class extends Stream.Transform {

this._range = range;
this._next = 0;

this._pipes = new Set();
this.on('pipe', (pipe) => this._pipes.add(pipe));
this.on('unpipe', (pipe) => this._pipes.delete(pipe));
}

processChunk(chunk) {
Expand All @@ -134,13 +137,21 @@ exports.Stream = internals.Stream = class extends Stream.Transform {
const pos = this._next;
this._next = this._next + chunk.length;

if (this._next <= this._range.from || // Before range
pos > this._range.to) { // After range
if (this._next <= this._range.from) { // Before range
return;
}

if (pos > this._range.to) { // After range
for (const pipe of this._pipes) {
pipe.unpipe(this);
}

this._pipes.clear();
this.end();
return;
}

// Calc bounds of chunk to read
// Calculate bounds of chunk to read

const from = Math.max(0, this._range.from - pos);
const to = Math.min(chunk.length, this._range.to - pos + 1);
Expand All @@ -159,4 +170,10 @@ exports.Stream = internals.Stream = class extends Stream.Transform {

return done();
}

_flush(done) {

this._pipes.clear();
done();
}
};
41 changes: 41 additions & 0 deletions test/index.js
Expand Up @@ -152,4 +152,45 @@ describe('Stream', () => {

await expect(Wreck.read(source.pipe(stream))).to.reject(Error);
});

it('ends stream when the requested bytes are delivered regardless of upstream status', async () => {

const TestStream = class extends Stream.Readable {

constructor() {

super();
this._count = -1;
}

_read() {

this._count++;

if (this._count < 10) {
this.push(this._count.toString());
return;
}

this.emit('error', new Error('Failed'));
this.push(null);
}
};

const range = Ammo.header('bytes=2-9', 10);
const stream = new Ammo.Stream(range[0]);

const source = new TestStream();

let errored = false;
source.on('error', () => {

errored = true;
});

const buffer = await Wreck.read(source.pipe(stream));
expect(buffer.toString()).to.equal('23456789');
expect(source._count).to.equal(10);
expect(errored).to.be.true();
});
});

0 comments on commit cf8b049

Please sign in to comment.