Skip to content

Commit

Permalink
messenger: use stevedore to load named extensions
Browse files Browse the repository at this point in the history
Instead of using custom loaders, lets leverage stevedore to load named
extensions and then do the mapping.
  • Loading branch information
alvarolopez committed Feb 18, 2020
1 parent a330097 commit 015b389
Show file tree
Hide file tree
Showing 10 changed files with 113 additions and 184 deletions.
4 changes: 2 additions & 2 deletions caso/exception.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class RecordVersionNotFound(CasoException):
msg_fmt = "Version %(version)s of accounting record could not be found."


class ClassNotFound(CasoException):
msg_fmt = "Class %(class_name)s could not be found: %(exception)s."
class MessengerNotFound(CasoException):
msg_fmt = "Messengers %(names)s could not be found."


class LogstashConnectionError(CasoException):
Expand Down
117 changes: 0 additions & 117 deletions caso/loadables.py

This file was deleted.

58 changes: 53 additions & 5 deletions caso/loading.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,22 @@

import stevedore

from caso import exception

EXTRACTOR_NAMESPACE = "caso.extractors"
MESSENGER_NAMESPACE = "caso.messenger"


def _get_names(what):
mgr = stevedore.ExtensionManager(namespace=what)
return frozenset(mgr.names())


def _get(what):
mgr = stevedore.ExtensionManager(namespace=what,
propagate_map_exceptions=True)

return dict(mgr.map(lambda ext: (ext.entry_point.name, ext.plugin)))


def get_available_extractor_names():
Expand All @@ -25,8 +40,7 @@ def get_available_extractor_names():
:returns: A list of names.
:rtype: frozenset
"""
mgr = stevedore.ExtensionManager(namespace=EXTRACTOR_NAMESPACE)
return frozenset(mgr.names())
return _get_names(EXTRACTOR_NAMESPACE)


def get_available_extractors():
Expand All @@ -36,7 +50,41 @@ def get_available_extractors():
as the value.
:rtype: dict
"""
mgr = stevedore.ExtensionManager(namespace=EXTRACTOR_NAMESPACE,
propagate_map_exceptions=True)
return _get(EXTRACTOR_NAMESPACE)

return dict(mgr.map(lambda ext: (ext.entry_point.name, ext.plugin)))

def get_available_messenger_names():
"""Get the names of all the messengers that are available on the system.
:returns: A list of names.
:rtype: frozenset
"""
return _get_names(MESSENGER_NAMESPACE)


def get_available_messengers():
"""Retrieve all the messengers available on the system.
:returns: A dict with the entrypoint name as the key and the messenger
as the value.
:rtype: dict
"""
return _get(MESSENGER_NAMESPACE)


def get_enabled_messengers(names):
"""Retrieve all the enabled messengers on the system.
:returns: An extension manager.
"""

def cb(names):
raise exception.MessengerNotFound(names=",".join(list(names)))

mgr = stevedore.NamedExtensionManager(namespace=MESSENGER_NAMESPACE,
names=names,
name_order=True,
on_missing_entrypoints_callback=cb,
invoke_on_load=True,
propagate_map_exceptions=True)
return mgr
33 changes: 22 additions & 11 deletions caso/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,25 @@
from oslo_config import cfg

import caso.extract.manager
from caso import loading
import caso.messenger
from caso import utils

opts = [
cfg.ListOpt('messengers',
default=['caso.messenger.noop.NoopMessenger'],
help='List of messenger that will dispatch records. '
'valid values are %s' %
["%s.%s" % (i.__module__, i.__name__)
for i in caso.messenger.all_managers()]),
cfg.StrOpt('spooldir',
default='/var/spool/caso',
help='Spool directory.'),
cfg.ListOpt(
'messengers',
default=['noop'],
help='List of messengers that will dispatch records. '
'valid values are {}. You can specify more than '
'one messenger.'.format(
",".join(loading.get_available_messenger_names())
)
),
cfg.StrOpt(
'spooldir',
default='/var/spool/caso',
help='Spool directory.'
),
]

override_lock = cfg.StrOpt(
Expand Down Expand Up @@ -64,12 +70,17 @@ class Manager(object):
def __init__(self):
utils.makedirs(CONF.spooldir)

self.extractor_manager = caso.extract.manager.Manager()
self.messenger = caso.messenger.Manager()
self.extractor_manager = None
self.messenger = None

self.lock_path = CONF.lock_path

def run(self):
# Load the managers here to have the config options loaded and
# available
self.extractor_manager = caso.extract.manager.Manager()
self.messenger = caso.messenger.Manager()

@lockutils.synchronized("caso_should_not_run_in_parallel",
lock_path=self.lock_path, external=True)
def synchronized():
Expand Down
43 changes: 17 additions & 26 deletions caso/messenger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import six

from caso import exception
from caso import loadables
from caso import loading

CONF = cfg.CONF

Expand All @@ -35,31 +35,22 @@ def push(self, records):
"""Push the records."""


class Manager(loadables.BaseLoader):
class Manager(object):
def __init__(self):
super(Manager, self).__init__(BaseMessenger)
self.messengers = None

def _load_messengers(self):
self.messengers = [i()
for i in self.get_matching_classes(CONF.messengers)]
try:
self.mgr = loading.get_enabled_messengers(CONF.messengers)
except Exception as e:
# Capture exception so that we can continue working
LOG.error(e)
raise e

def push_to_all(self, records):
if self.messengers is None:
self._load_messengers()

for m in self.messengers:
try:
m.push(records)
except exception.RecordVersionNotFound:
# Oops, a messenger is using a weird version, stop working
LOG.error("Messenger '%s' is using an unknown "
"record version" % m.__class__.__name__)
raise
except Exception as e:
# Capture exception so that we can continue working
LOG.error(e)


def all_managers():
return Manager().get_all_classes()
try:
self.mgr.map_method("push", records)
except exception.RecordVersionNotFound as e:
# Oops, a messenger is using a weird version, stop working
LOG.error("Messenger is using an unknown record version")
raise e
except Exception as e:
# Capture exception so that we can continue working
LOG.error(e)
6 changes: 3 additions & 3 deletions caso/messenger/ssm.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
CONF.register_opts(opts, group="ssm")


__all__ = ["SsmMessager", "SSMMessengerV02", "SSMMessengerV04"]
__all__ = ["SsmMessenger", "SSMMessengerV02", "SSMMessengerV04"]


@six.add_metaclass(abc.ABCMeta)
Expand Down Expand Up @@ -88,11 +88,11 @@ class SSMMessengerV04(_SSMBaseMessenger):
version = "0.4"


class SsmMessager(SSMMessengerV02):
class SsmMessenger(SSMMessengerV02):
def __init__(self):
msg = ("Using deprecated caso.messenger.ssm.SsmMessager, "
"please use caso.messenger.ssm.SSMMessengerV02 if you "
"wish to continue usinf the 0.2 version of the record, "
"or refer to the cASO documentation.")
warnings.warn(msg, DeprecationWarning)
super(SsmMessager, self).__init__()
super(SsmMessenger, self).__init__()
9 changes: 0 additions & 9 deletions caso/tests/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,3 @@ def tearDown(self):
p.stop()

super(TestCasoManager, self).tearDown()

def test_dry_run(self):
self.flags(dry_run=True)
# NOTE(aloga): cannot patch a property of an instance, see
# https://code.google.com/p/mock/issues/detail?id=117
mngr = manager.Manager()
mngr.messenger.push_to_all.assert_not_called()
mngr.run()
self.assertFalse(mngr.messenger.push_to_all.called)
13 changes: 8 additions & 5 deletions doc/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,14 @@ of every option. You should check at least the following options:
a site. This is used if you have several endpoints within your site.
* ``projects`` (list value, default empty). List of the projects to extract
records from.
* ``messengers`` (list, default: ``caso.messenger.noop.NoopMessenger``). List
of the messengers to publish data to. Valid messenges are:
* ``caso.messenger.ssm.SSMMessengerV02`` for publishing APEL V0.2 records.
* ``caso.messenger.ssm.SSMMessengerV04`` for publishing APEL V0.4 records.
* ``caso.messenger.logstash.LogstashMessenger`` for publishing to Logstash.
* ``messengers`` (list, default: ``noop``). List of the messengers to publish
data to. Records will be pushed to all these messengers, in order. Valid
messengers shipped with cASO are:
* ``ssm`` for publishing APEL V0.4 records.
* ``logstash`` for publishing to Logstash.
* ``noop`` do nothing at all.
Note that there might be other messengers available in the system if they are
registered into the ``caso.messenger`` entry point namespace.
* ``mapping_file`` (default: ``/etc/caso/voms.json``). File containing the
mapping from VOs to local projects as configured in Keystone-VOMS, in the
form::
Expand Down
9 changes: 3 additions & 6 deletions etc/caso/caso.conf.sample
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,9 @@
# From caso
#

# List of messenger that will dispatch records. valid values are
# ['caso.messenger.noop.NoopMessenger',
# 'caso.messenger.logstash.LogstashMessenger',
# 'caso.messenger.ssm.SSMMessengerV02', 'caso.messenger.ssm.SSMMessengerV04',
# 'caso.messenger.ssm.SsmMessager'] (list value)
#messengers = caso.messenger.noop.NoopMessenger
# List of messengers that will dispatch records. valid values are
# ssm,logstash,noop. You can specify more than one messenger. (list value)
#messengers = noop

# Spool directory. (string value)
#spooldir = /var/spool/caso
Expand Down
Loading

0 comments on commit 015b389

Please sign in to comment.