Permalink
Fetching contributors…
Cannot retrieve contributors at this time
279 lines (236 sloc) 9.42 KB
from zope.interface import (
implementer,
providedBy,
)
from pyramid.interfaces import (
IDebugLogger,
IExecutionPolicy,
IRequest,
IRequestExtensions,
IRootFactory,
IRouteRequest,
IRouter,
IRequestFactory,
IRoutesMapper,
ITraverser,
ITweens,
)
from pyramid.events import (
ContextFound,
NewRequest,
NewResponse,
BeforeTraversal,
)
from pyramid.httpexceptions import HTTPNotFound
from pyramid.request import Request
from pyramid.view import _call_view
from pyramid.request import apply_request_extensions
from pyramid.threadlocal import RequestContext
from pyramid.traversal import (
DefaultRootFactory,
ResourceTreeTraverser,
)
@implementer(IRouter)
class Router(object):
debug_notfound = False
debug_routematch = False
def __init__(self, registry):
q = registry.queryUtility
self.logger = q(IDebugLogger)
self.root_factory = q(IRootFactory, default=DefaultRootFactory)
self.routes_mapper = q(IRoutesMapper)
self.request_factory = q(IRequestFactory, default=Request)
self.request_extensions = q(IRequestExtensions)
self.execution_policy = q(
IExecutionPolicy, default=default_execution_policy)
self.orig_handle_request = self.handle_request
tweens = q(ITweens)
if tweens is not None:
self.handle_request = tweens(self.handle_request, registry)
self.root_policy = self.root_factory # b/w compat
self.registry = registry
settings = registry.settings
if settings is not None:
self.debug_notfound = settings['debug_notfound']
self.debug_routematch = settings['debug_routematch']
def handle_request(self, request):
attrs = request.__dict__
registry = attrs['registry']
request.request_iface = IRequest
context = None
routes_mapper = self.routes_mapper
debug_routematch = self.debug_routematch
adapters = registry.adapters
has_listeners = registry.has_listeners
notify = registry.notify
logger = self.logger
has_listeners and notify(NewRequest(request))
# find the root object
root_factory = self.root_factory
if routes_mapper is not None:
info = routes_mapper(request)
match, route = info['match'], info['route']
if route is None:
if debug_routematch:
msg = ('no route matched for url %s' %
request.url)
logger and logger.debug(msg)
else:
attrs['matchdict'] = match
attrs['matched_route'] = route
if debug_routematch:
msg = (
'route matched for url %s; '
'route_name: %r, '
'path_info: %r, '
'pattern: %r, '
'matchdict: %r, '
'predicates: %r' % (
request.url,
route.name,
request.path_info,
route.pattern,
match,
', '.join([p.text() for p in route.predicates]))
)
logger and logger.debug(msg)
request.request_iface = registry.queryUtility(
IRouteRequest,
name=route.name,
default=IRequest)
root_factory = route.factory or self.root_factory
# Notify anyone listening that we are about to start traversal
#
# Notify before creating root_factory in case we want to do something
# special on a route we may have matched. See
# https://github.com/Pylons/pyramid/pull/1876 for ideas of what is
# possible.
has_listeners and notify(BeforeTraversal(request))
# Create the root factory
root = root_factory(request)
attrs['root'] = root
# We are about to traverse and find a context
traverser = adapters.queryAdapter(root, ITraverser)
if traverser is None:
traverser = ResourceTreeTraverser(root)
tdict = traverser(request)
context, view_name, subpath, traversed, vroot, vroot_path = (
tdict['context'],
tdict['view_name'],
tdict['subpath'],
tdict['traversed'],
tdict['virtual_root'],
tdict['virtual_root_path']
)
attrs.update(tdict)
# Notify anyone listening that we have a context and traversal is
# complete
has_listeners and notify(ContextFound(request))
# find a view callable
context_iface = providedBy(context)
response = _call_view(
registry,
request,
context,
context_iface,
view_name
)
if response is None:
if self.debug_notfound:
msg = (
'debug_notfound of url %s; path_info: %r, '
'context: %r, view_name: %r, subpath: %r, '
'traversed: %r, root: %r, vroot: %r, '
'vroot_path: %r' % (
request.url, request.path_info, context,
view_name, subpath, traversed, root, vroot,
vroot_path)
)
logger and logger.debug(msg)
else:
msg = request.path_info
raise HTTPNotFound(msg)
return response
def invoke_subrequest(self, request, use_tweens=False):
"""Obtain a response object from the Pyramid application based on
information in the ``request`` object provided. The ``request``
object must be an object that implements the Pyramid request
interface (such as a :class:`pyramid.request.Request` instance). If
``use_tweens`` is ``True``, the request will be sent to the
:term:`tween` in the tween stack closest to the request ingress. If
``use_tweens`` is ``False``, the request will be sent to the main
router handler, and no tweens will be invoked.
See the API for pyramid.request for complete documentation.
"""
request.registry = self.registry
request.invoke_subrequest = self.invoke_subrequest
extensions = self.request_extensions
if extensions is not None:
apply_request_extensions(request, extensions=extensions)
with RequestContext(request):
return self.invoke_request(request, _use_tweens=use_tweens)
def request_context(self, environ):
"""
Create a new request context from a WSGI environ.
The request context is used to push/pop the threadlocals required
when processing the request. It also contains an initialized
:class:`pyramid.interfaces.IRequest` instance using the registered
:class:`pyramid.interfaces.IRequestFactory`. The context may be
used as a context manager to control the threadlocal lifecycle:
.. code-block:: python
with router.request_context(environ) as request:
...
Alternatively, the context may be used without the ``with`` statement
by manually invoking its ``begin()`` and ``end()`` methods.
.. code-block:: python
ctx = router.request_context(environ)
request = ctx.begin()
try:
...
finally:
ctx.end()
"""
request = self.request_factory(environ)
request.registry = self.registry
request.invoke_subrequest = self.invoke_subrequest
extensions = self.request_extensions
if extensions is not None:
apply_request_extensions(request, extensions=extensions)
return RequestContext(request)
def invoke_request(self, request, _use_tweens=True):
"""
Execute a request through the request processing pipeline and
return the generated response.
"""
registry = self.registry
has_listeners = registry.has_listeners
notify = registry.notify
if _use_tweens:
handle_request = self.handle_request
else:
handle_request = self.orig_handle_request
try:
response = handle_request(request)
if request.response_callbacks:
request._process_response_callbacks(response)
has_listeners and notify(NewResponse(request, response))
return response
finally:
if request.finished_callbacks:
request._process_finished_callbacks()
def __call__(self, environ, start_response):
"""
Accept ``environ`` and ``start_response``; create a
:term:`request` and route the request to a :app:`Pyramid`
view based on introspection of :term:`view configuration`
within the application registry; call ``start_response`` and
return an iterable.
"""
response = self.execution_policy(environ, self)
return response(environ, start_response)
def default_execution_policy(environ, router):
with router.request_context(environ) as request:
try:
return router.invoke_request(request)
except Exception:
return request.invoke_exception_view(reraise=True)