Skip to content

Commit

Permalink
fix(queues): Global config used everywhere
Browse files Browse the repository at this point in the history
This patch modifies the queues code to use manually instantiated configs
rather than the global config. This will allow for more flexibility in
configuring components. For example, the up-and-coming sharding feature
can construct configs based on the shard catalog rather than the local
INI file. Also, this change will make testing easier.

Also in this patch, the SQLite driver schema was consolidated into
one place in preparation for the sharding patch. With this change,
controllers can be instantiated and used independently, since all
the tables are set up when the driver is loaded, not in a piecemeal
fashion as before.

Change-Id: I1afa8ab8c7e6dd9a017f4a9e3a3b1fadbeb32806
Implements: blueprint remove-global-config
Closes-Bug: #1239725
  • Loading branch information
kgriffs committed Oct 16, 2013
1 parent c2c3a20 commit c67d5a4
Show file tree
Hide file tree
Showing 40 changed files with 431 additions and 389 deletions.
20 changes: 15 additions & 5 deletions marconi/common/transport/wsgi/helpers.py
Expand Up @@ -19,7 +19,7 @@
import six

import marconi.openstack.common.log as logging
from marconi.queues.transport import validation as validate
from marconi.queues.transport import validation


LOG = logging.getLogger(__name__)
Expand All @@ -46,22 +46,32 @@ def extract_project_id(req, resp, params):
and retry.'''))


def validate_queue_name(req, resp, params):
"""Hook for validating the queue name sepecified in requests.
def validate_queue_name(validate, req, resp, params):
"""Hook for validating the queue name specified in a request.
Validation is short-circuited if 'queue_name' does not
exist in `params`.
This hook depends on the `get_project` hook, which must be
installed upstream.
:param validate: A validator function that will
be used to check the queue name against configured
limits. functools.partial or a closure must be used to
set this first arg, and expose the remaining ones as
a Falcon hook interface.
:param req: Falcon request object
:param resp: Falcon response object
:param params: Responder params dict
"""

try:
validate.queue_name(params['queue_name'])
validate(params['queue_name'])
except KeyError:
# NOTE(kgriffs): queue_name not in params, so nothing to do
pass
except validate.ValidationFailed as ex:
except validation.ValidationFailed as ex:
project = params['project_id']
queue = params['queue_name'].decode('utf-8', 'replace')

Expand Down
1 change: 1 addition & 0 deletions marconi/proxy/transport/wsgi/driver.py
Expand Up @@ -39,6 +39,7 @@

]

cfg.CONF.register_opt(cfg.StrOpt('auth_strategy', default=''))
cfg.CONF.register_opts(_WSGI_OPTIONS,
group='proxy:drivers:transport:wsgi')

Expand Down
8 changes: 6 additions & 2 deletions marconi/proxy/transport/wsgi/queues.py
Expand Up @@ -40,13 +40,17 @@
from marconi.proxy.utils import (
forward, lookup, helpers, http, partition
)
from marconi.queues import storage # NOQA
from marconi.queues.storage import base as storage
from marconi.queues.transport import validation as validate
from marconi.queues.transport.wsgi import exceptions as wsgi_exceptions


LOG = log.getLogger(__name__)
STORAGE_LIMITS = cfg.CONF['queues:limits:storage']

CFG = cfg.CONF
CFG.register_opts(storage._LIMITS_OPTIONS, group=storage._LIMITS_GROUP)

STORAGE_LIMITS = cfg.CONF[storage._LIMITS_GROUP]


class Listing(object):
Expand Down
29 changes: 16 additions & 13 deletions marconi/queues/bootstrap.py
Expand Up @@ -22,18 +22,16 @@
from marconi.queues.storage import pipeline
from marconi.queues import transport # NOQA

LOG = log.getLogger(__name__)

_bootstrap_options = [
_DRIVER_OPTIONS = [
cfg.StrOpt('transport', default='wsgi',
help='Transport driver to use'),
cfg.StrOpt('storage', default='sqlite',
help='Storage driver to use'),
]

CFG = cfg.CONF
CFG.register_opts(_bootstrap_options, group="queues:drivers")

LOG = log.getLogger(__name__)
_DRIVER_GROUP = 'queues:drivers'


class Bootstrap(object):
Expand All @@ -48,35 +46,40 @@ def __init__(self, config_file=None, cli_args=None):
if config_file is not None:
default_file = [config_file]

CFG(project='marconi', prog='marconi-queues', args=cli_args or [],
default_config_files=default_file)
self.conf = cfg.ConfigOpts()
self.conf.register_opts(_DRIVER_OPTIONS, group=_DRIVER_GROUP)
self.driver_conf = self.conf[_DRIVER_GROUP]

self.conf(project='marconi', prog='marconi-queues',
args=cli_args or [], default_config_files=default_file)

log.setup('marconi')

@decorators.lazy_property(write=False)
def storage(self):
storage_name = CFG['queues:drivers'].storage
storage_name = self.driver_conf.storage
LOG.debug(_(u'Loading storage driver: ') + storage_name)

try:
mgr = driver.DriverManager('marconi.queues.storage',
storage_name,
invoke_on_load=True)

return pipeline.Driver(CFG, mgr.driver)
invoke_on_load=True,
invoke_args=[self.conf])
return pipeline.Driver(self.conf, mgr.driver)
except RuntimeError as exc:
LOG.exception(exc)
raise exceptions.InvalidDriver(exc)

@decorators.lazy_property(write=False)
def transport(self):
transport_name = CFG['queues:drivers'].transport
transport_name = self.driver_conf.transport
LOG.debug(_(u'Loading transport driver: ') + transport_name)

try:
mgr = driver.DriverManager('marconi.queues.transport',
transport_name,
invoke_on_load=True,
invoke_args=[self.storage])
invoke_args=[self.conf, self.storage])
return mgr.driver
except RuntimeError as exc:
LOG.exception(exc)
Expand Down
12 changes: 0 additions & 12 deletions marconi/queues/storage/__init__.py
@@ -1,20 +1,8 @@
"""Marconi Storage Drivers"""

from oslo.config import cfg

from marconi.queues.storage import base
from marconi.queues.storage import exceptions # NOQA

_STORAGE_LIMITS_OPTIONS = [
cfg.IntOpt('default_queue_paging', default=10,
help='Default queue pagination size'),

cfg.IntOpt('default_message_paging', default=10,
help='Default message pagination size')
]

cfg.CONF.register_opts(_STORAGE_LIMITS_OPTIONS, group='queues:limits:storage')

# Hoist classes into package namespace
ClaimBase = base.ClaimBase
DriverBase = base.DriverBase
Expand Down
29 changes: 29 additions & 0 deletions marconi/queues/storage/base.py
Expand Up @@ -17,10 +17,39 @@

import abc

from oslo.config import cfg

_LIMITS_OPTIONS = [
cfg.IntOpt('default_queue_paging', default=10,
help='Default queue pagination size'),

cfg.IntOpt('default_message_paging', default=10,
help='Default message pagination size')
]

_LIMITS_GROUP = 'queues:limits:storage'


class DriverBase(object):
"""Interface definition for storage drivers.
Connection information and driver-specific options are
loaded from the config file or the shard catalog.
:param conf: Driver configuration. Can be any
dict-like object containing the expected
options. Must at least include 'uri' which
provides connection options such as host and
port.
"""
__metaclass__ = abc.ABCMeta

def __init__(self, conf):
self.conf = conf

self.conf.register_opts(_LIMITS_OPTIONS, group=_LIMITS_GROUP)
self.limits_conf = self.conf[_LIMITS_GROUP]

@abc.abstractproperty
def queue_controller(self):
"""Returns the driver's queue controller."""
Expand Down
4 changes: 1 addition & 3 deletions marconi/queues/storage/mongodb/claims.py
Expand Up @@ -24,7 +24,6 @@
import datetime

from bson import objectid
from oslo.config import cfg

import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
Expand All @@ -34,7 +33,6 @@


LOG = logging.getLogger(__name__)
STORAGE_LIMITS = cfg.CONF['queues:limits:storage']


class ClaimController(storage.ClaimBase):
Expand Down Expand Up @@ -121,7 +119,7 @@ def create(self, queue, metadata, project=None, limit=None):
msg_ctrl = self.driver.message_controller

if limit is None:
limit = STORAGE_LIMITS.default_message_paging
limit = self.driver.limits_conf.default_message_paging

ttl = metadata['ttl']
grace = metadata['grace']
Expand Down
30 changes: 19 additions & 11 deletions marconi/queues/storage/mongodb/driver.py
Expand Up @@ -30,23 +30,31 @@

class Driver(storage.DriverBase):

@decorators.lazy_property()
def __init__(self, conf):
super(Driver, self).__init__(conf)

self.conf.register_opts(options.MONGODB_OPTIONS,
group=options.MONGODB_GROUP)

self.mongodb_conf = self.conf[options.MONGODB_GROUP]

@decorators.lazy_property(write=False)
def queues_database(self):
"""Database dedicated to the "queues" collection.
The queues collection is separated out into it's own database
to avoid writer lock contention with the messages collections.
"""

name = options.CFG.database + '_queues'
name = self.mongodb_conf.database + '_queues'
return self.connection[name]

@decorators.lazy_property()
@decorators.lazy_property(write=False)
def message_databases(self):
"""List of message databases, ordered by partition number."""

name = options.CFG.database
partitions = options.CFG.partitions
name = self.mongodb_conf.database
partitions = self.mongodb_conf.partitions

# NOTE(kgriffs): Partition names are zero-based, and
# the list is ordered by partition, which means that a
Expand All @@ -58,25 +66,25 @@ def message_databases(self):
return [self.connection[name + '_messages_p' + str(p)]
for p in range(partitions)]

@decorators.lazy_property()
@decorators.lazy_property(write=False)
def connection(self):
"""MongoDB client connection instance."""

if options.CFG.uri and 'replicaSet' in options.CFG.uri:
if self.mongodb_conf.uri and 'replicaSet' in self.mongodb_conf.uri:
MongoClient = pymongo.MongoReplicaSetClient
else:
MongoClient = pymongo.MongoClient

return MongoClient(options.CFG.uri)
return MongoClient(self.mongodb_conf.uri)

@property
@decorators.lazy_property(write=False)
def queue_controller(self):
return controllers.QueueController(self)

@property
@decorators.lazy_property(write=False)
def message_controller(self):
return controllers.MessageController(self)

@property
@decorators.lazy_property(write=False)
def claim_controller(self):
return controllers.ClaimController(self)
21 changes: 11 additions & 10 deletions marconi/queues/storage/mongodb/messages.py
Expand Up @@ -24,20 +24,17 @@
import datetime
import time

from oslo.config import cfg
import pymongo.errors
import pymongo.read_preferences

import marconi.openstack.common.log as logging
from marconi.openstack.common import timeutils
from marconi.queues import storage
from marconi.queues.storage import exceptions
from marconi.queues.storage.mongodb import options
from marconi.queues.storage.mongodb import utils


LOG = logging.getLogger(__name__)
STORAGE_LIMITS = cfg.CONF['queues:limits:storage']

# NOTE(kgriffs): This value, in seconds, should be at least less than the
# minimum allowed TTL for messages (60 seconds). Make it 45 to allow for
Expand Down Expand Up @@ -107,8 +104,9 @@ def __init__(self, *args, **kwargs):
super(MessageController, self).__init__(*args, **kwargs)

# Cache for convenience and performance
self._num_partitions = self.driver.mongodb_conf.partitions
self._queue_ctrl = self.driver.queue_controller
self._retry_range = range(options.CFG.max_attempts)
self._retry_range = range(self.driver.mongodb_conf.max_attempts)

# Create a list of 'messages' collections, one for each database
# partition, ordered by partition number.
Expand Down Expand Up @@ -162,7 +160,8 @@ def _ensure_indexes(self, collection):

def _collection(self, queue_name, project=None):
"""Get a partitioned collection instance."""
return self._collections[utils.get_partition(queue_name, project)]
return self._collections[utils.get_partition(self._num_partitions,
queue_name, project)]

def _backoff_sleep(self, attempt):
"""Sleep between retries using a jitter algorithm.
Expand All @@ -173,9 +172,10 @@ def _backoff_sleep(self, attempt):
:param attempt: current attempt number, zero-based
"""
seconds = utils.calculate_backoff(attempt, options.CFG.max_attempts,
options.CFG.max_retry_sleep,
options.CFG.max_retry_jitter)
conf = self.driver.mongodb_conf
seconds = utils.calculate_backoff(attempt, conf.max_attempts,
conf.max_retry_sleep,
conf.max_retry_jitter)

time.sleep(seconds)

Expand Down Expand Up @@ -353,7 +353,7 @@ def list(self, queue_name, project=None, marker=None, limit=None,
echo=False, client_uuid=None, include_claimed=False):

if limit is None:
limit = STORAGE_LIMITS.default_message_paging
limit = self.driver.limits_conf.default_message_paging

if marker is not None:
try:
Expand Down Expand Up @@ -594,7 +594,8 @@ def post(self, queue_name, messages, client_uuid, project=None):

message = _(u'Hit maximum number of attempts (%(max)s) for queue '
u'"%(queue)s" under project %(project)s')
message %= dict(max=options.CFG.max_attempts, queue=queue_name,
message %= dict(max=self.driver.mongodb_conf.max_attempts,
queue=queue_name,
project=project)

LOG.warning(message)
Expand Down
6 changes: 2 additions & 4 deletions marconi/queues/storage/mongodb/options.py
Expand Up @@ -19,7 +19,7 @@
from oslo.config import cfg


_MONGODB_OPTIONS = [
MONGODB_OPTIONS = [
cfg.StrOpt('uri', help='Mongodb Connection URI'),

# Database name
Expand Down Expand Up @@ -53,6 +53,4 @@
'same instant.')),
]

cfg.CONF.register_opts(_MONGODB_OPTIONS,
group='queues:drivers:storage:mongodb')
CFG = cfg.CONF['queues:drivers:storage:mongodb']
MONGODB_GROUP = 'queues:drivers:storage:mongodb'

0 comments on commit c67d5a4

Please sign in to comment.