Skip to content
This repository has been archived by the owner on Mar 22, 2024. It is now read-only.

Commit

Permalink
Prevent unhandled error events from the first stream of utils.safePipe
Browse files Browse the repository at this point in the history
We have a utility function called `utils.safePipe()` that accepts an
array of streams and pipes them all together attaching error handlers at
every step.

We didn't recognise a subtle flaw that would prevent the error handler
(and any event listener, really) to be attached to the first stream on
the pipe chain: we check if `current` exists, and if not, set the
current stream as `current` however we `return` right away, which means
the error handler and event handlers defined just below never get called
on the first stream (which is the only case where `current` is not
defined).

As a bonus, this commit adds some unit tests for `utils.safePipe()`.

See: balena-io/etcher#981
Signed-off-by: Juan Cruz Viotti <jviotti@openmailbox.org>
  • Loading branch information
Juan Cruz Viotti committed Jan 5, 2017
1 parent d21a5e3 commit f70f4fd
Show file tree
Hide file tree
Showing 3 changed files with 218 additions and 6 deletions.
19 changes: 13 additions & 6 deletions lib/utils.js
Expand Up @@ -51,19 +51,26 @@ const _ = require('lodash');
exports.safePipe = (streams, callback) => {
let current = null;

// Make sure all event handlers are
// attached before piping the streams.
_.each(streams, (stream) => {
if (!current) {
current = stream.stream;
return;
}

stream.stream.on('error', callback);

_.each(stream.events || {}, (handler, event) => {
stream.stream.on(event, handler);
});
});

current = current.pipe(stream.stream);
_.last(streams).stream
.on('done', callback)
.on('finish', callback);

_.each(streams, (stream) => {
if (current) {
current = current.pipe(stream.stream);
} else {
current = stream.stream;
}
});

_.last(streams).stream
Expand Down
82 changes: 82 additions & 0 deletions tests/mock-stream.js
@@ -0,0 +1,82 @@
/*
* Copyright 2016 Resin.io
*
* Licensed under the Apache License, Version 2.0 (the 'License');
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an 'AS IS' BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

'use strict';

const stream = require('stream');
const _ = require('lodash');

/**
* @summary Create a mock read stream
* @class
* @public
*/
exports.Readable = class MockReadStream extends stream.Readable {

/**
* @summary Create a mock read stream
* @public
*
* @param {Array} data - data
*
* @example
* const stream = new mockStream.Readable([ 1, 2, 3 ]);
*/
constructor(data) {
super({
objectMode: true,
read: () => {
_.each(data, (item) => {
this.push(item);
});

this.push(null);
}
});
}

};

/**
* @summary Create a collector mock writable stream
* @class
* @public
*/
exports.Writable = class CollectorWriteStream extends stream.Writable {

/**
* @summary Create a collector mock writable stream
* @public
*
* @example
* const stream = new mockStream.Writable();
*
* stream.on('finish', () => {
* console.log(stream.data);
* });
*/
constructor() {
super({
objectMode: true,
write: (chunk, encoding, callback) => {
this.data = this.data || [];
this.data.push(chunk);
return callback();
}
});
}

};
123 changes: 123 additions & 0 deletions tests/utils.spec.js
Expand Up @@ -17,6 +17,9 @@
'use strict';

const m = require('mochainon');
const through2 = require('through2');
const stream = require('stream');
const mockStream = require('./mock-stream');
const utils = require('../lib/utils');

describe('Utils', function() {
Expand Down Expand Up @@ -46,4 +49,124 @@ describe('Utils', function() {

});

describe('.safePipe()', function() {

it('should create a stream chain', function(done) {
const collector = new mockStream.Writable();

utils.safePipe([
{
stream: new mockStream.Readable([ 'foo', 'bar', 'baz' ])
},
{
stream: through2.obj((chunk, encoding, callback) => {
return callback(null, chunk.toUpperCase());
})
},
{
stream: collector
}
], (error) => {
if (error) {
return done(error);
}

m.chai.expect(collector.data).to.deep.equal([
'FOO',
'BAR',
'BAZ'
]);
done();
});
});

it('should catch an error from the first stream', function(done) {
const input = new stream.Readable({
read() {
return;
}
});

setTimeout(() => {
input.emit('error', new Error('Input error'));
}, 1);

utils.safePipe([
{
stream: input
},
{
stream: through2.obj((chunk, encoding, callback) => {
return callback(null, chunk.toUpperCase());
})
},
{
stream: new mockStream.Writable()
}
], (error) => {
m.chai.expect(error).to.be.an.instanceof(Error);
m.chai.expect(error.message).to.equal('Input error');
done();
});
});

it('should be able to handle custom event handlers', function(done) {
const input = new stream.Readable({
objectMode: true,
read() {
input.emit('foo', 'FOO');
input.emit('bar', 'BAR');
this.push('foo');
this.push(null);
return;
}
});

const transform = through2.obj((chunk, encoding, callback) => {
transform.emit('baz', 'BAZ');
return callback(null, chunk.toUpperCase());
});

const fooInputSpy = m.sinon.spy();
const barInputSpy = m.sinon.spy();
const bazTransformSpy = m.sinon.spy();
const quxTransformSpy = m.sinon.spy();

utils.safePipe([
{
stream: input,
events: {
foo: fooInputSpy,
bar: barInputSpy
}
},
{
stream: transform,
events: {
baz: bazTransformSpy
}
},
{
stream: new mockStream.Writable()
}
], (error) => {
if (error) {
return done(error);
}

m.chai.expect(fooInputSpy).to.have.been.calledOnce;
m.chai.expect(barInputSpy).to.have.been.calledOnce;
m.chai.expect(bazTransformSpy).to.have.been.calledOnce;
m.chai.expect(quxTransformSpy).to.not.have.been.calledOnce;

m.chai.expect(fooInputSpy).to.have.been.calledWith('FOO');
m.chai.expect(barInputSpy).to.have.been.calledWith('BAR');
m.chai.expect(bazTransformSpy).to.have.been.calledWith('BAZ');

done();
});
});

});

});

0 comments on commit f70f4fd

Please sign in to comment.