Skip to content

Commit

Permalink
Configuration options for redis and mongodb
Browse files Browse the repository at this point in the history
  • Loading branch information
rnaveiras committed Apr 29, 2012
1 parent 99fa77f commit 0e74e3d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Gemfile
Expand Up @@ -10,7 +10,7 @@ gem 'hashie'

unless ENV["CI"]
gem 'ruby-debug', :platforms => :mri_18
gem 'ruby-debug19', :platforms => :mri_19
gem 'debugger', :platforms => :mri_19
end

# Drivers
Expand Down
31 changes: 26 additions & 5 deletions lib/eventwire/adapters/mongo.rb
@@ -1,9 +1,12 @@
require 'mongo'

class Eventwire::Adapters::Mongo
DB_NAME = 'broker'
DEFAULT_OPTIONS = {:host => 'localhost', :port => 27017,
:safe => true, :db_name => 'broker',
:pool_timeout => 5, :pool_size => 5 }

def initialize
def initialize(options = {})
@options = DEFAULT_OPTIONS.merge(options)
@handlers = []
end

Expand All @@ -29,12 +32,12 @@ def start

loop do
@handlers.each do |queue_name, handler|
break unless @started
queue = db.collection(queue_name)
if event_data = queue.find_and_modify({:remove => true})
handler.call event_data['event_data']
end
end
break unless @started
end
end

Expand All @@ -43,11 +46,29 @@ def stop
end

def db
@db ||= Mongo::Connection.new('localhost', 27017, :safe => true).db(DB_NAME)
@db ||= Mongo::Connection.new(host, port, options).db(db_name)
end

def purge
Mongo::Connection.new.drop_database(DB_NAME)
Mongo::Connection.new(host, port, options).drop_database(db_name)
end

private

def host
@options[:host]
end

def port
@options[:port]
end

def options
{ :safe => @options[:safe], :pool_timeout => @options[:pool_timeout], :pool_size => @options[:pool_size] }
end

def db_name
@options[:db_name]
end

end
11 changes: 6 additions & 5 deletions lib/eventwire/adapters/redis.rb
Expand Up @@ -2,12 +2,13 @@
require 'em-redis'

class Eventwire::Adapters::Redis
def initialize
def initialize(options = {})
@options = options
@handlers = []
end

def publish(event_name, event_data = nil)
redis = ::Redis.new
redis = ::Redis.new(@options)
handlers = redis.smembers("event_handlers:#{event_name}")
handlers.each do |handler|
redis.rpush handler, event_data
Expand All @@ -16,7 +17,7 @@ def publish(event_name, event_data = nil)

def subscribe(event_name, handler_id, &handler)
@handlers << [handler_id, handler]
::Redis.new.sadd "event_handlers:#{event_name}", handler_id
::Redis.new(@options).sadd "event_handlers:#{event_name}", handler_id
end

def start
Expand All @@ -30,7 +31,7 @@ def start
end

def subscribe_to_queue(queue, redis = nil, &block)
redis ||= EM::Protocols::Redis.connect
redis ||= EM::Protocols::Redis.connect(@options)
redis.blpop(queue, 0) do |response|
block.call(response.last) if response
subscribe_to_queue(queue, redis, &block)
Expand All @@ -42,7 +43,7 @@ def stop
end

def purge
redis = ::Redis.new
redis = ::Redis.new(@options)
redis.flushdb
end

Expand Down
2 changes: 1 addition & 1 deletion spec/acceptance/project_management_spec.rb
Expand Up @@ -3,7 +3,7 @@

describe 'Project Management System' do

adapters = %w{InProcess AMQP Redis}
adapters = %w{InProcess AMQP Redis Mongo}

adapters.each do |adapter|

Expand Down

0 comments on commit 0e74e3d

Please sign in to comment.