A Ruby library that adds Dataflow variables (inspired by the Oz
language). Dataflow variables have the property that they can only
be bound/assigned to once, or have an equivalent value as an existing
assignment (see “unification”).
Dataflow variables must be declared before they are used, and can be
passed around as data without actually being bound. If the variable
gets used (in this library this means a method call) while being
unbound then the currently executing thread will suspend.
Ruby is Object Oriented (with the ability to mutate local, instance,
class, and global variables, and even constants), and on top of that
it has powerful reflection and meta-programming abilities. While these
features are useful for certain problems, they are not within the
declarative model. Staying in the declarative model gives one 2 advantages:
- It is easy to reason about what the program does
- Simple but powerful concurrency is possible
Ruby, like many other OO languages, is facing the hurdles of taking
advantage of the increase of processor cores within a simple parallel
programming model. This library lets you program Ruby in the
declarative concurrent model when you need to take advantage of multiple cores
(assuming a Ruby implementation that uses native threads in one way or
another).
The trick to this kind of programming is binding variables from other
threads. The nice thing is that many existing
libraries/classes/methods can still be used, just avoid
side-effects. Use regular Ruby threading to create threads, use
“local” or “declare” to create new variables, and use “unify” to bind
variables.
To install the latest release as a gem:
sudo gem install dataflow
#dataflow-gem @ freenode.net
# Local variables include Dataflow local do |x, y, z| # notice how the order automatically gets resolved Thread.new { unify y, x + 2 } Thread.new { unify z, y + 3 } Thread.new { unify x, 1 } z #=> 6 end
# Module methods version Dataflow.local do |x, y, z| # notice how the order automatically gets resolved Thread.new { Dataflow.unify y, x + 2 } Thread.new { Dataflow.unify z, y + 3 } Thread.new { Dataflow.unify x, 1 } z #=> 6 end # Note that a gobal Dataflow.declare is not supported
# Instance variables class AnimalHouse include Dataflow declare :small_cat, :big_cat def fetch_big_cat Thread.new { unify big_cat, small_cat.upcase } unify small_cat, 'cat' big_cat end end AnimalHouse.new.fetch_big_cat #=> 'CAT'
# Data-driven concurrency include Dataflow local do |stream, doubles, triples, squares| unify stream, Array.new(5) { Dataflow::Variable.new } Thread.new { unify doubles, stream.map {|n| n*2 } } Thread.new { unify triples, stream.map {|n| n*3 } } Thread.new { unify squares, stream.map {|n| n**2 } } Thread.new { stream.each {|x| unify x, rand(100) } } puts "original: #{stream.inspect}" puts "doubles: #{doubles.inspect}" puts "triples: #{triples.inspect}" puts "squares: #{squares.inspect}" end
# By-need trigger laziness include Dataflow local do |x, y, z| Thread.new { unify y, by_need { 4 } } Thread.new { unify z, x + y } Thread.new { unify x, by_need { 3 } } z #=> 7 end
# Need-later future expressions include Dataflow local do |x, y, z| unify y, need_later { 4 } unify z, need_later { x + y } unify x, need_later { 3 } z #=> 7 end
include Dataflow # flow without parameters local do |x| flow do # other stuff unify x, 1337 end x #=> 1337 end # flow with an output parameter local do |x| flow(x) do # other stuff 1337 end x #=> 1337 end
# barrier include Dataflow local do |lock1, lock2| flow { unify lock1, :unlocked } flow { unify lock2, :unlocked } barrier lock1, lock2 puts "Barrier broken!" end
# FutureQueue include Dataflow local do |queue, first, second| unify queue, Dataflow::FutureQueue.new queue.pop first queue.push 1 queue.push 2 queue.pop second first #=> 1 second #=> 2 end
Sometimes you may want to pack a data structure with variables that do not need to be referenced with labels. For those cases anonymous variables are a good choice, here are some options:
include Dataflow Array.new(3) { Dataflow::Variable.new } Array.new(3) { Dataflow.local } Array.new(3) { local } # and technically not anonymous Array.new(3) { local {|v| v } }
If you are having trouble and need to debug dataflow variables, simply call #inspect.
If the variable has already been bound, it call inspect on its bound value like normal.However, if the variable is not bound yet then you will get a special string that contains the proxies #id that you can use to track down which proxy objects are being passed around to which parts of your program:
include Dataflow local do |my_var| my_var.inspect # => #<Dataflow::Variable:2637860 unbound> end
By default both #flow and #need_later use Thread.fork as their fork method. Youc an access the fork method via Dataflow.forker.
If you would like to use a custom forker, simple set it to an object that responds to #call and internally calls a block passed to it (for an example of a synchronous forker, see spec/forker_spec.rb):
Dataflow.forker = MyClass.method(:fork_with_threadpool)
Also note that #flow is used interally by #need_later, in case you want to override that specifically.
Ports are an extension of the declarative concurrent model to support nondeterministic behavior. They accomplish this through the use of a single state variable. Ports are also inspired by the Oz language.
An Actor class in the style of Erlang message-passing processes is also provided. It makes use of the asynchronous behavior of ports, but otherwise uses no state variables.
include Dataflow local do |port, stream| unify port, Dataflow::Port.new(stream) Thread.new {port.send 2} Thread.new {port.send 8} Thread.new {port.send 1024} stream.take(3).sort #=> [2, 8, 1024] end
include Dataflow Ping = Actor.new { 3.times { case receive when "Ping" puts "Ping" Pong.send "Pong" end } } Pong = Actor.new { 3.times { case receive when "Pong" puts "Pong" Ping.send "Ping" end } } Actor.new { Ping.send "Ping" } Ping.join Pong.join
Most Ruby implmentations will not use method calls for equality
operations in base types/classes. This means equality between dataflow
variables and those base types will not behave as expected. Require
the following to get equality on base types that uses method calls,
while still passing rubyspec:
require "dataflow/equality"
The basis of dataflow variables around a language is not common among
popular languages and may be confusing to some. For an in-depth
introduction to the Oz language and the techniques used in this
library (including by_need triggers, port objects, and comparisons to Erlang message passing) see the book Concepts, Techniques, and Models of Computer Programming
larrytheliquid, amiller