From 961f45027f9c5070683fb04e0d30f52eed304ac8 Mon Sep 17 00:00:00 2001 From: "Hartog C. de Mik" Date: Thu, 26 Jul 2012 14:07:15 +0200 Subject: [PATCH] Ack on the reception --- lib/amqp-hermes/receiver.rb | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/lib/amqp-hermes/receiver.rb b/lib/amqp-hermes/receiver.rb index 4db7e76..09b05a8 100644 --- a/lib/amqp-hermes/receiver.rb +++ b/lib/amqp-hermes/receiver.rb @@ -6,13 +6,13 @@ class Receiver attr_reader :messages, :queue, :exchange, :routing_key attr_accessor :_listening - def initialize(queue, topic="pub/sub", options={}) + def initialize(queue, topic=nil, options={}) raise "You *MUST* specify a queue" if queue.nil? or queue.empty? @queue = queue if topic.is_a? Hash options = topic.dup - topic = options.delete(:topic) || "pub/sub" + topic = options.delete(:topic) end @routing_key = options.delete(:routing_key) @@ -25,6 +25,8 @@ def initialize(queue, topic="pub/sub", options={}) @handler = options.delete(:handler) || self options[:auto_delete] ||= true + + topic ||= "pub/sub" @exchange = channel.topic(topic, options) @messages = [] @@ -40,8 +42,9 @@ def listen receiver.channel.queue(receiver.queue).bind( receiver.exchange, :routing_key => receiver.routing_key - ).subscribe do |headers, payload| + ).subscribe(:ack => true) do |headers, payload| handler.receive(AMQP::Hermes::Message.new(headers, payload)) + headers.ack end end end