Fetching contributors…
Cannot retrieve contributors at this time
325 lines (269 sloc) 11.8 KB
# -*- coding: utf-8 -*-
AMQ related functionality.
:copyright: (c) 2009 - 2012 by Ask Solem.
:license: BSD, see LICENSE for more details.
from __future__ import absolute_import
from datetime import timedelta
from weakref import WeakValueDictionary
from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
from kombu.common import entry_to_queue
from kombu.pools import ProducerPool
from celery import signals
from celery.utils import cached_property, uuid
from celery.utils.text import indent as textindent
from . import app_or_default
from . import routes as _routes
#: Human readable queue declaration.
. %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
class Queues(dict):
"""Queue name⇒ declaration mapping.
:param queues: Initial list/tuple or dict of queues.
:keyword create_missing: By default any unknown queues will be
added automatically, but if disabled
the occurrence of unknown queues
in `wanted` will raise :exc:`KeyError`.
#: If set, this is a subset of queues to consume from.
#: The rest of the queues are then used for routing only.
_consume_from = None
def __init__(self, queues=None, default_exchange=None, create_missing=True):
self.aliases = WeakValueDictionary()
self.default_exchange = default_exchange
self.create_missing = create_missing
if isinstance(queues, (tuple, list)):
queues = dict((, q) for q in queues)
for name, q in (queues or {}).iteritems():
self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
def __getitem__(self, name):
return self.aliases[name]
except KeyError:
return dict.__getitem__(self, name)
def __setitem__(self, name, queue):
if self.default_exchange and (not or
not = self.default_exchange
dict.__setitem__(self, name, queue)
if queue.alias:
self.aliases[queue.alias] = queue
def __missing__(self, name):
if self.create_missing:
return self.add(self.new_missing(name))
raise KeyError(name)
def add(self, queue, **kwargs):
"""Add new queue.
:param queue: Name of the queue.
:keyword exchange: Name of the exchange.
:keyword routing_key: Binding key.
:keyword exchange_type: Type of exchange.
:keyword \*\*options: Additional declaration options.
if not isinstance(queue, Queue):
return self.add_compat(queue, **kwargs)
self[] = queue
return queue
def add_compat(self, name, **options):
# docs used to use binding_key as routing key
options.setdefault("routing_key", options.get("binding_key"))
q = self[name] = entry_to_queue(name, **options)
return q
def format(self, indent=0, indent_first=True):
"""Format routing table into string for log dumps."""
active = self.consume_from
if not active:
return ""
info = [QUEUE_FORMAT.strip() % {
"name": (name + ":").ljust(12),
"routing_key": q.routing_key}
for name, q in sorted(active.iteritems())]
if indent_first:
return textindent("\n".join(info), indent)
return info[0] + "\n" + textindent("\n".join(info[1:]), indent)
def select_subset(self, wanted):
"""Sets :attr:`consume_from` by selecting a subset of the
currently defined queues.
:param wanted: List of wanted queue names.
if wanted:
self._consume_from = dict((name, self[name]) for name in wanted)
def new_missing(self, name):
return Queue(name, Exchange(name), name)
def consume_from(self):
if self._consume_from is not None:
return self._consume_from
return self
class TaskProducer(Producer):
app = None
auto_declare = False
retry = False
retry_policy = None
def __init__(self, channel=None, exchange=None, *args, **kwargs):
self.retry = kwargs.pop("retry", self.retry)
self.retry_policy = kwargs.pop("retry_policy",
self.retry_policy or {})
exchange = exchange or
self.queues = # shortcut
super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
def delay_task(self, task_name, task_args=None, task_kwargs=None,
countdown=None, eta=None, task_id=None, taskset_id=None,
expires=None, exchange=None, exchange_type=None,
event_dispatcher=None, retry=None, retry_policy=None,
queue=None, now=None, retries=0, chord=None, callbacks=None,
errbacks=None, mandatory=None, priority=None, immediate=None,
routing_key=None, serializer=None, delivery_mode=None,
compression=None, **kwargs):
"""Send task message."""
# merge default and custom policy
_rp = (dict(self.retry_policy, **retry_policy) if retry_policy
else self.retry_policy)
task_id = task_id or uuid()
task_args = task_args or []
task_kwargs = task_kwargs or {}
if not isinstance(task_args, (list, tuple)):
raise ValueError("task args must be a list or tuple")
if not isinstance(task_kwargs, dict):
raise ValueError("task kwargs must be a dictionary")
if countdown: # Convert countdown to ETA.
now = now or
eta = now + timedelta(seconds=countdown)
if isinstance(expires, (int, float)):
now = now or
expires = now + timedelta(seconds=expires)
eta = eta and eta.isoformat()
expires = expires and expires.isoformat()
body = {"task": task_name,
"id": task_id,
"args": task_args,
"kwargs": task_kwargs,
"retries": retries or 0,
"eta": eta,
"expires": expires,
"utc": self.utc,
"callbacks": callbacks,
"errbacks": errbacks}
if taskset_id:
body["taskset"] = taskset_id
if chord:
body["chord"] = chord
self.publish(body, exchange=exchange, mandatory=mandatory,
immediate=immediate, routing_key=routing_key,
serializer=serializer or self.serializer,
compression=compression or self.compression,
retry=retry, retry_policy=_rp, delivery_mode=delivery_mode,
declare=[self.queues[queue]] if queue else [],
signals.task_sent.send(sender=task_name, **body)
if event_dispatcher:
event_dispatcher.send("task-sent", uuid=task_id,
return task_id
class TaskPublisher(TaskProducer):
"""Deprecated version of :class:`TaskProducer`."""
def __init__(self, channel=None, exchange=None, *args, **kwargs): = app_or_default(kwargs.pop("app",
self.retry = kwargs.pop("retry", self.retry)
self.retry_policy = kwargs.pop("retry_policy",
self.retry_policy or {})
exchange = exchange or
if not isinstance(exchange, Exchange):
exchange = Exchange(exchange,
kwargs.pop("exchange_type", "direct"))
self.queues = # shortcut
super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
class TaskConsumer(Consumer):
app = None
def __init__(self, channel, queues=None, app=None, **kw): = app or
super(TaskConsumer, self).__init__(channel,
queues or, **kw)
class AMQP(object):
BrokerConnection = BrokerConnection
Consumer = Consumer
#: Cached and prepared routing table.
_rtable = None
def __init__(self, app): = app
def flush_routes(self):
self._rtable = _routes.prepare(
def Queues(self, queues, create_missing=None):
"""Create new :class:`Queues` instance, using queue defaults
from the current configuration."""
conf =
if create_missing is None:
create_missing = conf.CELERY_CREATE_MISSING_QUEUES
if not queues and conf.CELERY_DEFAULT_QUEUE:
queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
return Queues(queues, self.default_exchange, create_missing)
def Router(self, queues=None, create_missing=None):
"""Returns the current task router."""
return _routes.Router(self.routes, queues or self.queues,"CELERY_CREATE_MISSING_QUEUES",
def TaskConsumer(self):
"""Return consumer configured to consume from the queues
we are configured for (``app.amqp.queues.consume_from``)."""
get_task_consumer = TaskConsumer # XXX compat
def TaskProducer(self):
"""Returns publisher used to send tasks.
You should use `app.send_task` instead.
conf =
TaskPublisher = TaskProducer # compat
def default_queue(self):
return self.queues[]
def queues(self):
"""Queue name⇒ declaration mapping."""
return self.Queues(
@queues.setter # noqa
def queues(self, queues):
return self.Queues(queues)
def routes(self):
if self._rtable is None:
return self._rtable
def router(self):
return self.Router()
def producer_pool(self):
return ProducerPool(,,
publisher_pool = producer_pool # compat alias
def default_exchange(self):
return Exchange(,