/
messaging.py
81 lines (59 loc) · 2.23 KB
/
messaging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
from Queue import Empty
from itertools import cycle
class Queue(object):
def __init__(self, backend, name):
self.name = name
self.backend = backend
def put(self, payload):
self.backend.put(self.name, payload)
def get(self):
payload = self.backend.get(self.name)
if payload is not None:
return payload
raise Empty
def purge(self):
self.backend.purge(self.name)
def __repr__(self):
return "<Queue: %s>" % repr(self.name)
class QueueSet(object):
"""A set of queues that operates as one."""
def __init__(self, backend, queues):
self.backend = backend
self.queue_names = list(queues)
self._get_many = getattr(self.backend, "get_many", None)
self.get = self._emulated
if self._get_many:
self.get = self._native
# attributes below are only used in emulation mode.
# queues could be a PriorityQueue as well to support
# priorities.
self.queues = map(self.backend.Queue, self.queue_names)
# an infinite cycle through all the queues.
self.cycle = cycle(self.queues)
# A set of all the queue names, so we can match when we've
# tried all of them.
self.all = frozenset(self.queue_names)
def _native(self, timeout=None):
return self._get_many(self.queue_names, timeout=timeout)
def _emulated(self, timeout=None):
"""Get the next message avaiable in the queue.
:returns: The message and the name of the queue it came from as
a tuple.
:raises Empty: If there are no more items in any of the queues.
"""
# A set of queues we've already tried.
tried = set()
while True:
# Get the next queue in the cycle, and try to get an item off it.
queue = self.cycle.next()
try:
item = queue.get()
except Empty:
# raises Empty when we've tried all of them.
tried.add(queue.name)
if tried == self.all:
raise
else:
return item, queue.name
def __repr__(self):
return "<QueueSet: %s>" % repr(self.queue_names)