Skip to content
Browse files

Eventwired!

(TODO: Review this commit for future improvements on Eventwire)
  • Loading branch information...
1 parent c848114 commit c3b950d0c057320b2f66be80aac7dbf970475be9 @cavalle committed Oct 16, 2011
View
2 Gemfile
@@ -5,6 +5,8 @@ gem 'rails', '3.0.0'
gem 'uuidtools'
gem 'ohm'
+gem 'eventwire', :git => 'git://github.com/cavalle/eventwire.git'
+
gem 'em-redis', :require => false
gem 'bunny', :require => false
View
9 Gemfile.lock
@@ -1,3 +1,10 @@
+GIT
+ remote: git://github.com/cavalle/eventwire.git
+ revision: 073cdf9e82f557779ea4e97c646ecae3b895df5a
+ specs:
+ eventwire (0.0.1)
+ json
+
GEM
remote: http://rubygems.org/
specs:
@@ -67,6 +74,7 @@ GEM
term-ansicolor (~> 1.0.5)
thor (>= 0.13.6)
i18n (0.4.2)
+ json (1.6.1)
json_pure (1.5.0)
launchy (0.3.7)
configuration (>= 0.0.5)
@@ -167,6 +175,7 @@ DEPENDENCIES
bunny
capybara
em-redis
+ eventwire!
ffi-rzmq
foreman
launchy
View
2 Procfile
@@ -1,2 +1,2 @@
web: bundle exec thin start
-bus: bundle exec rake event_bus:start
+bus: bundle exec rake environment eventwire:work --trace
View
14 config/initializers/infrastructure.rb
@@ -1,5 +1,13 @@
Rails.application.class.configure do
- config.event_bus = 'EventBus::Redis'
- config.event_subscribers = %w{ClientReport ClientDetailsReport AccountDetailsReport MoneyTransferSaga}
- config.to_prepare { EventBus.init }
+ config.to_prepare do
+ # Initialize Eventwire before each request so that using the InProcess driver
+ # in development the event handlers are declared only once
+ Eventwire.driver = Rails.env.test? ? 'InProcess' : 'Redis'
+ Eventwire.on_error do |ex|
+ raise ex
+ end
+ ClientReport; ClientDetailsReport; AccountDetailsReport; MoneyTransferSaga
+ end
end
+
+
View
4 lib/infrastructure/domain_repository.rb
@@ -1,6 +1,8 @@
module DomainRepository
class << self
+
+ include Eventwire::Publisher
def aggregates
Thread.current[:"DomainRepositoryCurrentStore"]
@@ -48,7 +50,7 @@ def save(event)
end
def publish(event)
- EventBus.publish(event)
+ publish_event(event.name, {:data => event.data})
end
end
View
11 lib/infrastructure/event_bus.rb
@@ -1,11 +0,0 @@
-module EventBus
- class << self
- attr_accessor :current
- delegate :publish, :subscribe, :wait_for_events, :start, :purge, :stop, :to => :current
-
- def init
- EventBus.current = Rails.configuration.event_bus.constantize.new
- Rails.configuration.event_subscribers.each(&:constantize)
- end
- end
-end
View
63 lib/infrastructure/event_bus/amqp.rb
@@ -1,63 +0,0 @@
-require 'amqp'
-require 'bunny'
-
-class EventBus::AMQP
-
- def publish(event)
- Bunny.run do |mq|
- mq.exchange(event.name, :type => :fanout).publish(event.id)
- end
- end
-
- def subscribe(event_name, handler_id, &handler)
- subscriptions << Subscription.new(event_name, handler_id, handler)
- end
-
- def wait_for_events
- next_tick
- end
-
- def next_tick
- return unless EM.reactor_running?
- t = Thread.current
- EM.next_tick { t.wakeup }
- Thread.stop
- end
-
- def purge
- Bunny.run do |mq|
- subscriptions.each { |s| s.purge(mq) }
- end
- end
-
- def start
- AMQP.start do
- subscriptions.each { |s| s.subscribe(MQ) }
- end
- end
-
- def stop
- AMQP.stop { EM.stop }
- end
-
- private
-
- def subscriptions
- @subscriptions ||= []
- end
-
- class Subscription < Struct.new(:event, :queue, :handler)
-
- def subscribe(mq)
- mq.queue(queue).bind(mq.fanout(event)).subscribe do |event_id|
- handler.call Event[event_id]
- end
- end
-
- def purge(mq)
- mq.queue(queue).purge
- end
-
- end
-
-end
View
21 lib/infrastructure/event_bus/in_process.rb
@@ -1,21 +0,0 @@
-class EventBus::InProcess
- def publish(event)
- subscriptions(event.name).each do |subscription|
- subscription.call(event)
- end
- end
-
- def subscriptions(event_name)
- @subscriptions ||= Hash.new
- @subscriptions[event_name] ||= Set.new
- end
-
- def subscribe(event_name, handler_id, &handler)
- subscriptions(event_name) << handler
- end
-
- def wait_for_events; end
- def purge; end
- def start; end
- def stop; end
-end
View
73 lib/infrastructure/event_bus/redis.rb
@@ -1,73 +0,0 @@
-require 'redis'
-require 'em-redis'
-
-class EventBus::Redis
- def publish(event)
- Redis.new.rpush event.name, event.id
- end
-
- def initialize
- @subscriptions = []
- @handlers = []
- end
-
- def subscribe(event_name, handler_id, &handler)
- @subscriptions << [event_name.to_s, handler_id]
- @handlers << [handler_id, handler]
- end
-
- def wait_for_events
- 7.times { next_tick }
- end
-
- def next_tick
- return unless EM.reactor_running?
- t = Thread.current
- EM.next_tick { t.wakeup }
- Thread.stop
- end
-
- def purge
- redis = Redis.new
- all_queues.each do |queue|
- redis.del queue
- end
- end
-
- def all_queues
- (@subscriptions + @handlers).map(&:first).uniq
- end
-
- def start
- EM.run do
- redis = EM::Protocols::Redis.connect
-
- @subscriptions.group_by(&:first).each do |event, subscriptions|
- subscribe_to_queue redis, event do |event_id|
- subscriptions.each do |event, queue|
- redis.rpush queue, event_id
- end
- end
- end
-
- @handlers.each do |queue, handler|
- subscribe_to_queue redis, queue do |event_id|
- handler.call Event[event_id]
- end
- end
-
- end
- end
-
- def subscribe_to_queue(redis, queue, &block)
- callback = Proc.new do |response|
- block.call(response) if response
- redis.lpop(queue, &callback)
- end
- redis.lpop(queue, &callback)
- end
-
- def stop
- EM.stop
- end
-end
View
51 lib/infrastructure/event_bus/zero.rb
@@ -1,51 +0,0 @@
-require 'ffi-rzmq'
-
-class EventBus::Zero
- def publish(event)
- ctx = ZMQ::Context.new
- s = ctx.socket ZMQ::PUSH
- s.connect("tcp://127.0.0.1:5560")
- s.send_string(event.id.to_s)
- s.close
- ctx.terminate
- end
-
- def subscriptions(event_name)
- @subscriptions ||= Hash.new
- @subscriptions[event_name.to_s] ||= Set.new
- end
-
- def subscribe(event_name, handler_id, &handler)
- subscriptions(event_name.to_s) << handler
- end
-
- def wait_for_events
- sleep(0.15) # next_tick
- end
-
- def purge
- end
-
- def start
- ctx = ZMQ::Context.new
- s = ctx.socket ZMQ::PULL
- s.bind("tcp://127.0.0.1:5560")
- @running = true
- while @running
- event_id = s.recv_string(ZMQ::NOBLOCK)
- next unless event_id
- event = Event[event_id]
- subscriptions(event.name).each do |subscription|
- subscription.call(event)
- end
- end
- ensure
- s.close
- ctx.terminate
- end
-
- def stop
- @running = false
- end
-
-end
View
12 lib/infrastructure/event_handler.rb
@@ -1,13 +1,13 @@
module EventHandler
+ include Eventwire::Subscriber::DSL
+
def on(*events, &block)
events.each do |event_name|
- ::EventBus.subscribe(event_name, "#{name}:#{increment_handlers_count}:#{event_name}", &block)
+ super(event_name) do |event|
+ event.data = event.data.to_hash.symbolize_keys
+ block.call(event)
+ end
end
end
- def increment_handlers_count
- @handlers_count ||= 0
- @handlers_count += 1
- end
-
end
View
5 lib/tasks/event_bus.rake
@@ -1,4 +1 @@
-desc "Start Event Bus"
-task "event_bus:start" => :environment do
- EventBus.start
-end
+require 'eventwire/tasks'
View
11 spec/acceptance/acceptance_helper.rb
@@ -8,21 +8,18 @@
config.before(:each) do
Ohm.flush
- EventBus.purge
- @t = Thread.new { EventBus.start }
- @t.abort_on_exception = true
end
config.after(:each) do
- EventBus.stop
- @t.join(1)
- @t.kill
Capybara.reset_sessions!
end
end
+Eventwire.on_error do |ex|
+ raise ex
+end
+
Capybara.app = Proc.new { |env|
- EventBus.wait_for_events
Rails.application.call(env)
}
View
1 spec/acceptance/support/commands.rb
@@ -28,7 +28,6 @@ def execute_command(*args)
DomainRepository.begin
result = "#{args.shift}_command_handler".camelize.constantize.new.execute(*args)
DomainRepository.commit
- EventBus.wait_for_events
result
end

0 comments on commit c3b950d

Please sign in to comment.
Something went wrong with that request. Please try again.