Skip to content

Commit

Permalink
Reversing the default behavior of queue-flow queues. They drain *by d…
Browse files Browse the repository at this point in the history
…efault* unless otherwise 'plug'ged up. This will solve accidental memory 'leak' issues.
  • Loading branch information
David Ellis committed Oct 8, 2013
1 parent f726dd0 commit e22257b
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
29 changes: 13 additions & 16 deletions lib/queue-flow.js
Expand Up @@ -69,23 +69,21 @@ function Q(nameOrArray, options, namespace) {
}
}
this.openQueue = new Infiniqueue();
this.handler = function() {};
this.handler = function(self, value, struct) {
self.handlerCallback(struct, function() {});
};
this.ender = function() {};
this.handlerSet = false;
this.handlerSet = true;
this.handlerRuns = 0;

// Privileged methods

// For node 0.6 and 0.8 compatibility, listen for the `pipe` event and register
// an event handler to call `push`
this.on('pipe', function(piper) {
piper.on('data', this.push.bind(this));
}.bind(this));

// Start processing the queue after the next JS event loop cycle and return the queue
// object to the remaining code.
this.enqueueParallelism();
if(this.queue.length > 0) this.on('empty', this.close.bind(this));
nextTick(this.enqueueParallelism.bind(this));
return this;
}

Expand Down Expand Up @@ -728,15 +726,14 @@ Q.prototype.pipe = function pipe(writable) {
return writable;
};

// ``drain`` is a simple callback that empties the attached queue and throws away the
// results. This is useful for long-running queues to eliminate references to effectively
// "dead" data without using the ``reduce`` hack to do so. No chaining is possible
// after this call (for obvious reasons).
Q.prototype.drain = function drain() {
this.setHandlers(function(self, value, struct) {
self.handlerCallback(struct, function() {});
});
return undefined;
// ``plug`` is a simple callback that "plugs up" the queue if nothing has been defined to
// process the incoming data. Useful if you don't (yet) know what method you'll run on it,
// though this should be a very rare occasion.
Q.prototype.plug = function plug() {
this.handlerSet = false;
this.handler = function() {};
this.ender = function() {};
return this;
};

function ns() {
Expand Down
27 changes: 17 additions & 10 deletions test/test.js
Expand Up @@ -866,17 +866,24 @@ exports.promise = function(test) {
});
};

exports.drain = function(test) {
exports.autoDrain = function(test) {
bootstrap(test);
test.expect(2);
var drainOutput = q([1])
.each(function(value) {
test.expect(1, value, 'the values are being passed on to the drain method');
})
.on('close', function() {
test.done();
}).drain();
test.equal(undefined, drainOutput, 'drain returns nothing');
test.expect(1);
var drainOutput = q([1, 2, 3]);
setTimeout(function() {
test.equal(undefined, drainOutput.queue, 'all of the queued elements automatically drained');
test.done();
}, 100);
};

exports.plug = function(test) {
bootstrap(test);
test.expect(1);
var pluggedOutput = q([1, 2, 3]).plug();
setTimeout(function() {
test.equal(3, pluggedOutput.queue.length, 'all of the queued elements have remained');
test.done();
}, 100);
};

exports.nodeShortCircuit = function(test) {
Expand Down

0 comments on commit e22257b

Please sign in to comment.