/
pyamqplib.py
364 lines (282 loc) · 12.8 KB
/
pyamqplib.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
"""
`amqplib`_ backend for carrot.
.. _`amqplib`: http://barryp.org/software/py-amqplib/
"""
from amqplib.client_0_8 import transport
# amqplib's handshake mistakenly identifies as protocol version 1191,
# this breaks in RabbitMQ tip, which no longer falls back to
# 0-8 for unknown ids.
transport.AMQP_PROTOCOL_HEADER = "AMQP\x01\x01\x08\x00"
from amqplib import client_0_8 as amqp
from amqplib.client_0_8.exceptions import AMQPConnectionException
from amqplib.client_0_8.exceptions import AMQPChannelException
from amqplib.client_0_8.serialization import AMQPReader, AMQPWriter
from carrot.backends.base import BaseMessage, BaseBackend
from itertools import count
import socket
import warnings
import weakref
DEFAULT_PORT = 5672
class Connection(amqp.Connection):
def drain_events(self, allowed_methods=None, timeout=None):
"""Wait for an event on any channel."""
return self.wait_multi(self.channels.values(), timeout=timeout)
def wait_multi(self, channels, allowed_methods=None, timeout=None):
"""Wait for an event on a channel."""
chanmap = dict((chan.channel_id, chan) for chan in channels)
chanid, method_sig, args, content = self._wait_multiple(
chanmap.keys(), allowed_methods, timeout=timeout)
channel = chanmap[chanid]
if content \
and channel.auto_decode \
and hasattr(content, 'content_encoding'):
try:
content.body = content.body.decode(content.content_encoding)
except Exception:
pass
amqp_method = channel._METHOD_MAP.get(method_sig, None)
if amqp_method is None:
raise Exception('Unknown AMQP method (%d, %d)' % method_sig)
if content is None:
return amqp_method(channel, args)
else:
return amqp_method(channel, args, content)
def read_timeout(self, timeout=None):
if timeout is None:
return self.method_reader.read_method()
sock = self.transport.sock
prev = sock.gettimeout()
sock.settimeout(timeout)
try:
return self.method_reader.read_method()
finally:
sock.settimeout(prev)
def _wait_multiple(self, channel_ids, allowed_methods, timeout=None):
for channel_id in channel_ids:
method_queue = self.channels[channel_id].method_queue
for queued_method in method_queue:
method_sig = queued_method[0]
if (allowed_methods is None) \
or (method_sig in allowed_methods) \
or (method_sig == (20, 40)):
method_queue.remove(queued_method)
method_sig, args, content = queued_method
return channel_id, method_sig, args, content
# Nothing queued, need to wait for a method from the peer
while True:
channel, method_sig, args, content = self.read_timeout(timeout)
if (channel in channel_ids) \
and ((allowed_methods is None) \
or (method_sig in allowed_methods) \
or (method_sig == (20, 40))):
return channel, method_sig, args, content
# Not the channel and/or method we were looking for. Queue
# this method for later
self.channels[channel].method_queue.append((method_sig,
args,
content))
#
# If we just queued up a method for channel 0 (the Connection
# itself) it's probably a close method in reaction to some
# error, so deal with it right away.
#
if channel == 0:
self.wait()
class QueueAlreadyExistsWarning(UserWarning):
"""A queue with that name already exists, so a recently changed
``routing_key`` or other settings might be ignored unless you
rename the queue or restart the broker."""
class Message(BaseMessage):
"""A message received by the broker.
Usually you don't insantiate message objects yourself, but receive
them using a :class:`carrot.messaging.Consumer`.
:param backend: see :attr:`backend`.
:param amqp_message: see :attr:`_amqp_message`.
.. attribute:: body
The message body.
.. attribute:: delivery_tag
The message delivery tag, uniquely identifying this message.
.. attribute:: backend
The message backend used.
A subclass of :class:`carrot.backends.base.BaseBackend`.
.. attribute:: _amqp_message
A :class:`amqplib.client_0_8.basic_message.Message` instance.
This is a private attribute and should not be accessed by
production code.
"""
def __init__(self, backend, amqp_message, **kwargs):
self._amqp_message = amqp_message
self.backend = backend
for attr_name in ("body",
"delivery_tag",
"content_type",
"content_encoding",
"delivery_info"):
kwargs[attr_name] = getattr(amqp_message, attr_name, None)
super(Message, self).__init__(backend, **kwargs)
class Backend(BaseBackend):
"""amqplib backend
:param connection: see :attr:`connection`.
.. attribute:: connection
A :class:`carrot.connection.BrokerConnection` instance. An established
connection to the broker.
"""
default_port = DEFAULT_PORT
connection_errors = (AMQPConnectionException,
socket.error,
IOError,
OSError)
channel_errors = (AMQPChannelException, )
Message = Message
def __init__(self, connection, **kwargs):
self.connection = connection
self.default_port = kwargs.get("default_port", self.default_port)
self._channel_ref = None
@property
def _channel(self):
return callable(self._channel_ref) and self._channel_ref()
@property
def channel(self):
"""If no channel exists, a new one is requested."""
if not self._channel:
connection = self.connection.connection
self._channel_ref = weakref.ref(connection.channel())
return self._channel
def establish_connection(self):
"""Establish connection to the AMQP broker."""
conninfo = self.connection
if not conninfo.hostname:
raise KeyError("Missing hostname for AMQP connection.")
if conninfo.userid is None:
raise KeyError("Missing user id for AMQP connection.")
if conninfo.password is None:
raise KeyError("Missing password for AMQP connection.")
if not conninfo.port:
conninfo.port = self.default_port
return Connection(host=conninfo.host,
userid=conninfo.userid,
password=conninfo.password,
virtual_host=conninfo.virtual_host,
insist=conninfo.insist,
ssl=conninfo.ssl,
connect_timeout=conninfo.connect_timeout)
def close_connection(self, connection):
"""Close the AMQP broker connection."""
connection.close()
def queue_exists(self, queue):
"""Check if a queue has been declared.
:rtype bool:
"""
try:
self.channel.queue_declare(queue=queue, passive=True)
except AMQPChannelException, e:
if e.amqp_reply_code == 404:
return False
raise e
else:
return True
def queue_delete(self, queue, if_unused=False, if_empty=False):
"""Delete queue by name."""
return self.channel.queue_delete(queue, if_unused, if_empty)
def queue_purge(self, queue, **kwargs):
"""Discard all messages in the queue. This will delete the messages
and results in an empty queue."""
return self.channel.queue_purge(queue=queue)
def queue_declare(self, queue, durable, exclusive, auto_delete,
warn_if_exists=False, arguments=None):
"""Declare a named queue."""
if warn_if_exists and self.queue_exists(queue):
warnings.warn(QueueAlreadyExistsWarning(
QueueAlreadyExistsWarning.__doc__))
return self.channel.queue_declare(queue=queue,
durable=durable,
exclusive=exclusive,
auto_delete=auto_delete,
arguments=arguments)
def exchange_declare(self, exchange, type, durable, auto_delete):
"""Declare an named exchange."""
return self.channel.exchange_declare(exchange=exchange,
type=type,
durable=durable,
auto_delete=auto_delete)
def queue_bind(self, queue, exchange, routing_key, arguments=None):
"""Bind queue to an exchange using a routing key."""
return self.channel.queue_bind(queue=queue,
exchange=exchange,
routing_key=routing_key,
arguments=arguments)
def message_to_python(self, raw_message):
"""Convert encoded message body back to a Python value."""
return self.Message(backend=self, amqp_message=raw_message)
def get(self, queue, no_ack=False):
"""Receive a message from a declared queue by name.
:returns: A :class:`Message` object if a message was received,
``None`` otherwise. If ``None`` was returned, it probably means
there was no messages waiting on the queue.
"""
raw_message = self.channel.basic_get(queue, no_ack=no_ack)
if not raw_message:
return None
return self.message_to_python(raw_message)
def declare_consumer(self, queue, no_ack, callback, consumer_tag,
nowait=False):
"""Declare a consumer."""
return self.channel.basic_consume(queue=queue,
no_ack=no_ack,
callback=callback,
consumer_tag=consumer_tag,
nowait=nowait)
def consume(self, limit=None):
"""Returns an iterator that waits for one message at a time."""
for total_message_count in count():
if limit and total_message_count >= limit:
raise StopIteration
if not self.channel.is_open:
raise StopIteration
self.channel.wait()
yield True
def cancel(self, consumer_tag):
"""Cancel a channel by consumer tag."""
if not self.channel.connection:
return
self.channel.basic_cancel(consumer_tag)
def close(self):
"""Close the channel if open."""
if self._channel and self._channel.is_open:
self._channel.close()
self._channel_ref = None
def ack(self, delivery_tag):
"""Acknowledge a message by delivery tag."""
return self.channel.basic_ack(delivery_tag)
def reject(self, delivery_tag):
"""Reject a message by deliver tag."""
return self.channel.basic_reject(delivery_tag, requeue=False)
def requeue(self, delivery_tag):
"""Reject and requeue a message by delivery tag."""
return self.channel.basic_reject(delivery_tag, requeue=True)
def prepare_message(self, message_data, delivery_mode, priority=None,
content_type=None, content_encoding=None):
"""Encapsulate data into a AMQP message."""
message = amqp.Message(message_data, priority=priority,
content_type=content_type,
content_encoding=content_encoding)
message.properties["delivery_mode"] = delivery_mode
return message
def publish(self, message, exchange, routing_key, mandatory=None,
immediate=None, headers=None):
"""Publish a message to a named exchange."""
if headers:
message.properties["headers"] = headers
ret = self.channel.basic_publish(message, exchange=exchange,
routing_key=routing_key,
mandatory=mandatory,
immediate=immediate)
if mandatory or immediate:
self.close()
def qos(self, prefetch_size, prefetch_count, apply_global=False):
"""Request specific Quality of Service."""
self.channel.basic_qos(prefetch_size, prefetch_count,
apply_global)
def flow(self, active):
"""Enable/disable flow from peer."""
self.channel.flow(active)