Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

Fix listening to two exchanges allowing to refresh the prefs when needed

  • Loading branch information...
pypingou committed Jun 9, 2016
1 parent a9f284f commit ef3a661809a7dbd84e2cc1c813e79af3069e48c4
Showing with 49 additions and 34 deletions.
  1. +49 −34 fmn/consumer/worker.py
@@ -25,6 +25,7 @@
fedmsg.meta.make_processors(**CONFIG)

DB_URI = CONFIG.get('fmn.sqlalchemy.uri', None)
session = fmn.lib.models.init(DB_URI)

from dogpile.cache import make_region
_cache = make_region(
@@ -56,14 +57,12 @@ def update_preferences(openid, prefs):
log.info("Loading and caching preferences for %r" % openid)
old_preferences = [
p for p in prefs if p['user']['openid'] == openid]
session = fmn.lib.models.init(DB_URI)
new_preferences = fmn.lib.load_preferences(
session, CONFIG, valid_paths,
cull_disabled=True,
openid=openid,
cull_backends=['desktop']
)
session.close()
prefs.extend(new_preferences)
for old_preference in old_preferences:
prefs.remove(old_preference)
@@ -89,14 +88,25 @@ def update_preferences(openid, prefs):
fedmsg_meta_fedora_infrastructure.pagure.email2fas = \
fmn.consumer.fmn_fasshim.email2fas

queue = 'workers'
connection = pika.BlockingConnection(OPTS)
channel = connection.channel()
channel.exchange_declare(exchange=queue, type='fanout')
ch = channel.queue_declare(queue, durable=True)
channel.queue_bind(exchange=queue, queue=queue)
print 'started at', ch.method.message_count
SEEN = []
def inform_workers(raw_msg, context, recipients):
chan = connection.channel()
chan.exchange_declare(exchange='backends', type='direct')
q = chan.queue_declare(queue, durable=True)
for i in range(q.method.consumer_count):
chan.basic_publish(
exchange='backends',
routing_key='',
body=json.dumps({
'context': context,
'recipients': recipients,
'raw_msg': raw_msg,
}),
properties=pika.BasicProperties(
delivery_mode=2
)
)
chan.close()


def callback(ch, method, properties, body):
start = time.time()
@@ -107,18 +117,13 @@ def callback(ch, method, properties, body):
topic, msg = raw_msg['topic'], raw_msg['body']
print topic

if msg['msg_id'] in SEEN:
print 'Already seen %s' % msg['msg_id']
channel.basic_cancel(delivery_tag=method.delivery_tag)
return

# If the user has tweaked their preferences on the frontend, then
# invalidate our entire in-memory cache of the fmn preferences
# database.
if '.fmn.' in topic:
openid = msg['msg']['openid']
PREFS = update_preferences(openid, PREFS)
if topic == 'consumer.fmn.prefs.update':
if topic == 'consumer.fmn.prefs.update': # msg from the consumer
log.debug(
"Done with refreshing prefs. %0.2fs %s",
time.time() - start,msg['topic'])
@@ -145,32 +150,41 @@ def callback(ch, method, properties, body):
if not recipients:
continue

chan = connection.channel()
chan.exchange_declare(exchange=queue, type='fanout')
q = chan.queue_declare(queue, durable=True)
for i in range(q.method.consumer_count):
chan.basic_publish(
exchange='backends',
routing_key='',
body=json.dumps({
'context': context,
'recipients': recipients,
'raw_msg': raw_msg,
}),
properties=pika.BasicProperties(
delivery_mode=2
)
)
chan.close()
for attempt in range(3):
try:
inform_workers(raw_msg, context, recipients)
break
except:
if attmpt == 2:
raise

ch.basic_ack(delivery_tag=method.delivery_tag)

log.debug("Done. %0.2fs %s %s",
time.time() - start, msg['msg_id'], msg['topic'])


connection = pika.BlockingConnection(OPTS)

queue = 'refresh'
channel = connection.channel()
channel.exchange_declare(exchange=queue, type='fanout')
refresh_q = channel.queue_declare(exclusive=True)
refresh_q_name = refresh_q.method.queue
channel.queue_bind(exchange=queue, queue=refresh_q_name)

queue = 'workers'
channel.exchange_declare(exchange=queue, type='direct')
workers_q = channel.queue_declare(queue, durable=True)
channel.queue_bind(exchange=queue, queue=queue)

print 'started at %s workers' % workers_q.method.message_count
print 'started at %s refresh' % refresh_q.method.message_count

# Make sure we leave any other messages in the queue
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='workers')
channel.basic_consume(callback, queue=queue)
channel.basic_consume(callback, queue=refresh_q_name)


try:
@@ -179,5 +193,6 @@ def callback(ch, method, properties, body):
except KeyboardInterrupt:
channel.cancel()
connection.close()
session.close()
finally:
print '%s tasks proceeded' % CNT

0 comments on commit ef3a661

Please sign in to comment.
You can’t perform that action at this time.