Skip to content
Browse files

adding a concurrency helper for consumers

  • Loading branch information...
1 parent e5ace27 commit 29cb05f39afc3942e503b06bbf3a9e6c419db1f6 Jonthan Moss committed
Showing with 53 additions and 13 deletions.
  1. +22 −0 amity/concurrency.py
  2. +7 −13 amity/messaging.py
  3. +24 −0 scripts/concurrent_consume.py
View
22 amity/concurrency.py
@@ -0,0 +1,22 @@
+"""
+amity.concurrency
+============
+
+For helping with concurrent processing
+
+:copyright: (c) 2011 by Jonathan Moss.
+
+"""
+
+from threading import Thread
+from functools import wraps
+
+def run_in_thread(f):
+ """
+ Wraps the passed in function in a thread so that it executes asynchronously
+ """
+ @wraps(f)
+ def wrapper(*args, **kwargs):
+ thread = Thread(target=f, args=args, kwargs=kwargs)
+ thread.start()
+ return wrapper
View
20 amity/messaging.py
@@ -145,7 +145,7 @@ def _get_producer(self):
channel = self.connection.channel()
self.producer = Producer(channel,
exchange=self._get_exchange(),
- serializer="json")
+ serializer='json')
return self.producer
def emit(self, event_name, event):
@@ -211,7 +211,7 @@ def _get_queue(self, event_name):
def _marshal(self, func):
"""
- Wraps up the pass in func so it receive a nice tasty
+ Wraps up the passed in func so it receive a nice tasty
Event object
"""
def marshalled_func(body, message):
@@ -219,13 +219,9 @@ def marshalled_func(body, message):
Converts the kombu body into an Event object
and passes that to the handler function
"""
- event = Event()
- for key, value in body['values'].items():
- event.set(key, value)
- for header, value in body['headers'].items():
- event.set_header(header, value)
- if func(event) is not False:
- message.ack()
+ event = Event(body['values'], body['headers'])
+ func(event)
+ message.ack()
return marshalled_func
def register_callback(self, event_name, callback):
@@ -326,10 +322,8 @@ def marshalled_func(body, message):
and passes that to the handler function
"""
event = Event(body['values'], body['headers'])
- result = func(event)
- if result is not False:
- self._emitter.emit(event.get_header('reply-to'), result)
- message.ack()
+ self._emitter.emit(event.get_header('reply-to'), func(event))
+ message.ack()
return marshalled_func
View
24 scripts/concurrent_consume.py
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+'''
+Created on Jun 19, 2011
+
+@author: mossj
+'''
+from amity.messaging import Listener
+from amity.concurrency import run_in_thread
+from kombu.connection import BrokerConnection
+
+connection = BrokerConnection('localhost', 'guest', 'guest', '/')
+
+@run_in_thread
+def lineItemLogger(event):
+ """
+ The @run_in_thread does what is says and wraps this function
+ in a thread
+ """
+ print "Message: '%s' FROM %s" % (event.get("msg"),
+ event.get_header('from'))
+
+c = Listener(connection)
+c.register_callback("lineitem.*", lineItemLogger)
+c.listen()

0 comments on commit 29cb05f

Please sign in to comment.
Something went wrong with that request. Please try again.