Skip to content

Commit

Permalink
Ack on the reception
Browse files Browse the repository at this point in the history
  • Loading branch information
coffeeaddict committed Jul 26, 2012
1 parent 94ce034 commit 961f450
Showing 1 changed file with 6 additions and 3 deletions.
9 changes: 6 additions & 3 deletions lib/amqp-hermes/receiver.rb
Expand Up @@ -6,13 +6,13 @@ class Receiver
attr_reader :messages, :queue, :exchange, :routing_key attr_reader :messages, :queue, :exchange, :routing_key
attr_accessor :_listening attr_accessor :_listening


def initialize(queue, topic="pub/sub", options={}) def initialize(queue, topic=nil, options={})
raise "You *MUST* specify a queue" if queue.nil? or queue.empty? raise "You *MUST* specify a queue" if queue.nil? or queue.empty?
@queue = queue @queue = queue


if topic.is_a? Hash if topic.is_a? Hash
options = topic.dup options = topic.dup
topic = options.delete(:topic) || "pub/sub" topic = options.delete(:topic)
end end


@routing_key = options.delete(:routing_key) @routing_key = options.delete(:routing_key)
Expand All @@ -25,6 +25,8 @@ def initialize(queue, topic="pub/sub", options={})
@handler = options.delete(:handler) || self @handler = options.delete(:handler) || self


options[:auto_delete] ||= true options[:auto_delete] ||= true

topic ||= "pub/sub"
@exchange = channel.topic(topic, options) @exchange = channel.topic(topic, options)


@messages = [] @messages = []
Expand All @@ -40,8 +42,9 @@ def listen


receiver.channel.queue(receiver.queue).bind( receiver.channel.queue(receiver.queue).bind(
receiver.exchange, :routing_key => receiver.routing_key receiver.exchange, :routing_key => receiver.routing_key
).subscribe do |headers, payload| ).subscribe(:ack => true) do |headers, payload|
handler.receive(AMQP::Hermes::Message.new(headers, payload)) handler.receive(AMQP::Hermes::Message.new(headers, payload))
headers.ack
end end
end end
end end
Expand Down

0 comments on commit 961f450

Please sign in to comment.