Permalink
Browse files

Allow configuring the channel of a queue.e.g.CELERY_QUEUES = {foo: {e…

…xchange: foo, routing_key: foo, channel: 2, prefetch_count: 10}, default: {exchange: default, routing_key: default,}}Means messages to foo will be received independently of messages to default.
  • Loading branch information...
1 parent 1875eda commit 594a2bc8b2374434506b5dca789b92fc4ec7888a Ask Solem committed Oct 11, 2010
Showing with 9 additions and 0 deletions.
  1. +9 −0 celery/messaging.py
View
@@ -15,6 +15,7 @@
from celery import conf
from celery import signals
from celery.utils import gen_unique_id, mitemgetter, noop
+from celery.utils.compat import defaultdict
from celery.utils.functional import wraps
@@ -95,6 +96,7 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
type=exchange_type,
durable=self.durable,
auto_delete=self.auto_delete)
+
self.send(message_data, exchange=exchange,
**extract_msg_options(kwargs))
@@ -302,10 +304,17 @@ def get_consumer_set(connection, queues=None, **options):
"""
queues = queues or conf.get_queues()
cset = ConsumerSet(connection)
+ channels = defaultdict(lambda: connection.create_backend().channel)
+ channels[1] = cset.backend.channel
for queue_name, queue_options in queues.items():
queue_options = dict(queue_options)
+ channel = queue_options.pop("channel", 1)
+ prefetch_count = queue_options.pop("prefetch_count", 0)
queue_options["routing_key"] = queue_options.pop("binding_key", None)
consumer = Consumer(connection, queue=queue_name,
backend=cset.backend, **queue_options)
cset.consumers.append(consumer)
+ channel = consumer.channel = channels[int(channel)]
+ if prefetch_count:
+ channel.basic_qos(prefetch_count=prefetch_count)
return cset

0 comments on commit 594a2bc

Please sign in to comment.