npm install asynchelper
While writing nodejs code, it is not uncommon to have to do something like this:
- get database connection
- count records for user
- if count is 0, insert record for user
- create user game descriptor
- return JSON with user data
Another use case would be:
- get facebook credentials
- make a post to your REST
- make a get to another REST
- increment REDIS for API call track
these use cases translated into plain node code, are a callback nightmare.
Not to mention error control flow.
Not to mention timeout constraints (you really don't want this to take more than 200 milliseconds)
What if you need to know when all this code ends execution.
And then:
- how about having both use cases running at the same time ?
- how about not having more than 4 of these (remember scarce database connection number ?) running at the same time ?
AsyncHelper is a general purpose library to handle asynchronous processing in nodejs.
The goal is to expose a clean interface, not only for code readability, but to keep under control all the synchronous operations that could eventually span in the lifecycle of a node js application, and to keep scarce resources under fine control.
The library relies on the concept of a Condition
for signaling asynchronous code activity and a Future
returned
to the developer, so she can be notified about computation results.
The Condition
allows to choreograph complex operations and have a fast short-circuit mechanism to allow your
code to progress whenever the conditions are met or not. A Condition
is hence a super-powered boolean events broadcaster.
Conditions are defined to fail-fast because it met its criteria, or because of built-in timeout control.
The main object to interact with is a Dispatcher
object.
Since sometimes is important to handle different scarce resources at a time, as much as needed Dispatcher
objects
can be created. It makes sense to interact with different pieces of software separately like dealing with your database
or an exposed REST API.
The creation of a Dispatcher
, relies on objects instead of calling AsyncHelper.waterfall( [], callback )
for well
reasons:
- concurrency purposes. A dispatcher can set a maximum number of concurrent operations, so that you can keep under fine control the scarce resources of your application, like for example, database connections.
- keep track of running operations, and pending running operations.
A dispatcher is hence a Queue
, served as a first-in first-out, with interesting control flow capabilities.
How you get notified about dispatched tasks status is by means of Future objects
.
For each element submitted for execution to a Dispatcher
, a Future
object will be returned in exchange.
A Future
object offers a more formal and complete notification scheme than a callback:
- a future can be checked for its value. Either not set (still pending execution), and if set, whether is an Error.
- multiple observers can be listening to the future's value change.
- it is latched by a
Condition
, so its value can only be set once. And the future's observers will only be notified once.
const future = _dispatcher.waterfall( [...] );
Future objects expose three different callback notification mechanisms (in fact they are the same, but with some sugar icing):
future.onValueset( (f:Future<number>) => {
// check future's value.
// it will be either a number or Error.
});
Promisify a future object:
future.then(
(v:number) => {
// waterfall returned v
},
(e:Error) => {
// waterfall errored
}
);
notify on a nodejs standard callback:
future.node(
(err:Error, v:number) => {
// waterfall returned v or Error
}
);
But what can be actually be submitted to a Dispatcher
.
There are three operations a Dispatcher can execute:
waterfall
of nodejs-style functions.ParallelCondition
objects.- arbitrary functions with time control.
waterfall
-ing is the most common scenario for the AsyncHelper.
It allows to have fine timing control for the whole function waterfall, and pass-through error control. A thrown Error in
any of the functions can be catch and propagated to the Future
as an error value.
Bonus points: if an Error is thrown, you can get a detailed string representation of the waterfall status at the
moment of throw, and all parameters passed to each waterfall function by accessing sequenceStackTrace
on the error
object.
const _d = new Dispatcher(4); // 4 concurrent elements.
// so up to 4 submitted elements will be executing concurrently.
// all others will be queued until another element ends execution.
_d.waterfall(
[
function f1() {
...
},
function f2( err:Error, ret_from_prev_function : any ) {
}
],
2000, // 2 seconds to execute the waterfall
true // if a function of the waterfall throws an error, halt execution
)
.onValueSet( (f:Future) {
// the future has value set.
});
- Functions must comply with nodejs's callback convention: (e:Error, args:any).
- Functions return value will be propagated to the next function. The latest waterfall function return value
will be propagated as the
Future
object's value. - Functions can't be fat arrow functions. Internally, the
Dispatcher
sets this as the waterfall control function itself, otherwise, it won't be able to propagate returned values from function to function. - Functions must return a value. If no the function has no return value it is expected that the next waterfall function will be invoked as a callback from something invoked in the current waterfall's function.
- You can set properties on this. The execution context will be a per-waterfall submission function which handles the control flow.
With all this, a real life use case for the waterfall could be:
game_dispatcher.waterfall(
[
function() {
// set the next waterfall function as postgres' connect callback.
pg.connect( DU.PG_CONN_STRING, this );
},
function (err:Error, client:pg.Client, done:()=>void) {
if (DU.HandleError(err, client, done)) {
// stop waterfall, and propagate error to the Future value.
throw err;
}
// pass postgres specific objects to next waterfall functions
this.props = {
client : client,
done : done
};
this.props.client.query(
"update client_created_games " +
"set" +
" context= $1," +
" status= $2," +
" ...",
[
...
],
this)
},
function( err:Error /*, result:pg.ResultBuilder*/) {
if (DU.HandleError(err, this.props.client, this.props.done)) {
throw err;
}
this.props.client.query(
"select " +
" id," +
" client_id, ...",
[
...
],
this
);
},
function(err:Error, result:pg.ResultBuilder) {
if (DU.HandleError(err, this.props.client, this.props.done)) {
throw err;
}
// return postgres connection to the pool
this.props.done();
if ( result.rows.length===1 ) {
// future's value
return 12345;
} else {
// future's value
throw new Error("An error.");
}
}
],
1000, // take a second as much to execute waterfall functions. 0 for no timeout. optional
true // error pass-through enabled ?
).onValueSet(
// check future.getValue() for an error.
(future:Future) => ...
);
A ParallelCondition
can be scheduled for execution. The future's value will be a boolean, indicating Condition
met
or not.
Remember, the Condition
might have short-circuited early, and does not mean all Condition
operations are done, only
the Condition
value has been set.
const pc = new ParallelCondition( ... );
_dispatcher.submitCondition( pc ).then(
(condition_result:boolean) => {
// the condition result was successful or not
},
(err:Error) => {
// an error ocurred
}
);
This Dispatcher
function submits an arbitrary function for execution. It also exposes timeout control.
Since a dispatcher does not have any knowledge of the execution context, the function has to be bound with its parameters.
_dispatcher.submit(
function() {
},
0 // no timeout
);
function fn( a,b,c ) {
...
}
// execute fn with a maximum time of 100 milliseconds.
_dispatcher.submit( fn.bind(null,3,4,5), 100 );
The method Dispatcher.addIsEmptyListener( (d:Dispatcher)=>void )
will be invoked whenever a dispatcher has drained
all its scheduled tasks.
Though the most interesting object in AsyncHelper is the Dispatcher
, it exposes some really useful objects as well.
Internally, Dispatcher
relies on choreographing these objects, but they could also be used directly
Conditions are the basic building blocks. A condition is a simple object wrapping a boolean variable, and exposing events based on its changes.
export type SignalObserver = (...args:any[])=>void;
const c = new Condition()
.onTrue( (c:Condition) => {
})
.onFalse( (c:Condition) => {
})
.onStateChange( (c:Condition) => {
});
Initially, a Condition
has no internal state. It is nor true, nor false.
When it has its values set, the onXXX
callbacks will be invoked.
Internally, a Condition
uses a Signal
object, which is useful to register multiple function callbacks
for each event.
c
.onTrue( (c:Condition) => {...} )
.onTrue( (c:Condition) => {...} );
c.setTrue(); // will notify to both callbacks.
A value is set by externally calling:
c.setTrue();
c.setFalse();
can also be reseted
c.setNotSet();
A Condition can also have value set based on a Timer
const _condition : Condition = new Condition()
.onTrue( (c:Condition) => {
...
})
.onFalse( (c:Condition) => {
// invoked in 200ms unless _condition.setTrue() is called before.
})
.setTimeout( 200 );
A simple boolean variable can be leveraged with a whole tree of Condition
objects, which
will be represented by a ConditionTree
. A tree is defined by one or more Condition
and
a boolean operator.
ConditionTree
objects short circuit, and notify onXXX
methods as fast as possible.
The evaluation is totally lazy, and may eventually happen when any of the Conditions
inside the tree fire.
const c0 = new Condition();
const c1 = new Condition();
const c2 = new Condition();
const ct : ConditionTree = new ConditionTree( BOOLEAN_OPERATOR.AND )
.onFalse( (ct : Condition) => {
})
.addCondition( c0 )
.addCondition( c1 )
.addCondition( c2 );
c0.setTrue();
// ct still has no value.
c1.setFalse(); // short circuit, ct evaluates to false now,
// regardless c1 and c2 have values set later.
c1.setTrue(); // it does not change ct's value.
Condition trees can be nested:
const c0 = new Condition();
const c1 = new Condition();
const c2 = new Condition();
const c3 = new Condition();
const ct0 : ConditionTree = new ConditionTree( BOOLEAN_OPERATOR.OR )
.addCondition( c0 )
.addCondition( c1 );
const ct1 : ConditionTree = new ConditionTree( BOOLEAN_OPERATOR.OR )
.addCondition( c2 )
.addCondition( c3 );
const ctct : ConditionTree = new ConditionTree( BOOLEAN_OPERATOR.OR )
.addCondition( ct0 )
.addCondition( ct1 )
.onTrue( ( c : Condition ) {
console.log( c.getCurrentValue() ); // BOOL_OPERATOR.TRUE | BOOL_OPERATOR.FALSE |BOOL_OPERATOR.NOT_SET
});
c2.setTrue(); // ctct invoked onTrue();
ParallelCondition
objects are asynchronous ConditionTree
s. It is initialized from a collection of
ParallelCondition
and or callbacks of the form: (c:Condition, index:number)=>void
.
The idea with this object is each of the elements supplied at construction time will be executed on the next tick.
Elements supplied are either other ParallelCondition
objects, or worker functions which will notify their
activity result on the Condition
object supplied as parameter.
The ParallelCondition
evaluates as an AND ConditionTree
, so it expects every action to notify back with setTrue().
It will shot-circuit fast, so any action notifying in its parameter Condition
with setFalse()
will make the
ParallelCondition
evaluate as false, and notify immediately onFalse()
regardless all parallel operations have
ended.
A real use case for this object is:
- write some activity to the database
- get postgres connection
- insert
- check result
- notify on the parameter condition
- (and) send some content to redis
- (and) get some XHR content
// Wait for **all** these things to happen, with a 3 second limit.
function do_postgres_stuff( _condition:Condition, index:number ) {
// more on the `Dispatcher` beast later
// Execute the sequence of functions,
// + take a maximum of 2 seconds to execute, otherwise set an error as result
// + if an error is thrown in any function, halt sequence execution.
//
_dispatcher.submitNodeSequence(
[
function getConnection() {
},
function insertContent( e:Error, ...) {
},
function checkResult( e:Error, ...) {
}
],
true, // error fall through
2000 // max timeout
).onValueSet(
( f : Future ) => {
const f : any = future.getValue();
if (f instanceof Error) {
// you can read f.sequenceStacktrace to get detailed info of what went wrong
console.log( f.sequenceStackTrace );
_condition.setFalse();
} else {
_condition.setTrue();
}
}
);
}
function do_redis_stuff( _condition:Condition, index:number ) {
...
}
function do_xhr_stuff( _condition:Condition, index:number ) {
...
}
const pc = new ParallelCondition([
do_posgres_stuff,
do_redis_stuff,
do_xhr_stuff
]).onTrue( (c : Condition) => {
// all operations ended correctly
}).onFalse( (c : Condition) => {
// something went wrong.
// condition short-circuited, some operations may not have ended yet.
}).
setTimeout( 3000 ); // take 3 secs as much to process the condition, othewise, onFalse will be invoked.