Skip to content
This repository has been archived by the owner on Jan 31, 2021. It is now read-only.

Commit

Permalink
flatMap() for multiple arguments
Browse files Browse the repository at this point in the history
  • Loading branch information
ash2k committed Apr 4, 2015
1 parent 63df487 commit 44712ad
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 4 deletions.
46 changes: 42 additions & 4 deletions EventEmitterEx.js
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,52 @@
return eex;
};

EventEmitterEx.prototype.flatMap = function flatMap (f) {
assertIsFunction(f);
EventEmitterEx.prototype.flatMap = function flatMap (/* arguments */) {
var eex = new EventEmitterEx(),
funcs = slice(arguments);

var eex = new EventEmitterEx();
funcs.forEach(assertIsFunction);

eex.pipeExcept(this, 'end');
this.on('end', function (/* arguments */) {
eex.pipeExcept(f.apply(eex, arguments));
var result = [], firstError, len = funcs.length, lenLoop = len;

for (var i = 0; i < lenLoop; i++) {
var e = funcs[i].apply(eex, arguments);
eex.pipeExcept(e, 'end', 'error');
e.on('end', endListener.bind(null, i));
e.on('error', errorListener.bind(null, i));
}

function checkUsage (position) {
assert(! Array.isArray(result[position]),
'end/error (or both) event emitted more than once by emitter at position ' + position + ' (0-based)');
}

function endListener (position/* arguments */) {
checkUsage(position);
result[position] = slice(arguments, 1);
maybeNext();
}

function errorListener (position, err) {
checkUsage(position);
firstError = firstError || err;
result[position] = [];
maybeNext();
}

function maybeNext () {
len--;
if (! len) {
if (firstError) {
eex.emit('error', firstError);
} else {
// flatten the array
eex.emit.apply(eex, [].concat.apply(['end'], result));
}
}
}
});

return eex;
Expand Down
87 changes: 87 additions & 0 deletions test/EventEmitterEx.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,93 @@
emitter.emit('end');
});

it('should throw if emitter emit end more than once', function () {
var e = new EEX();
emitter
.flatMap(function () {
return e;
});
emitter.emit('end');
e.emit('end');
expect(function () {
e.emit('end');
}).to.throw(Error, 'end/error (or both) event emitted more than once by emitter at position 0 (0-based)');
});

it('should throw if emitter emit error more than once', function () {
var e = new EEX();
emitter
.flatMap(function () {
return e;
})
.on('error', function () {
// ignore
});
emitter.emit('end');
e.emit('error', new Error('123'));
expect(function () {
e.emit('error', new Error('234'));
}).to.throw(Error, 'end/error (or both) event emitted more than once by emitter at position 0 (0-based)');
});

it('should throw if emitter emit end and error', function () {
var e = new EEX();
emitter
.flatMap(function () {
return e;
});
emitter.emit('end');
e.emit('end');
expect(function () {
e.emit('error', new Error('234'));
}).to.throw(Error, 'end/error (or both) event emitted more than once by emitter at position 0 (0-based)');
});

it('should collect results and emit all together in order', function (done) {
var A = 1, B = 40, C = 2, e = new EEX();
emitter
.flatMap(
function () {
return e;
},
function () {
return new EEX()
.startPipeline(C)
.on('end', function () { e.startPipeline(A, B); } );
})
.on('end', function (r1, r2, r3, r4) {
r1.should.be.equal(A);
r2.should.be.equal(B);
r3.should.be.equal(C);
expect(r4).to.be.undefined;
done();
})
.on('error', done);
emitter.emit('end');
});

it('should emit error after all emitters finished', function (done) {
var e = new EEX(), error = new Error('error!');
emitter
.flatMap(
function () {
return e;
},
function () {
return new EEX()
.startPipeline()
.on('end', e.emit.bind(e, 'error', error));
})
.on('end', function () {
fail('end emitted', 'end emitted');
})
.on('error', function (err) {
err.should.be.equal(error);
done();
});
emitter.emit('end');
});

it('should throw exception on non-function arguments', function () {
expect(function () {
emitter.flatMap(42);
Expand Down

0 comments on commit 44712ad

Please sign in to comment.