Skip to content

Commit

Permalink
refactor auto to not need a deferral
Browse files Browse the repository at this point in the history
  • Loading branch information
aearly committed Mar 7, 2016
1 parent 4d82563 commit 7688e98
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 91 deletions.
163 changes: 88 additions & 75 deletions lib/auto.js
@@ -1,66 +1,120 @@
'use strict';

import arrayEach from 'lodash/_arrayEach';
import arrayEvery from 'lodash/_arrayEvery';
import baseHas from 'lodash/_baseHas';
import forOwn from 'lodash/forOwn';
import indexOf from 'lodash/indexOf';
import isArray from 'lodash/isArray';
import okeys from 'lodash/keys';
import noop from 'lodash/noop';
import once from 'lodash/once';
import rest from 'lodash/rest';
import onlyOnce from './internal/onlyOnce';

import setImmediate from './internal/setImmediate';
import onlyOnce from './internal/onlyOnce';

export default function (tasks, concurrency, callback) {
if (typeof arguments[1] === 'function') {
if (typeof concurrency === 'function') {
// concurrency is optional, shift the args.
callback = concurrency;
concurrency = null;
}
callback = once(callback || noop);
var keys = okeys(tasks);
var remainingTasks = keys.length;
if (!remainingTasks) {
var numTasks = keys.length;
if (!numTasks) {
return callback(null);
}
if (!concurrency) {
concurrency = remainingTasks;
concurrency = numTasks;
}

var results = {};
var runningTasks = 0;
var hasError = false;

var listeners = [];
var listeners = {};

var readyTasks = [];


function addListener(fn) {
listeners.unshift(fn);
forOwn(tasks, function (task, key) {
if (!isArray(task)) {
// no dependencies
enqueueTask(key, [task]);
return;
}

var dependencies = task.slice(0, task.length - 1);
var remainingDependencies = dependencies.length;

checkForDeadlocks();

function checkForDeadlocks() {
var len = dependencies.length;
var dep;
while (len--) {
if (!(dep = tasks[dependencies[len]])) {
throw new Error('async.auto task `' + key +
'` has non-existent dependency in ' +
dependencies.join(', '));
}
if (isArray(dep) && indexOf(dep, key) >= 0) {
throw new Error('async.auto task `' + key +
'`Has cyclic dependencies');
}
}
}

arrayEach(dependencies, function (dependencyName) {
addListener(dependencyName, function () {
remainingDependencies--;
if (remainingDependencies === 0) {
enqueueTask(key, task);
}
});
});
});

processQueue();


function enqueueTask(key, task) {
readyTasks.push(function () {
runTask(key, task);
});
}

function removeListener(fn) {
var idx = indexOf(listeners, fn);
if (idx >= 0) listeners.splice(idx, 1);
function processQueue() {
if (readyTasks.length === 0 && runningTasks === 0) {
return callback(null, results);
}
while(readyTasks.length && runningTasks < concurrency) {
var run = readyTasks.shift();
run();
}

}

function taskComplete() {
remainingTasks--;
arrayEach(listeners.slice(), function (fn) {
function addListener(taskName, fn) {
var taskListeners = listeners[taskName];
if (!taskListeners) {
taskListeners = listeners[taskName] = [];
}

taskListeners.push(fn);
}

function taskComplete(taskName) {
var taskListeners = listeners[taskName] || [];
arrayEach(taskListeners, function (fn) {
fn();
});
processQueue();
}

addListener(function () {
if (!remainingTasks) {
callback(null, results);
}
});

arrayEach(keys, function (k) {
function runTask(key, task) {
if (hasError) return;
var task = isArray(tasks[k]) ? tasks[k]: [tasks[k]];

var taskCallback = onlyOnce(rest(function(err, args) {
runningTasks--;
if (args.length <= 1) {
Expand All @@ -71,66 +125,25 @@ export default function (tasks, concurrency, callback) {
forOwn(results, function(val, rkey) {
safeResults[rkey] = val;
});
safeResults[k] = args;
safeResults[key] = args;
hasError = true;
listeners = [];

callback(err, safeResults);
}
else {
results[k] = args;
setImmediate(taskComplete);
} else {
results[key] = args;
taskComplete(key);
}
}));

var requires = task.slice(0, task.length - 1);

checkForDeadlocks();

if (ready()) {
startNext();
runningTasks++;
var taskFn = task[task.length - 1];
if (task.length > 1) {
taskFn(results, taskCallback);
} else {
addListener(listener);
}

function checkForDeadlocks() {
var len = requires.length;
var dep;
while (len--) {
if (!(dep = tasks[requires[len]])) {
throw new Error('Has non-existent dependency in ' +
requires.join(', '));
}
if (isArray(dep) && indexOf(dep, k) >= 0) {
throw new Error('Has cyclic dependencies');
}
}
}

function ready() {
return runningTasks < concurrency &&
!baseHas(results, k) &&
!hasError &&
arrayEvery(requires, function (x) {
return baseHas(results, x);
});
taskFn(taskCallback);
}
}

function startNext() {
runningTasks++;
var taskFn = task[task.length - 1];
if (requires.length > 0) {
taskFn(results, taskCallback);
} else {
taskFn(taskCallback);
}
}

function listener() {
if (ready()) {
removeListener(listener);
startNext();
}
}
});
}
41 changes: 25 additions & 16 deletions mocha_test/auto.js
Expand Up @@ -40,7 +40,7 @@ describe('auto', function () {
},
function(err){
expect(err).to.equal(null);
expect(callOrder).to.eql(['task2','task6','task3','task5','task1','task4']);
expect(callOrder).to.eql(['task2','task3','task6','task5','task1','task4']);
done();
});
});
Expand Down Expand Up @@ -214,19 +214,8 @@ describe('auto', function () {

// Issue 410 on github: https://github.com/caolan/async/issues/410
it('auto calls callback multiple times', function(done) {
if (process.browser) {
// node only test
return done();
}
var finalCallCount = 0;
var domain = require('domain').create();
domain.on('error', function (e) {
// ignore test error
if (!e._test_error) {
return done(e);
}
});
domain.run(function () {
try {
async.auto({
task1: function(callback) { callback(null); },
task2: ['task1', function(results, callback) { callback(null); }]
Expand All @@ -239,7 +228,11 @@ describe('auto', function () {
e._test_error = true;
throw e;
});
});
} catch (e) {
if (!e._test_error) {
throw e;
}
}
setTimeout(function () {
expect(finalCallCount).to.equal(1);
done();
Expand Down Expand Up @@ -293,7 +286,7 @@ describe('auto', function () {
callback(null, 'task1');
}]
});
}).to.throw;
}).to.throw();
done();
});

Expand All @@ -308,7 +301,7 @@ describe('auto', function () {
callback(null, 'task2');
}]
});
}).to.throw;
}).to.throw();
done();
});

Expand Down Expand Up @@ -356,4 +349,20 @@ describe('auto', function () {
}).to.throw();
});

it("should avoid unncecessary deferrals", function (done) {
var isSync = true;

async.auto({
step1: function (cb) { cb(null, 1); },
step2: ["step1", function (results, cb) {
cb();
}]
}, function () {
expect(isSync).to.equal(true);
done();
});

isSync = false;
});

});

0 comments on commit 7688e98

Please sign in to comment.