Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Fetching contributors…

Cannot retrieve contributors at this time

100 lines (79 sloc) 3.296 kb
# -*- coding: utf-8 -*-
"""
celery.routes
~~~~~~~~~~~~~
Contains utilities for working with task routes
(:setting:`CELERY_ROUTES`).
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
"""
from __future__ import absolute_import
from celery.exceptions import QueueNotFound
from celery.utils import lpmerge
from celery.utils.functional import firstmethod, mpromise
from celery.utils.imports import instantiate
_first_route = firstmethod("route_for_task")
class MapRoute(object):
"""Creates a router out of a :class:`dict`."""
def __init__(self, map):
self.map = map
def route_for_task(self, task, *args, **kwargs):
route = self.map.get(task)
if route:
return dict(route)
class Router(object):
def __init__(self, routes=None, queues=None, create_missing=False,
app=None):
from . import app_or_default
self.app = app_or_default(app)
self.queues = {} if queues is None else queues
self.routes = [] if routes is None else routes
self.create_missing = create_missing
def route(self, options, task, args=(), kwargs={}):
options = self.expand_destination(options) # expands 'queue'
if self.routes:
route = self.lookup_route(task, args, kwargs)
if route: # expands 'queue' in route.
return lpmerge(self.expand_destination(route), options)
if "queue" not in options:
options = lpmerge(self.expand_destination(
self.app.conf.CELERY_DEFAULT_QUEUE), options)
return options
def expand_destination(self, route):
# Route can be a queue name: convenient for direct exchanges.
if isinstance(route, basestring):
queue, route = route, {}
else:
# can use defaults from configured queue, but override specific
# things (like the routing_key): great for topic exchanges.
queue = route.pop("queue", None)
if queue: # expand config from configured queue.
try:
dest = self.queues[queue].as_dict()
except KeyError:
if not self.create_missing:
raise QueueNotFound(
"Queue %r is not defined in CELERY_QUEUES" % queue)
for key in "exchange", "routing_key":
if route.get(key) is None:
route[key] = queue
dest = self.app.amqp.queues.add(queue, **route).as_dict()
# needs to be declared by publisher
dest["queue"] = queue
return lpmerge(dest, route)
return route
def lookup_route(self, task, args=None, kwargs=None):
return _first_route(self.routes, task, args, kwargs)
def prepare(routes):
"""Expands the :setting:`CELERY_ROUTES` setting."""
def expand_route(route):
if isinstance(route, dict):
return MapRoute(route)
if isinstance(route, basestring):
return mpromise(instantiate, route)
return route
if routes is None:
return ()
if not isinstance(routes, (list, tuple)):
routes = (routes, )
return map(expand_route, routes)
Jump to Line
Something went wrong with that request. Please try again.