Permalink
Fetching contributors…
Cannot retrieve contributors at this time
300 lines (244 sloc) 10.3 KB
import operator
import threading
from zope.interface import implementer
from zope.interface.registry import Components
from pyramid.compat import text_
from pyramid.decorator import reify
from pyramid.interfaces import (
IIntrospector,
IIntrospectable,
ISettings,
)
from pyramid.path import (
CALLER_PACKAGE,
caller_package,
)
empty = text_('')
class Registry(Components, dict):
""" A registry object is an :term:`application registry`.
It is used by the framework itself to perform mappings of URLs to view
callables, as well as servicing other various framework duties. A registry
has its own internal API, but this API is rarely used by Pyramid
application developers (it's usually only used by developers of the
Pyramid framework and Pyramid addons). But it has a number of attributes
that may be useful to application developers within application code,
such as ``settings``, which is a dictionary containing application
deployment settings.
For information about the purpose and usage of the application registry,
see :ref:`zca_chapter`.
The registry may be used both as an :class:`pyramid.interfaces.IDict` and
as a Zope component registry.
These two ways of storing configuration are independent.
Applications will tend to prefer to store information as key-values
whereas addons may prefer to use the component registry to avoid naming
conflicts and to provide more complex lookup mechanisms.
The application registry is usually accessed as ``request.registry`` in
application code. By the time a registry is used to handle requests it
should be considered frozen and read-only. Any changes to its internal
state should be done with caution and concern for thread-safety.
"""
# for optimization purposes, if no listeners are listening, don't try
# to notify them
has_listeners = False
_settings = None
def __init__(self, package_name=CALLER_PACKAGE, *args, **kw):
# add a registry-instance-specific lock, which is used when the lookup
# cache is mutated
self._lock = threading.Lock()
# add a view lookup cache
self._clear_view_lookup_cache()
if package_name is CALLER_PACKAGE:
package_name = caller_package().__name__
Components.__init__(self, package_name, *args, **kw)
dict.__init__(self)
def _clear_view_lookup_cache(self):
self._view_lookup_cache = {}
def __nonzero__(self):
# defeat bool determination via dict.__len__
return True
@reify
def package_name(self):
return self.__name__
def registerSubscriptionAdapter(self, *arg, **kw):
result = Components.registerSubscriptionAdapter(self, *arg, **kw)
self.has_listeners = True
return result
def registerSelfAdapter(self, required=None, provided=None, name=empty,
info=empty, event=True):
# registerAdapter analogue which always returns the object itself
# when required is matched
return self.registerAdapter(lambda x: x, required=required,
provided=provided, name=name,
info=info, event=event)
def queryAdapterOrSelf(self, object, interface, default=None):
# queryAdapter analogue which returns the object if it implements
# the interface, otherwise it will return an adaptation to the
# interface
if not interface.providedBy(object):
return self.queryAdapter(object, interface, default=default)
return object
def registerHandler(self, *arg, **kw):
result = Components.registerHandler(self, *arg, **kw)
self.has_listeners = True
return result
def notify(self, *events):
if self.has_listeners:
# iterating over subscribers assures they get executed
[ _ for _ in self.subscribers(events, None) ]
# backwards compatibility for code that wants to look up a settings
# object via ``registry.getUtility(ISettings)``
def _get_settings(self):
return self._settings
def _set_settings(self, settings):
self.registerUtility(settings, ISettings)
self._settings = settings
settings = property(_get_settings, _set_settings)
@implementer(IIntrospector)
class Introspector(object):
def __init__(self):
self._refs = {}
self._categories = {}
self._counter = 0
def add(self, intr):
category = self._categories.setdefault(intr.category_name, {})
category[intr.discriminator] = intr
category[intr.discriminator_hash] = intr
intr.order = self._counter
self._counter += 1
def get(self, category_name, discriminator, default=None):
category = self._categories.setdefault(category_name, {})
intr = category.get(discriminator, default)
return intr
def get_category(self, category_name, default=None, sort_key=None):
if sort_key is None:
sort_key = operator.attrgetter('order')
category = self._categories.get(category_name)
if category is None:
return default
values = category.values()
values = sorted(set(values), key=sort_key)
return [
{'introspectable': intr,
'related': self.related(intr)}
for intr in values
]
def categorized(self, sort_key=None):
L = []
for category_name in self.categories():
L.append((category_name, self.get_category(category_name,
sort_key=sort_key)))
return L
def categories(self):
return sorted(self._categories.keys())
def remove(self, category_name, discriminator):
intr = self.get(category_name, discriminator)
if intr is None:
return
L = self._refs.pop(intr, [])
for d in L:
L2 = self._refs[d]
L2.remove(intr)
category = self._categories[intr.category_name]
del category[intr.discriminator]
del category[intr.discriminator_hash]
def _get_intrs_by_pairs(self, pairs):
introspectables = []
for pair in pairs:
category_name, discriminator = pair
intr = self._categories.get(category_name, {}).get(discriminator)
if intr is None:
raise KeyError((category_name, discriminator))
introspectables.append(intr)
return introspectables
def relate(self, *pairs):
introspectables = self._get_intrs_by_pairs(pairs)
relatable = ((x,y) for x in introspectables for y in introspectables)
for x, y in relatable:
L = self._refs.setdefault(x, [])
if x is not y and y not in L:
L.append(y)
def unrelate(self, *pairs):
introspectables = self._get_intrs_by_pairs(pairs)
relatable = ((x,y) for x in introspectables for y in introspectables)
for x, y in relatable:
L = self._refs.get(x, [])
if y in L:
L.remove(y)
def related(self, intr):
category_name, discriminator = intr.category_name, intr.discriminator
intr = self._categories.get(category_name, {}).get(discriminator)
if intr is None:
raise KeyError((category_name, discriminator))
return self._refs.get(intr, [])
@implementer(IIntrospectable)
class Introspectable(dict):
order = 0 # mutated by introspector.add
action_info = None # mutated by self.register
def __init__(self, category_name, discriminator, title, type_name):
self.category_name = category_name
self.discriminator = discriminator
self.title = title
self.type_name = type_name
self._relations = []
def relate(self, category_name, discriminator):
self._relations.append((True, category_name, discriminator))
def unrelate(self, category_name, discriminator):
self._relations.append((False, category_name, discriminator))
def _assert_resolved(self):
assert undefer(self.discriminator) is self.discriminator
@property
def discriminator_hash(self):
self._assert_resolved()
return hash(self.discriminator)
def __hash__(self):
self._assert_resolved()
return hash((self.category_name,) + (self.discriminator,))
def __repr__(self):
self._assert_resolved()
return '<%s category %r, discriminator %r>' % (self.__class__.__name__,
self.category_name,
self.discriminator)
def __nonzero__(self):
return True
__bool__ = __nonzero__ # py3
def register(self, introspector, action_info):
self.discriminator = undefer(self.discriminator)
self.action_info = action_info
introspector.add(self)
for relate, category_name, discriminator in self._relations:
discriminator = undefer(discriminator)
if relate:
method = introspector.relate
else:
method = introspector.unrelate
method(
(self.category_name, self.discriminator),
(category_name, discriminator)
)
class Deferred(object):
""" Can be used by a third-party configuration extender to wrap a
:term:`discriminator` during configuration if an immediately hashable
discriminator cannot be computed because it relies on unresolved values.
The function should accept no arguments and should return a hashable
discriminator."""
def __init__(self, func):
self.func = func
@reify
def value(self):
result = self.func()
del self.func
return result
def resolve(self):
return self.value
def undefer(v):
""" Function which accepts an object and returns it unless it is a
:class:`pyramid.registry.Deferred` instance. If it is an instance of
that class, its ``resolve`` method is called, and the result of the
method is returned."""
if isinstance(v, Deferred):
v = v.resolve()
return v
class predvalseq(tuple):
""" A subtype of tuple used to represent a sequence of predicate values """
pass
global_registry = Registry('global')