Skip to content
Jean Hugues Robert edited this page Mar 19, 2014 · 11 revisions

Fluids are streams. They are made of water sources. See Water. Fluids provide a "fluent" API similar to the one on arrays. However, whereas an array has a fixed content that methods operate on, fluids have an evolving content that is processed according to methods describing what to do when new values are pushed into the fluid.

Fluid, from water

There is a "fluid" API. It returns a "fluid" object that implements the usual suspects (.filter(), .map(), .reduce(), etc). Fluids have a water delegate and duck type as water source objects, ie wherever a water source is required, a fluid does the job.

var source  = Water();
var source2 = Fluid();
var sink    = Water();
Water.fluid()
  .from( source  )
  .from( source2 )
    .filter( function( v ){ return v > 0; } )
    .map(    function( v ){ return v * 10;       } )
    .reduce( function( p, v ){ return p + v;  }, 0 )
    .tap(    function( v ){ console.log( v );    } )
  .to( sink );
source(  -1 ); // nothing
source(   1 ); // => 10
source(  -2 ); // nothing
source(   2 ); // => 30
source2( 10 ); // => 130
source2.from( [ 1, 1 ] ); // => 140, => 150
console.log( sink() ); => 150

console.log( sink() ); // => 30

Note: all functions can return a promise (or a boxon) for async outcome delivery. undefined is never propagated.

var f = Water.fluid()

Create a fluid. A kind of "stream".

var f = Water.fluid( source )

Create a fluid, from a source. The source can be some water or another fluid. This is equivalent to var f = Water.fluid().from( source );.

Note: when source is an array of values, to avoid early values propagation, ie values beeing pushed and processed before the dataflow graph is actually built, consider using Water.fluid.hold().

a_fluid.from( a_source )

Set up a connection between a source and a fluid. The source pushes data into the fluid. The source can be a water source or another fluid. If the source is an array, each item is a pushed, using fluid.push( v );. When a source is pushed into a fluid, a connection between that source and the fluid is established.

Note: using a_fluid.from( [ v1, v2, v3.. ] ) it is possible to push values before the dataflow graph is fully built and consequently the values don't flow in that graph. It is possible to hold the value until the graph is built and then release them. See f.hold() and f.release().

a_fluid.push( a_value )

Push a value into a fluid. Some values are processed in a special way. If the value is a fluid or a water source, a connection is established so that data from that source will flow into the fluid. If the value is an error, ie {water:w,error:e} it will flow transparently unless captured by a .failure(m); mapper or by an .if().failure()... branch. As always, undefined does not flow (but you can wrap it, see Water.wrap()).

Note: the special values cannot propagate, unless they are wrapped. See Water.wrap( v ) and Water.unwrap( v ).

If a value is pushed while a change propagation is going on, the push is queued (using fluid.effect( fn );) and will occur when all changes got properly propagated.

a_fluid.hold() & a_fluid.release()

When values are pushed into a fluid, they are processed immediately, unless there is a change propagation occurring, in which case the values are queued to be processed after the propagation is done.

It is possible to control the queueing of values even when no change propagation is running. Using a_fluid.hold() a "nested" counter is incremented and that forces pushed values to be queued. When enough call to fluid.release() are done and when the "nested" counter reaches zero, the queued values are pushed again. This occurs in FIFO order.

var msgs = [ "hello", "world" ];
Water.fluid().from( msgs ).log();                  // => nothing, values are pushed too soon
Water.fluid().hold().from( msgs ).log().release(); // => hello world
Water.fluid().log().from( msgs );                  // => hello world

Note: the "nested" counter is a global counter, it is shared by all fluids. This implementation detail may change in the future in favor to a "per fluid" counter (march 2014).

Note: Water.fluid.hold() and Water.fluid.release() are provided to control queueing globally.

a_fluid.to( a_sink )

Connect the fluid with another fluid or water.

a_fluid.close()

Closes the fluid. A special error is propagated: { water: f, error: "close" }. After a close, pushes are ignored and a_fluid.closed is true. Only f.final() can capture a "close" error, .failure() skips them.

a_fluid.concat(...)

Works like Array::concat(). If an argument is a fluid or a water source, instead of an array, data will be sourced from that source, a connection is established between the source and the fluid.

a_fluid.fail( error )

Push an error into the fluid. Error flow transparently unless captured using .failure( mapper ). Note: if no error is provided, "fail" is assumed.

a_fluid.water()

Return the underlying water object.

a_fluid.value()

Return the current value of the underlying water object. This is equivalent to a_fluid.water()().

Array like methods

Methods .filter(), .map(), .reduce(), .forEach(), .find(), .findIndex(), .indexOf(), .join(), .every() and .some() work like similar methods for arrays. Instead of returning a new array, it's a new fluid that is returned. Errors are skipped.

Note: the index is relative to the instant the method is invoked, ie index 0 is the first value, index 1 is the second value and so on. By default these values are not stored but they can be stored if source_fluid.stateful() was called. In that case, source_fluid.state()[index] retrieves the stored value.

a_fluid.raw( transform )

Like f.map( transform ) but errors (besides "close") are not skipped.

a_fluid.flatmap( fn )

Works like fluid.map( fn ); unless fn returns an array. When an array is returned, the first item is immediately produced and the other items are queued to be pushed for processing during the next change propagation.

var routed = a_fluid.route( rules )

Select route to follow. Each property of the rules object names a fluid where values will flow when the associated predicate succeeds. Unrouted values are transmitted transparently.

function is_even( n ){ return n % 1 === 0; };
var source = Water.fluid().hold().from( [ 0, 1, 2, 3, 4 ] )
var routed = source.route( { even: is_even } );
var even_numbers = routed.even.stateful.state();
var odd_numbers = routed.stateful.state();
source.release();
console.log( even_numbers ); // => 0, 2, 4
console.log( odd_numbers  ); // => 1, 3

var main = a_fluid.branch( p )

Branch fluid according to a predicate. The resulting branch is accessible using syntax main.branch.

var source = Water.fluid().hold().from( [ 0, 1, 2, 3, 4 ] )
var main = source.branch( is_even );
main.branch.log( "Even" );
main.log(        "Odd"  );
source.release(); // Even 0 Odd 1 Even 2 Odd 3 Even 4

a_fluid.junction(), .repeat()

Reunite stacked branches into a new fluid. Branches are stacked using f.stack(). f.junction() reunites all the not yet reunited branches since last call to f.route() or f.branch(), ie it clears the stack after doing new_fluid.from( stacked_fluid ) for each stacked fluid. In addition, the unrouted values are reunited too.

var routed = source.route( {
  even:  is_even,
  prime: is_prime,
  nan:   is_NaN
}).
routed.even.map( function( v ){ return v * 10; } ).stack();
routed.prim.filter().stack();
routed.nan.map( "Nan" ).stack();
routed.junction().log();
source.from( [ 0, 1, 2, 3, 4, 9, NaN ] ); // => 0, 20, 40, 9, NaN 

If the reunited values needs to be processed in a loop, please use .repeat() instead of .junction(). The reunited values will flow again into the last .route(). Contrary to .junction(), the unrouted values are not reunited, they are the output of the fluid. ie: a value exits the loop when no route matches for it.

a_fluid.if( p )

Nested if/else/else_if/end_if branching.

source.if( is_even ).log( "Even" ).else().log( "Odd" ).end_if();

The changes introduced in the branches are reunited at the .end_if() junction.

source
.if( is_odd )
  .map( function( v ){ return v + 1; } )
.end_if()
.log()
.from( [ 1, 2 ] ); // => 2 2

If there is no .else() clause, the values that don't get routed to the branch are kept unchanged when reunited with the routed values.

Syntax .else_if() helps avoid excessive nesting. Values are reunited at the final .end_if() junction.

source.if( is_even )
  .map( function( v ){ return v * 10; } ).
.else_if(  is_prime )
  .filter()
.else_if(  is_NaN )
  .map( "Not a number" )
.end_if().log();
source.from( [ 
  0,  // even number * 10 => 0
  1,  // odd & prime, filtered out
  2,  // even number * 10 => 20
  3,  // odd & prime, filtered out
  4,  // even number * 10 => 40
  9,  // not routed => 9
  NaN // => Not a number
] ); // =>  0 20 40 9 Not a number

a_fluid.while()...end_while()

Works like .if()...end_if() but the values loop until the predicate fails.

f.while( function( v ){ return v.length; } )
  .map( function( v ){
    do_something( v.pop() );
    return v;
  } )
.end_while()
.assert( function( v ){ !v.length } );

a_fluid.stateful( [array] )

Fluid are usually stateless, values flow but are not memorized. If access to previous values is required, a fluid can become stateful. The optional array is the new state, it defaults to a new empty array.

a_fluid.stateless()

If the fluid was stateful, the state array is cleared and no longer updated.

a_fluid.state( [index], [new_value] )

Return the fluid's state, an array of previous values, or null. Only stateful fluids have a state. If an index is provided, this is a getter. If a new value is provided too, this is a setter. Note: negative indexes are valid, -1 means "last item".

a_fluid.tap( fn )

Install a callback called when a new value is available. Note: errors are skipped.

a_fluid.once( fn )

This is similar to .tap( cb ) but the callback is executed once only, please use Water.again() to reinstall it if so desired.

a_fluid.failure( [fn] )

Map errors. If a value is returned by function fn, it replaces the error. Alias: a_fluid.catch(). If fn is not a function, it specifies the value to use on error. To process "close" special error, use f.final().

f.failure( "default" ).log();
f.fail() // => default

If fn is not provided, the error is unwrapped. Normal (non error) values are skipped.

f.if().failure()
  .map( "default" )
.end_if()
.log();
f.fail() // => default

f.final( [fn] )

Like f.failure() but for the special "close" error only.

f.first( initializer )

The initializer is called when a new value is available. If it succeeds, ie if it returns value that is not an error, that value replaces the new value and the initializer is not called anymore.

var quoter = Water.fluid();
var resource;
quoter
.first( function( first ){
  resource = alloc_resource();
  return first`;
})
  .map( xxx )
  .filter( xxx )
  .reduce( xxx )
.failure( function( err ){
  console.log( "Quoter error", err );
})
.final( function(){
  free_resource( resource );
});
quoter.push( "Hello" ).close();

a_fluid.define( fn )

Create a fluid based function. fn( input_fluid ); is called once to setup dependencies and then a function is returned. When that function is called, f( v [, cb ] ); the v value is pushed into the input fluid and the output is provided, either now or via a f( err, r ); callback if cb was provided.

Object oriented fluids

Fluids defines "member" functions that operate on them. If different types of fluids are required, with members defined differently, a new type of fluids can be created using fluid.subclass().


// Moving average over a window of samples, approximation using exponential filter
function exp_average( p, v, window ){ return ( p * (window - 1) + v ) / window; }

// Fluids of numbers
var numbers = Water.fluid.subclass();

// Define numbers.average( n_samples ), using an exponential filter
numbers.method( function average( window ){
  return this.reduce( function( p, v ){ return exp_average( p, v, window ); } )
});

// Fluids of complex numbers. { r: real_part, i: imaginary_part }
var complex_numbers = numbers.subclass();

// Define complex_numbers.average(), averages both real and imaginary parts
complex_numbers.method( function average( window ){
  return this.reduce( function( p, v ){
    return { r: exp_average( p.r, v.r ), i: exp_average( p.i, v.i ) };
  } );
});

Water.fluid.method( [name], fn )

Define a new fluid method. If name is not provided, it defaults to the fn function's name.

Water.fluid.method( function square(){ return this.map( function( v ){ return v * v ; } ); } );
var f = Water.fluid();
f.square().log( "Squared" );
f.push( 10 ); // => Squared 100
f.push( 20 ); // => Squared 400

It is also possible to define anonymous functions instead of new fluid members: var f = Water.fluid.method( fn ). Once defined, that function can be applied: var output = f( input ). both input and output are fluids.

Note: members are defined globally. If different types of fluids are required, with members unique to them, use syntax: my_fluid = Water.fluid.subclass(); and then my_fluid.method( function mmm(){ ... } );.

var square = Water.fluid.method( function(){
  return this.map( function( it ){ return it * it; } );
});
var source = Water.fluid();
square( source ).log( "Squared" );
source.push( 10 ); => Squared 100

fluid_class.mixin( [ target ], source, )

Populate target with members from source, target defaults to the fluid class's prototype.

var strings = Water.fluid.subclass();
strings.mixin( {
  match: function( pattern ){ return this.flatmap( function( v ){
    var r = v.match( pattern );
    if( !r )return
    return r;
  },
  capitalize: function(){ return this.map( function( v ){ return v.toUpperCase(); } );
  log: function(){
     var args = [ "String" ].concat( Array.prototype.slice.call( arguments ); );
     return this.super.log.call( this, args );
  }
})

var str_stream = strings();
str_stream.capitalize().log();
str_stream.push( "Hello, world!" ); // => String HELLO, WORLD!

This works well when a data flow graph handles fluids that are all of the same type, ie when they carry values of identical types. However, sometimes the same data flow graph must handle different types of fluids, ie fluids with values of different types. When the type of an output fluid is not the same as the one of the sources' one, there is a need to specify the type of fluid that a transform function will push values into. This requires a special type of "mapper", one that convert input values from the current fluid into values that fit the new output fluid's type. Such mappers are "casters".


// Integers are numbers without decimals
var integers = Water.fluid.subclass();

// Redefine "log()" for integers, adds an "Integer" prefix to messages
integers.method( function log(){
  args = Array.prototype.slice.call( arguments );
  args = [ "Integer" ].concat( args );
  return this.super.log.call( this, args );
} );

// To convert numbers with decimals into integers, select the nearest integer
var source = Water.fluid();
source
.cast( integers, function( r ){ return Math.round( r ); } )
.log(); // This is integers' log, not the base log()

source.push( 0.4 ); // => Integer 0
source.push( 0.6 ); // => Integer 1

Aliases

f.filter(), f.where() and f.select() are the same.

f.map() and f.step() too.

f.tap() and f.subscribe() idem.

f.failure() and f.catch() also.

Misc

ToDo: .merge(), .zip() ToDo: .pluck(), .min(), .max(), .count(), .fork()/.join(), .head()/.first(), .throttle(), .take(), .uniq(), etc, etc.

ToDo: memory management. It probably leaks at lot.