<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>lib/bunny/subscription08.rb</filename>
    </added>
    <added>
      <filename>lib/bunny/subscription09.rb</filename>
    </added>
    <added>
      <filename>lib/qrack/subscription.rb</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -38,9 +38,12 @@ Gem::Specification.new do |s|
 		&quot;lib/bunny/exchange09.rb&quot;,
 		&quot;lib/bunny/queue08.rb&quot;,
 		&quot;lib/bunny/queue09.rb&quot;,
+		&quot;lib/bunny/subscription08.rb&quot;,
+		&quot;lib/bunny/subscription09.rb&quot;,
 		&quot;lib/qrack/client.rb&quot;,
 		&quot;lib/qrack/channel.rb&quot;,
 		&quot;lib/qrack/queue.rb&quot;,
+		&quot;lib/qrack/subscription.rb&quot;,
 		&quot;lib/qrack/protocol/protocol08.rb&quot;,
 		&quot;lib/qrack/protocol/protocol09.rb&quot;,
 		&quot;lib/qrack/protocol/spec08.rb&quot;,</diff>
      <filename>bunny.gemspec</filename>
    </modified>
    <modified>
      <diff>@@ -22,7 +22,7 @@ q = b.queue('test1')
 q.publish('Hello everybody!')
 
 # get message from the queue
-msg = q.pop
+msg = q.pop[:payload]
 
 puts 'This is the message: ' + msg + &quot;\n\n&quot;
 </diff>
      <filename>examples/simple_08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -22,7 +22,7 @@ q = b.queue('test1')
 q.publish('Hello everybody!')
 
 # get message from the queue
-msg = q.pop
+msg = q.pop[:payload]
 
 puts 'This is the message: ' + msg + &quot;\n\n&quot;
 </diff>
      <filename>examples/simple_09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -22,7 +22,7 @@ q = b.queue('test1')
 q.publish('Testing acknowledgements')
 
 # get message from the queue
-msg = q.pop(:ack =&gt; true)
+msg = q.pop(:ack =&gt; true)[:payload]
 
 # acknowledge receipt of message
 q.ack</diff>
      <filename>examples/simple_ack_08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -22,7 +22,7 @@ q = b.queue('test1')
 q.publish('Testing acknowledgements')
 
 # get message from the queue
-msg = q.pop(:ack =&gt; true)
+msg = q.pop(:ack =&gt; true)[:payload]
 
 # acknowledge receipt of message
 q.ack</diff>
      <filename>examples/simple_ack_09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -44,18 +44,10 @@ exch = b.exchange('sorting_room')
 # bind queue to exchange
 q.bind(exch, :key =&gt; 'fred')
 
-# initialize counter
-i = 1
-
 # subscribe to queue
-begin
-	ret = q.subscribe(:consumer_tag =&gt; 'testtag1', :timeout =&gt; 30) do |msg|
-		puts &quot;#{i.to_s}: #{msg}&quot;
-		i+=1
-	end
-rescue Qrack::ClientTimeout
-	puts '==== simple_consumer_08.rb timed out - closing down ===='
-	q.unsubscribe(:consumer_tag =&gt; 'testtag1')
-	# close the connection
-	b.stop
-end
\ No newline at end of file
+q.subscribe(:consumer_tag =&gt; 'testtag1', :timeout =&gt; 30) do |msg|
+	puts &quot;#{q.subscription.message_count}: #{msg[:payload]}&quot;
+end
+
+# Close client
+b.stop
\ No newline at end of file</diff>
      <filename>examples/simple_consumer_08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -44,18 +44,10 @@ exch = b.exchange('sorting_room')
 # bind queue to exchange
 q.bind(exch, :key =&gt; 'fred')
 
-# initialize counter
-i = 1
-
 # subscribe to queue
-begin
-	ret = q.subscribe(:consumer_tag =&gt; 'testtag1', :timeout =&gt; 30) do |msg|
-		puts &quot;#{i.to_s}: #{msg}&quot;
-		i+=1
-	end
-rescue Qrack::ClientTimeout
-	puts '==== simple_consumer_09.rb timed out - closing down ===='
-	q.unsubscribe(:consumer_tag =&gt; 'testtag1')
-	# close the connection
-	b.stop
-end
\ No newline at end of file
+q.subscribe(:consumer_tag =&gt; 'testtag1', :timeout =&gt; 30) do |msg|
+	puts &quot;#{q.subscription.message_count}: #{msg[:payload]}&quot;
+end
+
+# Close client
+b.stop
\ No newline at end of file</diff>
      <filename>examples/simple_consumer_09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -30,9 +30,9 @@ q2.bind(exch)
 exch.publish('This message will be fanned out')
 
 # get message from the queues
-msg = q1.pop
+msg = q1.pop[:payload]
 puts 'This is the message from q1: ' + msg + &quot;\n\n&quot;
-msg = q2.pop
+msg = q2.pop[:payload]
 puts 'This is the message from q2: ' + msg + &quot;\n\n&quot;
 
 # close the client connection</diff>
      <filename>examples/simple_fanout_08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -30,9 +30,9 @@ q2.bind(exch)
 exch.publish('This message will be fanned out')
 
 # get message from the queues
-msg = q1.pop
+msg = q1.pop[:payload]
 puts 'This is the message from q1: ' + msg + &quot;\n\n&quot;
-msg = q2.pop
+msg = q2.pop[:payload]
 puts 'This is the message from q2: ' + msg + &quot;\n\n&quot;
 
 # close the client connection</diff>
      <filename>examples/simple_fanout_09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -32,7 +32,7 @@ header_exch.publish('Headers test msg 2', :headers =&gt; {'h1'=&gt;'z'})
 # get messages from the queue - should only be msg 1 that got through
 msg = &quot;&quot;
 until msg == :queue_empty do
-	msg = q.pop
+	msg = q.pop[:payload]
 	puts 'This is a message from the header_q1 queue: ' + msg + &quot;\n&quot; unless msg == :queue_empty
 end
 </diff>
      <filename>examples/simple_headers_08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -32,7 +32,7 @@ header_exch.publish('Headers test msg 2', :headers =&gt; {'h1'=&gt;'z'})
 # get messages from the queue - should only be msg 1 that got through
 msg = &quot;&quot;
 until msg == :queue_empty do
-	msg = q.pop
+	msg = q.pop[:payload]
 	puts 'This is a message from the header_q1 queue: ' + msg + &quot;\n&quot; unless msg == :queue_empty
 end
 </diff>
      <filename>examples/simple_headers_09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -38,20 +38,20 @@ sports_results.publish('British Lions 15 : South Africa 12', :key =&gt; 'rugby.resu
 # get message from the queues
 
 # soccer queue got the soccer message
-msg = soccer.pop
+msg = soccer.pop[:payload]
 puts 'This is a message from the soccer q: ' + msg + &quot;\n\n&quot;
 
 # cricket queue got the cricket message
-msg = cricket.pop
+msg = cricket.pop[:payload]
 puts 'This is a message from the cricket q: ' + msg + &quot;\n\n&quot;
 
 # rugby queue got the rugby message
-msg = rugby.pop
+msg = rugby.pop[:payload]
 puts 'This is a message from the rugby q: ' + msg + &quot;\n\n&quot;
 
 # allsport queue got all of the messages
 until msg == :queue_empty do
-	msg = allsport.pop
+	msg = allsport.pop[:payload]
 	puts 'This is a message from the allsport q: ' + msg + &quot;\n\n&quot; unless msg == :queue_empty
 end
 </diff>
      <filename>examples/simple_topic_08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -38,20 +38,20 @@ sports_results.publish('British Lions 15 : South Africa 12', :key =&gt; 'rugby.resu
 # get message from the queues
 
 # soccer queue got the soccer message
-msg = soccer.pop
+msg = soccer.pop[:payload]
 puts 'This is a message from the soccer q: ' + msg + &quot;\n\n&quot;
 
 # cricket queue got the cricket message
-msg = cricket.pop
+msg = cricket.pop[:payload]
 puts 'This is a message from the cricket q: ' + msg + &quot;\n\n&quot;
 
 # rugby queue got the rugby message
-msg = rugby.pop
+msg = rugby.pop[:payload]
 puts 'This is a message from the rugby q: ' + msg + &quot;\n\n&quot;
 
 # allsport queue got all of the messages
 until msg == :queue_empty do
-	msg = allsport.pop
+	msg = allsport.pop[:payload]
 	puts 'This is a message from the allsport q: ' + msg + &quot;\n\n&quot; unless msg == :queue_empty
 end
 </diff>
      <filename>examples/simple_topic_09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -7,12 +7,13 @@ end
 
 module Bunny
 
-	class ProtocolError &lt; StandardError; end
-	class ServerDownError &lt; StandardError; end
 	class ConnectionError &lt; StandardError; end
-	class MessageError &lt; StandardError; end
-	class ForcedConnectionCloseError &lt; StandardError; end
 	class ForcedChannelCloseError &lt; StandardError; end
+	class ForcedConnectionCloseError &lt; StandardError; end
+	class MessageError &lt; StandardError; end
+	class ProtocolError &lt; StandardError; end
+	class ServerDownError &lt; StandardError; end
+	class UnsubscribeError &lt; StandardError; end
 	
 	VERSION = '0.5.4'
 	
@@ -62,6 +63,7 @@ module Bunny
 			require 'bunny/exchange08'
 			require 'bunny/queue08'
 			require 'bunny/channel08'
+			require 'bunny/subscription08'
 			
 			@client = Bunny::Client.new(opts)
 		else
@@ -71,6 +73,7 @@ module Bunny
 			require 'bunny/exchange09'
 			require 'bunny/queue09'
 			require 'bunny/channel09'
+			require 'bunny/subscription09'
 			
 			@client = Bunny::Client09.new(opts)
 		end			</diff>
      <filename>lib/bunny.rb</filename>
    </modified>
    <modified>
      <diff>@@ -18,6 +18,7 @@ Queues must be attached to at least one exchange in order to receive messages fr
 	    @client = client
 	    @opts   = opts
       @delivery_tag = nil
+      @subscription = nil
 
       # Queues without a given name are named by the server and are generally
       # bound to the process that created them.
@@ -65,7 +66,7 @@ ask to confirm a single message or a set of messages up to and including a speci
 	
 		def ack(opts={})
 			# If delivery tag is nil then set it to 1 to prevent errors
-			self.delivery_tag = 1 if self.delivery_tag.nil?
+			self.delivery_tag = opts[:delivery_tag] || 1
 			
       client.send_frame(
         Qrack::Protocol::Basic::Ack.new({:delivery_tag =&gt; delivery_tag, :multiple =&gt; false}.merge(opts))
@@ -164,9 +165,7 @@ from queues if successful. If an error occurs raises _Bunny_::_ProtocolError_.
 Gets a message from a queue in a synchronous way. If error occurs, raises _Bunny_::_ProtocolError_.
 
 ==== OPTIONS:
-
-* &lt;tt&gt;:header =&gt; true or false (_default_)&lt;/tt&gt; - If set to _true_,
-  hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt; is returned. 
+ 
 * &lt;tt&gt;:no_ack =&gt; true (_default_) or false&lt;/tt&gt; - If set to _true_, the server does not expect an
   acknowledgement message from the client. If set to _false_, the server expects an acknowledgement
   message from the client and will re-queue the message if it does not receive one within a time specified
@@ -174,17 +173,13 @@ Gets a message from a queue in a synchronous way. If error occurs, raises _Bunny
 
 ==== RETURNS:
 
-If &lt;tt&gt;:header =&gt; true&lt;/tt&gt; returns hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt;. &lt;tt&gt;:delivery_details&lt;/tt&gt; is
-a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count}&lt;/tt&gt;. If 
-&lt;tt&gt;:header =&gt; false&lt;/tt&gt; only the message payload is returned.
+Hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt;. &lt;tt&gt;:delivery_details&lt;/tt&gt; is
+a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count}&lt;/tt&gt;.
 
 =end
 
 	  def pop(opts = {})
 			
-			# do we want the message header?
-			hdr = opts.delete(:header)
-			
 			# do we want to have to provide an acknowledgement?
 			ack = opts.delete(:ack)
 			
@@ -198,7 +193,7 @@ a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count
 			method = client.next_method
 			
 			if method.is_a?(Qrack::Protocol::Basic::GetEmpty) then
-				return :queue_empty
+				return {:header =&gt; nil, :payload =&gt; :queue_empty, :delivery_details =&gt; nil}
 			elsif	!method.is_a?(Qrack::Protocol::Basic::GetOk)
 				raise Bunny::ProtocolError, &quot;Error getting message from queue #{name}&quot;
 			end
@@ -216,7 +211,7 @@ a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count
 			end
 
 			# Return message with additional info if requested
-			hdr ? {:header =&gt; header, :payload =&gt; msg, :delivery_details =&gt; method.arguments} : msg
+			{:header =&gt; header, :payload =&gt; msg, :delivery_details =&gt; method.arguments}
 			
 	  end
 	
@@ -270,99 +265,14 @@ Returns hash {:message_count, :consumer_count}.
 	    {:message_count =&gt; method.message_count, :consumer_count =&gt; method.consumer_count}
 	  end
 
-=begin rdoc
-
-=== DESCRIPTION:
-
-Asks the server to start a &quot;consumer&quot;, which is a transient request for messages from a specific
-queue. Consumers last as long as the channel they were created on, or until the client cancels them
-with an _unsubscribe_. Every time a message reaches the queue it is passed to the _blk_ for
-processing. If error occurs, _Bunny_::_ProtocolError_ is raised.
-
-==== OPTIONS:
-* &lt;tt&gt;:header =&gt; true or false (_default_)&lt;/tt&gt; - If set to _true_, hash is delivered for each message
-  &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt;.
-* &lt;tt&gt;:consumer_tag =&gt; '_tag_'&lt;/tt&gt; - Specifies the identifier for the consumer. The consumer tag is
-  local to a connection, so two clients can use the same consumer tags. If this field is empty the
-  queue name is used.
-* &lt;tt&gt;:no_ack=&gt; true (_default_) or false&lt;/tt&gt; - If set to _true_, the server does not expect an
-  acknowledgement message from the client. If set to _false_, the server expects an acknowledgement
-  message from the client and will re-queue the message if it does not receive one within a time specified
-  by the server.
-* &lt;tt&gt;:exclusive =&gt; true or false (_default_)&lt;/tt&gt; - Request exclusive consumer access, meaning
-  only this consumer can access the queue.
-* &lt;tt&gt;:nowait =&gt; true or false (_default_)&lt;/tt&gt; - Ignored by Bunny, always _false_.
-* &lt;tt&gt;:timeout =&gt; number of seconds - The subscribe loop will continue to wait for
-  messages until terminated (Ctrl-C or kill command) or this timeout interval is reached.
-* &lt;tt&gt;:message_max =&gt; max number messages to process&lt;/tt&gt; - When the required number of messages
-  is processed subscribe loop is exited.
-
-==== RETURNS:
-
-If &lt;tt&gt;:header =&gt; true&lt;/tt&gt; returns hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt; for each message.
-&lt;tt&gt;:delivery_details&lt;/tt&gt; is a hash &lt;tt&gt;{:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}&lt;/tt&gt;.
-If &lt;tt&gt;:header =&gt; false&lt;/tt&gt; only message payload is returned.
-If &lt;tt&gt;:timeout =&gt; &gt; 0&lt;/tt&gt; is reached Qrack::ClientTimeout is raised
-
-=end
 	
-		def subscribe(opts = {}, &amp;blk)
-			# Get maximum amount of messages to process
-			message_max = opts[:message_max] || nil
-			return if message_max == 0
-						
-			# If a consumer tag is not passed in the server will generate one
-			consumer_tag = opts[:consumer_tag] || nil
-			
-			# ignore the :nowait option if passed, otherwise program will hang waiting for a
-			# response from the server causing an error.
-			opts.delete(:nowait)
-			
-			# do we want the message header?
-			hdr = opts.delete(:header)
-			
-			# do we want to have to provide an acknowledgement?
-			ack = opts.delete(:ack)
-			
-			client.send_frame(
-				Qrack::Protocol::Basic::Consume.new({ :queue =&gt; name,
-																	 		 :consumer_tag =&gt; consumer_tag,
-																	 		 :no_ack =&gt; !ack,
-																	 		 :nowait =&gt; false }.merge(opts))
-			)
-			
-			raise Bunny::ProtocolError,
-				&quot;Error subscribing to queue #{name}&quot; unless
-				client.next_method.is_a?(Qrack::Protocol::Basic::ConsumeOk)
-			
-			# Initialize message counter
-			counter = 0
-			
-			loop do
-        method = client.next_method(:timeout =&gt; opts[:timeout])
-			
-				# get delivery tag to use for acknowledge
-				self.delivery_tag = method.delivery_tag if ack
-			
-				header = client.next_payload
-
-		    # If maximum frame size is smaller than message payload body then message
-				# will have a message header and several message bodies				
-		    msg = ''
-				while msg.length &lt; header.size
-					msg += client.next_payload
-				end
-				
-				# pass the message and related info, if requested, to the block for processing
-				blk.call(hdr ? {:header =&gt; header, :payload =&gt; msg, :delivery_details =&gt; method.arguments} : msg)
-				
-				# Increment message counter
-				counter += 1
-				
-				# Exit loop if message_max condition met
-				break if !message_max.nil? and counter == message_max
-			end
+    def subscribe(opts = {}, &amp;blk)
+			# Create subscription
+			s = Bunny::Subscription.new(client, self, opts)
+			s.start(&amp;blk)
 			
+			# Reset when subscription finished
+			@subscription = nil
 		end
 		
 =begin rdoc
@@ -384,20 +294,27 @@ the server will not send any more messages for that consumer.
 =end
 		
 		def unsubscribe(opts = {})
-			consumer_tag = opts[:consumer_tag] || name
+			# Default consumer_tag from subscription if not passed in
+			consumer_tag = subscription ? subscription.consumer_tag : opts[:consumer_tag]
 			
-			# ignore the :nowait option if passed, otherwise program will hang waiting for a
-			# response from the server causing an error
-			opts.delete(:nowait)
+			# Must have consumer tag to tell server what to unsubscribe
+			raise Bunny::UnsubscribeError,
+				&quot;No consumer tag received&quot; if !consumer_tag
 			
-      client.send_frame( Qrack::Protocol::Basic::Cancel.new({ :consumer_tag =&gt; consumer_tag }.merge(opts)))
+      # Cancel consumer
+      client.send_frame( Qrack::Protocol::Basic::Cancel.new(:consumer_tag =&gt; consumer_tag,
+																														:nowait =&gt; false))
+
+      raise Bunny::UnsubscribeError,
+        &quot;Error unsubscribing from queue #{name}&quot; unless
+        client.next_method.is_a?(Qrack::Protocol::Basic::CancelOk)
 
-			raise Bunny::ProtocolError,
-				&quot;Error unsubscribing from queue #{name}&quot; unless
-				client.next_method.is_a?(Qrack::Protocol::Basic::CancelOk)
+			# Reset subscription
+			@subscription = nil
 				
-			# return confirmation
+			# Return confirmation
 			:unsubscribe_ok
+			
     end
 
 =begin rdoc</diff>
      <filename>lib/bunny/queue08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -166,9 +166,7 @@ from queues if successful. If an error occurs raises _Bunny_::_ProtocolError_.
 Gets a message from a queue in a synchronous way. If error occurs, raises _Bunny_::_ProtocolError_.
 
 ==== OPTIONS:
-
-* &lt;tt&gt;:header =&gt; true or false (_default_)&lt;/tt&gt; - If set to _true_,
-  hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt; is returned. 
+ 
 * &lt;tt&gt;:no_ack =&gt; true (_default_) or false&lt;/tt&gt; - If set to _true_, the server does not expect an
   acknowledgement message from the client. If set to _false_, the server expects an acknowledgement
   message from the client and will re-queue the message if it does not receive one within a time specified
@@ -176,17 +174,13 @@ Gets a message from a queue in a synchronous way. If error occurs, raises _Bunny
 
 ==== RETURNS:
 
-If &lt;tt&gt;:header =&gt; true&lt;/tt&gt; returns hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt;. &lt;tt&gt;:delivery_details&lt;/tt&gt; is
-a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count}&lt;/tt&gt;. If 
-&lt;tt&gt;:header =&gt; false&lt;/tt&gt; only the message payload is returned.
+Hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt;. &lt;tt&gt;:delivery_details&lt;/tt&gt; is
+a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count}&lt;/tt&gt;.
 
 =end
 
 	  def pop(opts = {})
 			
-			# do we want the message header?
-			hdr = opts.delete(:header)
-			
 			# do we want to have to provide an acknowledgement?
 			ack = opts.delete(:ack)
 			
@@ -201,7 +195,7 @@ a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count
 			method = client.next_method
 			
 			if method.is_a?(Qrack::Protocol09::Basic::GetEmpty) then
-				return :queue_empty
+				return {:header =&gt; nil, :payload =&gt; :queue_empty, :delivery_details =&gt; nil}
 			elsif	!method.is_a?(Qrack::Protocol09::Basic::GetOk)
 				raise Bunny::ProtocolError, &quot;Error getting message from queue #{name}&quot;
 			end
@@ -219,7 +213,7 @@ a hash &lt;tt&gt;{:delivery_tag, :redelivered, :exchange, :routing_key, :message_count
 			end
 
 			# Return message with additional info if requested
-			hdr ? {:header =&gt; header, :payload =&gt; msg, :delivery_details =&gt; method.arguments} : msg
+			{:header =&gt; header, :payload =&gt; msg, :delivery_details =&gt; method.arguments}
 			
 	  end
 	
@@ -273,100 +267,13 @@ Returns hash {:message_count, :consumer_count}.
 	    {:message_count =&gt; method.message_count, :consumer_count =&gt; method.consumer_count}
 	  end
 
-=begin rdoc
-
-=== DESCRIPTION:
-
-Asks the server to start a &quot;consumer&quot;, which is a transient request for messages from a specific
-queue. Consumers last as long as the channel they were created on, or until the client cancels them
-with an _unsubscribe_. Every time a message reaches the queue it is passed to the _blk_ for
-processing. If error occurs, _Bunny_::_ProtocolError_ is raised.
-
-==== OPTIONS:
-* &lt;tt&gt;:header =&gt; true or false (_default_)&lt;/tt&gt; - If set to _true_, hash is delivered for each message
-  &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt;.
-* &lt;tt&gt;:consumer_tag =&gt; '_tag_'&lt;/tt&gt; - Specifies the identifier for the consumer. The consumer tag is
-  local to a connection, so two clients can use the same consumer tags. If this field is empty the
-  queue name is used.
-* &lt;tt&gt;:no_ack=&gt; true (_default_) or false&lt;/tt&gt; - If set to _true_, the server does not expect an
-  acknowledgement message from the client. If set to _false_, the server expects an acknowledgement
-  message from the client and will re-queue the message if it does not receive one within a time specified
-  by the server.
-* &lt;tt&gt;:exclusive =&gt; true or false (_default_)&lt;/tt&gt; - Request exclusive consumer access, meaning
-  only this consumer can access the queue.
-* &lt;tt&gt;:nowait =&gt; true or false (_default_)&lt;/tt&gt; - Ignored by Bunny, always _false_.
-* &lt;tt&gt;:timeout =&gt; number of seconds - The subscribe loop will continue to wait for
-  messages until terminated (Ctrl-C or kill command) or this timeout interval is reached.
-* &lt;tt&gt;:message_max =&gt; max number messages to process&lt;/tt&gt; - When the required number of messages
-  is processed subscribe loop is exited.
-
-==== RETURNS:
-
-If &lt;tt&gt;:header =&gt; true&lt;/tt&gt; returns hash &lt;tt&gt;{:header, :delivery_details, :payload}&lt;/tt&gt; for each message.
-&lt;tt&gt;:delivery_details&lt;/tt&gt; is a hash &lt;tt&gt;{:consumer_tag, :delivery_tag, :redelivered, :exchange, :routing_key}&lt;/tt&gt;.
-If &lt;tt&gt;:header =&gt; false&lt;/tt&gt; only message payload is returned.
-If &lt;tt&gt;:timeout =&gt; &gt; 0&lt;/tt&gt; is reached Qrack::ClientTimeout is raised
-
-=end
-	
 		def subscribe(opts = {}, &amp;blk)
-			# Get maximum amount of messages to process
-			message_max = opts[:message_max] || nil
-			return if message_max == 0
-			
-			# If a consumer tag is not passed in the server will generate one
-			consumer_tag = opts[:consumer_tag] || nil
-			
-			# ignore the :nowait option if passed, otherwise program will hang waiting for a
-			# response from the server causing an error.
-			opts.delete(:nowait)
-			
-			# do we want the message header?
-			hdr = opts.delete(:header)
-			
-			# do we want to have to provide an acknowledgement?
-			ack = opts.delete(:ack)
-			
-			client.send_frame(
-				Qrack::Protocol09::Basic::Consume.new({ :reserved_1 =&gt; 0,
-																			 					:queue =&gt; name,
-																	 		 					:consumer_tag =&gt; consumer_tag,
-																	 		 					:no_ack =&gt; !ack,
-																	 		 					:nowait =&gt; false }.merge(opts))
-			)
-			
-			raise Bunny::ProtocolError,
-				&quot;Error subscribing to queue #{name}&quot; unless
-				client.next_method.is_a?(Qrack::Protocol09::Basic::ConsumeOk)
-				
-			# Initialize message counter
-			counter = 0
-			
-			loop do
-        method = client.next_method(:timeout =&gt; opts[:timeout])
-			
-				# get delivery tag to use for acknowledge
-				self.delivery_tag = method.delivery_tag if ack
-			
-				header = client.next_payload
-				
-		    # If maximum frame size is smaller than message payload body then message
-				# will have a message header and several message bodies				
-		    msg = ''
-				while msg.length &lt; header.size
-					msg += client.next_payload
-				end
-				
-				# pass the message and related info, if requested, to the block for processing
-				blk.call(hdr ? {:header =&gt; header, :payload =&gt; msg, :delivery_details =&gt; method.arguments} : msg)
-
-				# Increment message counter
-				counter += 1
-				
-				# Exit loop if message_max condition met
-				break if !message_max.nil? and counter == message_max
-			end
+			# Create subscription
+			s = Bunny::Subscription09.new(client, self, opts)
+			s.start(&amp;blk)
 			
+			# Reset when subscription finished
+			@subscription = nil
 		end
 		
 =begin rdoc
@@ -429,19 +336,25 @@ the server will not send any more messages for that consumer.
 =end
 		
 		def unsubscribe(opts = {})
-			consumer_tag = opts[:consumer_tag] || name
+			# Default consumer_tag from subscription if not passed in
+			consumer_tag = subscription ? subscription.consumer_tag : opts[:consumer_tag]
 			
-			# ignore the :nowait option if passed, otherwise program will hang waiting for a
-			# response from the server causing an error
-			opts.delete(:nowait)
+			# Must have consumer tag to tell server what to unsubscribe
+			raise Bunny::UnsubscribeError,
+				&quot;No consumer tag received&quot; if !consumer_tag
 			
-      client.send_frame( Qrack::Protocol09::Basic::Cancel.new({ :consumer_tag =&gt; consumer_tag }.merge(opts)))
+      # Cancel consumer
+      client.send_frame( Qrack::Protocol09::Basic::Cancel.new(:consumer_tag =&gt; consumer_tag,
+																														:nowait =&gt; false))
 
-			raise Bunny::ProtocolError,
-				&quot;Error unsubscribing from queue #{name}&quot; unless
-				client.next_method.is_a?(Qrack::Protocol09::Basic::CancelOk)
+      raise Bunny::UnsubscribeError,
+        &quot;Error unsubscribing from queue #{name}&quot; unless
+        client.next_method.is_a?(Qrack::Protocol09::Basic::CancelOk)
+
+			# Reset subscription
+			@subscription = nil
 				
-			# return confirmation
+			# Return confirmation
 			:unsubscribe_ok
 			
     end</diff>
      <filename>lib/bunny/queue09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -9,6 +9,7 @@ require 'transport/frame08'
 require 'qrack/client'
 require 'qrack/channel'
 require 'qrack/queue'
+require 'qrack/subscription'
 
 module Qrack
 	</diff>
      <filename>lib/qrack/qrack08.rb</filename>
    </modified>
    <modified>
      <diff>@@ -9,6 +9,7 @@ require 'transport/frame09'
 require 'qrack/client'
 require 'qrack/channel'
 require 'qrack/queue'
+require 'qrack/subscription'
 
 module Qrack
 	</diff>
      <filename>lib/qrack/qrack09.rb</filename>
    </modified>
    <modified>
      <diff>@@ -4,7 +4,7 @@ module Qrack
 	class Queue
 		
 		attr_reader :name, :client
-	  attr_accessor :delivery_tag
+	  attr_accessor :delivery_tag, :subscription
 	
 =begin rdoc
 </diff>
      <filename>lib/qrack/queue.rb</filename>
    </modified>
    <modified>
      <diff>@@ -63,7 +63,7 @@ describe Bunny do
 	
 	it &quot;should be able to pop a message complete with header and delivery details&quot; do
 		q = @b.queue('test1')
-		msg = q.pop(:header =&gt; true)
+		msg = q.pop()
 		msg.should be_an_instance_of(Hash)
 		msg[:header].should be_an_instance_of(Bunny::Protocol::Header)
 		msg[:payload].should == 'This is a test message'
@@ -74,7 +74,7 @@ describe Bunny do
 	it &quot;should be able to pop a message and just get the payload&quot; do
 		q = @b.queue('test1')
 		q.publish('This is another test message')
-		msg = q.pop
+		msg = q.pop[:payload]
 		msg.should == 'This is another test message'
 		q.message_count.should == 0
 	end
@@ -83,7 +83,7 @@ describe Bunny do
 		q = @b.queue('test1')
 		lg_msg = 'z' * 142000
 		q.publish(lg_msg)
-		msg = q.pop
+		msg = q.pop[:payload]
 		msg.should == lg_msg
 	end
 	
@@ -105,7 +105,7 @@ describe Bunny do
 		q = @b.queue('test1')
 		q.publish('This is another test message')
 		q.pop
-		msg = q.pop
+		msg = q.pop[:payload]
 		msg.should == :queue_empty
 	end
 	
@@ -113,9 +113,8 @@ describe Bunny do
 		q = @b.queue('test1')
 		5.times {q.publish('Yet another test message')}
 		q.message_count.should == 5
-		q.subscribe(:message_max =&gt; 0){|msg| x = 1}
+		q.subscribe(:message_max =&gt; 0)
 		q.message_count.should == 5
-		q.unsubscribe.should == :unsubscribe_ok
 		q.purge.should == :purge_ok
 	end
 	
@@ -123,16 +122,60 @@ describe Bunny do
 		q = @b.queue('test1')
 		5.times {q.publish('Yet another test message')}
 		q.message_count.should == 5
-		q.subscribe(:message_max =&gt; 5){|msg| x = 1}
-		q.unsubscribe.should == :unsubscribe_ok
+		q.subscribe(:message_max =&gt; 5)
 	end
-	
+
+	it &quot;should stop subscription after processing message_max messages &lt; total in queue&quot; do
+		q = @b.queue('test1')
+		@b.qos()
+		10.times {q.publish('Yet another test message')}
+		q.message_count.should == 10
+		q.subscribe(:message_max =&gt; 5, :ack =&gt; true)
+		q.message_count.should == 5
+		q.purge.should == :purge_ok
+	end
+
 	it &quot;should raise an error when delete fails&quot; do
 		q = @b.queue('test1')
 		lambda {q.delete(:queue =&gt; 'bogus')}.should raise_error(Bunny::ForcedChannelCloseError)
 		@b.channel.active.should == false
 	end
 
+  it &quot;should pass correct block parameters through on subscribe&quot; do
+    q = @b.queue('test1')
+    q.publish(&quot;messages pop\'n&quot;)
+
+    q.subscribe do |msg|
+      msg[:header].should be_an_instance_of Qrack::Protocol::Header
+      msg[:payload].should == &quot;messages pop'n&quot;
+      msg[:delivery_details].should_not be_nil
+
+      q.unsubscribe
+			break
+    end
+
+  end
+
+  it &quot;should finish processing subscription messages if break is called in block&quot; do
+    q = @b.queue('test1')
+    q.publish('messages in my quezen')
+
+    q.subscribe do |msg|
+      msg[:payload].should == 'messages in my quezen'
+      q.unsubscribe
+			break
+    end
+
+    5.times {|i| q.publish(&quot;#{i}&quot;)}
+		q.subscribe do |msg|
+		  if msg[:payload] == '4'
+				q.unsubscribe 
+				break
+			end
+		end
+
+  end
+
 	it &quot;should be able to be deleted&quot; do
 		q = @b.queue('test1')
 		res = q.delete</diff>
      <filename>spec/spec_08/queue_spec.rb</filename>
    </modified>
    <modified>
      <diff>@@ -31,7 +31,7 @@ describe Bunny do
 		@b.channel.active.should == false
 	end
 	
-	it &quot;should be able to bind to an exchange&quot; do
+	it &quot;should be able to bind to an existing exchange&quot; do
 		exch = @b.exchange('direct_exch')
 		q = @b.queue('test1')
 		q.bind(exch).should == :bind_ok
@@ -63,7 +63,7 @@ describe Bunny do
 	
 	it &quot;should be able to pop a message complete with header and delivery details&quot; do
 		q = @b.queue('test1')
-		msg = q.pop(:header =&gt; true)
+		msg = q.pop()
 		msg.should be_an_instance_of(Hash)
 		msg[:header].should be_an_instance_of(Bunny::Protocol09::Header)
 		msg[:payload].should == 'This is a test message'
@@ -74,7 +74,7 @@ describe Bunny do
 	it &quot;should be able to pop a message and just get the payload&quot; do
 		q = @b.queue('test1')
 		q.publish('This is another test message')
-		msg = q.pop
+		msg = q.pop[:payload]
 		msg.should == 'This is another test message'
 		q.message_count.should == 0
 	end
@@ -83,7 +83,7 @@ describe Bunny do
 		q = @b.queue('test1')
 		lg_msg = 'z' * 142000
 		q.publish(lg_msg)
-		msg = q.pop
+		msg = q.pop[:payload]
 		msg.should == lg_msg
 	end
 	
@@ -105,17 +105,16 @@ describe Bunny do
 		q = @b.queue('test1')
 		q.publish('This is another test message')
 		q.pop
-		msg = q.pop
+		msg = q.pop[:payload]
 		msg.should == :queue_empty
 	end
-	
+
 	it &quot;should stop subscription without processing messages if max specified is 0&quot; do
 		q = @b.queue('test1')
 		5.times {q.publish('Yet another test message')}
 		q.message_count.should == 5
-		q.subscribe(:message_max =&gt; 0){|msg| x = 1}
+		q.subscribe(:message_max =&gt; 0)
 		q.message_count.should == 5
-		q.unsubscribe.should == :unsubscribe_ok
 		q.purge.should == :purge_ok
 	end
 	
@@ -123,8 +122,17 @@ describe Bunny do
 		q = @b.queue('test1')
 		5.times {q.publish('Yet another test message')}
 		q.message_count.should == 5
-		q.subscribe(:message_max =&gt; 5){|msg| x = 1}
-		q.unsubscribe.should == :unsubscribe_ok
+		q.subscribe(:message_max =&gt; 5)
+	end
+	
+	it &quot;should stop subscription after processing message_max messages &lt; total in queue&quot; do
+		q = @b.queue('test1')
+		@b.qos()
+		10.times {q.publish('Yet another test message')}
+		q.message_count.should == 10
+		q.subscribe(:message_max =&gt; 5, :ack =&gt; true)
+		q.message_count.should == 5
+		q.purge.should == :purge_ok
 	end
 	
 	it &quot;should raise an error when delete fails&quot; do
@@ -133,24 +141,58 @@ describe Bunny do
 		@b.channel.active.should == false
 	end
 
+	it &quot;should pass correct block parameters through on subscribe&quot; do
+    q = @b.queue('test1')
+    q.publish(&quot;messages pop\'n&quot;)
+
+    q.subscribe do |msg|
+      msg[:header].should be_an_instance_of Qrack::Protocol09::Header
+      msg[:payload].should == &quot;messages pop'n&quot;
+      msg[:delivery_details].should_not be_nil
+
+      q.unsubscribe
+			break
+    end
+
+	end
+
+	it &quot;should finish processing subscription messages if break is called in block&quot; do
+	  q = @b.queue('test1')
+	  q.publish('messages in my quezen')
+
+	  q.subscribe do |msg|
+	    msg[:payload].should == 'messages in my quezen'
+	    q.unsubscribe
+			break
+	  end
+
+	  5.times {|i| q.publish(&quot;#{i}&quot;)}
+		q.subscribe do |msg|
+		  if msg[:payload] == '4'
+				q.unsubscribe 
+				break
+			end
+		end
+	end
+
 	it &quot;should be able to be deleted&quot; do
 		q = @b.queue('test1')
 		res = q.delete
 		res.should == :delete_ok
 		@b.queues.has_key?('test1').should be(false)
 	end
-	
+
 	it &quot;should ignore the :nowait option when deleted&quot; do
 		q = @b.queue('test0')
 		q.delete(:nowait =&gt; true)
 	end
 
-  it &quot;should support server named queues&quot; do
-    q = @b.queue
-    q.name.should_not == nil
+	it &quot;should support server named queues&quot; do
+	  q = @b.queue
+	  q.name.should_not == nil
 
-    @b.queue(q.name).should == q
-    q.delete
-  end
+	  @b.queue(q.name).should == q
+	  q.delete
+	end
 	
 end</diff>
      <filename>spec/spec_09/queue_spec.rb</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>b8b39af3359000e2cdf82ee05c857c7a996a08ab</id>
    </parent>
  </parents>
  <author>
    <name>Chris Duncan</name>
    <email>celldee@gmail.com</email>
  </author>
  <url>http://github.com/celldee/bunny/commit/5166c1e1e5bc367e5b21da1ca0fb8d6184038dd4</url>
  <id>5166c1e1e5bc367e5b21da1ca0fb8d6184038dd4</id>
  <committed-date>2009-09-13T17:38:24-07:00</committed-date>
  <authored-date>2009-09-13T17:38:24-07:00</authored-date>
  <message>Add Subscription classes and improve Queue#pop/subscribe/unsubscribe. Update examples, gemspec and test specs.</message>
  <tree>7b8da8ac0126ded6659e48a72c16de166f5d1dcb</tree>
  <committer>
    <name>Chris Duncan</name>
    <email>celldee@gmail.com</email>
  </committer>
</commit>
