-
Notifications
You must be signed in to change notification settings - Fork 0
/
pikaclient.py
206 lines (166 loc) · 7.2 KB
/
pikaclient.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
#/usr/bin/env python
"""
Abstracting IOloop/AMQP operations used by gecko proper. This is
based on pika's tornado demo here:
https://github.com/pika/pika/blob/master/examples/demo_tornado.py
"""
import multiprocessing
import os
import pika
from pika.adapters.tornado_connection import TornadoConnection
import tornado
import logging
from pyrabbit.api import Server
logging.basicConfig(level=logging.DEBUG)
__author__ = 'Brian K. Jones'
__email__ = 'bkjones@gmail.com'
__since__ = '05/25/2011'
class PoolManager(object):
"""
Takes in the AMQP config, which includes the info needed to connect and
consume from the broker, as well as the min/max number of consumer
threads to run, spawn/kill threshholds, etc.
It doesn't connect or consume itself. It passes that on to the workers.
This object's responsibility is limited to thread management and providing
a singular interface with with to communicate w/ threads & get information
about them.
To get queue depths & anything else it needs to determine when/if to
spawn/kill threads, it uses the HTTP interface of the RabbitMQ
management plugin, so that's an external dependency.
"""
def __init__(self, config):
self._config = config['AMQP']['AutoScaling']
mgmt_cfg = config['AMQP']['Management']
self.min_workers = self._config['min_workers']
self.max_workers= self._config['max_workers']
self.mps_max = self._config['mps_max']
self.max_err = self._config['max_err']
self.spawn_depth = self._config['spawn_depth']
self.requeue_on_err = self._config['requeue_on_err']
self.broker = Server("%s:%s" % (mgmt_cfg['host'], mgmt_cfg['port']),
mgmt_cfg['user'], mgmt_cfg['password'])
# Passed to workers
self.worker_config = config['AMQP']
# worker instances.
self.workers = []
self.worker_class = PikaClient
def spawn_workers(self):
"""Starts a new worker, and registers it in self.workers.
"""
try:
new_worker = self.worker_class(self._config['AMQP'])
new_worker.start()
except Exception:
raise
else:
self.workers.append(new_worker)
logging.info("New worker added (%s workers)", len(self.workers))
return True
def reap_workers(self):
"""Loops over the workers and removes any that have died.
"""
for worker in self.workers:
if worker.is_alive():
continue
else:
logging.info("Removing dead worker: %s", worker.name)
self.workers.pop(self.workers.index(worker))
logging.info("%s workers running", len(self.workers))
def poll_depth(self, vhost, queue):
"""Asks RabbitMQ about queue depths. Used to determine if we need to
spawn more workers. Returns an integer.
"""
depth = self.broker.get_queue_depth(vhost, queue)
return depth
def start(self):
"""This is called by the end user's processor module to kick off
the fireworks. It's basically this class's 'main()'.
"""
worker = PikaClient(self.worker_config)
logging.info("Starting a new worker")
worker.start()
logging.info("Worker started")
class PikaClient(multiprocessing.Process):
def __init__(self, config):
super(PikaClient, self).__init__()
self.config = config
# Default values
self.connected = False
self.connecting = False
self.connection = None
self.channel = None
self.ioloop = tornado.ioloop.IOLoop.instance()
# A place for us to keep messages sent to us by Rabbitmq
self.messages = list()
# A place for us to put pending messages while we're waiting to connect
self.pending = list()
self.handlers = set()
def register_handler(self, handler):
"""
handler is an instance of some class that processes messages.
"""
logging.debug("register_handler called...")
self.handlers.add(handler)
def run(self):
"""
Start consuming messages, dispatching them to any registered handlers.
"""
for i in range(3):
if os.fork() == 0:
self.ioloop = tornado.ioloop.IOLoop.instance()
logging.debug("Starting ioloop inside consume()")
self.connect()
self.ioloop.start()
def dispatch(self, channel, method, header, body):
logging.debug("CONSUMER: Got message: %s", body)
logging.debug("Handlers available: %s", self.handlers)
for handler in self.handlers:
logging.debug("Routing to handler: %s", handler)
handler.process(body)
def connect(self):
if self.connecting:
logging.debug('PikaClient: Already connecting to RabbitMQ')
return
logging.debug('PikaClient: Connecting to RabbitMQ')
self.connecting = True
credentials = pika.PlainCredentials('guest', 'guest')
param = pika.ConnectionParameters(credentials=credentials,
**self.config['Connection'])
logging.debug("Pika params: %s" % param)
try:
TornadoConnection(param, on_open_callback=self.on_connected)
except Exception as out:
logging.error("There was a connection problem: %s", out)
def on_connected(self, connection):
logging.debug('PikaClient: Connected to RabbitMQ')
self.connected = True
self.connection = connection
self.connection.add_on_close_callback(self.on_closed)
logging.debug("Setting self.channel now...")
self.connection.channel(self.on_channel_open)
def on_channel_open(self, channel):
logging.debug('PikaClient: Channel Open, Declaring Exchange')
self.channel = channel
self.channel.exchange_declare(callback=self.on_exchange_declared,
**self.config['Exchange'])
def on_exchange_declared(self, frame):
logging.debug('PikaClient: Exchange Declared, Declaring Queue')
self.channel.queue_declare(callback=self.on_queue_declared,
**self.config['Queue'])
def on_queue_declared(self, frame):
logging.debug('PikaClient: Queue Declared, Binding Queue')
self.channel.queue_bind(exchange=self.config['Exchange']['exchange'],
queue=self.config['Queue']['queue'],
routing_key=self.config['Binding']['routing_key'],
callback=self.on_queue_bound)
def on_queue_bound(self, frame):
logging.debug('PikaClient: Queue Bound, Issuing Basic Consume!')
self.channel.basic_consume(consumer_callback=self.dispatch,
queue=self.config['Queue']['queue'],
no_ack=True)
def on_basic_cancel(self, frame):
logging.debug('PikaClient: Basic Cancel Ok')
self.connection.close()
def on_closed(self, connection):
# We've closed our pika connection so stop
tornado.ioloop.IOLoop.instance().stop()