Permalink
Browse files

New execution options: queue and terms. Terms being used to replace w…

…ords in

binding keys to create a routing key, e.g:

    CELERY_QUEUES = {"videos": {
                        "exchange": "media",
                        "binding_key": "video.#"}}

    apply_async(queue="videos", terms=("dvd", ))
    -> "video.dvd"

    binding_key = "stock.#.status.*"
    apply_async(terms=("us", "nasdaq"))
    -> "stock.us.status.nasdaq"
  • Loading branch information...
Ask Solem
Ask Solem committed Mar 7, 2010
1 parent 0f383b4 commit 7fa0c81bbd817752c1329ffb942386ced0398df7
Showing with 46 additions and 4 deletions.
  1. +1 −1 celery/execute/__init__.py
  2. +23 −2 celery/messaging.py
  3. +22 −1 celery/task/base.py
@@ -6,7 +6,7 @@
from celery.messaging import with_connection
from celery.messaging import TaskPublisher
-extract_exec_options = mattrgetter("routing_key", "exchange",
+extract_exec_options = mattrgetter("queue", "routing_key", "exchange",
"immediate", "mandatory",
"priority", "serializer")
View
@@ -3,18 +3,22 @@
Sending and Receiving Messages
"""
+import re
import socket
from datetime import datetime, timedelta
from carrot.connection import DjangoBrokerConnection
from carrot.messaging import Publisher, Consumer, ConsumerSet
-from billiard.utils.functional import wraps
+from billiard.utils.functional import wraps, curry
from celery import conf
from celery import signals
from celery.utils import gen_unique_id, mitemgetter, noop
+TERMS_TO_FORMAT = curry(re.compile(r"\.[\#\*]").sub, ".%s")
+
+
MSG_OPTIONS = ("mandatory", "priority",
"immediate", "routing_key",
"serializer")
@@ -32,6 +36,15 @@ class TaskPublisher(Publisher):
exchange_type = default_queue["exchange_type"]
routing_key = conf.DEFAULT_ROUTING_KEY
serializer = conf.TASK_SERIALIZER
+ queues = conf.routing_table
+
+ def get_queue_config(self, name):
+ config = dict(self.queues[name])
+ config.pop("binding_key", None)
+ return config
+
+ def use_terms(self, rkey, terms):
+ return TERMS_TO_FORMAT(rkey) % terms
def __init__(self, *args, **kwargs):
super(TaskPublisher, self).__init__(*args, **kwargs)
@@ -44,11 +57,15 @@ def __init__(self, *args, **kwargs):
_queues_declared = True
def delay_task(self, task_name, task_args=None, task_kwargs=None,
- countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
+ countdown=None, eta=None, task_id=None, taskset_id=None,
+ queue=None, terms=None, **kwargs):
"""Delay task for execution by the celery nodes."""
task_id = task_id or gen_unique_id()
+ if queue:
+ kwargs = dict(kwargs, **self.get_queue_config(queue))
+
if countdown: # Convert countdown to ETA.
eta = datetime.now() + timedelta(seconds=countdown)
@@ -64,6 +81,10 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
if taskset_id:
message_data["taskset"] = taskset_id
+ if terms is not None:
+ kwargs["routing_key"] = self.as_terms(kwargs.get("routing_key"),
+ terms)
+
self.send(message_data, **extract_msg_options(kwargs))
signals.task_sent.send(sender=task_name, **message_data)
View
@@ -76,6 +76,26 @@ class Task(object):
however if you want a periodic task, you should subclass
:class:`PeriodicTask` instead.
+ .. attribute:: queue
+
+ Name of queue in ``CELERY_QUEUES`` used for default routing settings.
+ If ``CELERY_QUEUES`` is defined as the following::
+
+ .. code-block:: python
+
+ CELERY_QUEUES = {"video": {
+ "exchange": "media",
+ "exchange_type": "topic",
+ "binding_key": "media.video.#",
+ }}
+
+ Setting :attr:`queue` to ``"video"``, will use those settings,
+ except for ``binding_key``, as defaults when applying tasks, i.e.:
+
+ >>> Task.apply_async(args, kwargs, exchange="media",
+ exchange_type="topic")
+
+
.. attribute:: routing_key
Override the global default ``routing_key`` for this task.
@@ -152,20 +172,21 @@ class Task(object):
abstract = True
autoregister = True
type = "regular"
+ queue = None
exchange = None
routing_key = None
immediate = False
mandatory = False
priority = None
ignore_result = conf.IGNORE_RESULT
+ exchange_type = conf.DEFAULT_EXCHANGE_TYPE
disable_error_emails = False
max_retries = 3
default_retry_delay = 3 * 60
serializer = conf.TASK_SERIALIZER
rate_limit = conf.DEFAULT_RATE_LIMIT
rate_limit_queue_type = Queue
backend = default_backend
- exchange_type = conf.DEFAULT_EXCHANGE_TYPE
MaxRetriesExceededError = MaxRetriesExceededError

0 comments on commit 7fa0c81

Please sign in to comment.