Skip to content

Commit

Permalink
Fix known problems with the interpretor
Browse files Browse the repository at this point in the history
* Bug reported on 2017-06-02 by @d3vilroot
* Parallel actions not being cancelled after
  synchronous early termination.
* Parallel queued actions being cancelled after
  synchronous early termination.
  • Loading branch information
Avaq committed Jun 5, 2017
1 parent 2849063 commit e24fa0f
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 29 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"eslint-config-warp": "^1.2.0",
"eslint-plugin-markdown": "^1.0.0-beta.6",
"fantasy-land": "^3.0.0",
"fantasy-states": "^0.2.1",
"fun-task": "^1.5.1",
"istanbul": "^0.4.5",
"jsverify": "^0.8.2",
Expand Down
36 changes: 21 additions & 15 deletions src/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -565,51 +565,57 @@ Sequence.prototype._finally = function Sequence$finally(other){

Sequence.prototype._fork = function Sequence$_fork(rej, res){

const stack = new Denque(this._actions);
const queue = new Denque(this._actions.length);
const cold = new Denque(this._actions);
let action, cancel = noop, future = this._spawn, it, settled, async;

function settle(m){
settled = true;
future = m;
if(!(future instanceof Sequence)) return;
for(let i = future._actions.length - 1; i >= 0; i--) stack.unshift(future._actions[i]);
future = future._spawn;
if(future instanceof Sequence){
for(let i = future._actions.length - 1; i >= 0; i--) cold.unshift(future._actions[i]);
future = future._spawn;
}
if(async) drain();
}

function early(m, terminator){
cancel();
if(action !== terminator){
action.cancel();
while((it = queue.shift()) && it !== terminator) it.cancel();
while(async && (it = queue.shift()) && it !== terminator) it.cancel();
}
settle(m);
if(async) drain();
}

function rejected(x){
settle(action.rejected(x));
if(async) drain();
}

function resolved(x){
settle(action.resolved(x));
if(async) drain();
}

function runActions(){
let i = 0;
const hot = new Array(cold.length);
while(it = cold.shift()){
const tmp = it.run(early);
if(settled) break;
hot[i++] = tmp;
}
while(--i >= 0) if(settled) hot[i].cancel(); else queue.unshift(hot[i]);
}

function drain(){
async = false;
while(action = stack.shift() || queue.shift()){
while(action = cold.shift() || queue.shift()){
settled = false;
cancel = future._fork(rejected, resolved);
if(settled) continue;
action = action.run(early);
if(settled) continue;
while(it = stack.shift()){
const tmp = it.run(early);
if(settled) break;
queue.push(tmp);
}
runActions();
if(settled) continue;
async = true;
return;
Expand All @@ -623,7 +629,7 @@ Sequence.prototype._fork = function Sequence$_fork(rej, res){
cancel();
action && action.cancel();
while(it = queue.shift()) it.cancel();
stack.clear();
cold.clear();
cancel = noop;
};

Expand Down
12 changes: 0 additions & 12 deletions test/2.computation.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,18 +72,6 @@ describe('Computation', () => {
actual.fork(_ => done(), U.failRes);
});

it('prevents chains from running twice', done => {
const m = Future((rej, res) => {
res(1);
res(1);
});
m.map(x => {
done();
return x;
})
.fork(U.failRej, U.noop);
});

it('stops continuations from being called after cancellation', done => {
Future((rej, res) => {
setTimeout(res, 20, 1);
Expand Down
27 changes: 25 additions & 2 deletions test/2.sequence.test.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import {Future, of, never, after} from '../index.es.js';
import {expect} from 'chai';
import {add, bang, noop, error, assertResolved, assertRejected} from './util';
import {resolved, rejected} from './futures';
import {resolved, rejected, resolvedSlow} from './futures';
import {Sequence, Core} from '../src/core';
import {StateT} from 'fantasy-states';

describe('Sequence', () => {

Expand Down Expand Up @@ -333,14 +334,24 @@ describe('Sequence', () => {
setTimeout(done, 40, null);
});

it('cancels running actions when one early-terminates asynchronously', done => {
const slow = new Sequence(Future(() => {
const id = setTimeout(done, 50, new Error('Not terminated'));
return () => clearTimeout(id);
}));
const m = slow.race(slow).race(slow).race(slow).race(resolvedSlow);
m.fork(noop, noop);
setTimeout(done, 100, null);
});

it('does not forget about actions to run after early termination', () => {
const m = new Sequence(after(30, 'a'))
.race(new Sequence(after(20, 'b')))
.map(x => `${x}c`);
return assertResolved(m, 'bc');
});

it('does not run early terminating actions twice', done => {
it('does not run early terminating actions twice, or cancel them', done => {
const mock = Object.create(Core);
mock._fork = (l, r) => r(done()) || (() => done(error));
const m = new Sequence(after(30, 'a')).map(x => `${x}b`).race(mock);
Expand All @@ -362,4 +373,16 @@ describe('Sequence', () => {

});

describe('Bug 2017-06-02, reported by @d3vilroot', () => {

const Middleware = StateT(Future);
const slow = Middleware.lift(after(10, null));
const program = slow.chain(_ => slow.chain(_ => slow)).evalState(null);

it('does not occur', done => {
program.fork(done, _ => done());
});

});

});

0 comments on commit e24fa0f

Please sign in to comment.