Skip to content

Commit

Permalink
Split DigestProducer into a separate process
Browse files Browse the repository at this point in the history
This will make digestproducer be in a separate process where isntead
of calling backends directly, it submits messages to the queue which
indicate the backend and method to call, and the arguments to pass.

Signed-off-by: Patrick Uiterwijk <puiterwijk@redhat.com>
  • Loading branch information
puiterwijk committed Sep 27, 2016
1 parent 87e9e7f commit 7e4ac43
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 0 deletions.
20 changes: 20 additions & 0 deletions fmn/consumer/backend.py
Expand Up @@ -139,6 +139,7 @@ def read(queue_object):

data = json.loads(body)
topic = data.get('topic', '')
method = data.get('method', '')

if '.fmn.' in topic:
openid = data['body']['msg']['openid']
Expand All @@ -149,6 +150,25 @@ def read(queue_object):
ch.basic_ack(delivery_tag=method.delivery_tag)
return

if method:
print "Got a method call to %s" % method
if method == 'handle':
backend = backends[data['backend']]
backend.handle(session,
data['recipient'],
data['msg'],
data['streamline'])
ch.basic_ack(delivery_tag=method.delivery_tag)
elif method == 'handle_batch':
backend = backends[data['backend']]
backend.handle_batch(session,
data['recipient'],
data['queued_messages'])
ch.basic_ack(delivery_tag=method.delivery_tag)
else:
print "Unknown method"
return

recipients, context, raw_msg = \
data['recipients'], data['context'], data['raw_msg']['body']

Expand Down
130 changes: 130 additions & 0 deletions fmn/consumer/digests.py
@@ -0,0 +1,130 @@
# FMN worker calculating and sending digests


import json
import logging
import time
import random

import pika
import fedmsg
import fedmsg.meta
import fedmsg_meta_fedora_infrastructure

from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task

import fmn.lib
import fmn.rules.utils
import fmn.consumer.backends as fmn_backends
import fmn.consumer.producer as fmn_producers

import fmn.consumer.fmn_fasshim
from fedmsg_meta_fedora_infrastructure import fasshim

log = logging.getLogger("fmn")
log.setLevel('DEBUG')
CONFIG = fedmsg.config.load_config()
fedmsg.meta.make_processors(**CONFIG)

DB_URI = CONFIG.get('fmn.sqlalchemy.uri', None)
session = fmn.lib.models.init(DB_URI)

fmn.consumer.fmn_fasshim.make_fas_cache(**CONFIG)
# Monkey patch fedmsg_meta modules
fasshim.nick2fas = fmn.consumer.fmn_fasshim.nick2fas
fasshim.email2fas = fmn.consumer.fmn_fasshim.email2fas
fedmsg_meta_fedora_infrastructure.supybot.nick2fas = \
fmn.consumer.fmn_fasshim.nick2fas
fedmsg_meta_fedora_infrastructure.anitya.email2fas = \
fmn.consumer.fmn_fasshim.email2fas
fedmsg_meta_fedora_infrastructure.bz.email2fas = \
fmn.consumer.fmn_fasshim.email2fas
fedmsg_meta_fedora_infrastructure.mailman3.email2fas = \
fmn.consumer.fmn_fasshim.email2fas
fedmsg_meta_fedora_infrastructure.pagure.email2fas = \
fmn.consumer.fmn_fasshim.email2fas

CNT = 0

log.debug("Instantiating FMN digest producer")
backend_kwargs = dict(config=CONFIG)


OPTS = pika.ConnectionParameters(
heartbeat_interval=0,
retry_delay=2,
)
connection = pika.BlockingConnection(OPTS)



class FakeBackend(object):
def inform_workers(self, body):
queue = 'backends'
chan = self.connection.channel()
chan.exchange_declare(exchange=queue)
chan.queue_declare(queue, durable=True)

body['backend'] = self.name

chan.basic_publish(
exchange='',
routing_key=queue,
body=json.dumps(body),
properties=pika.BasicProperties(
delivery_mode=2
)
)
chan.close()

def __init__(self, name, connection):
self.name = name
self.connection = connection

def handle(self, session, recipient, msg, streamline=False):
self.inform_workers({
'method': 'handle',
'recipient': recipient,
'msg': msg,
'streamline': steamline})

def handle_batch(self, session, recipient, queued_messages):
self.inform_workers({
'method': 'handle_batch',
'recipient': recipient,
'queued_messages': queued_messages})

backends = {
'email': FakeBackend('email', connection),
'irc': FakeBackend('irc', connection),
'android': FakeBackend('android', connection),
#'rss': fmn_backends.RSSBackend,
}

# But, disable any of those backends that don't appear explicitly in
# our config.
for key, value in backends.items():
if key not in CONFIG['fmn.backends']:
del backends[key]

# Also, check that we don't have something enabled that's not explicit
for key in CONFIG['fmn.backends']:
if key not in backends:
raise ValueError("%r in fmn.backends (%r) is invalid" % (
key, CONFIG['fmn.backends']))


frequency = CONFIG.get('fmn.digest_frequency', 10)
digest_producer = fmn_producers.DigestProducer(
session, backends)
lc4 = task.LoopingCall(digest_producer.work)
lc4.start(frequency)


try:
print 'Starting producing'
reactor.run()
finally:
connection.close()
session.close()
14 changes: 14 additions & 0 deletions systemd/fmn-digests@.service
@@ -0,0 +1,14 @@
[Unit]
Description=FMN.consumer digester #%i
After=network.target
Documentation=https://github.com/fedora-infra/fmn.consumer/

[Service]
ExecStart=/usr/bin/python2 /usr/lib/python2.7/site-packages/fmn/consumer/digests.py
Type=simple
User=root
Group=root
Restart=on-failure

[Install]
WantedBy=multi-user.target

0 comments on commit 7e4ac43

Please sign in to comment.