Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Reorganized infrastructure code, particularly event buses, so that li…

…bzeromq is not required unless EventBus::Zero is used
  • Loading branch information...
commit 0eb4a5511c768ed31a6ba322d12db93b7acbfef1 1 parent 4cda5a3
@cavalle authored
View
7 Gemfile
@@ -4,9 +4,10 @@ gem 'rails', '3.0.0'
gem 'uuidtools'
gem 'ohm'
-gem 'carrot'
-gem 'ffi-rzmq'
-gem 'amqp', :require => 'mq'
+
+gem 'carrot', :require => false
+gem 'ffi-rzmq', :require => false
+gem 'amqp', :require => false
group :development, :test do
gem 'rspec-rails'
View
10 README.md
@@ -16,10 +16,6 @@ For persistence the app uses [Redis](http://code.google.com/p/redis/). So a Redi
$ brew install redis
$ redis-server
-
-You also need libzeromq:
-
- $ brew install zeromq
With all that set, the test suite should pass by just running:
@@ -59,9 +55,11 @@ This is the equivalent to the missing `app/models` and you'll find the entities
Reports are subscribed to events from the domain model and update themselves according to those events. That way they are always in sync but totally uncoupled from the domain model. The reporting repository is denormalized and persisted with Redis.
-**app/infrastructure**
+**lib/infrastructure**
+
+Infrastructure libraries. Most of the magic is here. Including implementations of the Event Bus using different technologies (ZeroMQ, AMQP, Redis…)
-Typically the contents of this directory would be inside `lib` (or a plugin or gem) but, for development purposes, it turned out more convenient to put them in directory added to the `load_path`
+Configuration and initialization can be found at `config/initializers/infrastructure.rb` (if you want to try alternative Event Buses you should look here)
**spec/acceptance**
View
175 app/infrastructure/event_bus.rb
@@ -1,175 +0,0 @@
-module EventBus
- class << self
- attr_accessor :current
- delegate :publish, :subscribe, :wait_for_events, :start, :purge, :stop, :to => :current
-
- def init
- EventBus.current = Rails.configuration.event_bus.constantize.new
- Rails.configuration.event_subscribers.each(&:constantize)
- end
- end
-end
-
-class InProcessEventBus
- def publish(event)
- subscriptions(event.name).each do |subscription|
- subscription.call(event)
- end
- end
-
- def subscriptions(event_name)
- @subscriptions ||= Hash.new
- @subscriptions[event_name] ||= Set.new
- end
-
- def subscribe(event_name, &handler)
- subscriptions(event_name) << handler
- end
-
- def wait_for_events; end
- def purge; end
- def start; end
- def stop; end
-end
-
-class RedisEventBus
- def publish(event)
- Redis.new.publish "events", event.id
- end
-
- def subscriptions(event_name)
- @subscriptions ||= Hash.new
- @subscriptions[event_name.to_s] ||= Set.new
- end
-
- def subscribe(event_name, &handler)
- subscriptions(event_name.to_s) << handler
- end
-
- def wait_for_events
- sleep(0.05) # next_tick
- end
-
- def purge
- Redis.new.del "events"
- end
-
- def start
- Redis.new.subscribe("events") do |on|
- on.message do |channel, event_id|
- event = Event[event_id]
- subscriptions(event.name).each do |subscription|
- subscription.call(event)
- end
- end
- end
- end
-
- def stop; end
-end
-
-class AMQPEventBus
- def publish(event)
- Carrot.queue('events').publish(event.id)
- end
-
- def subscriptions(event_name)
- @subscriptions ||= Hash.new
- @subscriptions[event_name.to_s] ||= Set.new
- end
-
- def subscribe(event_name, &handler)
- subscriptions(event_name.to_s) << handler
- end
-
- def wait_for_events
- sleep(0.1) # next_tick
- end
-
- def purge
- Carrot.queue("events").purge
- end
-
- def start
- AMQP.start do
- MQ.new.queue("events").subscribe do |event_id|
- event = Event[event_id]
- subscriptions(event.name).each do |subscription|
- subscription.call(event)
- end
- end
- end
- end
-
- def stop
- AMQP.stop { EM.stop }
- wait_for_events
- end
-end
-
-class ZeroMQEventBus
- def publish(event)
- ctx = ZMQ::Context.new
- s = ctx.socket ZMQ::PUSH
- s.connect("tcp://127.0.0.1:5560")
- s.send_string(event.id.to_s)
-
- # Release socket after a delay since ZMQ
- # doesn't flush if we close right away
- Thread.new do
- sleep(1)
- s.close
- ctx.terminate
- end
- end
-
- def subscriptions(event_name)
- @subscriptions ||= Hash.new
- @subscriptions[event_name.to_s] ||= Set.new
- end
-
- def subscribe(event_name, &handler)
- subscriptions(event_name.to_s) << handler
- end
-
- def wait_for_events
- sleep(0.1) # next_tick
- end
-
- def purge
- end
-
- def start
- ctx = ZMQ::Context.new
- s = ctx.socket ZMQ::PULL
- s.bind("tcp://127.0.0.1:5560")
- @running = true
- while @running
- event_id = s.recv_string(ZMQ::NOBLOCK)
- next unless event_id
- event = Event[event_id]
- subscriptions(event.name).each do |subscription|
- subscription.call(event)
- end
- end
- ensure
- s.close
- ctx.terminate
- end
-
- def stop
- @running = false
- sleep(0.1) until available_port?('127.0.0.1', 5560)
- end
-
- private
-
- def available_port?(host, port)
- server = TCPServer.new(host, port)
- return true
- rescue
- return false
- ensure
- server.close if server
- end
-end
View
2  config/application.rb
@@ -17,7 +17,7 @@ class Application < Rails::Application
# -- all .rb files in that directory are automatically loaded.
# Custom directories with classes and modules you want to be autoloadable.
- # config.autoload_paths += %W(#{config.root}/extras)
+ config.autoload_paths += %W(#{config.root}/lib/infrastructure)
# Only load the plugins named here, in the order given (default is alphabetical).
# :all can be used as a placeholder for all plugins not explicitly named.
View
2  config/initializers/infrastructure.rb
@@ -1,5 +1,5 @@
Rails.application.class.configure do
- config.event_bus = 'RedisEventBus'
+ config.event_bus = 'EventBus::Redis'
config.event_subscribers = %w{ClientReport ClientDetailsReport AccountDetailsReport MoneyTransferSaga}
config.to_prepare { EventBus.init }
end
View
0  app/infrastructure/command_bus.rb → lib/infrastructure/command_bus.rb
File renamed without changes
View
0  app/infrastructure/domain_repository.rb → lib/infrastructure/domain_repository.rb
File renamed without changes
View
0  app/infrastructure/entity.rb → lib/infrastructure/entity.rb
File renamed without changes
View
0  app/infrastructure/event.rb → lib/infrastructure/event.rb
File renamed without changes
View
11 lib/infrastructure/event_bus.rb
@@ -0,0 +1,11 @@
+module EventBus
+ class << self
+ attr_accessor :current
+ delegate :publish, :subscribe, :wait_for_events, :start, :purge, :stop, :to => :current
+
+ def init
+ EventBus.current = Rails.configuration.event_bus.constantize.new
+ Rails.configuration.event_subscribers.each(&:constantize)
+ end
+ end
+end
View
41 lib/infrastructure/event_bus/amqp.rb
@@ -0,0 +1,41 @@
+require 'carrot'
+require 'mq'
+
+class EventBus::AMQP
+ def publish(event)
+ Carrot.queue('events').publish(event.id)
+ end
+
+ def subscriptions(event_name)
+ @subscriptions ||= Hash.new
+ @subscriptions[event_name.to_s] ||= Set.new
+ end
+
+ def subscribe(event_name, &handler)
+ subscriptions(event_name.to_s) << handler
+ end
+
+ def wait_for_events
+ sleep(0.15) # next_tick
+ end
+
+ def purge
+ Carrot.queue("events").purge
+ end
+
+ def start
+ AMQP.start do
+ MQ.new.queue("events").subscribe do |event_id|
+ event = Event[event_id]
+ subscriptions(event.name).each do |subscription|
+ subscription.call(event)
+ end
+ end
+ end
+ end
+
+ def stop
+ AMQP.stop { EM.stop }
+ wait_for_events
+ end
+end
View
21 lib/infrastructure/event_bus/in_process.rb
@@ -0,0 +1,21 @@
+class EventBus::InProcess
+ def publish(event)
+ subscriptions(event.name).each do |subscription|
+ subscription.call(event)
+ end
+ end
+
+ def subscriptions(event_name)
+ @subscriptions ||= Hash.new
+ @subscriptions[event_name] ||= Set.new
+ end
+
+ def subscribe(event_name, &handler)
+ subscriptions(event_name) << handler
+ end
+
+ def wait_for_events; end
+ def purge; end
+ def start; end
+ def stop; end
+end
View
35 lib/infrastructure/event_bus/redis.rb
@@ -0,0 +1,35 @@
+class EventBus::Redis
+ def publish(event)
+ Redis.new.publish "events", event.id
+ end
+
+ def subscriptions(event_name)
+ @subscriptions ||= Hash.new
+ @subscriptions[event_name.to_s] ||= Set.new
+ end
+
+ def subscribe(event_name, &handler)
+ subscriptions(event_name.to_s) << handler
+ end
+
+ def wait_for_events
+ sleep(0.05) # next_tick
+ end
+
+ def purge
+ Redis.new.del "events"
+ end
+
+ def start
+ Redis.new.subscribe("events") do |on|
+ on.message do |channel, event_id|
+ event = Event[event_id]
+ subscriptions(event.name).each do |subscription|
+ subscription.call(event)
+ end
+ end
+ end
+ end
+
+ def stop; end
+end
View
68 lib/infrastructure/event_bus/zero.rb
@@ -0,0 +1,68 @@
+require 'ffi-rzmq'
+
+class EventBus::Zero
+ def publish(event)
+ ctx = ZMQ::Context.new
+ s = ctx.socket ZMQ::PUSH
+ s.connect("tcp://127.0.0.1:5560")
+ s.send_string(event.id.to_s)
+
+ # Release socket after a delay since ZMQ
+ # doesn't flush if we close right away
+ Thread.new do
+ sleep(1)
+ s.close
+ ctx.terminate
+ end
+ end
+
+ def subscriptions(event_name)
+ @subscriptions ||= Hash.new
+ @subscriptions[event_name.to_s] ||= Set.new
+ end
+
+ def subscribe(event_name, &handler)
+ subscriptions(event_name.to_s) << handler
+ end
+
+ def wait_for_events
+ sleep(0.15) # next_tick
+ end
+
+ def purge
+ end
+
+ def start
+ ctx = ZMQ::Context.new
+ s = ctx.socket ZMQ::PULL
+ s.bind("tcp://127.0.0.1:5560")
+ @running = true
+ while @running
+ event_id = s.recv_string(ZMQ::NOBLOCK)
+ next unless event_id
+ event = Event[event_id]
+ subscriptions(event.name).each do |subscription|
+ subscription.call(event)
+ end
+ end
+ ensure
+ s.close
+ ctx.terminate
+ end
+
+ def stop
+ @running = false
+ sleep(0.1) until available_port?('127.0.0.1', 5560)
+ end
+
+ private
+
+ def available_port?(host, port)
+ server = TCPServer.new(host, port)
+ return true
+ rescue
+ return false
+ ensure
+ server.close if server
+ end
+end
View
0  app/infrastructure/event_handler.rb → lib/infrastructure/event_handler.rb
File renamed without changes
View
0  app/infrastructure/report.rb → lib/infrastructure/report.rb
File renamed without changes
Please sign in to comment.
Something went wrong with that request. Please try again.