Ruby Dataflow Engine
Ruby
Switch branches/tags
Nothing to show
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
lib
.gitignore
README

README

Ruby DataFlow Engine, written by Ho-Sheng Hsiao <hosh@isshen.com>

Copyright (c) 2009 Isshen, LLC

- Each Processor (component) is run inside an EventMachine context
- Connectors uses AMQP (such as RabbitMQ)
- Journal uses a Datamapper object

TODO
- In-memory connector
- In-memory Journal 
- DataFlow can be redone from an arbitrary starting point
- May write a Blackboard system instead and reimplement the DSL on top of it

INSTALLATION

git submodule add git://github.com/hosh/ruby-dataflow.git lib/dataflow
git submodule init
git submodule update

If you want to load this with Merb, edit your initialization scripts. For example, in Merb config/init.rb:

Merb::BootLoader.before_app_loads do
  # This will get executed after dependencies have been loaded but before your app's classes have loaded.
  require 'lib/dataflow/lib/dataflow'
end


USAGE

http://pastie.org/private/hetwdvw3crgiv04nmg51qw

require 'pp' # Diagnostics
class TweetSearchEngine
  include Dataflow::Engine

  dataflow :tweet_cloud do
    serialize_as :json
    fetch_dataset lambda { |msg| get_tweet_cloud(msg) }
    journal_with :tweet_journal_entries

    start_with :download do |tweet_search|
      tweet_search.download
    end

    rescue_change :download do |error, tweet_search|
      p [error.inspect, tweet_search]
      tweet_cloud.abort_download
    end

    change :download, :map do |tweet_search|
      tweet_search.each do |tweet|
         pp tweet
      end
    end

    change :download, :download_twits do |tweet_search|
      screen_names = tweet_search.all_users
      screen_names.each do |screen_name, tweets_count|
        twit = Twit.fetch(screen_name) # Scrapes a Twitter Profile
      end
      true
    end

    change [:map, :download_twits], :reduce do |tweet_search|
      tweet_search.reduce
    end

    change :reduce, :complete do |tweet_search|
      tweet_search.complete
      EmailNotifier.notify(tweet_search)
    end

    ends_with :complete
  end
end

# Then in config/init.rb start the background thread for the engine

Merb::BootLoader.after_app_loads do
  # This will get executed after your app's classes have been loaded.
  TweetSearchEngine.start if Merb.env == 'development' || Merb.env == 'production'
end