Permalink
Browse files

responders can now specify queue names so that you can run multiple a…

…nd still only get one response
  • Loading branch information...
1 parent b10a778 commit e5ace27069bdb4a0bacc75e08bdd8518805e5286 @a-musing-moose committed Jun 29, 2011
Showing with 63 additions and 17 deletions.
  1. +62 −16 amity/messaging.py
  2. +1 −1 scripts/responser.py
View
@@ -162,7 +162,8 @@ class Listener(object):
"""
connection = None
exchange = None
- consumers = []
+ consumers = None
+ _queues = None
def __init__(self, connection):
"""
@@ -173,6 +174,8 @@ def __init__(self, connection):
"""
self._set_connection(connection)
self.exchange = Exchange('event', 'topic')
+ self.consumers = []
+ self._queues = {}
def _set_connection(self, connection):
"""
@@ -195,6 +198,17 @@ def get_connection(self):
"""
return self.connection
+ def _get_queue(self, event_name):
+ """
+ Gets a fresh new queue
+ """
+ if event_name not in self._queues:
+ self._queues[event_name] = Queue(str(getpid()),
+ exchange=self.exchange,
+ routing_key=event_name,
+ durable=False)
+ return self._queues[event_name]
+
def _marshal(self, func):
"""
Wraps up the pass in func so it receive a nice tasty
@@ -210,7 +224,8 @@ def marshalled_func(body, message):
event.set(key, value)
for header, value in body['headers'].items():
event.set_header(header, value)
- func(event)
+ if func(event) is not False:
+ message.ack()
return marshalled_func
def register_callback(self, event_name, callback):
@@ -226,10 +241,7 @@ def register_callback(self, event_name, callback):
The callable to call when the event is received
"""
channel = self.get_connection().channel()
- queue = Queue(str(getpid()),
- exchange=self.exchange,
- routing_key=event_name,
- durable=False)
+ queue = self._get_queue(event_name)
consumer = Consumer(channel, queue)
consumer.register_callback(self._marshal(callback))
consumer.consume()
@@ -256,14 +268,49 @@ def listen(self):
except socket.timeout:
pass
-class Responder(Listener):
+class ExclusiveListener(Listener):
+ """
+ An extension of Listener which listens on a specified queue name
+
+ By specifying a specific queue name you can ensure that messages are
+ received by only one listener (hence the 'Exclusive' bit. ed.)
+ """
+ queue = None
+ queue_name = None
+
+ def __init__(self, connection, queue_name):
+ """
+ Arguments:
+
+ connection
+ The BrokerConnection to use
+
+ queue_name
+ The queue_name to bind to
+ """
+ Listener.__init__(self, connection)
+ self.queue_name = queue_name
+ self.queue = None
+
+ def _get_queue(self, event_name):
+ """
+ Returns the queue object to use
+ """
+ if self.queue is None:
+ self.queue = Queue(self.queue_name,
+ exchange=self.exchange,
+ routing_key=event_name,
+ durable=False)
+ return self.queue
+
+class Responder(ExclusiveListener):
"""
Base class for a listener that responds
"""
_emitter = None
- def __init__(self, connection):
- Listener.__init__(self, connection)
+ def __init__(self, connection, queue_name):
+ ExclusiveListener.__init__(self, connection, queue_name)
self._emitter = Emitter(self.connection)
def _marshal(self, func):
@@ -278,12 +325,11 @@ def marshalled_func(body, message):
Converts the kombu body into an Event object
and passes that to the handler function
"""
- event = Event()
- for key, value in body['values'].items():
- event.set(key, value)
- for header, value in body['headers'].items():
- event.set_header(header, value)
- self._emitter.emit(event.get_header('reply-to'), func(event))
+ event = Event(body['values'], body['headers'])
+ result = func(event)
+ if result is not False:
+ self._emitter.emit(event.get_header('reply-to'), result)
+ message.ack()
return marshalled_func
@@ -326,7 +372,7 @@ def call(self, event_name, event, timeout=10):
self._response = None
reply_to = str(uuid.uuid1())
event.set_header('reply-to', reply_to)
- listen = Listener(self.connection)
+ listen = ExclusiveListener(self.connection, reply_to)
listen.register_callback(reply_to, self._on_response)
self._emitter.emit(event_name, event)
listen.listen_once(timeout)
@@ -15,6 +15,6 @@ def responser(event):
response = event.get('msg')[::-1]
return Event({"msg": response})
-c = Responder(connection)
+c = Responder(connection, 'my_queue')
c.register_callback("event.reverse", responser)
c.listen()

0 comments on commit e5ace27

Please sign in to comment.