Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Canceling flows #1542

Merged
merged 14 commits into from
Jul 2, 2018
34 changes: 34 additions & 0 deletions intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,40 @@ async.map([1, 2, 3], AsyncSquaringLibrary.square.bind(AsyncSquaringLibrary), fun
});
```

### Subtle Memory Leaks

There are cases where you might want to exit early from async flow, when calling an Async method inside another async function:

```javascript
function myFunction (args, outerCallback) {
async.waterfall([
//...
function (arg, next) {
if (someImportantCondition()) {
return outerCallback(null)
}
},
function (arg, next) {/*...*/}
], function done (err) {
//...
})
}
```

Something happened in a waterfall where you want to skip the rest of the execution, so you call an outer callack. However, Async will still wait for that inner `next` callback to be called, leaving some closure scope allocated.

As of version 3.0, you can call any Async callback with `false` as the `error` argument, and the rest of the execution of the Async method will be stopped or ignored.

```javascript
function (arg, next) {
if (someImportantCondition()) {
outerCallback(null)
return next(false) // ← signal that you called an outer callback
}
},
```


## Download

The source is available for download from
Expand Down
8 changes: 7 additions & 1 deletion lib/auto.js
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ export default function (tasks, concurrency, callback) {

var results = {};
var runningTasks = 0;
var canceled = false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo should be cancelled

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either spelling is correct. I chose one 'l' for consistency.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

damn americans

var hasError = false;

var listeners = Object.create(null);
Expand Down Expand Up @@ -156,6 +157,7 @@ export default function (tasks, concurrency, callback) {
}

function processQueue() {
if (canceled) return
if (readyTasks.length === 0 && runningTasks === 0) {
return callback(null, results);
}
Expand Down Expand Up @@ -189,6 +191,10 @@ export default function (tasks, concurrency, callback) {

var taskCallback = onlyOnce(function(err, result) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Higher up in the runTask function, we should change if (hasError) return; to if (hasError || canceled) return; so other tasks, not dependent on this one, won't continue running.

runningTasks--;
if (err === false) {
canceled = true
return
}
if (arguments.length > 2) {
result = slice(arguments, 1);
}
Expand All @@ -200,7 +206,7 @@ export default function (tasks, concurrency, callback) {
safeResults[key] = result;
hasError = true;
listeners = Object.create(null);

if (canceled) return
callback(err, safeResults);
} else {
results[key] = result;
Expand Down
2 changes: 2 additions & 0 deletions lib/doDuring.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@ export default function doDuring(fn, test, callback) {

function next(err/*, ...args*/) {
if (err) return callback(err);
if (err === false) return;
var args = slice(arguments, 1);
args.push(check);
_test.apply(this, args);
};

function check(err, truth) {
if (err) return callback(err);
if (err === false) return;
if (!truth) return callback(null);
_fn(next);
}
Expand Down
1 change: 1 addition & 0 deletions lib/doWhilst.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export default function doWhilst(iteratee, test, callback) {
var _iteratee = wrapAsync(iteratee);
var next = function(err/*, ...args*/) {
if (err) return callback(err);
if (err === false) return;
var args = slice(arguments, 1);
if (test.apply(this, args)) return _iteratee(next);
callback.apply(null, [null].concat(args));
Expand Down
2 changes: 2 additions & 0 deletions lib/during.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,13 @@ export default function during(test, fn, callback) {

function next(err) {
if (err) return callback(err);
if (err === false) return;
_test(check);
}

function check(err, truth) {
if (err) return callback(err);
if (err === false) return;
if (!truth) return callback(null);
_fn(next);
}
Expand Down
7 changes: 6 additions & 1 deletion lib/eachOf.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,17 @@ function eachOfArrayLike(coll, iteratee, callback) {
callback = once(callback || noop);
var index = 0,
completed = 0,
length = coll.length;
length = coll.length,
canceled = false;
if (length === 0) {
callback(null);
}

function iteratorCallback(err, value) {
if (err === false) {
canceled = true
}
if (canceled === true) return
if (err) {
callback(err);
} else if ((++completed === length) || value === breakLoop) {
Expand Down
1 change: 1 addition & 0 deletions lib/forever.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ export default function forever(fn, errback) {

function next(err) {
if (err) return done(err);
if (err === false) return;
task(next);
}
next();
Expand Down
6 changes: 6 additions & 0 deletions lib/internal/eachOfLimit.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ export default function _eachOfLimit(limit) {
}
var nextElem = iterator(obj);
var done = false;
var canceled = false;
var running = 0;
var looping = false;

function iterateeCallback(err, value) {
if (canceled) return
running -= 1;
if (err) {
done = true;
callback(err);
}
else if (err === false) {
done = true;
canceled = true;
}
else if (value === breakLoop || (done && running <= 0)) {
done = true;
return callback(null);
Expand Down
1 change: 1 addition & 0 deletions lib/retry.js
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ export default function retry(opts, task, callback) {
var attempt = 1;
function retryAttempt() {
_task(function(err) {
if (err === false) return
if (err && attempt++ < options.times &&
(typeof options.errorFilter != 'function' ||
options.errorFilter(err))) {
Expand Down
2 changes: 1 addition & 1 deletion lib/tryEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export default function tryEach(tasks, callback) {
result = res;
}
error = err;
callback(!err);
callback(err ? null : {});
});
}, function () {
callback(error, result);
Expand Down
1 change: 1 addition & 0 deletions lib/waterfall.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ export default function(tasks, callback) {
}

function next(err/*, ...args*/) {
if (err === false) return
if (err || taskIndex === tasks.length) {
return callback.apply(null, arguments);
}
Expand Down
1 change: 1 addition & 0 deletions lib/whilst.js
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export default function whilst(test, iteratee, callback) {
if (!test()) return callback(null);
var next = function(err/*, ...args*/) {
if (err) return callback(err);
if (err === false) return;
if (test()) return _iteratee(next);
var args = slice(arguments, 1);
callback.apply(null, [null].concat(args));
Expand Down
59 changes: 58 additions & 1 deletion test/auto.js
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,63 @@ describe('auto', function () {
setTimeout(done, 100);
});

it('auto canceled', function(done){
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should also add a test for ensuring other functions don't get started when one is canceled. Something along the lines of

it('does not start other tasks when it has been canceled', function(done) {
  const call_order = []
  async.auto({
    task1: function(callback) {
      call_order.push(1);
      // defer calling task2, so task3 has time to stop execution
      async.setImmediate(callback);
    },
    task2: ['task1', function(/*results, callback*/) {
      call_order.push(2);
      throw new Error('task2 should not be called');
    }],
    task3: function(callback) {
      call_order.push(3);
      callback(false);
    },
    task4: ['task3', function(/*results, callback*/) {
      call_order.push(4);
      throw new Error('task4 should not be called');
    }]
  },
  function() {
    throw new Error('should not get here')
  });

  setTimeout(() => {
    expect(call_order).to.eql([1,3])
    done()
  }, 25)
});

const call_order = []
async.auto({
task1: function(callback){
call_order.push(1)
callback(false);
},
task2: ['task1', function(/*results, callback*/){
call_order.push(2)
throw new Error('task2 should not be called');
}],
task3: function(callback){
call_order.push(3)
callback('testerror2');
}
},
function(){
throw new Error('should not get here')
});
setTimeout(() => {
expect(call_order).to.eql([1, 3])
done()
}, 10);
});

it('does not start other tasks when it has been canceled', function(done) {
const call_order = []
debugger
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misc debugger statement

async.auto({
task1: function(callback) {
call_order.push(1);
// defer calling task2, so task3 has time to stop execution
async.setImmediate(callback);
},
task2: ['task1', function( /*results, callback*/ ) {
call_order.push(2);
throw new Error('task2 should not be called');
}],
task3: function(callback) {
call_order.push(3);
callback(false);
},
task4: ['task3', function( /*results, callback*/ ) {
call_order.push(4);
throw new Error('task4 should not be called');
}]
},
function() {
throw new Error('should not get here')
});

setTimeout(() => {
expect(call_order).to.eql([1, 3])
done()
}, 25)
});

it('auto no callback', function(done){
async.auto({
task1: function(callback){callback();},
Expand All @@ -185,7 +242,7 @@ describe('auto', function () {
it('auto error should pass partial results', function(done) {
async.auto({
task1: function(callback){
callback(false, 'result1');
callback(null, 'result1');
},
task2: ['task1', function(results, callback){
callback('testerror', 'result2');
Expand Down
48 changes: 48 additions & 0 deletions test/during.js
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@ describe('during', function() {
);
});

it('during canceling', (done) => {
Copy link
Collaborator

@hargasinski hargasinski Jun 12, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also add a test for canceling during in the check?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're going to deprecate during, making all whilst/until functions have an async test function, so no need for comprehensive test coverage.

let counter = 0;
async.during(
cb => cb(null, true),
cb => {
counter++
cb(counter === 2 ? false : null);
},
() => { throw new Error('should not get here')}
);
setTimeout(() => {
expect(counter).to.equal(2);
done();
}, 10)
})

it('doDuring', function(done) {
var call_order = [];

Expand Down Expand Up @@ -95,4 +111,36 @@ describe('during', function() {
}
);
});

it('doDuring canceling', (done) => {
let counter = 0;
async.doDuring(
cb => {
counter++
cb(counter === 2 ? false : null);
},
cb => cb(null, true),
() => { throw new Error('should not get here')}
);
setTimeout(() => {
expect(counter).to.equal(2);
done();
}, 10)
})

it('doDuring canceling in test', (done) => {
let counter = 0;
async.doDuring(
cb => {
counter++
cb(null, counter);
},
(n, cb) => cb(n === 2 ? false : null, true),
() => { throw new Error('should not get here')}
);
setTimeout(() => {
expect(counter).to.equal(2);
done();
}, 10)
})
});
Loading