Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

Adjust the consumer so that all it does is sending messages to the wo…

…rkers
  • Loading branch information...
pypingou committed May 11, 2016
1 parent 90aca51 commit ffd192f9b615083323e8ea49a5ff5cd888d944d0
Showing with 34 additions and 95 deletions.
  1. +34 −95 fmn/consumer/consumer.py
@@ -9,10 +9,16 @@
import fmn.rules.utils
import backends as fmn_backends

from dogpile.cache import make_region
_cache = make_region(
key_mangler=lambda key: "fmn.consumer:dogpile:" + key
)

from fmn.consumer.util import (
new_packager,
new_badges_user,
get_fas_email,
load_preferences,
)

import logging
@@ -34,66 +40,27 @@ def __init__(self, *args, **kwargs):
if not self.uri:
raise ValueError('fmn.sqlalchemy.uri must be present')

log.debug("Instantiating FMN backends")
backend_kwargs = dict(config=self.hub.config)
self.backends = {
'email': fmn_backends.EmailBackend(**backend_kwargs),
'irc': fmn_backends.IRCBackend(**backend_kwargs),
'android': fmn_backends.GCMBackend(**backend_kwargs),
#'rss': fmn_backends.RSSBackend,
}

# But, disable any of those backends that don't appear explicitly in
# our config.
for key, value in self.backends.items():
if key not in self.hub.config['fmn.backends']:
del self.backends[key]

# Also, check that we don't have something enabled that's not explicit
for key in self.hub.config['fmn.backends']:
if key not in self.backends:
raise ValueError("%r in fmn.backends (%r) is invalid" % (
key, self.hub.config['fmn.backends']))

# If debug is enabled, use the debug backend everywhere
if self.hub.config.get('fmn.backends.debug', False):
for key in self.backends:
log.debug('Setting %s to use the DebugBackend' % key)
self.backends[key] = fmn_backends.DebugBackend(
**backend_kwargs)

log.debug("Loading rules from fmn.rules")
self.valid_paths = fmn.lib.load_rules(root="fmn.rules")

# Initialize our in-memory cache of the FMN preferences database
self.cached_preferences_lock = threading.Lock()
self.cached_preferences = None
# Initialize our dogpile cache of the FMN preferences database
if not _cache.is_configured:
_cache.configure(
**self.hub.config['fmn.rules.cache']
)

session = self.make_session()
self.refresh_cache(session)
self.set_cache(session)
session.close()

log.debug("FMNConsumer initialized")

def refresh_cache(self, session, openid=None):
log.debug("Acquiring cached_preferences_lock")
with self.cached_preferences_lock:
if not openid or self.cached_preferences is None:
log.info("Loading and caching all preferences for all users")
self.cached_preferences = fmn.lib.load_preferences(
session, self.hub.config, self.valid_paths,
cull_disabled=True,
cull_backends=['desktop'])
else:
log.info("Loading and caching preferences for %r" % openid)
old_preferences = [p for p in self.cached_preferences
if p['user']['openid'] == openid]
new_preferences = fmn.lib.load_preferences(
session, self.hub.config, self.valid_paths,
cull_disabled=True, openid=openid,
cull_backends=['desktop'])
self.cached_preferences.extend(new_preferences)
for old_preference in old_preferences:
self.cached_preferences.remove(old_preference)
def set_cache(self, session, openid=None):
log.info("Loading and caching all preferences for all users")
return _cache.get_or_create(
'preferences', load_preferences,
(session, self.hub.config, self.valid_paths)
)

def make_session(self):
return fmn.lib.models.init(self.uri)
@@ -189,53 +156,25 @@ def work(self, session, raw_msg):
# Compute, based on our in-memory cache of preferences, who we think
# should receive this message.

# First, make a thread-local copy of our shared cached prefs
preferences = list(self.cached_preferences)
# Shuffle it so that not all threads step through the list in the same
# order. This should cut down on competition for the dogpile lock when
# getting pkgdb info at startup.
random.shuffle(preferences)
# And do the real work of comparing every rule against the message.
results = fmn.lib.recipients(preferences, msg,
self.valid_paths, self.hub.config)

log.debug("Recipients found %i dt %0.2fs %s %s",
len(results), time.time() - start,
msg['msg_id'], msg['topic'])

# Let's look at the results of our matching operation and send stuff
# where we need to.
for context, recipients in results.items():
if not recipients:
continue
log.debug(" Considering %r with %i recips" % (
context, len(list(recipients))))
backend = self.backends[context]
for recipient in recipients:
user = recipient['user']
pref = fmn.lib.models.Preference.load(
session, user, context)

if not pref.should_batch:
log.debug(" Calling backend %r with %r" % (
backend, recipient))
backend.handle(session, recipient, msg)
else:
log.debug(" Queueing msg for digest")
fmn.lib.models.QueuedMessage.enqueue(
session, user, context, msg)
if ('filter_oneshot' in recipient
and recipient['filter_oneshot']):
log.debug(" Marking one-shot filter as fired")
idx = recipient['filter_id']
fltr = session.query(fmn.lib.models.Filter).get(idx)
fltr.fired(session)
import json
import pika
connection = pika.BlockingConnection()
channel = connection.channel()
channel.queue_declare('workers', durable=True)
channel.basic_publish(
exchange='',
routing_key='workers',
body=json.dumps(raw_msg),
properties=pika.BasicProperties(
delivery_mode=2
)
)
channel.close()
connection.close()

log.debug("Done. %0.2fs %s %s",
time.time() - start, msg['msg_id'], msg['topic'])

def stop(self):
log.info("Cleaning up FMNConsumer.")
for context, backend in self.backends.iteritems():
backend.stop()
super(FMNConsumer, self).stop()

0 comments on commit ffd192f

Please sign in to comment.
You can’t perform that action at this time.