Skip to content

Commit

Permalink
Prevent interpretor from running concurrent actions twice
Browse files Browse the repository at this point in the history
  • Loading branch information
Avaq committed Jun 7, 2017
1 parent ca9f5b5 commit 446a27f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 33 deletions.
80 changes: 47 additions & 33 deletions src/internal/interpretor.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
/*eslint no-cond-assign:0 */
/*eslint no-cond-assign:0, no-constant-condition:0 */

import Denque from 'denque';
import {noop} from './fn';
Expand Down Expand Up @@ -36,6 +36,20 @@ export default Sequence => function interpretor(rej, res){
if(async) drain();
}

//This function serves as a rejection handler for our current future.
//It will tell the current action that the future rejected, and it will
//settle the current tick with the action's answer to that.
function rejected(x){
settle(action.rejected(x));
}

//This function serves as a resolution handler for our current future.
//It will tell the current action that the future resolved, and it will
//settle the current tick with the action's answer to that.
function resolved(x){
settle(action.resolved(x));
}

//This function is passed into actions when they are "warmed up".
//If the action decides that it has its result, without the need to await
//anything else, then it can call this function to force "early termination".
Expand All @@ -53,44 +67,44 @@ export default Sequence => function interpretor(rej, res){
settle(m);
}

//This function serves as a rejection handler for our current future.
//It will tell the current action that the future rejected, and it will
//settle the current tick with the action's answer to that.
function rejected(x){
settle(action.rejected(x));
}

//This function serves as a resolution handler for our current future.
//It will tell the current action that the future resolved, and it will
//settle the current tick with the action's answer to that.
function resolved(x){
settle(action.resolved(x));
//This function serves to kickstart concurrent computations.
//Takes all actions from the cold queue *back-to-front*, and calls run() on
//each of them, passing them the "early" function. If any of them settles (by
//calling early()), we abort. After warming up all actions in the cold queue,
//we warm up the current action as well.
function warmupActions(){
while(it = cold.pop()){
it = it.run(early);
if(settled) return;
queue.unshift(it);
}
action = action.run(early);
}

//This function represents our main execution loop.
//It will take an action off the cold queue, which will only have items if
//we are synchronously processing actions, or if new cold items were just
//added by our previous settle. If there are no actions on the cold queue, it
//will take one from the hot queue. If there are no actions on the hot queue
//either, we fork the current future using the user provided continuations.
//Then, we perform the following steps in order. If any of these steps cause
//a settle, we stop and continue to the next tick synchronously:
//1. We fork the current future.
//2. We "warm up" all actions on the cold queue back-to-front.
//3. We "warm up" the current action which we shifted from the queue earlier.
//4. We return from the function if nothing caused a settle, we are now async.
//When we refer to a "tick", we mean the execution of the body inside the
//primary while-loop of this function.
//Every tick follows the following algorithm:
// 1. We try to take an action from the cold queue, if we fail, go to step 2.
// 1a. We fork the future.
// 1b. We warmupActions() if the we haven't settled yet.
// 2. We try to take an action from the hot queue, if we fail, go to step 3.
// 2a. We fork the Future, if settles, we continue to the next tick.
// 3. If we couldn't take actions from either queues, we fork the Future into
// the user provided continuations. This is the end of the interpretation.
// 4. If we did take an action from one of queues, but none of the steps
// caused a settle(), it means we are asynchronously waiting for something
// to settle and start the next tick, so we return from the function.
function drain(){
async = false;
while(action = cold.shift() || queue.shift()){
while(true){
settled = false;
cancel = future._fork(rejected, resolved);
if(settled) continue;
while(it = cold.pop()){
it = it.run(early);
if(!settled) queue.unshift(it);
}
if(settled) continue;
action = action.run(early);
if(action = cold.shift()){
cancel = future._fork(rejected, resolved);
if(!settled) warmupActions();
}else if(action = queue.shift()){
cancel = future._fork(rejected, resolved);
}else break;
if(settled) continue;
async = true;
return;
Expand Down
7 changes: 7 additions & 0 deletions test/2.sequence.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,13 @@ describe('Sequence', () => {
m.fork(noop, noop);
});

it('does not run run concurrent computations twice', done => {
let ran = false;
const mock = Future(_ => { ran ? done(error) : (ran = true) });
const m = new Sequence(resolvedSlow).chain(_ => resolvedSlow).race(mock);
m.fork(done, _ => done());
});

it('returns a cancel function which cancels all running actions', done => {
let i = 0;
const started = _ => void i++;
Expand Down

0 comments on commit 446a27f

Please sign in to comment.