Skip to content

Commit

Permalink
Adding in support for Node streams to queue-flow.
Browse files Browse the repository at this point in the history
You can simple wrap a readable stream with `q(readable)` and you can push results into a writable stream with `.pipe(writable)`
  • Loading branch information
David Ellis committed Sep 17, 2013
1 parent 1f6f799 commit e7ab37d
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 1 deletion.
23 changes: 22 additions & 1 deletion lib/queue-flow.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ function qFunc(nameOrArray, QType) {
return this.namedQueues[nameOrArray];
} else if(nameOrArray instanceof Array) {
return new QType(nameOrArray, q);
} else if(nameOrArray instanceof Object &&
typeof(nameOrArray.pipe) === 'function' &&
typeof(nameOrArray.on) === 'function') {
var newQ = new QType(undefined, q);
nameOrArray.on('data', newQ.push.bind(newQ));
nameOrArray.on('end', newQ.close.bind(newQ));
return newQ;
} else {
return new QType(undefined, q);
}
Expand Down Expand Up @@ -709,7 +716,21 @@ Q.prototype.promise = function promise(callback, error) {
Q.prototype.promiseAsync = Q.prototype.promise;
Q.prototype.promiseSync = function() { throw "Synchronous Promises are Nonsensical!"; };

// ``drain`` is a simple callback that empties the attached queue and throws away the
// ``pipe`` pushes the queue results into the provided writeable object (and returns it so
// if it is also readable, you can continue to ``pipe`` it just as you'd expect). Throws
// an error if it can't find the ``write`` and ``end`` methods.
Q.prototype.pipe = function pipe(writable) {
if(typeof(writable.write) !== 'function' || typeof(writable.end) !== 'function') throw new Error('Not a valid writeable object!');
this.setHandler(function(value, next) {
if(!next) return writable.end();
next(function() {
writable.write(value);
});
});
return writable;
};

// ``drain`` is a simple callback that empties the attached queue and throws away the
// results. This is useful for long-running queues to eliminate references to effectively
// "dead" data without using the ``reduce`` hack to do so. No chaining is possible
// after this call (for obvious reasons).
Expand Down
25 changes: 25 additions & 0 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,31 @@ exports.pushToClosedQueue = function(test) {
test.done();
};

exports.readStreams = function(test) {
bootstrap(test);
test.expect(1);
q(fs.createReadStream('./test/test.js', { encoding: 'utf8' }))
.toArray(function(arr) {
test.ok(arr.length > 0, 'got the array');
test.done();
});
};

exports.pipe = function(test) {
bootstrap(test);
test.expect(2);
var fakeWritable = {
write: function write(data) {
test.ok(!!data, 'received data');
},
end: function end() {
test.ok(true, 'stream was closed');
test.done();
}
};
q(['foo']).pipe(fakeWritable);
};

exports.complexity = function(test) {
bootstrap(test);
test.expect(1);
Expand Down

0 comments on commit e7ab37d

Please sign in to comment.