Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Safe amqp adapter

  • Loading branch information...
commit 4d366c14b837a1dcd0902eee50866f817b052671 1 parent 5eccd0e
@rnaveiras rnaveiras authored
View
1  lib/eventwire/adapters.rb
@@ -2,6 +2,7 @@ module Eventwire
module Adapters
autoload :InProcess, 'eventwire/adapters/in_process'
autoload :AMQP, 'eventwire/adapters/amqp'
+ autoload :SafeAMQP, 'eventwire/adapters/safe_amqp'
autoload :Bunny, 'eventwire/adapters/bunny'
autoload :Redis, 'eventwire/adapters/redis'
autoload :Mongo, 'eventwire/adapters/mongo'
View
82 lib/eventwire/adapters/safe_amqp.rb
@@ -0,0 +1,82 @@
+require 'bunny'
+require 'amqp'
+require 'amqp/extensions/rabbitmq'
+
+class Eventwire::Adapters::SafeAMQP
+ def initialize(options = {})
+ @options = options
+ end
+
+ def publish(event_name, event_data = nil)
+ connect_asynch do |conn|
+ AMQP::Channel.new(conn) do |channel|
+ channel.confirm_select
+ channel.on_ack do |basic_ack|
+ channel.close
+ stop
+ end
+ fanout = channel.fanout(event_name.to_s, :durable => true)
+ fanout.publish(event_data, :persistent => true)
+ end
+ end
+ end
+
+ def subscribe(event_name, handler_id, &handler)
+ subscriptions << [event_name, handler_id, handler]
+ end
+
+ def subscribe?(event_name, handler_id)
+ subscriptions.any? {|s| s[0] == event_name && s[1] == handler_id }
+ end
+
+ def start
+ connect_asynch do |conn|
+ (@channel ||= AMQP::Channel.new(conn, AMQP::Channel.next_channel_id, :prefetch => 1)).tap do |channel|
@pacoguzman Collaborator

Why did you keep an instance variable here?

@rnaveiras Collaborator

Probably in this version is not need it anymore, but original is because we need to avoid open a new channel every time. If you remember is a bug that fixed in the original sync version and in the standard eventwire adapter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ subscriptions.each {|subscription| bind_subscription(channel, *subscription) }
+ end
+ end
+ end
+
+ def stop
+ AMQP.stop { EM.stop }
+ end
+
+ def purge
+ connect_synch do |conn|
+ subscriptions.group_by(&:first).each do |event_name, _|
+ conn.exchange(event_name.to_s, :type => :fanout, :durable => true).delete
+ end
+ subscriptions.group_by(&:second).each do |handler_id, _|
+ conn.queue(handler_id.to_s, :durable => true).delete
+ end
+ end
+ end
+
+ def subscriptions
+ @subscriptions ||= []
+ end
+
+ def bind_subscription(channel, event_name, handler_id, handler)
+
+ fanout = channel.fanout(event_name.to_s, :durable => true)
+ queue = channel.queue(handler_id.to_s, :durable => true)
+
+ queue.bind(fanout).subscribe(:ack => true) do |metadata, json_data|
+ metadata.ack
+ handler.call json_data
+ end
+ end
+
+ def connect_asynch(&block)
+ if AMQP.connection && !AMQP.closing?
+ block.call(AMQP.connection)
+ else
+ AMQP.start(@options, &block)
+ end
+ end
+
+ def connect_synch(&block)
+ Bunny.run(@options, &block)
+ end
+
+end
View
28 spec/integration/adapters/safe_amqp_spec.rb
@@ -0,0 +1,28 @@
+require 'integration/adapters/adapters_helper'
+
+describe_adapter Eventwire::Adapters::SafeAMQP do
+ it_should_behave_like 'an adapter with single-process support'
+ it_should_behave_like 'an adapter with multi-process support'
+
+ describe 'with options given' do
+ let(:options) { stub }
+
+ subject { Eventwire::Adapters::SafeAMQP.new(options) }
+
+ it 'should connect with options on subscribe' do
+ AMQP.should_receive(:start).with(options)
+ subject.subscribe :event_name, :handler_id
+ subject.start
+ end
+
+ it 'should connect with options on publish' do
+ AMQP.should_receive(:start).with(options)
+ subject.publish :event_name
+ end
+
+ it 'should connect with options on purge' do
+ Bunny.should_receive(:run).with(options)
+ subject.purge
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.