Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Speccing Queue#subscribe and refactoring it to handle bunny 0.6.0. Se…

…e code comments for details.
  • Loading branch information...
commit 5735bbf600e27ae9743191d6de45e40e66513715 1 parent 645909f
@caius caius authored
View
23 lib/warren/adapters/bunny_adapter.rb
@@ -61,19 +61,36 @@ def self.publish queue_name, payload, &blk
#
# Expects a block and raises NoBlockGiven if no block is given.
#
+ # The block is passed up to two arguments, depending on how many
+ # you ask for. The first one is always required, which is the message
+ # passed over the queue (after being unpacked by filters.)
+ # The (optional) second argument is a hash of headers bunny gives us
+ # containing extra data about the message.
+ #
+ # Warren::Queue.subscribe(:default) {|msg, payload| puts msg, payload }
+ #
def self.subscribe queue_name, &block
raise NoBlockGiven unless block_given?
queue_name = self.queue_name if queue_name == :default
# todo: check if its a valid queue?
do_connect(queue_name) do |queue|
- msg = queue.pop
- return if msg == :queue_empty
- block.call(Warren::MessageFilter.unpack(msg))
+ handle_bunny_message(queue.pop, &block)
end
end
private
+ # Called when a message is pulled off the queue by bunny.
+ #
+ def self.handle_bunny_message headers, &block
+ payload = headers.delete(:payload)
+ return if payload == :queue_empty
+ # unpack it
+ payload = Warren::MessageFilter.unpack(payload)
+ # Call our block with as many args as we need to
+ block.arity == 1 ? block.call(payload) : block.call(payload, headers)
+ end
+
#
# Connects and does the stuff its told to!
#
View
37 spec/warren/adapters/bunny_adapter_spec.rb
@@ -40,6 +40,43 @@
end
end
+ describe "subscribe" do
+ it "should override subscribe" do
+ Warren::Queue::BunnyAdapter.methods(false).should include("subscribe")
+ end
+
+ it "should accept a subscribe block with one argument" do
+ blk = lambda do |one|
+ one.should == "my message"
+ end
+
+ send_headers_to_bunny_with &blk
+ end
+
+ it "should accept a subscribe block with two arguments" do
+ blk = lambda do | message, headers |
+ message.should == "my message"
+ headers.should == {:some => :header}
+ end
+
+ headers = {
+ :payload => "my message",
+ :some => :header
+ }
+ Warren::Queue::BunnyAdapter.__send__(:handle_bunny_message, headers, &blk)
+ end
+
+ def send_headers_to_bunny_with &blk
+ headers = {
+ :payload => "my message",
+ :some => :header
+ }
+ Warren::Queue::BunnyAdapter.__send__(:handle_bunny_message, headers, &blk)
+ end
+ end
+
+ protected
+
def setup_config_object
@options = {
:host => "localhost",
Please sign in to comment.
Something went wrong with that request. Please try again.