Skip to content

Commit

Permalink
support basic.return
Browse files Browse the repository at this point in the history
  • Loading branch information
somic committed Feb 23, 2009
1 parent 62f44af commit bf01cb2
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 3 deletions.
34 changes: 34 additions & 0 deletions examples/mq/returned.rb
@@ -0,0 +1,34 @@

#
# This example demonstrates how to handle returns of undeliverable
# messages (when published with "immediate" flag) or unroutable
# messages (when published with "mandatory" flag)
#

$:.unshift File.dirname(__FILE__) + '/../../lib'
require 'mq'

EM.run {
AMQP.start :logging => true

amq = MQ.new
MQ.message_returned { |msg|
p "Broker returned a message"
p msg.reply_code
p msg.reply_text
p msg.original_header
p msg.original_body
}

# attempting to publish to a non-existent exchange with
# "immediate" flag set to true
amq.topic("test_topic_exch_does_not_exist").publish("test",
:key => 'foo.bar.baz', :immediate => true)

EM.add_timer(1) {
AMQP.stop { EM.stop }
}
}



39 changes: 36 additions & 3 deletions lib/mq.rb
Expand Up @@ -16,6 +16,21 @@ class << self

# Raised whenever an illegal operation is attempted.
class Error < StandardError; end

class ReturnedMessage
attr_reader :reply_code, :reply_text, :routing_key, :exchange
attr_reader :original_header, :original_body

def initialize(reason, header, body)
@reply_code = reason.reply_code
@reply_text = reason.reply_text
@routing_key = reason.routing_key
@exchange = reason.exchange
@original_header = header
@original_body = body
end
end

end

# The top-level class for building AMQP clients. This class contains several
Expand Down Expand Up @@ -167,8 +182,12 @@ def process_frame frame
when Frame::Body
@body << frame.payload
if @body.length >= @header.size
@header.properties.update(@method.arguments)
@consumer.receive @header, @body if @consumer
if @method.is_a? Protocol::Basic::Return
MQ.message_returned ReturnedMessage.new(@method, @header, @body)
else
@header.properties.update(@method.arguments)
@consumer.receive @header, @body if @consumer
end
@body = @header = @consumer = @method = nil
end

Expand Down Expand Up @@ -230,6 +249,11 @@ def process_frame frame
c.channels.delete @channel
c.close if c.channels.empty?
}

when Protocol::Basic::Return
@method = method
@header = nil
@body = ''
end
end
end
Expand Down Expand Up @@ -732,6 +756,15 @@ def self.error msg = nil, &blk
end
end

# Define what to do when a message is returned with basic.return
def self.message_returned msg = nil, &blk
if blk
@basic_return_callback = blk
else
@basic_return_callback.call(msg) if @basic_return_callback and msg
end
end

# Returns a hash of all the exchange proxy objects.
#
# Not typically called by client code.
Expand Down Expand Up @@ -816,4 +849,4 @@ class MQ
def MQ.id
Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end
end
end

0 comments on commit bf01cb2

Please sign in to comment.