Permalink
Browse files

Added AMQP basic support

  • Loading branch information...
1 parent 9521261 commit c3a069e0d0b2edb89e22e4eac5c43f7f974dcc97 Camille Meulien committed Nov 24, 2011
Showing with 202 additions and 0 deletions.
  1. +1 −0 Gemfile
  2. +112 −0 lib/em-synchrony/amqp.rb
  3. +88 −0 spec/amqp_spec.rb
  4. +1 −0 spec/helper/all.rb
View
1 Gemfile
@@ -14,4 +14,5 @@ group :development do
gem 'em-redis', '~> 0.3.0'
gem 'em-hiredis'
gem 'mongo'
+ gem 'amqp'
end
View
112 lib/em-synchrony/amqp.rb
@@ -0,0 +1,112 @@
+begin
+ require "amqp"
+rescue LoadError => error
+ raise "Missing EM-Synchrony dependency: gem install amqp"
+end
+
+module EventMachine
+ module Synchrony
+ module AMQP
+
+ class << self
+ def sync &blk
+ fiber = Fiber.current
+ blk.call(fiber)
+ Fiber.yield
+ end
+
+ def sync_cb fiber
+ Proc.new do |*args|
+ if fiber == Fiber.current
+ return *args
+ else
+ fiber.resume *args
+ end
+ end
+ end
+
+ %w[connect start run].each do |type|
+ module_eval %[
+ def #{type}(*params)
+ sync { |f| ::AMQP.#{type}(*params, &sync_cb(f)) }
+ end
+ ]
+ end
+ end
+
+ class Channel < ::AMQP::Channel
+ def initialize(*params, &block)
+ f = Fiber.current
+ super(*params, &EM::Synchrony::AMQP.sync_cb(f))
+ Fiber.yield
+ end
+
+ %w[direct fanout topic headers queue queue! flow prefetch recover tx_select tx_commit tx_rollback reset]
+ .each do |type|
+ module_eval %[
+ alias :a#{type} :#{type}
+ def #{type}(*params)
+ EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
+ end
+ ]
+ end
+ end
+
+ class Exchange < ::AMQP::Exchange
+ def initialize(channel, type, name, opts = {}, &block)
+ f = Fiber.current
+ super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
+ Fiber.yield
+ end
+
+ %w[publish delete]
+ .each do |type|
+ module_eval %[
+ alias :a#{type} :#{type}
+ def #{type}(*params)
+ EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
+ end
+ ]
+ end
+ end
+
+ class Queue < ::AMQP::Queue
+ def initialize(*params)
+ f = Fiber.current
+ super(*params, &EM::Synchrony::AMQP.sync_cb(f))
+ Fiber.yield
+ end
+
+ alias :asubscribe :subscribe
+ def subscribe &block
+ Fiber.new do
+ asubscribe(&EM::Synchrony::AMQP.sync_cb(Fiber.current))
+ loop { block.call(Fiber.yield) }
+ end.resume
+ end
+
+ %w[bind rebind unbind delete purge pop unsubscribe status]
+ .each do |type|
+ module_eval %[
+ alias :a#{type} :#{type}
+ def #{type}(*params)
+ EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
+ end
+ ]
+ end
+ end
+
+ class Session < ::AMQP::Session
+ %w[disconnect].each do |type|
+ module_eval %[
+ alias :a#{type} :#{type}
+ def #{type}(*params)
+ EM::Synchrony::AMQP.sync { |f| self.a#{type}(*params, &EM::Synchrony::AMQP.sync_cb(f)) }
+ end
+ ]
+ end
+ end
+
+ end
+ end
+end
View
88 spec/amqp_spec.rb
@@ -0,0 +1,88 @@
+require "spec/helper/all"
+require 'pp'
+require 'ruby-debug'
+
+describe EM::Synchrony::AMQP do
+
+ it "should yield until connection is ready" do
+ EM.synchrony do
+ connection = EM::Synchrony::AMQP.connect
+ connection.connected?.should be_true
+ EM.stop
+ end
+ end
+
+ it "should yield until disconnection is complete" do
+ EM.synchrony do
+ connection = EM::Synchrony::AMQP.connect
+ connection.disconnect
+ connection.connected?.should be_false
+ EM.stop
+ end
+ end
+
+ it "should yield until the channel is created" do
+ EM.synchrony do
+ connection = EM::Synchrony::AMQP.connect
+ channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
+ channel.should be_kind_of(EM::Synchrony::AMQP::Channel)
+ EM.stop
+ end
+ end
+
+ it "should yield until the queue is created" do
+ EM.synchrony do
+ connection = EM::Synchrony::AMQP.connect
+ channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
+ queue, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true)
+ EM.stop
+ end
+ end
+
+ it "should yield until the exchange is created" do
+ EM.synchrony do
+ connection = EM::Synchrony::AMQP.connect
+ channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
+ exchange, declare_ok = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")
+ EM.stop
+ end
+ end
+
+ it "should publish and receive messages" do
+ publish_number = 10
+ EM.synchrony do
+ connection = EM::Synchrony::AMQP.connect
+ channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
+ ex, declare_ok = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")
+
+ q1, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.1", :auto_delete => true)
+ q2, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.2", :auto_delete => true)
+
+ q1.bind(ex)
+ q2.bind(ex)
+
+ q1_nb, q2_nb = 0, 0
+ stop_cb = proc { EM.stop if q1_nb + q2_nb == 2 * publish_number }
+
+ q1.subscribe do |meta, msg|
+ msg.should match(/^Bonjour [0-9]+/)
+ q1_nb += 1
+ stop_cb.call
+ end
+
+ q2.subscribe do |meta, msg|
+ msg.should match(/^Bonjour [0-9]+/)
+ q2_nb += 1
+ stop_cb.call
+ end
+
+ Fiber.new do
+ publish_number.times do |n|
+ ex.publish("Bonjour #{n}")
+ EM::Synchrony.sleep(0.1)
+ end
+ end.resume
+ end
+ end
+
+end
View
1 spec/helper/all.rb
@@ -10,6 +10,7 @@
require 'lib/em-synchrony/em-mongo'
require 'lib/em-synchrony/em-redis'
require 'lib/em-synchrony/em-hiredis'
+require 'lib/em-synchrony/amqp'
require 'helper/tolerance_matcher'
require 'helper/stub-http-server'

0 comments on commit c3a069e

Please sign in to comment.