<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array"/>
  <modified type="array">
    <modified>
      <diff>@@ -1,3 +1,4 @@
+.DS_Store
 .actionScriptProperties
 .flexLibProperties
 .project</diff>
      <filename>.gitignore</filename>
    </modified>
    <modified>
      <diff></diff>
      <filename>lib/flexunit.swc</filename>
    </modified>
    <modified>
      <diff>@@ -18,15 +18,17 @@
 package org.amqp.patterns.impl
 {
     import com.ericfeminella.utils.HashMap;
-
+    
     import flash.events.EventDispatcher;
     import flash.utils.ByteArray;
-
+    
     import org.amqp.BasicConsumer;
     import org.amqp.Command;
     import org.amqp.Connection;
     import org.amqp.ProtocolEvent;
     import org.amqp.headers.BasicProperties;
+    import org.amqp.methods.basic.Cancel;
+    import org.amqp.methods.basic.CancelOk;
     import org.amqp.methods.basic.Consume;
     import org.amqp.methods.basic.Deliver;
     import org.amqp.methods.queue.Declare;
@@ -36,8 +38,6 @@ package org.amqp.patterns.impl
 
     public class SubscribeClientImpl extends AbstractDelegate implements SubscribeClient, BasicConsumer, Dispatcher
     {
-        private const CONSUME_HANDLER:Function = consumeHandler;
-
         private var replyQueue:String = null;
         private var topics:HashMap = new HashMap();
         private var dispatcher:EventDispatcher = new EventDispatcher();
@@ -53,36 +53,39 @@ package org.amqp.patterns.impl
                 return;
             }
 
-            topics.put(key, callback);
+            topics.put(key, {callback:callback, consumerTag:null});
 
             if (replyQueue != null) {
-                dispatch(key, CONSUME_HANDLER);
+                dispatch(key, null);
             }else {
-                sendBuffer.buffer(key, CONSUME_HANDLER);
+                sendBuffer.buffer(key, null);
             }
         }
 
         public function unsubscribe(key:String):void {
+        	var cancel:Cancel = new Cancel();
+        	var topic:* = topics.getValue(key);
+        	
+        	cancel.consumertag = topic.consumerTag;
+            sessionHandler.dispatch(new Command(cancel));
+            sessionHandler.addEventListener(new org.amqp.methods.basic.CancelOk, onCancelOk);
+        	
+            dispatcher.removeEventListener(key, topic.callback);
             topics.remove(key);
-            dispatcher.removeEventListener(key, CONSUME_HANDLER);
         }
 
         public function dispatch(o:*, callback:Function):void {
+        	var consume:Consume = new Consume();
+            consume.queue = replyQueue;
+            consume.noack = true;
+            consume.consumertag = replyQueue + &quot;:&quot; + o;
+            sessionHandler.register(consume, this);
+        	
             bindQueue(exchange, replyQueue, o);
-            dispatcher.addEventListener(o, callback);
-        }
-
-        public function consumeHandler(event:CorrelatedMessageEvent):void {
-            var key:String = event.type;
-
-            if (topics.containsKey(key)) {
-                var topic:* = topics.getValue(key);
-                topic.call(null, event);
-            }
         }
 
         override protected function onChannelOpenOk(event:ProtocolEvent):void {
-            declareExchange(exchange,exchangeType);
+            declareExchange(exchange, exchangeType);
             setupReplyQueue();
         }
 
@@ -95,26 +98,26 @@ package org.amqp.patterns.impl
 
         override protected function onQueueDeclareOk(event:ProtocolEvent):void {
             replyQueue = getReplyQueue(event);
-            var consume:Consume = new Consume();
-            consume.queue = replyQueue;
-            consume.noack = true;
-            sessionHandler.register(consume, this);
             sendBuffer.drain();
         }
 
-        public function onConsumeOk(tag:String):void {}
+        public function onConsumeOk(tag:String):void {
+    	    var key:String = tag.split(&quot;:&quot;)[1];
+    	    var topic:* = topics.getValue(key);
+    	   
+    	    topic.consumerTag = tag;
+    	    topics.put(key, topic);
+    	   
+    	    dispatcher.addEventListener(key, topic.callback);
+        }
 
         public function onCancelOk(tag:String):void {}
 
         public function onDeliver(method:Deliver,
                                   properties:BasicProperties,
                                   body:ByteArray):void {
-            // replyto will always be null since we dont know the queue name
-            // ..maybe we set an optional user id in the constructor?
-            //if (properties.replyto != replyQueue) {
-                var result:* = serializer.deserialize(body);
-                dispatcher.dispatchEvent(new CorrelatedMessageEvent(properties.correlationid, result));
-            //}
+            var result:* = serializer.deserialize(body);
+            dispatcher.dispatchEvent(new CorrelatedMessageEvent(properties.correlationid, result));
         }
     }
 }
\ No newline at end of file</diff>
      <filename>src/org/amqp/patterns/impl/SubscribeClientImpl.as</filename>
    </modified>
    <modified>
      <diff></diff>
      <filename>test/lib/flexunit.swc</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>0557ca6665b37cebc895d1f975520f87efc22b41</id>
    </parent>
  </parents>
  <author>
    <name>Peter Kieltyka</name>
    <email>peter.kieltyka@nulayer.com</email>
  </author>
  <url>http://github.com/0x6e6562/as3-amqp/commit/ecfa04f09d9d374baf3a2581bcbcd5f3bf91659e</url>
  <id>ecfa04f09d9d374baf3a2581bcbcd5f3bf91659e</id>
  <committed-date>2008-08-26T11:18:11-07:00</committed-date>
  <authored-date>2008-08-26T11:18:11-07:00</authored-date>
  <message>Reworked SubscribeClientImpl so it creates a consumer for each subscription</message>
  <tree>2af9ac603bb80e12bf3b67736e85e7bce750549f</tree>
  <committer>
    <name>Peter Kieltyka</name>
    <email>peter.kieltyka@nulayer.com</email>
  </committer>
</commit>
