Skip to content

Commit

Permalink
added possibility to push taskbulks, added test
Browse files Browse the repository at this point in the history
Conflicts:

	lib/async.js
  • Loading branch information
seriousManual authored and Caolan McMahon committed Feb 13, 2012
1 parent c89db1f commit 0225f70
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ methods:
alter the concurrency on-the-fly.
* push(task, [callback]) - add a new task to the queue, the callback is called
once the worker has finished processing the task.
instead of a single task, an array of tasks can be submitted. the respective callback is used for every task in the list.
* saturated - a callback that is called when the queue length hits the concurrency and further tasks will be queued
* empty - a callback that is called when the last item from the queue is given to a worker
* drain - a callback that is called when the last item from the queue has returned from the worker
Expand Down Expand Up @@ -729,6 +730,12 @@ __Example__
console.log('finished processing bar');
});

// add some items to the queue (batch-wise)

q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function (err) {
console.log('finished processing bar');
});


---------------------------------------

Expand Down
16 changes: 13 additions & 3 deletions lib/async.js
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,19 @@
empty: null,
drain: null,
push: function (data, callback) {
q.tasks.push({data: data, callback: typeof callback === 'function' ? callback : null});
if(q.saturated && q.tasks.length == concurrency) q.saturated();
async.nextTick(q.process);
if(data.constructor !== Array) {
data = [data];
}
_forEach(data, function(task) {
q.tasks.push({
data: task,
callback: typeof callback === 'function' ? callback : null
});
if (q.saturated && q.tasks.length == concurrency) {
q.saturated();
}
async.nextTick(q.process);
});
},
process: function () {
if (workers < q.concurrency && q.tasks.length) {
Expand Down
36 changes: 36 additions & 0 deletions test/test-async.js
Original file line number Diff line number Diff line change
Expand Up @@ -1358,6 +1358,42 @@ exports['queue push without callback'] = function (test) {
}, 200);
};

exports['queue bulk task'] = function (test) {
var call_order = [],
delays = [40,20,60,20];

// worker1: --1-4
// worker2: -2---3
// order of completion: 2,1,4,3

var q = async.queue(function (task, callback) {
setTimeout(function () {
call_order.push('process ' + task);
callback('error', task);
}, delays.splice(0,1)[0]);
}, 2);

q.push( [1,2,3,4], function (err, arg) {
test.equal(err, 'error');
call_order.push('callback ' + arg);
});

test.equal(q.length(), 4);
test.equal(q.concurrency, 2);

setTimeout(function () {
test.same(call_order, [
'process 2', 'callback 2',
'process 1', 'callback 1',
'process 4', 'callback 4',
'process 3', 'callback 3'
]);
test.equal(q.concurrency, 2);
test.equal(q.length(), 0);
test.done();
}, 200);
};

exports['memoize'] = function (test) {
test.expect(4);
var call_order = [];
Expand Down

0 comments on commit 0225f70

Please sign in to comment.