Flowlib: Another Scala Dataflow Library
Flowlib is a compact library for asynchronous programming within a single JVM.
-
Compared to futures and promises, flowlib has a message passing model and allows arbitrary message processing graphs. There is a DSL to describe these graphs and operators to split and join message flows in sum or product style. The degree of concurrency is tune-able at each processing node.
-
Compared to Akka actors, message flows are statically typed and many of the details of flow control that would be manually programmed are automatic. On the other hand, flowlib is confined to a single JVM instance.
-
Compared to Spark, Storm or Akka clusters, flowlib is for smaller asynchronous systems. There are many of these!
-
Compared to Functional Reactive Programming, there is no global synchronization of messages flows (signals and behaviours in FRP) nor any other glitch suppression strategy.
Important Caveats:
- This an experimental library for now. I hope to have something well-proven soon.
- This page is in draft form until I test my examples and post more complete example code.
- The source is on github but the repository will likely be renamed to remove the reference to actors.
- Updated 29/12/2013 to reflect API changes: a type parameter for
Action
;Supervisor
parameter forrun()
;(p: Process) * (n: Int) == p*n
no longer holds.
Without Further Ado: A Graph
Here is a toy processing graph in the flowlib DSL:
import au.com.langdale.async.Flow._
trait SampleGraph extends SampleDecls {
val N = 2
def graph =
InitialData :- urls :-> Fetcher :- raw :-> Splitter -: (
urls ->: Dedup -: urls ->: Fetcher &
text ->: Filer*N -: metrics ->: Reporter )
}
Uppercase identifiers such as Fetcher
by convention are processes. Lowercase identifiers such as raw
are labels for message flows.
The connecting operator :-
attaches a flow label to a process forming a projection and :->
connects this to a target process.
These have right associative equivalents ->:
and -:
respectively, which are useful to express fan-out as opposed to fan-in structures.
The &
operator combines graphs or projections. More about the representation of graphs and projections later.
The *
operator multiplies a process. It indicates how many parallel instances should be executed.
We left out the declarations of the identifiers. Let's put them in a separate trait without committing to the types just yet.
trait SampleDecls {
type Text
type Address
type Metric
val raw, text = label[Text]
val urls = label[Address]
val metrics = label[Metric]
val Fetcher, Splitter, Dedup, Filer, Reporter, Supervisor: Process
}
Processes
On to the definition of processes. Lets take Dedup
as an example:
trait SampleProcesses {
def dedup[Message](flow: Label[Message]) = new Process {
def description = "remove duplicate messages"
def action = loop(Set.empty)
private def loop(seen: Set[Message]): Action[Nothing] =
input(flow) { message =>
if(seen contains message) loop(seen)
else output(flow, message) { loop(seen + message) }
}
}
// more process definitions here ...
}
This defines a dedup
method that will create a process with a given message type and flow label.
The parameters make dedup
potentially usable in different positions within a graph or in different graphs.
The process is constructed in continuation passing style as follows:
-
The
action
member is the process entry point. -
The
input
method returns anAction
that will be dispatched when a message is available on the port labelledflow
. -
The function
message => ...
is a continuation that is invoked when theinput
action is dispatched. It returns a newAction
to be dispatched. -
The
output
method returns anAction
that will be dispatched when themessage
can be delivered on the output port labelledflow
. -
The passed block is a continuation that is invoked when the
output
action is dispatched. -
Action
has a type parameter, in this caseNothing
, which indicates that this series of actions loops indefinitely.
Any other type U
indicates that a stop(u: U)
action
may be encountered which terminates a series of actions.
A Note About Process State
The dedup process must keep track of the messages already seen
which it does using an immutable Set[Message]
.
Successive versions of this set are passed from continuation
to continuation.
It might be tempting to use a mutable set here and make it
a member of Process
. That would be a common actor
programming style but it is not suitable for processes.
The library assumes Process
is immutable - it may not
have var
members or mutable members. Among other things,
this enables the *
operator and allows a given graph to
be run more than once.
Putting it Together
To complete the example we need to bring the graph and the process definitions together:
object Sample extends SampleGraph with SampleProcesses {
type Address = java.net.URI
val Dedup = dedup(urls)
// commit the remaining types and processes here ...
val procmap = run(graph, Supervisor)
println(s"Started ${procmap.size} processes!")
}
This fills in the Address
type and creates a Dedup
process
whose input and output will be given the urls
label.
(The remaining types and processes are omitted for brevity.)
The run(graph, Supervisor)
method puts everything in motion. A network of sites
connected by communication channels is created that mirrors the passed graph.
The corresponding process from the graph is executed at each site.
The given Supervisor
process is also executed. It is connected to the prefined errors
port of all the other processes and receives messages on errors or process termination.
The run method returns a map of processes to the sites at which they are executing.