Permalink
Browse files

generate unique consumer tags and clean up on unsubscribe

  • Loading branch information...
1 parent da099b8 commit 197385bfb40158a2d57f4d4dcc9fbde7aebec038 @tmm1 tmm1 committed Sep 9, 2008
Showing with 15 additions and 4 deletions.
  1. +8 −2 lib/mq.rb
  2. +7 −2 lib/mq/queue.rb
View
@@ -65,14 +65,14 @@ def process_frame frame
succeed
when Protocol::Basic::CancelOk
- @consumer = queues[ method.consumer_tag ]
+ @consumer = consumers[ method.consumer_tag ]
@consumer.cancelled
when Protocol::Basic::Deliver
@method = method
@header = nil
@body = ''
- @consumer = queues[ method.consumer_tag ]
+ @consumer = consumers[ method.consumer_tag ]
when Protocol::Channel::Close
raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]} on #{@channel}"
@@ -140,6 +140,12 @@ def rpcs
@rcps ||= {}
end
+ # queue objects keyed on their consumer tags
+
+ def consumers
+ @consumers ||= {}
+ end
+
private
def log *args
View
@@ -42,10 +42,13 @@ def delete opts = {}
end
def subscribe opts = {}, &blk
+ @consumer_tag = "#{name}-#{Kernel.rand(999_999_999_999)}"
+ @mq.consumers[@consumer_tag] = self
+
@on_msg = blk
@mq.callback{
@mq.send Protocol::Basic::Consume.new({ :queue => name,
- :consumer_tag => name,
+ :consumer_tag => @consumer_tag,
:no_ack => true,
:nowait => true }.merge(opts))
}
@@ -55,7 +58,7 @@ def subscribe opts = {}, &blk
def unsubscribe opts = {}, &blk
@on_cancel = blk
@mq.callback{
- @mq.send Protocol::Basic::Cancel.new({ :consumer_tag => name }.merge(opts))
+ @mq.send Protocol::Basic::Cancel.new({ :consumer_tag => @consumer_tag }.merge(opts))
}
self
end
@@ -73,6 +76,8 @@ def receive headers, body
def cancelled
@on_cancel.call if @on_cancel
@on_cancel = @on_msg = nil
+ @mq.consumers.delete @consumer_tag
+ @consumer_tag = nil
end
private

0 comments on commit 197385b

Please sign in to comment.