Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Modernize Kombu integration

  • Loading branch information...
commit a4f2874e824377c3c112f158f4885667e819c910 1 parent 7e74e9b
@ask authored
Showing with 296 additions and 359 deletions.
  1. +4 −5 celery/__compat__.py
  2. +116 −136 celery/app/amqp.py
  3. +23 −23 celery/app/base.py
  4. +3 −6 celery/app/builtins.py
  5. +10 −9 celery/app/control.py
  6. +2 −4 celery/app/routes.py
  7. +10 −16 celery/app/task.py
  8. +2 −2 celery/beat.py
  9. +1 −1  celery/bin/camqadm.py
  10. +1 −1  celery/bin/celery.py
  11. +2 −2 celery/contrib/migrate.py
  12. +14 −20 celery/events/__init__.py
  13. +4 −0 celery/local.py
  14. +1 −1  celery/task/__init__.py
  15. +3 −7 celery/task/sets.py
  16. +2 −22 celery/tests/app/test_amqp.py
  17. +4 −4 celery/tests/app/test_app.py
  18. +7 −3 celery/tests/app/test_control.py
  19. +28 −21 celery/tests/app/test_routes.py
  20. +5 −7 celery/tests/bin/test_celeryd.py
  21. +5 −1 celery/tests/config.py
  22. +1 −1  celery/tests/events/test_events.py
  23. +1 −0  celery/tests/events/test_snapshot.py
  24. +0 −6 celery/tests/tasks/test_sets.py
  25. +8 −7 celery/tests/tasks/test_tasks.py
  26. +4 −4 celery/tests/utilities/test_info.py
  27. +1 −1  celery/worker/consumer.py
  28. +32 −47 docs/userguide/routing.rst
  29. +1 −1  examples/eventlet/bulk_task_producer.py
  30. +1 −1  funtests/suite/config.py
View
9 celery/__compat__.py
@@ -27,7 +27,7 @@ def getappattr(path):
"""Gets attribute from the current_app recursively,
e.g. getappattr("amqp.get_task_consumer")``."""
from celery import current_app
- return reduce(lambda a, b: getattr(a, b), [current_app] + path)
+ return current_app._rgetattr(path)
def _compat_task_decorator(*args, **kwargs):
@@ -59,11 +59,10 @@ def _compat_periodic_task_decorator(*args, **kwargs):
},
"messaging": {
"TaskPublisher": "amqp.TaskPublisher",
- "ConsumerSet": "amqp.ConsumerSet",
"TaskConsumer": "amqp.TaskConsumer",
"establish_connection": "broker_connection",
"with_connection": "with_default_connection",
- "get_consumer_set": "amqp.get_task_consumer",
+ "get_consumer_set": "amqp.TaskConsumer",
},
"registry": {
"tasks": "tasks",
@@ -76,7 +75,7 @@ def _compat_periodic_task_decorator(*args, **kwargs):
"time_limit": "control.time_limit",
"ping": "control.ping",
"revoke": "control.revoke",
- "discard_all": "control.discard_all",
+ "discard_all": "control.purge",
"inspect": "control.inspect",
}
}
@@ -166,7 +165,7 @@ def get_compat_module(pkg, name):
def prepare(attr):
if isinstance(attr, basestring):
- return Proxy(getappattr, (attr.split('.'), ))
+ return Proxy(getappattr, (attr, ))
return attr
attrs = dict(COMPAT_MODULES[pkg.__name__][name])
View
252 celery/app/amqp.py
@@ -12,11 +12,11 @@
from __future__ import absolute_import
from datetime import timedelta
+from weakref import WeakValueDictionary
-from kombu import BrokerConnection, Exchange
-from kombu import compat as messaging
-from kombu import pools
-from kombu.common import maybe_declare
+from kombu import BrokerConnection, Consumer, Exchange, Producer, Queue
+from kombu.common import entry_to_queue, maybe_declare
+from kombu.pools import ProducerPool
from celery import signals
from celery.utils import cached_property, lpmerge, uuid
@@ -26,18 +26,14 @@
#: Human readable queue declaration.
QUEUE_FORMAT = """
-. %(name)s exchange:%(exchange)s (%(exchange_type)s) \
-binding:%(binding_key)s
+. %(name)s exchange:%(exchange)s(%(exchange_type)s) binding:%(routing_key)s
"""
class Queues(dict):
"""Queue name⇒ declaration mapping.
- Celery will consult this mapping to find the options
- for any queue by name.
-
- :param queues: Initial mapping.
+ :param queues: Initial list/tuple or dict of queues.
"""
#: If set, this is a subset of queues to consume from.
@@ -45,12 +41,20 @@ class Queues(dict):
_consume_from = None
def __init__(self, queues):
+ self.aliases = WeakValueDictionary()
+ if isinstance(queues, (tuple, list)):
+ queues = dict((q.name, q) for q in queues)
dict.__init__(self)
- for queue_name, options in (queues or {}).items():
- self.add(queue_name, **options)
+ for name, q in (queues or {}).items():
+ self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
- def add(self, queue, exchange=None, routing_key=None,
- exchange_type="direct", **options):
+ def __getitem__(self, name):
+ try:
+ return dict.__getitem__(self, key)
+ except KeyError:
+ return self.aliases[key]
+
+ def add(self, queue, **kwargs):
"""Add new queue.
:param queue: Name of the queue.
@@ -60,16 +64,24 @@ def add(self, queue, exchange=None, routing_key=None,
:keyword \*\*options: Additional declaration options.
"""
- q = self[queue] = self.options(exchange, routing_key,
- exchange_type, **options)
- return q
+ if not isinstance(queue, Queue):
+ return self.add_compat(queue, **kwargs)
+ self[queue.name] = queue
+ if queue.alias:
+ self.aliases[queue.alias] = queue
+ return queue
+
+ def add_compat(self, name, **options):
+ # docs used to use binding_key as routing key
+ options.setdefault("routing_key", options.get("binding_key"))
+ self[name] = queue = entry_to_queue(name, **options)
+ return queue
def options(self, exchange, routing_key,
exchange_type="direct", **options):
"""Creates new option mapping for queue, with required
keys present."""
return dict(options, routing_key=routing_key,
- binding_key=routing_key,
exchange=exchange,
exchange_type=exchange_type)
@@ -78,18 +90,19 @@ def format(self, indent=0, indent_first=True):
active = self.consume_from
if not active:
return ""
- info = [QUEUE_FORMAT.strip() % dict(
- name=(name + ":").ljust(12), **config)
- for name, config in sorted(active.iteritems())]
+ info = [QUEUE_FORMAT.strip() % {
+ "name": (name + ":").ljust(12),
+ "exchange": q.exchange.name,
+ "exchange_type": q.exchange.type,
+ "routing_key": q.routing_key}
+ for name, q in sorted(active.iteritems())]
if indent_first:
return text.indent("\n".join(info), indent)
return info[0] + "\n" + text.indent("\n".join(info[1:]), indent)
def select_subset(self, wanted, create_missing=True):
- """Select subset of the currently defined queues.
-
- Does not return anything: queues not in `wanted` will
- be discarded in-place.
+ """Sets :attr:`consume_from` by selecting a subset of the
+ currently defined queues.
:param wanted: List of wanted queue names.
:keyword create_missing: By default any unknown queues will be
@@ -102,15 +115,18 @@ def select_subset(self, wanted, create_missing=True):
acc = {}
for queue in wanted:
try:
- options = self[queue]
+ Q = self[queue]
except KeyError:
if not create_missing:
raise
- options = self.options(queue, queue)
- acc[queue] = options
+ Q = self.new_missing(queue)
+ acc[queue] = Q
self._consume_from = acc
self.update(acc)
+ def new_missing(self, name):
+ return Queue(name, Exchange(name), name)
+
@property
def consume_from(self):
if self._consume_from is not None:
@@ -118,56 +134,33 @@ def consume_from(self):
return self
@classmethod
- def with_defaults(cls, queues, default_exchange, default_exchange_type):
+ def with_defaults(cls, queues, default_exchange):
"""Alternate constructor that adds default exchange and
exchange type information to queues that does not have any."""
- if queues is None:
- queues = {}
- for opts in queues.values():
- opts.setdefault("exchange", default_exchange),
- opts.setdefault("exchange_type", default_exchange_type)
- opts.setdefault("binding_key", default_exchange)
- opts.setdefault("routing_key", opts.get("binding_key"))
- return cls(queues)
+ queues = cls(queues if queues is not None else {})
+ for q in queues.itervalues():
+ if not q.exchange or not q.exchange.name:
+ q.exchange = default_exchange
+ if not q.routing_key:
+ q.routing_key = default_exchange.name
+ return queues
-class TaskPublisher(messaging.Publisher):
+class TaskProducer(Producer):
auto_declare = False
retry = False
retry_policy = None
- _queue_cache = {}
- _exchange_cache = {}
- def __init__(self, *args, **kwargs):
- self.app = kwargs.pop("app")
+ def __init__(self, channel=None, exchange=None, *args, **kwargs):
+ self.app = kwargs.get("app") or self.app
self.retry = kwargs.pop("retry", self.retry)
self.retry_policy = kwargs.pop("retry_policy",
self.retry_policy or {})
- self.utc = kwargs.pop("enable_utc", False)
- super(TaskPublisher, self).__init__(*args, **kwargs)
-
- def _get_queue(self, name):
- if name not in self._queue_cache:
- options = self.app.amqp.queues[name]
- self._queue_cache[name] = messaging.entry_to_queue(name, **options)
- return self._queue_cache[name]
-
- def _get_exchange(self, name, type=None):
- if name not in self._exchange_cache:
- self._exchange_cache[name] = Exchange(name,
- type=type or self.exchange_type,
- durable=self.durable,
- auto_delete=self.auto_delete,
- )
- return self._exchange_cache[name]
-
- def _declare_queue(self, name, retry=False, retry_policy={}):
- maybe_declare(self._get_queue(name), self.channel,
- retry=retry, **retry_policy)
-
- def _declare_exchange(self, name, type=None, retry=False, retry_policy={}):
- maybe_declare(self._get_exchange(name, type), self.channel,
- retry=retry, **retry_policy)
+ exchange = exchange or self.exchange
+ if not isinstance(exchange, Exchange):
+ exchange = Exchange(exchange,
+ kwargs.get("exchange_type") or self.exchange_type)
+ super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
def delay_task(self, task_name, task_args=None, task_kwargs=None,
countdown=None, eta=None, task_id=None, taskset_id=None,
@@ -183,14 +176,6 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
_retry_policy = self.retry_policy
if retry_policy: # merge default and custom policy
_retry_policy = dict(_retry_policy, **retry_policy)
-
- # declare entities
- if queue:
- self._declare_queue(queue, retry, _retry_policy)
- else:
- self._declare_exchange(exchange, exchange_type,
- retry, _retry_policy)
-
task_id = task_id or uuid()
task_args = task_args or []
task_kwargs = task_kwargs or {}
@@ -222,15 +207,14 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
if chord:
body["chord"] = chord
- do_retry = retry if retry is not None else self.retry
- send = self.send
- if do_retry:
- send = connection.ensure(self, self.send, **_retry_policy)
- send(body, exchange=exchange, mandatory=mandatory,
+ self.publish(body, exchange=exchange, mandatory=mandatory,
immediate=immediate, routing_key=routing_key,
serializer=serializer or self.serializer,
delivery_mode=delivery_mode,
- compression=compression or self.compression)
+ compression=compression or self.compression,
+ retry=retry, retry_policy=retry_policy,
+ declare=[self.app.amqp.queues[queue]] if queue else [])
+
signals.task_sent.send(sender=task_name, **body)
if event_dispatcher:
event_dispatcher.send("task-sent", uuid=task_id,
@@ -241,33 +225,21 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
eta=eta,
expires=expires)
return task_id
+TaskPublisher = TaskProducer # compat
- def __exit__(self, *exc_info):
- try:
- self.release()
- except AttributeError:
- self.close()
+class TaskConsumer(Consumer):
+ app = None
-class PublisherPool(pools.ProducerPool):
-
- def __init__(self, app):
- self.app = app
- super(PublisherPool, self).__init__(self.app.pool,
- limit=self.app.pool.limit)
-
- def create_producer(self):
- conn = self.connections.acquire(block=True)
- pub = self.app.amqp.TaskPublisher(conn, auto_declare=False)
- conn._producer_chan = pub.channel
- return pub
+ def __init__(self, channel, queues=None, app=None, **kw):
+ self.app = app or self.app
+ super(TaskConsumer, self).__init__(channel,
+ queues or self.app.amqp.queues.consume_from.values(), **kw)
class AMQP(object):
BrokerConnection = BrokerConnection
- Publisher = messaging.Publisher
- Consumer = messaging.Consumer
- ConsumerSet = messaging.ConsumerSet
+ Consumer = Consumer
#: Cached and prepared routing table.
_rtable = None
@@ -283,12 +255,10 @@ def Queues(self, queues):
from the current configuration."""
conf = self.app.conf
if not queues and conf.CELERY_DEFAULT_QUEUE:
- queues = {conf.CELERY_DEFAULT_QUEUE: {
- "exchange": conf.CELERY_DEFAULT_EXCHANGE,
- "exchange_type": conf.CELERY_DEFAULT_EXCHANGE_TYPE,
- "binding_key": conf.CELERY_DEFAULT_ROUTING_KEY}}
- return Queues.with_defaults(queues, conf.CELERY_DEFAULT_EXCHANGE,
- conf.CELERY_DEFAULT_EXCHANGE_TYPE)
+ queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
+ exchange=self.default_exchange,
+ routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
+ return Queues.with_defaults(queues, self.default_exchange)
def Router(self, queues=None, create_missing=None):
"""Returns the current task router."""
@@ -296,51 +266,55 @@ def Router(self, queues=None, create_missing=None):
self.app.either("CELERY_CREATE_MISSING_QUEUES",
create_missing), app=self.app)
- def TaskConsumer(self, *args, **kwargs):
+ @cached_property
+ def TaskConsumer(self):
"""Returns consumer for a single task queue."""
- default_queue_name, default_queue = self.get_default_queue()
- defaults = dict({"queue": default_queue_name}, **default_queue)
- defaults["routing_key"] = defaults.pop("binding_key", None)
- return self.Consumer(*args, **lpmerge(defaults, kwargs))
+ return self.app.subclass_with_self(TaskConsumer,
+ reverse="amqp.TaskConsumer")
+
+ def queue_or_default(self, q):
+ if q:
+ return self.queues[q] if not isinstance(q, Queue) else q
+ return self.default_queue
- def TaskPublisher(self, *args, **kwargs):
+ @cached_property
+ def TaskProducer(self):
"""Returns publisher used to send tasks.
You should use `app.send_task` instead.
"""
conf = self.app.conf
- _, default_queue = self.get_default_queue()
- defaults = {"exchange": default_queue["exchange"],
- "exchange_type": default_queue["exchange_type"],
- "routing_key": conf.CELERY_DEFAULT_ROUTING_KEY,
- "serializer": conf.CELERY_TASK_SERIALIZER,
- "compression": conf.CELERY_MESSAGE_COMPRESSION,
- "retry": conf.CELERY_TASK_PUBLISH_RETRY,
- "retry_policy": conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
- "enable_utc": conf.CELERY_ENABLE_UTC,
- "app": self.app}
- return TaskPublisher(*args, **lpmerge(defaults, kwargs))
-
- def get_task_consumer(self, connection, queues=None, **kwargs):
+ return self.app.subclass_with_self(TaskProducer,
+ reverse="amqp.TaskProducer",
+ exchange=self.default_exchange,
+ exchange_type=self.default_exchange.type,
+ routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
+ serializer=conf.CELERY_TASK_SERIALIZER,
+ compression=conf.CELERY_MESSAGE_COMPRESSION,
+ retry=conf.CELERY_TASK_PUBLISH_RETRY,
+ retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
+ utc=conf.CELERY_ENABLE_UTC)
+ TaskPublisher = TaskProducer # compat
+
+ def get_task_consumer(self, channel, *args, **kwargs):
"""Return consumer configured to consume from all known task
queues."""
- return self.ConsumerSet(connection,
- from_dict=queues or self.queues.consume_from,
- channel=connection.default_channel,
- **kwargs)
+ return self.TaskConsumer(channel, *args, **kwargs)
- def get_default_queue(self):
- """Returns `(queue_name, queue_options)` tuple for the queue
- configured to be default (:setting:`CELERY_DEFAULT_QUEUE`)."""
- q = self.app.conf.CELERY_DEFAULT_QUEUE
- return q, self.queues[q]
+ @cached_property
+ def default_queue(self):
+ return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
@cached_property
def queues(self):
"""Queue name⇒ declaration mapping."""
return self.Queues(self.app.conf.CELERY_QUEUES)
+ @queues.setter
+ def queues(self, queues): # noqa
+ return self.Queues(queues)
+
@property
def routes(self):
if self._rtable is None:
@@ -353,4 +327,10 @@ def router(self):
@cached_property
def publisher_pool(self):
- return PublisherPool(self.app)
+ return ProducerPool(self.app.pool, limit=self.app.pool.limit,
+ Producer=self.TaskProducer)
+
+ @cached_property
+ def default_exchange(self):
+ return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
+ self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)
View
46 celery/app/base.py
@@ -40,7 +40,7 @@
def _unpickle_appattr(reverse_name, args):
"""Given an attribute name and a list of args, gets
the attribute from the current app and calls it."""
- return getattr(get_current_app(), reverse_name)(*args)
+ return get_current_app()._rgetattr(reverse_name)(*args)
class Celery(object):
@@ -158,33 +158,21 @@ def config_from_cmdline(self, argv, namespace="celery"):
def send_task(self, name, args=None, kwargs=None, countdown=None,
eta=None, task_id=None, publisher=None, connection=None,
- connect_timeout=None, result_cls=None, expires=None,
- queues=None, **options):
+ result_cls=None, expires=None, queues=None, **options):
if self.conf.CELERY_ALWAYS_EAGER: # pragma: no cover
warnings.warn(AlwaysEagerIgnored(
"CELERY_ALWAYS_EAGER has no effect on send_task"))
- router = self.amqp.Router(queues)
result_cls = result_cls or self.AsyncResult
-
+ router = self.amqp.Router(queues)
options.setdefault("compression",
self.conf.CELERY_MESSAGE_COMPRESSION)
options = router.route(options, name, args, kwargs)
- exchange = options.get("exchange")
- exchange_type = options.get("exchange_type")
-
- with self.default_connection(connection, connect_timeout) as conn:
- publish = publisher or self.amqp.TaskPublisher(conn,
- exchange=exchange,
- exchange_type=exchange_type)
- try:
- new_id = publish.delay_task(name, args, kwargs,
- task_id=task_id,
- countdown=countdown, eta=eta,
- expires=expires, **options)
- finally:
- publisher or publish.close()
- return result_cls(new_id)
+ with self.default_producer(publisher) as producer:
+ return result_cls(producer.delay_task(name, args, kwargs,
+ task_id=task_id,
+ countdown=countdown, eta=eta,
+ expires=expires, **options))
def broker_connection(self, hostname=None, userid=None,
password=None, virtual_host=None, port=None, ssl=None,
@@ -206,16 +194,25 @@ def broker_connection(self, hostname=None, userid=None,
**transport_options or {}))
@contextmanager
- def default_connection(self, connection=None, connect_timeout=None):
+ def default_connection(self, connection=None, *args, **kwargs):
if connection:
yield connection
else:
with self.pool.acquire(block=True) as connection:
yield connection
+ @contextmanager
+ def default_producer(self, producer=None):
+ if producer:
+ yield producer
+ else:
+ with self.amqp.publisher_pool.acquire(block=True) as producer:
+ yield producer
+
+
def with_default_connection(self, fun):
- """With any function accepting `connection` and `connect_timeout`
- keyword arguments, establishes a default connection if one is
+ """With any function accepting a `connection`
+ keyword argument, establishes a default connection if one is
not already passed to it.
Any automatically established connection will be closed after
@@ -317,6 +314,9 @@ def __reduce__(self):
return type(name or Class.__name__, (Class, ), attrs)
+ def _rgetattr(self, path):
+ return reduce(getattr, [self] + path.split('.'))
+
def __repr__(self):
return "<%s %s:0x%x>" % (self.__class__.__name__,
self.main or "__main__", id(self), )
View
9 celery/app/builtins.py
@@ -87,12 +87,9 @@ def run(self, tasks, result, setid):
return app.TaskSetResult(result.id,
[subtask(task).apply(taskset_id=setid)
for task in tasks])
- with app.pool.acquire(block=True) as conn:
- with app.amqp.TaskPublisher(conn) as publisher:
- [subtask(task).apply_async(
- taskset_id=setid,
- publisher=publisher)
- for task in tasks]
+ with app.default_producer() as pub:
+ [subtask(task).apply_async(taskset_id=setid, publisher=pub)
+ for task in tasks]
parent = get_current_task()
if parent:
parent.request.children.append(result)
View
19 celery/app/control.py
@@ -14,6 +14,7 @@
from __future__ import with_statement
from kombu.pidbox import Mailbox
+from kombu.utils import cached_property
from . import app_or_default
@@ -26,13 +27,14 @@ def flatten_reply(reply):
class Inspect(object):
+ app = None
- def __init__(self, control, destination=None, timeout=1, callback=None,
- connection=None):
+ def __init__(self, destination=None, timeout=1, callback=None,
+ connection=None, app=None):
+ self.app = app or self.app
self.destination = destination
self.timeout = timeout
self.callback = callback
- self.control = control
self.connection = connection
def _prepare(self, reply):
@@ -45,7 +47,7 @@ def _prepare(self, reply):
return by_node
def _request(self, command, **kwargs):
- return self._prepare(self.control.broadcast(command,
+ return self._prepare(self.app.control.broadcast(command,
arguments=kwargs,
destination=self.destination,
callback=self.callback,
@@ -103,9 +105,9 @@ def __init__(self, app=None):
self.app = app_or_default(app)
self.mailbox = self.Mailbox("celeryd", type="fanout")
- def inspect(self, destination=None, timeout=1, callback=None):
- return Inspect(self, destination=destination, timeout=timeout,
- callback=callback)
+ @cached_property
+ def inspect(self):
+ return self.app.subclass_with_self(Inspect, reverse="control.inspect")
def purge(self, connection=None):
"""Discard all waiting tasks.
@@ -117,8 +119,7 @@ def purge(self, connection=None):
"""
with self.app.default_connection(connection) as conn:
- return self.app.amqp.get_task_consumer(connection=conn)\
- .discard_all()
+ return self.app.amqp.TaskConsumer(conn).purge()
discard_all = purge
def revoke(self, task_id, destination=None, terminate=False,
View
6 celery/app/routes.py
@@ -64,7 +64,7 @@ def expand_destination(self, route):
if queue: # expand config from configured queue.
try:
- dest = dict(self.queues[queue])
+ dest = self.queues[queue].as_dict()
except KeyError:
if not self.create_missing:
raise QueueNotFound(
@@ -72,11 +72,9 @@ def expand_destination(self, route):
for key in "exchange", "routing_key":
if route.get(key) is None:
route[key] = queue
- dest = dict(self.app.amqp.queues.add(queue, **route))
+ dest = self.app.amqp.queues.add(queue, **route).as_dict()
# needs to be declared by publisher
dest["queue"] = queue
- # routing_key and binding_key are synonyms.
- dest.setdefault("routing_key", dest.get("binding_key"))
return lpmerge(dest, route)
return route
View
26 celery/app/task.py
@@ -16,6 +16,7 @@
import sys
import threading
+from kombu import Exchange
from kombu.utils import cached_property
from celery import current_app
@@ -369,7 +370,7 @@ def get_publisher(self, connection=None, exchange=None,
connect_timeout=None, exchange_type=None, **options):
"""Get a celery task message publisher.
- :rtype :class:`~celery.app.amqp.TaskPublisher`:
+ :rtype :class:`~celery.app.amqp.TaskProducer`:
.. warning::
@@ -381,23 +382,16 @@ def get_publisher(self, connection=None, exchange=None,
>>> # ... do something with publisher
>>> publisher.connection.close()
- or used as a context::
-
- >>> with self.get_publisher() as publisher:
- ... # ... do something with publisher
-
"""
exchange = self.exchange if exchange is None else exchange
if exchange_type is None:
exchange_type = self.exchange_type
connection = connection or self.establish_connection(connect_timeout)
- return self._get_app().amqp.TaskPublisher(connection=connection,
- exchange=exchange,
- exchange_type=exchange_type,
- routing_key=self.routing_key,
- **options)
+ return self._get_app().amqp.TaskProducer(connection,
+ exchange=exchange and Exchange(exchange, exchange_type),
+ routing_key=self.routing_key, **options)
- def get_consumer(self, connection=None, connect_timeout=None):
+ def get_consumer(self, connection=None, queues=None, **kwargs):
"""Get message consumer.
:rtype :class:`kombu.messaging.Consumer`:
@@ -414,10 +408,10 @@ def get_consumer(self, connection=None, connect_timeout=None):
>>> consumer.connection.close()
"""
- connection = connection or self.establish_connection(connect_timeout)
- return self._get_app().amqp.TaskConsumer(connection=connection,
- exchange=self.exchange,
- routing_key=self.routing_key)
+ app = self._get_app()
+ connection = connection or self.establish_connection()
+ return app.amqp.TaskConsumer(connection,
+ queues or app.amqp.queue_or_default(self.queue), **kwargs)
def delay(self, *args, **kwargs):
"""Star argument version of :meth:`apply_async`.
View
4 celery/beat.py
@@ -156,7 +156,7 @@ def __init__(self, schedule=None, max_interval=None,
self.max_interval = (max_interval
or app.conf.CELERYBEAT_MAX_LOOP_INTERVAL
or self.max_interval)
- self.Publisher = Publisher or app.amqp.TaskPublisher
+ self.Publisher = Publisher or app.amqp.TaskProducer
if not lazy:
self.setup_schedule()
@@ -306,7 +306,7 @@ def connection(self):
@cached_property
def publisher(self):
- return self.Publisher(connection=self._ensure_connected())
+ return self.Publisher(self._ensure_connected())
@property
def info(self):
View
2  celery/bin/camqadm.py
@@ -320,7 +320,7 @@ def respond(self, retval):
def _reconnect(self):
"""Re-establish connection to the AMQP server."""
self.conn = self.connect(self.conn)
- self.chan = self.conn.channel()
+ self.chan = self.conn.default_channel
self.needs_reconnect = False
@property
View
2  celery/bin/celery.py
@@ -183,7 +183,7 @@ def run(self, what=None, *_, **kw):
raise Error("unknown topic %r (choose one of: %s)" % (
what, available))
with self.app.broker_connection() as conn:
- self.app.amqp.get_task_consumer(conn).declare()
+ self.app.amqp.TaskConsumer(conn).declare()
topics[what](conn.manager)
list_ = command(list_, "list")
View
4 celery/contrib/migrate.py
@@ -67,10 +67,10 @@ def migrate_tasks(source, dest, timeout=1.0, app=None,
def update_state(body, message):
state.count += 1
- producer = app.amqp.TaskPublisher(dest)
+ producer = app.amqp.TaskProducer(dest)
if migrate is None:
migrate = partial(migrate_task, producer)
- consumer = app.amqp.get_task_consumer(source)
+ consumer = app.amqp.TaskConsumer(source)
consumer.register_callback(update_state)
if callback is not None:
callback = partial(callback, state)
View
34 celery/events/__init__.py
@@ -24,6 +24,7 @@
from kombu.common import eventloop
from kombu.entity import Exchange, Queue
from kombu.messaging import Consumer, Producer
+from kombu.utils import cached_property
from celery.app import app_or_default
from celery.utils import uuid
@@ -91,7 +92,7 @@ def __exit__(self, *exc_info):
self.close()
def enable(self):
- self.publisher = Producer(self.channel or self.connection.channel(),
+ self.publisher = Producer(self.channel or self.connection,
exchange=event_exchange,
serializer=self.serializer)
self.enabled = True
@@ -139,8 +140,6 @@ def close(self):
"""Close the event dispatcher."""
self.mutex.locked() and self.mutex.release()
if self.publisher is not None:
- if not self.channel: # close auto channel.
- self.publisher.channel.close()
self.publisher = None
@@ -225,25 +224,20 @@ class Events(object):
def __init__(self, app=None):
self.app = app
- def Receiver(self, connection, handlers=None, routing_key="#",
- node_id=None):
- return EventReceiver(connection,
- handlers=handlers,
- routing_key=routing_key,
- node_id=node_id,
- app=self.app)
-
- def Dispatcher(self, connection=None, hostname=None, enabled=True,
- channel=None, buffer_while_offline=True):
- return EventDispatcher(connection,
- hostname=hostname,
- enabled=enabled,
- channel=channel,
- app=self.app)
+ @cached_property
+ def Receiver(self):
+ return self.app.subclass_with_self(EventReceiver,
+ reverse="events.Receiver")
+ @cached_property
+ def Dispatcher(self):
+ return self.app.subclass_with_self(EventDispatcher,
+ reverse="events.Dispatcher")
+
+ @cached_property
def State(self):
- from .state import State as _State
- return _State()
+ return self.app.subclass_with_self("celery.events.state:State",
+ reverse="events.State")
@contextmanager
def default_dispatcher(self, hostname=None, enabled=True,
View
4 celery/local.py
@@ -45,6 +45,10 @@ def __name__(self):
return self._get_current_object().__name__
@property
+ def __module__(self):
+ return self._get_current_object().__module__
+
+ @property
def __doc__(self):
return self._get_current_object().__doc__
View
2  celery/task/__init__.py
@@ -35,7 +35,7 @@ def __call__(self, *args, **kwargs):
__path__=__path__,
__doc__=__doc__,
current=current,
- discard_all=Proxy(lambda: current_app.control.discard_all),
+ discard_all=Proxy(lambda: current_app.control.purge),
backend_cleanup=Proxy(
lambda: current_app.tasks["celery.backend_cleanup"]
),
View
10 celery/task/sets.py
@@ -39,12 +39,8 @@ def apply_async(self, connection=None, connect_timeout=None,
with app.default_connection(connection, connect_timeout) as conn:
setid = taskset_id or uuid()
- pub = publisher or self.Publisher(connection=conn)
- try:
- results = self._async_results(setid, pub)
- finally:
- if not publisher: # created by us.
- pub.close()
+ pub = publisher or self.Publisher(conn)
+ results = self._async_results(setid, pub)
result = app.TaskSetResult(setid, results)
parent = get_current_task()
@@ -84,7 +80,7 @@ def _set_tasks(self, tasks):
tasks = property(_get_tasks, _set_tasks)
def _get_Publisher(self):
- return self._Publisher or self.app.amqp.TaskPublisher
+ return self._Publisher or self.app.amqp.TaskProducer
def _set_Publisher(self, Publisher):
self._Publisher = Publisher
View
24 celery/tests/app/test_amqp.py
@@ -23,25 +23,6 @@ def test_declare(self):
publisher.exchange.name = None
publisher.declare()
- def test_exit_AttributeError(self):
- publisher = self.app.amqp.TaskPublisher(self.app.broker_connection())
- publisher.close = Mock()
- publisher.release = Mock()
- publisher.release.side_effect = AttributeError()
- publisher.__exit__()
- publisher.close.assert_called_with()
-
- def test_ensure_declare_queue(self, q="x1242112"):
- publisher = self.app.amqp.TaskPublisher(Mock())
- self.app.amqp.queues.add(q, q, q)
- publisher._declare_queue(q, retry=True)
- self.assertTrue(publisher.connection.ensure.call_count)
-
- def test_ensure_declare_exchange(self, e="x9248311"):
- publisher = self.app.amqp.TaskPublisher(Mock())
- publisher._declare_exchange(e, "direct", retry=True)
- self.assertTrue(publisher.connection.ensure.call_count)
-
def test_retry_policy(self):
pub = self.app.amqp.TaskPublisher(Mock())
pub.channel.connection.client.declared_entities = set()
@@ -94,7 +75,6 @@ def test_setup(self):
p1 = r1 = pool.acquire()
p2 = r2 = pool.acquire()
- delattr(r1.connection, "_producer_chan")
r1.release()
r2.release()
r1 = pool.acquire()
@@ -119,5 +99,5 @@ def test_queues_format(self):
def test_with_defaults(self):
self.assertEqual(
- self.app.amqp.queues.with_defaults(None, "celery", "direct"),
- {})
+ self.app.amqp.queues.with_defaults(None,
+ self.app.amqp.default_exchange), {})
View
8 celery/tests/app/test_app.py
@@ -6,6 +6,8 @@
from mock import Mock, patch
from pickle import loads, dumps
+from kombu import Exchange
+
from celery import Celery
from celery import app as _app
from celery.app import defaults
@@ -263,22 +265,20 @@ def send(self, type, **fields):
entities = conn.declared_entities
- pub = self.app.amqp.TaskPublisher(conn, exchange="foo_exchange")
- self.assertNotIn(pub._get_exchange("foo_exchange"), entities)
+ pub = self.app.amqp.TaskPublisher(conn,
+ exchange=Exchange("foo_exchange"))
dispatcher = Dispatcher()
self.assertTrue(pub.delay_task("footask", (), {},
exchange="moo_exchange",
routing_key="moo_exchange",
event_dispatcher=dispatcher))
- self.assertIn(pub._get_exchange("moo_exchange"), entities)
self.assertTrue(dispatcher.sent)
self.assertEqual(dispatcher.sent[0][0], "task-sent")
self.assertTrue(pub.delay_task("footask", (), {},
event_dispatcher=dispatcher,
exchange="bar_exchange",
routing_key="bar_exchange"))
- self.assertIn(pub._get_exchange("bar_exchange"), entities)
def test_error_mail_sender(self):
x = ErrorMail.subject % {"name": "task_name",
View
10 celery/tests/app/test_control.py
@@ -49,10 +49,14 @@ def _resets(*args, **kwargs):
class test_inspect(Case):
def setUp(self):
- app = app_or_default()
+ app = self.app = app_or_default()
self.c = Control(app=app)
+ self.prev, app.control = app.control, self.c
self.i = self.c.inspect()
+ def tearDown(self):
+ self.app.control = self.prev
+
def test_prepare_reply(self):
self.assertDictEqual(self.i._prepare([{"w1": {"ok": 1}},
{"w2": {"ok": 1}}]),
@@ -138,8 +142,8 @@ def setUp(self):
def tearDown(self):
del(self.app.control)
- def test_discard_all(self):
- self.control.discard_all()
+ def test_purge(self):
+ self.control.purge()
@with_mock_broadcast
def test_broadcast(self):
View
49 celery/tests/app/test_routes.py
@@ -3,6 +3,7 @@
from functools import wraps
+from kombu import Exchange
from kombu.utils.functional import maybe_promise
from celery import current_app
@@ -45,41 +46,49 @@ def __inner(*args, **kwargs):
a_queue = {"exchange": "fooexchange",
"exchange_type": "fanout",
- "binding_key": "xuzzy"}
+ "routing_key": "xuzzy"}
b_queue = {"exchange": "barexchange",
"exchange_type": "topic",
- "binding_key": "b.b.#"}
+ "routing_key": "b.b.#"}
d_queue = {"exchange": current_app.conf.CELERY_DEFAULT_EXCHANGE,
"exchange_type": current_app.conf.CELERY_DEFAULT_EXCHANGE_TYPE,
"routing_key": current_app.conf.CELERY_DEFAULT_ROUTING_KEY}
-class test_MapRoute(Case):
+class RouteCase(Case):
+
+ def assertAnswer(self, answer, expected):
+ self.assertEqual(answer["exchange"].name, expected["exchange"])
+ self.assertEqual(answer["routing_key"], expected["routing_key"])
+ if "queue" in expected:
+ self.assertEqual(answer["queue"], expected["queue"])
+
+
+class test_MapRoute(RouteCase):
@with_queues(foo=a_queue, bar=b_queue)
def test_route_for_task_expanded_route(self):
- expand = E(current_app.conf.CELERY_QUEUES)
+ expand = E(current_app.amqp.queues)
route = routes.MapRoute({mytask.name: {"queue": "foo"}})
- self.assertDictContainsSubset(a_queue,
- expand(route.route_for_task(mytask.name)))
+ self.assertAnswer(expand(route.route_for_task(mytask.name)), a_queue)
self.assertIsNone(route.route_for_task("celery.awesome"))
@with_queues(foo=a_queue, bar=b_queue)
def test_route_for_task(self):
- expand = E(current_app.conf.CELERY_QUEUES)
+ expand = E(current_app.amqp.queues)
route = routes.MapRoute({mytask.name: b_queue})
self.assertDictContainsSubset(b_queue,
expand(route.route_for_task(mytask.name)))
self.assertIsNone(route.route_for_task("celery.awesome"))
def test_expand_route_not_found(self):
- expand = E(current_app.conf.CELERY_QUEUES)
+ expand = E(current_app.amqp.queues)
route = routes.MapRoute({"a": {"queue": "x"}})
with self.assertRaises(QueueNotFound):
expand(route.route_for_task("a"))
-class test_lookup_route(Case):
+class test_lookup_route(RouteCase):
def test_init_queues(self):
router = routes.Router(queues=None)
@@ -89,15 +98,14 @@ def test_init_queues(self):
def test_lookup_takes_first(self):
R = routes.prepare(({mytask.name: {"queue": "bar"}},
{mytask.name: {"queue": "foo"}}))
- router = routes.Router(R, current_app.conf.CELERY_QUEUES)
- self.assertDictContainsSubset(b_queue,
- router.route({}, mytask.name,
- args=[1, 2], kwargs={}))
+ router = routes.Router(R, current_app.amqp.queues)
+ self.assertAnswer(router.route({}, mytask.name,
+ args=[1, 2], kwargs={}), b_queue)
@with_queues()
def test_expands_queue_in_options(self):
R = routes.prepare(())
- router = routes.Router(R, current_app.conf.CELERY_QUEUES,
+ router = routes.Router(R, current_app.amqp.queues,
create_missing=True)
# apply_async forwards all arguments, even exchange=None etc,
# so need to make sure it's merged correctly.
@@ -114,10 +122,10 @@ def test_expands_queue_in_options(self):
self.assertIn("queue", route)
@with_queues(foo=a_queue, bar=b_queue)
- def test_expand_destaintion_string(self):
- x = routes.Router({}, current_app.conf.CELERY_QUEUES)
+ def test_expand_destination_string(self):
+ x = routes.Router({}, current_app.amqp.queues)
dest = x.expand_destination("foo")
- self.assertEqual(dest["exchange"], "fooexchange")
+ self.assertEqual(dest["exchange"].name, "fooexchange")
@with_queues(foo=a_queue, bar=b_queue, **{
current_app.conf.CELERY_DEFAULT_QUEUE: d_queue})
@@ -125,10 +133,9 @@ def test_lookup_paths_traversed(self):
R = routes.prepare(({"celery.xaza": {"queue": "bar"}},
{mytask.name: {"queue": "foo"}}))
router = routes.Router(R, current_app.amqp.queues)
- self.assertDictContainsSubset(a_queue,
- router.route({}, mytask.name,
- args=[1, 2], kwargs={}))
- self.assertEqual(router.route({}, "celery.poza"),
+ self.assertAnswer(router.route({}, mytask.name,
+ args=[1, 2], kwargs={}), a_queue)
+ self.assertAnswer(router.route({}, "celery.poza"),
dict(d_queue, queue=current_app.conf.CELERY_DEFAULT_QUEUE))
View
12 celery/tests/bin/test_celeryd.py
@@ -11,6 +11,7 @@
from nose import SkipTest
from billiard import current_process
+from kombu import Exchange, Queue
from celery import Celery
from celery import platforms
@@ -166,9 +167,9 @@ def test_init_queues(self):
c = app.conf
p, app.amqp.queues = app.amqp.queues, app.amqp.Queues({
"celery": {"exchange": "celery",
- "binding_key": "celery"},
+ "routing_key": "celery"},
"video": {"exchange": "video",
- "binding_key": "video"}})
+ "routing_key": "video"}})
try:
worker = self.Worker(queues=["video"])
worker.init_queues()
@@ -184,11 +185,8 @@ def test_init_queues(self):
worker = self.Worker(queues=["image"])
worker.init_queues()
self.assertIn("image", app.amqp.queues.consume_from)
- self.assertDictContainsSubset({"exchange": "image",
- "routing_key": "image",
- "binding_key": "image",
- "exchange_type": "direct"},
- app.amqp.queues["image"])
+ self.assertEqual(Queue("image", Exchange("image"),
+ routing_key="image"), app.amqp.queues["image"])
finally:
app.amqp.queues = p
View
6 celery/tests/config.py
@@ -2,6 +2,8 @@
import os
+from kombu import Queue
+
BROKER_TRANSPORT = "memory"
#: Don't want log output when running suite.
@@ -15,7 +17,9 @@
CELERY_DEFAULT_QUEUE = "testcelery"
CELERY_DEFAULT_EXCHANGE = "testcelery"
CELERY_DEFAULT_ROUTING_KEY = "testcelery"
-CELERY_QUEUES = {"testcelery": {"binding_key": "testcelery"}}
+CELERY_QUEUES = (
+ Queue("testcelery", routing_key="testcelery"),
+)
CELERY_ENABLE_UTC = True
View
2  celery/tests/events/test_events.py
@@ -109,7 +109,6 @@ def test_enabled_disable(self):
dispatcher2.disable()
self.assertFalse(dispatcher.enabled)
self.assertIsNone(dispatcher.publisher)
- self.assertTrue(created_channel.closed)
self.assertFalse(dispatcher2.channel.closed,
"does not close manually provided channel")
@@ -119,6 +118,7 @@ def test_enabled_disable(self):
finally:
channel.close()
connection.close()
+ self.assertTrue(created_channel.closed)
class test_EventReceiver(AppCase):
View
1  celery/tests/events/test_snapshot.py
@@ -117,6 +117,7 @@ def Receiver(self, *args, **kwargs):
def setUp(self):
self.app = app_or_default()
self.prev, self.app.events = self.app.events, self.MockEvents()
+ self.app.events.app = self.app
def tearDown(self):
self.app.events = self.prev
View
6 celery/tests/tasks/test_sets.py
@@ -158,12 +158,6 @@ def xyz():
_tls.current_task = None
xyz.request.clear()
- # must close publisher
- ts._Publisher = Mock()
- ts._Publisher.return_value = Mock()
- ts.apply_async()
- self.assertTrue(ts._Publisher.return_value.close.called)
-
def test_apply(self):
applied = [0]
View
15 celery/tests/tasks/test_tasks.py
@@ -207,7 +207,7 @@ def test_AsyncResult(self):
def assertNextTaskDataEqual(self, consumer, presult, task_name,
test_eta=False, test_expires=False, **kwargs):
- next_task = consumer.fetch()
+ next_task = consumer.queues[0].get()
task_data = next_task.decode()
self.assertEqual(task_data["id"], presult.id)
self.assertEqual(task_data["task"], task_name)
@@ -251,8 +251,8 @@ def test_regular_task(self):
consumer = T1.get_consumer()
with self.assertRaises(NotImplementedError):
consumer.receive("foo", "foo")
- consumer.discard_all()
- self.assertIsNone(consumer.fetch())
+ consumer.purge()
+ self.assertIsNone(consumer.queues[0].get())
# Without arguments.
presult = T1.delay()
@@ -282,10 +282,10 @@ def test_regular_task(self):
name="George Costanza", test_eta=True, test_expires=True)
# Discarding all tasks.
- consumer.discard_all()
+ consumer.purge()
T1.apply_async()
- self.assertEqual(consumer.discard_all(), 1)
- self.assertIsNone(consumer.fetch())
+ self.assertEqual(consumer.purge(), 1)
+ self.assertIsNone(consumer.queues[0].get())
self.assertFalse(presult.successful())
T1.backend.mark_as_done(presult.id, result=None)
@@ -356,6 +356,7 @@ def test_get_publisher(self):
exchange="foo")
self.assertEqual(p.exchange.name, "foo")
p = increment_counter.get_publisher(connection, auto_declare=False,
+ exchange="foo",
exchange_type="fanout")
self.assertEqual(p.exchange.type, "fanout")
@@ -434,7 +435,7 @@ def test_counter_taskset(self):
taskset_id = taskset_res.taskset_id
consumer = increment_counter.get_consumer()
for subtask in subtasks:
- m = consumer.fetch().payload
+ m = consumer.queues[0].get().payload
self.assertDictContainsSubset({"taskset": taskset_id,
"task": increment_counter.name,
"id": subtask.id}, m)
View
8 celery/tests/utilities/test_info.py
@@ -21,15 +21,15 @@
QUEUES = {"queue1": {
"exchange": "exchange1",
"exchange_type": "type1",
- "binding_key": "bind1"},
+ "routing_key": "bind1"},
"queue2": {
"exchange": "exchange2",
"exchange_type": "type2",
- "binding_key": "bind2"}}
+ "routing_key": "bind2"}}
-QUEUE_FORMAT1 = """. queue1: exchange:exchange1 (type1) binding:bind1"""
-QUEUE_FORMAT2 = """. queue2: exchange:exchange2 (type2) binding:bind2"""
+QUEUE_FORMAT1 = """. queue1: exchange:exchange1(type1) binding:bind1"""
+QUEUE_FORMAT2 = """. queue2: exchange:exchange2(type2) binding:bind2"""
class test_Info(Case):
View
2  celery/worker/consumer.py
@@ -591,7 +591,7 @@ def reset_connection(self):
# Re-establish the broker connection and setup the task consumer.
self.connection = self._open_connection()
debug("Connection established.")
- self.task_consumer = self.app.amqp.get_task_consumer(self.connection,
+ self.task_consumer = self.app.amqp.TaskConsumer(self.connection,
on_decode_error=self.on_decode_error)
# QoS: Reset prefetch window.
self.qos = QoS(self.task_consumer, self.initial_prefetch_count)
View
79 docs/userguide/routing.rst
@@ -63,9 +63,12 @@ configuration:
.. code-block:: python
- CELERY_QUEUES = {"default": {"exchange": "default",
- "binding_key": "default"}}
+ from kombu import Exchange, Queue
+
CELERY_DEFAULT_QUEUE = "default"
+ CELERY_QUEUES = (
+ Queue("default", Exchange("default"), routing_key="default"),
+ )
.. _routing-autoqueue-details:
@@ -99,21 +102,20 @@ configuration:
.. code-block:: python
+ from kombu import Queue
+
CELERY_DEFAULT_QUEUE = "default"
- CELERY_QUEUES = {
- "default": {
- "binding_key": "task.#",
- },
- "feed_tasks": {
- "binding_key": "feed.#",
- },
- }
+ CELERY_QUEUES = (
+ Queue("default", routing_key="task.#"),
+ Queue("feed_tasks", routing_key="feed.#"),
+ )
CELERY_DEFAULT_EXCHANGE = "tasks"
CELERY_DEFAULT_EXCHANGE_TYPE = "topic"
CELERY_DEFAULT_ROUTING_KEY = "task.default"
-:setting:`CELERY_QUEUES` is a map of queue names and their
-exchange/type/binding_key, if you don't set exchange or exchange type, they
+:setting:`CELERY_QUEUES` is a list of :class:`~kombu.entitity.Queue`
+instances.
+If you don't set the exchange or exchange type values for a key, these
will be taken from the :setting:`CELERY_DEFAULT_EXCHANGE` and
:setting:`CELERY_DEFAULT_EXCHANGE_TYPE` settings.
@@ -159,19 +161,14 @@ just specify a custom exchange and exchange type:
.. code-block:: python
- CELERY_QUEUES = {
- "feed_tasks": {
- "binding_key": "feed.#",
- },
- "regular_tasks": {
- "binding_key": "task.#",
- },
- "image_tasks": {
- "binding_key": "image.compress",
- "exchange": "mediatasks",
- "exchange_type": "direct",
- },
- }
+ from kombu import Exchange, Queue
+
+ CELERY_QUEUES = (
+ Queue("feed_tasks", routing_key="feed.#"),
+ Queue("regular_tasks", routing_key="task.#"),
+ Queue("image_tasks", exchange=Exchange("mediatasks", type="direct"),
+ routing_key="image.compress"),
+ )
If you're confused about these terms, you should read up on AMQP.
@@ -253,29 +250,17 @@ One for video, one for images and one default queue for everything else:
.. code-block:: python
- CELERY_QUEUES = {
- "default": {
- "exchange": "default",
- "binding_key": "default"},
- "videos": {
- "exchange": "media",
- "binding_key": "media.video",
- },
- "images": {
- "exchange": "media",
- "binding_key": "media.image",
- }
- }
+ from kombu import Exchange, Queue
+
+ CELERY_QUEUES = (
+ Queue("default", Exchange("default"), routing_key="default"),
+ Queue("videos", Exchange("media"), routing_key="media.video"),
+ Queue("images", Exchange("media"), routing_key="media.image"),
+ )
CELERY_DEFAULT_QUEUE = "default"
CELERY_DEFAULT_EXCHANGE_TYPE = "direct"
CELERY_DEFAULT_ROUTING_KEY = "default"
-.. note::
-
- In Celery the `routing_key` is the key used to send the message,
- while `binding_key` is the key the queue is bound with. In the AMQP API
- they are both referred to as the routing key.
-
.. _amqp-exchange-types:
Exchange types
@@ -461,16 +446,16 @@ One for video, one for images and one default queue for everything else:
CELERY_QUEUES = {
"default": {
"exchange": "default",
- "binding_key": "default"},
+ "routing_key": "default"},
"videos": {
"exchange": "media",
"exchange_type": "topic",
- "binding_key": "media.video",
+ "routing_key": "media.video",
},
"images": {
"exchange": "media",
"exchange_type": "topic",
- "binding_key": "media.image",
+ "routing_key": "media.image",
}
}
CELERY_DEFAULT_QUEUE = "default"
View
2  examples/eventlet/bulk_task_producer.py
@@ -49,7 +49,7 @@ def _run(self):
def _producer(self):
connection = current_app.broker_connection()
- publisher = current_app.amqp.TaskPublisher(connection)
+ publisher = current_app.amqp.TaskProducer(connection)
inqueue = self.inqueue
while 1:
View
2  funtests/suite/config.py
@@ -16,7 +16,7 @@
CELERY_DEFAULT_QUEUE = "testcelery"
CELERY_DEFAULT_EXCHANGE = "testcelery"
CELERY_DEFAULT_ROUTING_KEY = "testcelery"
-CELERY_QUEUES = {"testcelery": {"binding_key": "testcelery"}}
+CELERY_QUEUES = {"testcelery": {"routing_key": "testcelery"}}
CELERYD_LOG_COLOR = False
Please sign in to comment.
Something went wrong with that request. Please try again.