Permalink
Browse files

Implement requeue strategies.

Current strategies are:

* linear
* exponential
* exponential_no_initial_delay

The code to implement requeue strategies was getting repeated in every
agent and given that this is a general thing to do it makes sense to add
it here.
  • Loading branch information...
Richard Heycock
Richard Heycock committed Mar 13, 2012
1 parent 493ed1a commit 55c201162e272967d98499aa72be1d20072f5601
Showing with 76 additions and 38 deletions.
  1. +76 −38 lib/smith/messaging/receiver.rb
@@ -105,6 +105,27 @@ def initialize(receiver, metadata, undecoded_payload)
logger.verbose { "Payload content: [queue]: #{denomalized_queue_name}, [metadata type]: #{metadata.type}, [message]: #{payload.inspect}" }
end
+ # Reply to a message. If reply_to header is not set a error will be logged
+ def reply(&block)
+ responder = Responder.new
+ if reply_to
+ responder.callback do |return_value|
+ Sender.new(@metadata.reply_to, :auto_delete => true).ready do |sender|
+ logger.verbose { "Replying on: #{@metadata.reply_to}" } if logger.level == 0
+ sender.publish(ACL::Payload.new(:default).content(return_value), sender.options.publish(:correlation_id => @metadata.message_id))
+ end
+ end
+ else
+ # Null responder. If a call on the responder is made log a warning. Something is wrong.
+ responder.callback do |return_value|
+ logger.error { "You are responding to a message that has no reply_to on queue: #{denomalized_queue_name}." }
+ logger.verbose { "Queue options: #{@metadata.exchange}." }
+ end
+ end
+
+ block.call(responder)
+ end
+
# acknowledge the message.
def ack(multiple=false)
@metadata.ack(multiple)
@@ -115,67 +136,84 @@ def reject(opts={})
@metadata.reject(opts)
end
+ def reply_to
+ @metadata.reply_to
+ end
+
# Republish the message to the end of the same queue. This is useful
# for when the agent encounters an error and needs to requeue the message.
- def requeue(delay, &block)
- # Sort out the options.
- opts = @receiver.send(:queue).opts.tap do |o|
- o.delete(:queue)
- o.delete(:exchange)
-
- o[:headers] = increment_retry_count(metadata.headers)
- o[:routing_key] = normalised_queue_name
- o[:type] = metadata.type
- end
+ def requeue(delay, count, strategy, &block)
+ requeue_with_strategy(delay, count, strategy) do
+
+ # Sort out the options. Receiver#queue is private hence the send. I know, I know.
+ opts = @receiver.send(:queue).opts.tap do |o|
+ o.delete(:queue)
+ o.delete(:exchange)
+
+ o[:headers] = increment_requeue_count(metadata.headers)
+ o[:routing_key] = normalised_queue_name
+ o[:type] = metadata.type
+ end
- logger.verbose { "Requeuing to: #{denomalized_queue_name}. [options]: #{opts}" }
- logger.verbose { "Requeuing to: #{denomalized_queue_name}. [message]: #{ACL::Payload.decode(@undecoded_payload, metadata.type)}" }
+ logger.verbose { "Requeuing to: #{denomalized_queue_name}. [options]: #{opts}" }
+ logger.verbose { "Requeuing to: #{denomalized_queue_name}. [message]: #{ACL::Payload.decode(@undecoded_payload, metadata.type)}" }
- EM.add_timer(delay) do
@receiver.send(:exchange).publish(@undecoded_payload, opts)
end
end
- def requeue_count
- metadata.headers['retry'] || 0
+ def on_requeue_error(&block)
+ @on_requeue_error = block
end
- # Reply to a message. If reply_to header is not set a error will be logged
- def reply(&block)
- responder = Responder.new
- if reply_to
- responder.callback do |return_value|
- Sender.new(@metadata.reply_to, :auto_delete => true).ready do |sender|
- logger.verbose { "Replying on: #{@metadata.reply_to}" } if logger.level == 0
- sender.publish(ACL::Payload.new(:default).content(return_value), sender.options.publish(:correlation_id => @metadata.message_id))
- end
- end
- else
- # Null responder. If a call on the responder is made log a warning. Something is wrong.
- responder.callback do |return_value|
- logger.error { "You are responding to a message that has no reply_to on queue: #{denomalized_queue_name}." }
- logger.verbose { "Queue options: #{@metadata.exchange}." }
- end
- end
+ def on_requeue(&block)
+ @on_requeue = block
+ end
- block.call(responder)
+ def current_requeue_number
+ metadata.headers['requeue'] || 0
end
# The payload type. This returns the protocol buffers class name as a string.
def payload_type
@metadata.type
end
- def reply_to
- @metadata.reply_to
- end
-
def queue_name
normalised_queue_name
end
private
+ def requeue_with_strategy(delay, count, strategy, &block)
+ if current_requeue_number < count
+ method = "#{strategy}_strategy".to_sym
+ if respond_to?(method, true)
+ cummulative_delay = send(method, delay)
+ @on_requeue.call(cummulative_delay, current_requeue_number + 1)
+ EM.add_timer(cummulative_delay) do
+ block.call(cummulative_delay, current_requeue_number + 1)
+ end
+ else
+ raise RuntimeError, "Unknown requeue strategy. #{method}"
+ end
+ else
+ @on_requeue_error.call(cummulative_delay, current_requeue_number)
+ end
+ end
+
+ def exponential_no_initial_delay_strategy(delay)
+ delay * (2 ** current_requeue_number - 1)
+ end
+
+ def exponential_strategy(delay)
+ delay * (2 ** current_requeue_number)
+ end
+
+ def linear_strategy(delay)
+ delay * (current_requeue_number + 1)
+ end
+
def denomalized_queue_name
@receiver.denomalized_queue_name
end
@@ -184,7 +222,7 @@ def normalised_queue_name
@receiver.queue_name
end
- def increment_retry_count(headers)
+ def increment_requeue_count(headers)
headers.tap do |m|
m['retry'] = (m['retry']) ? m['retry'] + 1 : 1
end

0 comments on commit 55c2011

Please sign in to comment.