Skip to content

Commit

Permalink
Prevent Future.parallel from cancelling settled computations
Browse files Browse the repository at this point in the history
  • Loading branch information
Avaq committed Jul 6, 2017
1 parent 3905da6 commit 720b298
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 11 deletions.
26 changes: 15 additions & 11 deletions src/parallel.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {Core, Resolved, isFuture} from './core';
import {invalidArgument, invalidFuture} from './internal/throw';
import {show, mapArray, partial1} from './internal/fn';
import {noop, show, mapArray, partial1} from './internal/fn';
import {isUnsigned, isArray} from './internal/is';

const check$parallel = (m, i) => isFuture(m) ? m : invalidFuture(
Expand Down Expand Up @@ -28,20 +28,24 @@ Parallel.prototype._fork = function Parallel$_fork(rej, res){
for(let n = 0; n < _max; n++) cancels[n] && cancels[n]();
}

function Parallel$fork$rej(reason){
Parallel$fork$cancelAll();
rej(reason);
}

function Parallel$fork$run(future, idx, cancelSlot){
cancels[cancelSlot] = future._fork(Parallel$fork$rej, function Parallel$fork$res(value){
function Parallel$fork$run(idx, cancelSlot){
cancels[cancelSlot] = _futures[idx]._fork(function Parallel$fork$rej(reason){
cancels[cancelSlot] = noop;
Parallel$fork$cancelAll();
rej(reason);
}, function Parallel$fork$res(value){
out[idx] = value;
if(i < _length) Parallel$fork$run(_futures[i], i++, cancelSlot);
else if(++i - _max === _length) res(out);

if(i < _length){
Parallel$fork$run(i++, cancelSlot);
} else {
cancels[cancelSlot] = noop;
if(++i - _max === _length) res(out);
}
});
}

for(let n = 0; n < _max; n++) Parallel$fork$run(_futures[n], n, n);
for(let n = 0; n < _max; n++) Parallel$fork$run(n, n);

return Parallel$fork$cancelAll;

Expand Down
18 changes: 18 additions & 0 deletions test/5.parallel.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,24 @@ describe('Parallel', () => {
}, 30);
});

it('[GH #123] does not cancel settled computations', done => {
const m1 = Object.create(F.mock);
const m2 = Object.create(F.mock);

m1._fork = (rej, res) => {
setTimeout(res, 10, 1);
return () => done(U.error);
};

m2._fork = rej => {
setTimeout(rej, 20, 2);
return () => done(U.error);
};

parallel(2, [m1, m2]).fork(U.noop, U.noop);
setTimeout(done, 50, null);
});

it('does not resolve after being cancelled', done => {
const cancel = parallel(1, [F.resolvedSlow, F.resolvedSlow])
.fork(U.failRej, U.failRes);
Expand Down

0 comments on commit 720b298

Please sign in to comment.