-
Notifications
You must be signed in to change notification settings - Fork 0
/
outbound_consumer.py
57 lines (48 loc) · 1.75 KB
/
outbound_consumer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from celery.utils.log import get_task_logger
from celery import Celery
from celery import bootsteps
from kombu import Consumer, Queue
from utils import LocalConfigParser
from Db import Db
import json
rabbit_configs = LocalConfigParser.parse_configs("RABBIT")
sdp_configs = LocalConfigParser.parse_configs("SDP")
print "CONFIGS", rabbit_configs
host = rabbit_configs['rabbithost']
username = rabbit_configs['rabbitusername']
password = rabbit_configs['rabbitpassword']
port = rabbit_configs['rabbitport']
vhost = rabbit_configs['rabbitvhost']
sdp_url = sdp_configs["url"]
bulk_url = sdp_configs["bulk_url"]
BROKER_URL = 'amqp://{user}:{password}@{host}:{port}//'.format(
user=username,
password=password,
host=host,
port=port
)
app = Celery('consumer', broker=BROKER_URL)
logger = get_task_logger(__name__)
queue_list = ['pedQueue']
class AlertConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
consumers = []
for q in queue_list:
queue = Queue(q, routing_key=q)
consumers.append(Consumer(channel, queues=[queue],
callbacks=[self.on_message]))
return consumers
def on_message(self, body, message):
try:
print "GOT MESSAGE %r " % message
logger.info("Began consume message :::: got body :: %r " % body)
db = Db(logger)
result = db.terminate_sms_operator(json.loads(body))
logger.info("message terminate result :: %r " % result.status_code)
if result.status_code == 200:
message.ack()
message.ack()
except Exception, e:
message.requeue()
logger.info("Exception on message consume:: %r " % e)
app.steps['consumer'].add(AlertConsumerStep)