Skip to content

Commit

Permalink
refactored queue and cargo to use the same logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Alexander Early committed May 31, 2015
1 parent f601150 commit d6e59ab
Showing 1 changed file with 27 additions and 72 deletions.
99 changes: 27 additions & 72 deletions lib/async.js
Expand Up @@ -809,8 +809,8 @@
});
};

async.queue = function (worker, concurrency) {
if (concurrency === undefined) {
function _queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
}
else if(concurrency === 0) {
Expand Down Expand Up @@ -871,20 +871,31 @@
},
process: function () {
if (!q.paused && workers < q.concurrency && q.tasks.length) {
var task = q.tasks.shift();
// var task = q.tasks.shift();
var tasks = payload ?
q.tasks.splice(0, payload) :
q.tasks.splice(0, q.tasks.length);

var data = _map(tasks, function (task) {
return task.data;
});

if (q.empty && q.tasks.length === 0) {
q.empty();
}
workers += 1;
var cb = only_once(next);
worker(task.data, cb);
worker(data, cb);
}

function next() {
workers -= 1;
if (task.callback) {
task.callback.apply(task, arguments);
}
var args = arguments;
_arrayEach(tasks, function (task) {
if (task.callback) {
task.callback.apply(task, args);
}
});
if (q.drain && q.tasks.length + workers === 0) {
q.drain();
}
Expand Down Expand Up @@ -915,6 +926,14 @@
}
}
};
return q;
}

async.queue = function (worker, concurrency) {
var q = _queue(function (items, cb) {
worker(items[0], cb);
}, concurrency, 1);

return q;
};

Expand Down Expand Up @@ -984,71 +1003,7 @@
};

async.cargo = function (worker, payload) {
var working = false,
tasks = [];

var cargo = {
tasks: tasks,
payload: payload,
saturated: null,
empty: null,
drain: null,
drained: true,
push: function (data, callback) {
if (!_isArray(data)) {
data = [data];
}
_arrayEach(data, function(task) {
tasks.push({
data: task,
callback: typeof callback === 'function' ? callback : null
});
cargo.drained = false;
if (cargo.saturated && tasks.length === payload) {
cargo.saturated();
}
});
async.setImmediate(cargo.process);
},
process: function process() {
if (working) return;
if (tasks.length === 0) {
if(cargo.drain && !cargo.drained) cargo.drain();
cargo.drained = true;
return;
}

var ts = typeof payload === 'number' ?
tasks.splice(0, payload) :
tasks.splice(0, tasks.length);

var ds = _map(ts, function (task) {
return task.data;
});

if(cargo.empty) cargo.empty();
working = true;
worker(ds, function () {
working = false;

var args = arguments;
_arrayEach(ts, function (data) {
if (data.callback) {
data.callback.apply(null, args);
}
});

process();
});
},
length: function () {
return tasks.length;
},
running: function () {
return working;
}
};
return cargo;
return _queue(worker, 1, payload);
};

function _console_fn(name) {
Expand Down

0 comments on commit d6e59ab

Please sign in to comment.