Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Implemented correctly the Channel methods direct, fanout, topic, head…
…ers and queue!

Added error handling into Channel, Exchange and Queue initialize, this way the coding style is closer to synchronous
  • Loading branch information
Camille Meulien committed Nov 27, 2011
1 parent c3a069e commit bd3aa58
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 13 deletions.
39 changes: 35 additions & 4 deletions lib/em-synchrony/amqp.rb
@@ -1,12 +1,14 @@
begin
require "amqp"
require "amq/protocol"
rescue LoadError => error
raise "Missing EM-Synchrony dependency: gem install amqp"
end

module EventMachine
module Synchrony
module AMQP
class Error < RuntimeError; end

class << self
def sync &blk
Expand Down Expand Up @@ -38,10 +40,35 @@ class Channel < ::AMQP::Channel
def initialize(*params, &block)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
Fiber.yield
channel, open_ok = Fiber.yield
raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk)
channel
end

%w[direct fanout topic headers].each do |type|
line = __LINE__ + 2
code = <<-EOF
alias :a#{type} :#{type}
def #{type}(name = 'amq.#{type}', opts = {})
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil)
validate_parameters_match!(exchange, extended_opts)
exchange
else
register_exchange(Exchange.new(self, :#{type}, name, opts))
end
end
EOF
module_eval(code, __FILE__, line)
end

%w[direct fanout topic headers queue queue! flow prefetch recover tx_select tx_commit tx_rollback reset]
alias :aqueue! :queue!
def queue!(name, opts = {})
queue = Queue.new(self, name, opts)
register_queue(queue)
end

%w[queue flow prefetch recover tx_select tx_commit tx_rollback reset]
.each do |type|
module_eval %[
alias :a#{type} :#{type}
Expand All @@ -56,7 +83,9 @@ class Exchange < ::AMQP::Exchange
def initialize(channel, type, name, opts = {}, &block)
f = Fiber.current
super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f))
Fiber.yield
exchange, declare_ok = Fiber.yield
raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk)
exchange
end

%w[publish delete]
Expand All @@ -74,7 +103,9 @@ class Queue < ::AMQP::Queue
def initialize(*params)
f = Fiber.current
super(*params, &EM::Synchrony::AMQP.sync_cb(f))
Fiber.yield
queue, declare_ok = Fiber.yield
raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk)
queue
end

alias :asubscribe :subscribe
Expand Down
30 changes: 21 additions & 9 deletions spec/amqp_spec.rb
Expand Up @@ -24,7 +24,7 @@
it "should yield until the channel is created" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
channel = EM::Synchrony::AMQP::Channel.new(connection)
channel.should be_kind_of(EM::Synchrony::AMQP::Channel)
EM.stop
end
Expand All @@ -33,17 +33,29 @@
it "should yield until the queue is created" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
queue, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true)
channel = EM::Synchrony::AMQP::Channel.new(connection)
queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true)
EM.stop
end
end

it "should yield until the exchange is created" do
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
exchange, declare_ok = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")
channel = EM::Synchrony::AMQP::Channel.new(connection)

exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange")
exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)

direct = channel.fanout("test.em-synchrony.direct")
fanout = channel.fanout("test.em-synchrony.fanout")
topic = channel.fanout("test.em-synchrony.topic")
headers = channel.fanout("test.em-synchrony.headers")

direct.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
fanout.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
topic.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
headers.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange)
EM.stop
end
end
Expand All @@ -52,11 +64,11 @@
publish_number = 10
EM.synchrony do
connection = EM::Synchrony::AMQP.connect
channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection)
ex, declare_ok = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")
channel = EM::Synchrony::AMQP::Channel.new(connection)
ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout")

q1, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.1", :auto_delete => true)
q2, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.2", :auto_delete => true)
q1 = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.1", :auto_delete => true)
q2 = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.2", :auto_delete => true)

q1.bind(ex)
q2.bind(ex)
Expand Down

0 comments on commit bd3aa58

Please sign in to comment.