Skip to content

Commit

Permalink
Merge pull request #169 from fedora-infra/requeue
Browse files Browse the repository at this point in the history
requeue
  • Loading branch information
jeremycline committed Feb 22, 2017
2 parents d46464e + 6e2d66a commit e44b359
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 19 deletions.
5 changes: 4 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ python:
- "3.4"
- "3.5"

before_install:
- sudo apt-get -qq update
- sudo apt-get install -y swig

install:
- sudo apt-get install swig -y
- pip install --upgrade setuptools pip
- pip install -r dev-requirements.txt
- pip install -e .
Expand Down
63 changes: 45 additions & 18 deletions fmn/consumer/backend.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# FMN worker figuring out for a fedmsg message the list of recipient and
# contexts
# -*- coding: utf-8 -*-
"""
The FMN backend is a `Twisted`_ application that consumes messages that have
been queued by FMN workers and sends them to the user over the configured
medium (IRC, email, etc) by using "backends".
.. _Twisted: https://twistedmatrix.com/
"""
from __future__ import print_function


Expand Down Expand Up @@ -139,7 +145,6 @@ def read(queue_object):

data = json.loads(body)
topic = data.get('topic', '')
function = data.get('function', '')

if '.fmn.' in topic:
openid = data['body']['msg']['openid']
Expand All @@ -150,24 +155,46 @@ def read(queue_object):
ch.basic_ack(delivery_tag=method.delivery_tag)
return

# This is a special handler for messages placed in the ``backends`` queue by
# the :mod:`fmn.consumer.digests` module.

# This module places messages in the queue in two different formats. The
# first format is used when there is only one message in the digest, and
# the second is used when there are multiple messages in the digest.

# This is in dire need of refactoring, but for the moment this is used to
# check to see if this is a special message by checking for the existence of
# the ``function`` key which has a string value set to either ``handle`` or
# ``handle_batch``, and then calling the appropriate backend function.
function = data.get('function', '')
if function:
print("Got a function call to %s" % function)
if function == 'handle':
backend = backends[data['backend']]
backend.handle(session,
data['recipient'],
data['msg'],
data['streamline'])
ch.basic_ack(delivery_tag=method.delivery_tag)
elif function == 'handle_batch':
backend = backends[data['backend']]
backend.handle_batch(session,
data['recipient'],
data['queued_messages'])
try:
if function == 'handle':
backend = backends[data['backend']]
backend.handle(session,
data['recipient'],
data['msg'],
data['streamline'])
ch.basic_ack(delivery_tag=method.delivery_tag)
elif function == 'handle_batch':
backend = backends[data['backend']]
backend.handle_batch(session,
data['recipient'],
data['queued_messages'])
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
print("Unknown function")
return
except Exception:
# Republishing the message will place the message at the back of
# work queue so the message doesn't get lost, but also doesn't hold
# up new messages.
logging.exception('Message failed, requeueing')
ch.basic_publish(exchange='', routing_key='backends', body=body,
properties=pika.BasicProperties(delivery_mode=2))
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
print("Unknown function")
return
return

recipients, context, raw_msg = \
data['recipients'], data['context'], data['raw_msg']['body']
Expand Down

0 comments on commit e44b359

Please sign in to comment.