diff --git a/lib/em-synchrony/amqp.rb b/lib/em-synchrony/amqp.rb index 619957e..b2bd8b7 100644 --- a/lib/em-synchrony/amqp.rb +++ b/lib/em-synchrony/amqp.rb @@ -1,5 +1,6 @@ begin require "amqp" + require "amq/protocol" rescue LoadError => error raise "Missing EM-Synchrony dependency: gem install amqp" end @@ -7,6 +8,7 @@ module EventMachine module Synchrony module AMQP + class Error < RuntimeError; end class << self def sync &blk @@ -38,10 +40,35 @@ class Channel < ::AMQP::Channel def initialize(*params, &block) f = Fiber.current super(*params, &EM::Synchrony::AMQP.sync_cb(f)) - Fiber.yield + channel, open_ok = Fiber.yield + raise Error.new unless open_ok.is_a?(::AMQ::Protocol::Channel::OpenOk) + channel + end + + %w[direct fanout topic headers].each do |type| + line = __LINE__ + 2 + code = <<-EOF + alias :a#{type} :#{type} + def #{type}(name = 'amq.#{type}', opts = {}) + if exchange = find_exchange(name) + extended_opts = Exchange.add_default_options(:#{type}, name, opts, nil) + validate_parameters_match!(exchange, extended_opts) + exchange + else + register_exchange(Exchange.new(self, :#{type}, name, opts)) + end + end + EOF + module_eval(code, __FILE__, line) end - %w[direct fanout topic headers queue queue! flow prefetch recover tx_select tx_commit tx_rollback reset] + alias :aqueue! :queue! + def queue!(name, opts = {}) + queue = Queue.new(self, name, opts) + register_queue(queue) + end + + %w[queue flow prefetch recover tx_select tx_commit tx_rollback reset] .each do |type| module_eval %[ alias :a#{type} :#{type} @@ -56,7 +83,9 @@ class Exchange < ::AMQP::Exchange def initialize(channel, type, name, opts = {}, &block) f = Fiber.current super(channel, type, name, opts, &EM::Synchrony::AMQP.sync_cb(f)) - Fiber.yield + exchange, declare_ok = Fiber.yield + raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Exchange::DeclareOk) + exchange end %w[publish delete] @@ -74,7 +103,9 @@ class Queue < ::AMQP::Queue def initialize(*params) f = Fiber.current super(*params, &EM::Synchrony::AMQP.sync_cb(f)) - Fiber.yield + queue, declare_ok = Fiber.yield + raise Error.new unless declare_ok.is_a?(::AMQ::Protocol::Queue::DeclareOk) + queue end alias :asubscribe :subscribe diff --git a/spec/amqp_spec.rb b/spec/amqp_spec.rb index 1b43725..87b5b83 100644 --- a/spec/amqp_spec.rb +++ b/spec/amqp_spec.rb @@ -24,7 +24,7 @@ it "should yield until the channel is created" do EM.synchrony do connection = EM::Synchrony::AMQP.connect - channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection) + channel = EM::Synchrony::AMQP::Channel.new(connection) channel.should be_kind_of(EM::Synchrony::AMQP::Channel) EM.stop end @@ -33,8 +33,8 @@ it "should yield until the queue is created" do EM.synchrony do connection = EM::Synchrony::AMQP.connect - channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection) - queue, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true) + channel = EM::Synchrony::AMQP::Channel.new(connection) + queue = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queue1", :auto_delete => true) EM.stop end end @@ -42,8 +42,20 @@ it "should yield until the exchange is created" do EM.synchrony do connection = EM::Synchrony::AMQP.connect - channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection) - exchange, declare_ok = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout") + channel = EM::Synchrony::AMQP::Channel.new(connection) + + exchange = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.exchange") + exchange.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + + direct = channel.fanout("test.em-synchrony.direct") + fanout = channel.fanout("test.em-synchrony.fanout") + topic = channel.fanout("test.em-synchrony.topic") + headers = channel.fanout("test.em-synchrony.headers") + + direct.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + fanout.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + topic.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) + headers.should be_kind_of(EventMachine::Synchrony::AMQP::Exchange) EM.stop end end @@ -52,11 +64,11 @@ publish_number = 10 EM.synchrony do connection = EM::Synchrony::AMQP.connect - channel, open_ok = EM::Synchrony::AMQP::Channel.new(connection) - ex, declare_ok = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout") + channel = EM::Synchrony::AMQP::Channel.new(connection) + ex = EM::Synchrony::AMQP::Exchange.new(channel, :fanout, "test.em-synchrony.fanout") - q1, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.1", :auto_delete => true) - q2, declare_ok = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.2", :auto_delete => true) + q1 = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.1", :auto_delete => true) + q2 = EM::Synchrony::AMQP::Queue.new(channel, "test.em-synchrony.queues.2", :auto_delete => true) q1.bind(ex) q2.bind(ex)