/
consumer.py
42 lines (35 loc) · 1.2 KB
/
consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
import errno
import zmq
class CircusConsumer(object):
def __init__(self, topics, context=None, endpoint='tcp://127.0.0.1:5556'):
self.topics = topics
self.keep_context = context is not None
self.context = context or zmq.Context()
self.endpoint = endpoint
self.pubsub_socket = self.context.socket(zmq.SUB)
self.pubsub_socket.connect(self.endpoint)
for topic in self.topics:
self.pubsub_socket.setsockopt(zmq.SUBSCRIBE, topic)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
""" On context manager exit, destroy the zmq context """
self.stop()
def __iter__(self):
return self.iter_messages()
def iter_messages(self):
""" Yields tuples of (topic, message) """
with self:
while True:
topic, message = self.pubsub_socket.recv_multipart()
yield topic, message
def stop(self):
if self.keep_context:
return
try:
self.context.destroy(0)
except zmq.ZMQError as e:
if e.errno == errno.EINTR:
pass
else:
raise