Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

increasing queue concurrency using queue.setConcurrency spins up new workers on nextTick #115

Closed
wants to merge 1 commit into from

2 participants

@emiloslavsky

without this fix, queue is still processed with the old number of workers.

@emiloslavsky

Any feedback on this little improvement?

@caolan caolan closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
Showing with 62 additions and 5 deletions.
  1. +2 −2 README.md
  2. +8 −1 lib/async.js
  3. +52 −2 test/test-async.js
View
4 README.md
@@ -700,8 +700,8 @@ methods:
* length() - a function returning the number of items waiting to be processed.
* concurrency - an integer for determining how many worker functions should be
- run in parallel. This property can be changed after a queue is created to
- alter the concurrency on-the-fly.
+ run in parallel. This property can be altered on-the-fly with setConcurrency()
+ after a queue is created.
* push(task, [callback]) - add a new task to the queue, the callback is called
once the worker has finished processing the task.
instead of a single task, an array of tasks can be submitted. the respective callback is used for every task in the list.
View
9 lib/async.js
@@ -620,7 +620,14 @@
},
running: function () {
return workers;
- }
+ },
+ setConcurrency: function (newConcurrency) {
+ var diff = newConcurrency - q.concurrency;
+ q.concurrency = newConcurrency;
+ for (var i = 0; i < diff; i += 1) {
+ async.nextTick(q.process);
+ }
+ },
};
return q;
};
View
54 test/test-async.js
@@ -1251,7 +1251,7 @@ exports['queue'] = function (test) {
}, 800);
};
-exports['queue changing concurrency'] = function (test) {
+exports['queue decreasing concurrency'] = function (test) {
var call_order = [],
delays = [40,20,60,20];
@@ -1291,7 +1291,7 @@ exports['queue changing concurrency'] = function (test) {
});
test.equal(q.length(), 4);
test.equal(q.concurrency, 2);
- q.concurrency = 1;
+ q.setConcurrency(1);
setTimeout(function () {
test.same(call_order, [
@@ -1306,6 +1306,56 @@ exports['queue changing concurrency'] = function (test) {
}, 250);
};
+exports['queue increasing concurrency'] = function (test) {
+ var call_order = [],
+ delays = [60,70,80];
+ // for the first 20, only one worker should be processing
+ // task1, after 20, 3 workers should process all 3 tasks
+ // and finish at 100.
+
+ var q = async.queue(function (task, callback) {
+ setTimeout(function () {
+ call_order.push('process ' + task);
+ callback('error', 'arg');
+ }, delays.splice(0,1)[0]);
+ }, 1);
+
+ q.push(1, function (err, arg) {
+ test.equal(err, 'error');
+ test.equal(arg, 'arg');
+ test.equal(q.length(), 0);
+ call_order.push('callback ' + 1);
+ });
+ q.push(2, function (err, arg) {
+ test.equal(err, 'error');
+ test.equal(arg, 'arg');
+ test.equal(q.length(), 0);
+ call_order.push('callback ' + 2);
+ });
+ q.push(3, function (err, arg) {
+ test.equal(err, 'error');
+ test.equal(arg, 'arg');
+ test.equal(q.length(), 0);
+ call_order.push('callback ' + 3);
+ });
+ test.equal(q.length(), 3);
+ test.equal(q.concurrency, 1);
+ setTimeout(function () {
+ q.setConcurrency(3);
+ }, 20);
+
+ setTimeout(function () {
+ test.same(call_order, [
+ 'process 1', 'callback 1',
+ 'process 2', 'callback 2',
+ 'process 3', 'callback 3',
+ ]);
+ test.equal(q.concurrency, 3);
+ test.equal(q.length(), 0);
+ test.done();
+ }, 120);
+};
+
exports['queue push without callback'] = function (test) {
var call_order = [],
delays = [160,80,240,80];
Something went wrong with that request. Please try again.