bwhitman / py-amqplib-wrapper

Python wrapper for amqplib to emulate SQS semantics

This URL has Read+Write access

py-amqplib-wrapper / amqp-wrapper.py
556f6daf » bwhitman 2009-04-27 first commit 1 import simplejson
2 import amqplib.client_0_8 as amqp
3
4 class Queue():
5 """Base class for AMQP queue service."""
6 def __init__(self, queue_name):
7 self.conn = amqp.Connection('HOST_HERE', userid='guest', password='guest', ssl=False)
8 self.queue_name = queue_name
9 self.ch = self.conn.channel()
10
11 def declare(self):
12 return self.ch.queue_declare(self.queue_name, passive=False, durable=True, exclusive=False, auto_delete=False)
13
14 def __len__(self):
15 """Return number of messages waiting in this queue"""
16 _,n_msgs,_ = self.declare()
17 return n_msgs
18
19 def consumers(self):
20 """Return how many clients are currently listening to this queue."""
21 _,_,n_consumers = self.declare()
22 return n_consumers
23
24
25 class QueueProducer(Queue):
26 def __init__(self, queue_name):
27 """Create new queue producer (guy that creates messages.)
28 Will create a queue if the given name does not already exist."""
29 Queue.__init__(self, queue_name)
30 self.ch.access_request('/data',active=True,read=False,write=True)
31 self.ch.exchange_declare('sqs_exchange', 'direct', durable=True, auto_delete=False)
32 qname, n_msgs, n_consumers = self.declare()
33 print "Connected to %s (%d msgs, %d consumers)" % (qname, n_msgs, n_consumers)
34 self.ch.queue_bind(self.queue_name, 'sqs_exchange', self.queue_name)
35
36 def delete(self):
37 """Delete a queue and closes the queue connection."""
38 self.ch.queue_delete(self.queue_name)
39 self.ch.close()
40
41 def write(self, message):
42 """Write a single message to the queue. Message can be a dict or a list or whatever."""
43 m = amqp.Message(simplejson.dumps(message), content_type='text/x-json')
44 self.ch.basic_publish(m, 'sqs_exchange', self.queue_name)
45
46 class QueueConsumer(Queue):
47 def __init__(self, queue_name):
48 """Create new queue consumer (guy that listens to messages.)"""
49 Queue.__init__(self, queue_name)
50 self.ch.access_request('/data',active=True, read=True, write=False)
51 self.ch.queue_bind(self.queue_name, 'sqs_exchange', self.queue_name)
52
53 def ack(self, delivery_tag):
54 """Acknowledge receipt of the message (which will remove it off the queue.)
55 Do this after you've completed your processing of the message.
56 Otherwise after some amount of time (about a minute) it will go back on the queue.
57 e.g.
58
59 (object, tag) = consumer.get()
60 if(object is not None):
61 error = doSomethingWithMessage(object)
62 if(error is None):
63 consumer.ack(tag)
64
65 """
66 self.ch.basic_ack(delivery_tag)
67
68 def get(self):
69 """Get a message. Returns the object and a delivery tag."""
70 m = self.ch.basic_get(self.queue_name)
71 if(m is not None):
72 try:
73 ret = simplejson.loads(m.body)
74 except ValueError:
75 print "Problem decoding json for body " + str(m.body) + ". deleting."
76 self.ack(m.delivery_tag)
77 return (None, None)
78 return (ret, m.delivery_tag)
79 else:
80 return (None,None)
81