<?xml version="1.0" encoding="UTF-8"?>
<commit>
  <added type="array">
    <added>
      <filename>hookah/tests/__init__.py</filename>
    </added>
    <added>
      <filename>hookah/tests/test_queue.py</filename>
    </added>
  </added>
  <modified type="array">
    <modified>
      <diff>@@ -1,4 +1,5 @@
 *.pyc
+_trial_temp
 twistd.log
 twistd.pid
-twisted/plugins/dropin.cache
\ No newline at end of file
+twisted/plugins/dropin.cache</diff>
      <filename>.gitignore</filename>
    </modified>
    <modified>
      <diff>@@ -1,7 +1,7 @@
 from twisted.internet import reactor
 from twisted.web import client, error, http
 from twisted.web.resource import Resource
-from hookah import pubsub
+from hookah import queue
 import urllib
 import sys
 
@@ -44,7 +44,7 @@ class DispatchResource(Resource):
             del request.args['_topic']
 
             data_params = urllib.urlencode(request.args, doseq=True)
-            pubsub.dispatch_queue.put({
+            queue.put('dispatch', {
                 'topic' : topic_param,
                 'data' : data_params,
                 'content_type' : 'application/x-www-form-urlencoded',</diff>
      <filename>hookah/dispatch.py</filename>
    </modified>
    <modified>
      <diff>@@ -6,59 +6,58 @@ from twisted.internet import defer
 import urllib
 import time
 
-import dispatch
-
-fetch_queue = defer.DeferredQueue()
-dispatch_queue = defer.DeferredQueue()
-verify_queue = defer.DeferredQueue()
+from hookah import dispatch, queue, stream
 
 subscriptions = dict() # Key: topic, Value: list of subscriber callback URLs
 
-def baseN(num,b,numerals=&quot;0123456789abcdefghijklmnopqrstuvwxyz&quot;): 
-    return ((num == 0) and  &quot;0&quot; ) or (baseN(num // b, b).lstrip(&quot;0&quot;) + numerals[num % b])
+class FetchQueue(queue.Queue):
+    def receivedMessage(self, url):
+        subscribers = len(subscriptions.get(url, []))
+        if subscribers:
+            print &quot;Fetching %s for %s subscribers&quot; % (url, subscribers)
+            client.getPage(url, headers={'X-Hub-Subscribers': subscribers}) \
+                .addCallbacks(lambda p: queue.put('dispatch', {'topic': url, 'data': p, 'content_type': 'application/atom+xml'}))
 
-def do_fetch(url):
-    subscribers = len(subscriptions.get(url, []))
-    if subscribers:
-        print &quot;Fetching %s for %s subscribers&quot; % (url, subscribers)
-        client.getPage(url, headers={'X-Hub-Subscribers': subscribers}) \
-            .addCallbacks(lambda p: dispatch_queue.put({'topic': url, 'data': p, 'content_type': 'application/atom+xml'}))
-    fetch_queue.get().addCallback(do_fetch)
-    
-def do_dispatch(message):
-    subscribers = subscriptions.get(message['topic'], None)
-    if subscribers:
-        print &quot;Dispatching new content for %s to %s subscribers&quot; % (message['topic'], len(subscribers))
-        for subscriber in subscribers:
-            dispatch.post_and_retry(subscriber, message['data'], content_type=message['content_type'])
-    dispatch_queue.get().addCallback(do_dispatch)
+class DispatchQueue(queue.Queue):
+    def receivedMessage(self, message):
+        subscribers = subscriptions.get(message['topic'], None)
+        if subscribers:
+            print &quot;Dispatching new content for %s to %s subscribers&quot; % (message['topic'], len(subscribers))
+            for subscriber in subscribers:
+                dispatch.post_and_retry(subscriber, message['data'], content_type=message['content_type'])
+        if message['topic'] in stream.listeners:
+            for request in stream.listeners[message['topic']]:
+                request.queue.put(message['data'])
 
-def do_verify(to_verify):
-    print &quot;Verifying %s as a subscriber to %s&quot; % (to_verify['callback'], to_verify['topic'])
-    challenge = baseN(abs(hash(time.time())), 36)
-    verify_token = to_verify.get('verify_token', None)
-    payload = {'hub.mode': to_verify['mode'], 'hub.topic': to_verify['topic'], 'hub.challenge': challenge}
-    if verify_token:
-        payload['hub.verify_token'] = verify_token
-    url = '?'.join([to_verify['callback'], urllib.urlencode(payload)])
-    def success(page):
-        if challenge in page:
-            if to_verify['mode'] == 'subscribe':
-                if not to_verify['topic'] in subscriptions:
-                    subscriptions[to_verify['topic']] = []
-                subscriptions[to_verify['topic']].append(to_verify['callback'])
+class VerifyQueue(queue.Queue):
+    def receivedMessage(self, to_verify):
+        print &quot;Verifying %s as a subscriber to %s&quot; % (to_verify['callback'], to_verify['topic'])
+        challenge = baseN(abs(hash(time.time())), 36)
+        verify_token = to_verify.get('verify_token', None)
+        payload = {'hub.mode': to_verify['mode'], 'hub.topic': to_verify['topic'], 'hub.challenge': challenge}
+        if verify_token:
+            payload['hub.verify_token'] = verify_token
+        url = '?'.join([to_verify['callback'], urllib.urlencode(payload)])
+        def success(page):
+            if challenge in page:
+                if to_verify['mode'] == 'subscribe':
+                    if not to_verify['topic'] in subscriptions:
+                        subscriptions[to_verify['topic']] = []
+                    subscriptions[to_verify['topic']].append(to_verify['callback'])
+                else:
+                    subscriptions[to_verify['topic']].remove(to_verify['callback'])
+                if 'onsuccess' in to_verify:
+                    to_verify['onsuccess'](page)
             else:
-                subscriptions[to_verify['topic']].remove(to_verify['callback'])
-            if 'onsuccess' in to_verify:
-                to_verify['onsuccess'](page)
-        else:
+                if 'onfail' in to_verify:
+                    to_verify['onfail'](page)
+        def failure(x):
             if 'onfail' in to_verify:
-                to_verify['onfail'](page)
-    def failure(x):
-        if 'onfail' in to_verify:
-            to_verify['onfail'](x)
-    client.getPage(url).addCallbacks(success, failure)
-    verify_queue.get().addCallback(do_verify)
+                to_verify['onfail'](x)
+        client.getPage(url).addCallbacks(success, failure)
+
+def baseN(num,b,numerals=&quot;0123456789abcdefghijklmnopqrstuvwxyz&quot;): 
+    return ((num == 0) and  &quot;0&quot; ) or (baseN(num // b, b).lstrip(&quot;0&quot;) + numerals[num % b])
 
 class SubscribeResource(Resource):
     isLeaf = True
@@ -95,11 +94,11 @@ class SubscribeResource(Resource):
                 request.finish()
             to_verify['onsuccess'] = lambda x: finish_success(request)
             to_verify['onfail'] = lambda x: finish_failed(request)
-            verify_queue.put(to_verify)
+            queue.put('verify', to_verify)
             return NOT_DONE_YET
             
         elif verify == 'async':
-            verify_queue.put(to_verify)
+            queue.put('verify', to_verify)
             request.setResponseCode(http.ACCEPTED)
             return &quot;202 Scheduled for verification&quot;
 
@@ -116,9 +115,13 @@ class PublishResource(Resource):
             return &quot;400 Bad request: Expected 'hub.mode' and 'hub.url'&quot;
         
         if mode == 'publish':
-            fetch_queue.put(url)
+            queue.put('fetch', url)
             request.setResponseCode(http.NO_CONTENT)
             return &quot;204 Published&quot;
         else:
             request.setResponseCode(http.BAD_REQUEST)
-            return &quot;400 Bad request: Unrecognized mode&quot;
\ No newline at end of file
+            return &quot;400 Bad request: Unrecognized mode&quot;
+
+queue.register('dispatch', DispatchQueue())
+queue.register('verify', VerifyQueue())
+queue.register('fetch', FetchQueue())
\ No newline at end of file</diff>
      <filename>hookah/pubsub.py</filename>
    </modified>
    <modified>
      <diff>@@ -1,35 +1,33 @@
 from twisted.web import client, error, http, server
 from twisted.web.resource import Resource
+from hookah import queue
 
 # known sessions, session list
 # request gets a queue
 # buffer between main queue and request queue
 # push adds to buffer queue, adds to any request buffers
 
-listeners = {}
-
-def touch_active_sessions():
-    for topic in listeners:
-        if listeners[topic]:
-            for request in listeners[topic]:
-                request.getSession().touch()
-
-def __mk_session_exp_cb(self, sid):
-    def f():
-        print &quot;Expired session&quot;, sid
-        del sessions[sid]
-    return f
+listeners = {} # Key: topic, Value: list of requests listening
     
-def __req_finished(whatever, sid):
-    sessions[sid] = None
 
 class StreamResource(Resource):
     isLeaf = True
     
     def render_GET(self, request):
-        session = request.getSession()
-        if session.uid not in sessions:
-            sessions[session.uid] = request
-            session.notifyOnExpire(__mk_session_exp_cb(session.uid))
-        request.notifyFinish().addBoth(__req_finished, session.uid)
-        return server.NOT_DONE_YET
\ No newline at end of file
+        topic = request.args.get('topic', [None])[0]
+        if not topic:
+            return &quot;No topic&quot;
+        if not topic in listeners:
+            listeners[topic] = []
+        request.queue = queue.Queue(lambda m: self._send(request, m))
+        listeners[topic].append(request)
+        request.setHeader('Content-Type', 'application/json')
+        request.setHeader('Transfer-Encoding', 'chunked')
+        request.notifyFinish().addBoth(self._finished, topic, request)
+        return server.NOT_DONE_YET
+    
+    def _finished(self, whatever, topic, request):
+        listeners[topic].remove(request)
+
+    def _send(self, request, message):
+        request.write(message)
\ No newline at end of file</diff>
      <filename>hookah/stream.py</filename>
    </modified>
    <modified>
      <diff>@@ -3,7 +3,7 @@ from twisted.web import client, error, http, static
 from twisted.web.resource import Resource
 from twisted.internet import task
 
-import dispatch, pubsub, stream
+from hookah import dispatch, pubsub, stream
 
 class HookahResource(Resource):
     isLeaf = False
@@ -69,11 +69,6 @@ class HookahResource(Resource):
     
     @classmethod
     def setup(cls):
-        # These should probably go somewhere else
-        pubsub.fetch_queue.get().addCallback(pubsub.do_fetch)
-        pubsub.dispatch_queue.get().addCallback(pubsub.do_dispatch)
-        pubsub.verify_queue.get().addCallback(pubsub.do_verify)
-        
         r = cls()
         r.putChild('dispatch', dispatch.DispatchResource())
         r.putChild('subscribe', pubsub.SubscribeResource())</diff>
      <filename>hookah/web.py</filename>
    </modified>
  </modified>
  <removed type="array"/>
  <parents type="array">
    <parent>
      <id>0dcd90a04b303e37871b17b8d0ade6855f59b8b0</id>
    </parent>
  </parents>
  <author>
    <name>Jeff Lindsay</name>
    <email>progrium@gmail.com</email>
  </author>
  <url>http://github.com/progrium/hookah/commit/91565e0fcbb91e0efe6dda7b7faf86c7f8e3dc05</url>
  <id>91565e0fcbb91e0efe6dda7b7faf86c7f8e3dc05</id>
  <committed-date>2009-07-27T22:56:48-07:00</committed-date>
  <authored-date>2009-07-27T22:56:48-07:00</authored-date>
  <message>integrated new queue module, started playing with tests in twisted, and got the stream module mostly functional</message>
  <tree>dcae4650b3383332974e1ff954e3be1dd44b3ee3</tree>
  <committer>
    <name>Jeff Lindsay</name>
    <email>progrium@gmail.com</email>
  </committer>
</commit>
