Skip to content

Commit

Permalink
Merge pull request #171 from jeremycline/document-consumer
Browse files Browse the repository at this point in the history
Document the FMN consumer package
  • Loading branch information
jeremycline committed Feb 24, 2017
2 parents e44b359 + 3ac6112 commit 642ed4b
Show file tree
Hide file tree
Showing 5 changed files with 239 additions and 24 deletions.
20 changes: 20 additions & 0 deletions fmn/consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,23 @@
"""
The FMN consumer package contains all the service that process the incoming
`fedmsg`_ messages. The messages are processed in three steps:
1. The `fedmsg consumer`_ defined in :mod:`fmn.consumer.consumer` subscribes
to every fedmsg topic (``*``). It places these messages into a RabbitMQ
message queue called ``worker``. This message broker provides message
durability for FMN as it processes the messages.
2. One or more :mod:`fmn.consumer.worker` processes are started and consume
messages from the ``worker`` message queue. These worker processes take
each message and determine who should receive notifications based on their
message filters. It then records this information in the message and
publishes the message to the ``backend`` queue.
3. A single :mod:`fmn.consumer.backend` process is run and is responsible for
sending the messages to users via IRC, email, etc. It consumes the messages
from the ``backend`` queue and dispatches them. It defines a backend interface
and new backends can be added to allow for new message mediums.
"""
from .consumer import FMNConsumer # noqa
from .producer import ConfirmationProducer # noqa
from .producer import DigestProducer # noqa
34 changes: 34 additions & 0 deletions fmn/consumer/backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@


def get_preferences():
"""
Load all preferences from the FMN database and return them.
Returns:
dict: A big dictionary of user preferences.
"""
print('get_preferences')
prefs = {}
for p in session.query(fmn.lib.models.Preference).all():
Expand All @@ -103,6 +109,17 @@ def get_preferences():


def update_preferences(openid, prefs):
"""
Update an existing preference dictionary loaded by :func:`get_preferences`
with the latest preferences for the provided openid.
Args:
openid (str): The openid of the user to fetch the new preferences for.
prefs (dict): The dictionary of existing preferences.
Returns:
dict: A big dictionary of user preferences.
"""
print("Refreshing preferences for %r" % openid)
for p in fmn.lib.models.Preference.by_user(session, openid):
prefs['%s__%s' % (p.openid, p.context_name)] = p
Expand All @@ -111,6 +128,15 @@ def update_preferences(openid, prefs):

@defer.inlineCallbacks
def run(connection):
"""
Ensure the various exchanges and queues are configured and set up a looping
call in the reactor for :func:`.read` on the ``refresh`` and ``backends``
queues.
Args:
connection (twisted_connection.TwistedProtocolConnection): The Pika
RabbitMQ connection to use.
"""

channel = yield connection.channel()
yield channel.basic_qos(prefetch_count=1)
Expand All @@ -135,7 +161,15 @@ def run(connection):

@defer.inlineCallbacks
def read(queue_object):
"""
Read a single message from the queue and dispatch it to the proper backend.
This is meant to be used with the looping call functionality of Twisted.
Args:
queue_object (twisted_connection.ClosableDeferredQueue): A queue of
messages to consume.
"""
ch, method, properties, body = yield queue_object.get()

global CNT, PREFS
Expand Down
72 changes: 71 additions & 1 deletion fmn/consumer/consumer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
# An example fedmsg koji consumer
"""
This is a `fedmsg consumer`_ that subscribes to every topic on the message bus
it is connected to. It has two tasks. The first is to place all incoming
messages into a RabbitMQ message queue. The second is to manage the FMN caches.
FMN makes heavy use of caches since it needs to know who owns what packages and
what user notification preferences are, both of which require expensive API
queries to `FAS`_, `pkgdb`_, or the database.
.. _fedmsg consumer: http://www.fedmsg.com/en/latest/consuming/#the-hub-consumer-approach
.. _FAS: https://admin.fedoraproject.org/accounts/
.. _pkgdb: https://admin.fedoraproject.org/pkgdb/
"""

import datetime
import logging
Expand Down Expand Up @@ -26,6 +38,29 @@


def notify_prefs_change(openid):
"""
Publish a message to a fanout exchange notifying consumers about preference
updates.
Consumers can use these messages to refresh the process-local caches on a
user-by-user basis. To recieve these messages, just bind the RabbitMQ
queue you're using to the ``refresh`` fanout exchange.
Messages are JSON-serialized and UTF-8 encoded.
Example::
{
"topic": "consumer.fmn.prefs.update",
"body": {
"topic": "consumer.fmn.prefs.update",
"msg_id": "<year>-<random-uuid>",
"msg": {
"openid": "<user's openid who updated their preferences>"
}
}
}
"""
import json
connection = pika.BlockingConnection(OPTS)
msg_id = '%s-%s' % (datetime.datetime.utcnow().year, uuid.uuid4())
Expand Down Expand Up @@ -55,6 +90,16 @@ def notify_prefs_change(openid):


class FMNConsumer(fedmsg.consumers.FedmsgConsumer):
"""
A `fedmsg consumer`_ that subscribes to all topics and re-publishes all
messages to the ``workers`` exchange.
Attributes:
topic (str): The topics this consumer is subscribed to. Set to ``*``
(all topics).
config_key (str): The key to set to ``True`` in the fedmsg config to
enable this consumer. The key is ``fmn.consumer.enabled``.
"""
topic = '*'
config_key = 'fmn.consumer.enabled'

Expand All @@ -80,9 +125,23 @@ def __init__(self, *args, **kwargs):
log.debug("FMNConsumer initialized")

def make_session(self):
"""
Initialize the database session and return it.
Returns:
sqlalchemy.orm.scoping.scoped_session: An SQLAlchemy scoped session.
Calling it returns the current Session, creating it using the
scoped_session.session_factory if not present.
"""
return fmn.lib.models.init(self.uri)

def consume(self, raw_msg):
"""
This method is called when a message arrives on the fedmsg bus.
Args:
raw_msg (dict): The raw fedmsg deserialized to a Python dictionary.
"""
session = self.make_session()
try:
self.work(session, raw_msg)
Expand All @@ -92,6 +151,14 @@ def consume(self, raw_msg):
raise

def work(self, session, raw_msg):
"""
This method is called when a message arrives on the fedmsg bus by the
:meth:`.consume` method.
Args:
session (sqlalchemy.orm.session.Session): The SQLAlchemy session to use.
raw_msg (dict): The raw fedmsg deserialized to a Python dictionary.
"""
topic, msg = raw_msg['topic'], raw_msg['body']

for suffix in self.junk_suffixes:
Expand Down Expand Up @@ -196,5 +263,8 @@ def work(self, session, raw_msg):
time.time() - start, msg['msg_id'], msg['topic'])

def stop(self):
"""
Gracefully halt this fedmsg consumer.
"""
log.info("Cleaning up FMNConsumer.")
super(FMNConsumer, self).stop()
53 changes: 52 additions & 1 deletion fmn/consumer/digests.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
# FMN worker calculating and sending digests
"""
This module is responsible for creating message digests.
Users can opt to receive digests based on time or the number of messages. When
a user has opted to receive a digest, the messages are placed in the database
by the :mod:`fmn.consumer.backend` Twisted app. This then queries the database
and creates the digest messages and then re-publishes those messages to the
``backend`` exchange. The :mod:`fmn.consumer.backend` application then receives
the messages _again_ and sends them.
"""
from __future__ import print_function


Expand Down Expand Up @@ -56,7 +65,26 @@


class FakeBackend(object):
"""
This class implements part of the backend interface defined in the
:class:`fmn.backends.base.BaseBackend` class.
Used in conjunction with :class:`fmn_producers.DigestProducer`, this
will send batch messages to the ``backends`` message queue.
Args:
name (str): The backend name. This should correspond to a real backend
name.
connection (pika.BlockingConnection): The connection to use when
communicating with RabbitMQ.
"""
def inform_workers(self, body):
"""
Queue a message in RabbitMQ.
Args:
body (dict): A JSON-serializable dictionary to send.
"""
queue = 'backends'
chan = self.connection.channel()
chan.exchange_declare(exchange=queue)
Expand All @@ -79,13 +107,36 @@ def __init__(self, name, connection):
self.connection = connection

def handle(self, session, recipient, msg, streamline=False):
"""
Called when a digest is composed of a single message.
This occurs when the time between digests has been reached and there
is only one message to send. In those cases, we should send it like a
normal message.
Args:
session (None): An unused argument.
recipient (dict): The recipient of the message and their settings.
msg (dict): The original message body.
streamline (bool): If false, it triggers a call to
:meth:`fmn.consumer.producer.DigestProducer.manage_batch` in
IRC backend and only the IRC backend.
"""
self.inform_workers({
'function': 'handle',
'recipient': recipient,
'msg': msg,
'streamline': streamline})

def handle_batch(self, session, recipient, queued_messages):
"""
Called when a digest has more than one message in it.
Args:
session (None): An unused argument.
recipient (dict): The recipient of the message and their settings.
queued_messages (list): List of the original message bodies.
"""
self.inform_workers({
'function': 'handle_batch',
'recipient': recipient,
Expand Down
84 changes: 62 additions & 22 deletions fmn/consumer/worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,16 @@
# FMN worker figuring out for a fedmsg message the list of recipient and
# contexts
"""
This is a runnable Python module that uses Twisted to consume AMQP messages
processed by the :class:`fmn.consumer.consumer.FMNConsumer` `fedmsg consumer`_.
It determines a list of recipients and contexts for each fedmsg and constructs
a new message using the initial fedmsg and the recipient and context list.
It then publishes these messages to the ``backends`` exchange which is consumed
by the :mod:`fmn.consumer.backend` module's Twisted application.
.. _fedmsg consumer:
http://www.fedmsg.com/en/latest/consuming/#the-hub-consumer-approach
"""
from __future__ import print_function

import json
Expand Down Expand Up @@ -41,6 +52,12 @@


def get_preferences():
"""
Load all preferences from the FMN database and return them.
Returns:
dict: A big dictionary of user preferences.
"""
print('get_preferences')
session = fmn.lib.models.init(DB_URI)
prefs = fmn.lib.load_preferences(
Expand All @@ -54,6 +71,17 @@ def get_preferences():


def update_preferences(openid, prefs):
"""
Update an existing preference dictionary loaded by :func:`get_preferences`
with the latest preferences for the provided openid.
Args:
openid (str): The openid of the user to fetch the new preferences for.
prefs (dict): The dictionary of existing preferences.
Returns:
dict: A big dictionary of user preferences.
"""
log.info("Loading and caching preferences for %r" % openid)
old_preferences = [
p for p in prefs if p['user']['openid'] == openid]
Expand Down Expand Up @@ -93,26 +121,28 @@ def update_preferences(openid, prefs):

def inform_workers(raw_msg, context, recipients):
"""
Publish a message to the backends exchange for workers to consume
:param raw_msg: The original fedmsg that triggered this event.
:type raw_msg: dict
:param context: The type of backend to use (e.g. 'irc' or 'sse')
:type context: str
:param recipients: A list of recipients. The recipient is a dictionary
in the format:
{
"triggered_by_links": true,
"None": "sse-jcline.id.fedoraproject.org",
"markup_messages": false,
"user": "jcline.id.fedoraproject.org",
"filter_name": "hose",
"filter_oneshot": false,
"filter_id": 7,
"shorten_links": false,
"verbose": true,
}
:type recipients: list of dict
Publish a message to the backends exchange for the backend to send to
users.
Args:
raw_msg (dict): The original fedmsg that triggered this event.
context (str): The type of backend to use (e.g. 'irc' or 'sse')
recipients (list): A list of recipients. The recipient is a dictionary
in the format::
{
"triggered_by_links": true,
"None": "sse-jcline.id.fedoraproject.org",
"markup_messages": false,
"user": "jcline.id.fedoraproject.org",
"filter_name": "hose",
"filter_oneshot": false,
"filter_id": 7,
"shorten_links": false,
"verbose": true,
}
The values of these keys will vary based on user settings.
"""
queue = 'backends'
chan = connection.channel()
Expand All @@ -136,6 +166,16 @@ def inform_workers(raw_msg, context, recipients):


def callback(ch, method, properties, body):
"""
The callback attached to the Pika consumer.
This callback is called when a new message is pushed to the consumer by
RabbitMQ. The message is from the :class:`fmn.consumer.consumer.FMNConsumer`
and its format will either be the raw fedmsg or a notification to update the
local caches. This message is dispatched by
:func:`fmn.consumer.consumer.notify_prefs_change` and its format is documented
on the function.
"""
start = time.time()

global CNT, connection, PREFS
Expand Down

0 comments on commit 642ed4b

Please sign in to comment.