Skip to content

Commit

Permalink
add_task_queue must account for Queues.__missing__. Closes #1079
Browse files Browse the repository at this point in the history
  • Loading branch information
ask committed Nov 21, 2012
1 parent fee2cad commit 0bdbcd4
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions celery/worker/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -820,16 +820,20 @@ def maybe_shutdown(self):
def add_task_queue(self, queue, exchange=None, exchange_type=None,
routing_key=None, **options):
cset = self.task_consumer
try:
q = self.app.amqp.queues[queue]
except KeyError:
queues = self.app.amqp.queues
# Must use in' here, as __missing__ will automatically
# create queues when CELERY_CREATE_MISSING_QUEUES is enabled.
# (Issue #1079)
if queue in queues:
q = queues[queue]
else:
exchange = queue if exchange is None else exchange
exchange_type = 'direct' if exchange_type is None \
else exchange_type
q = self.app.amqp.queues.select_add(queue,
exchange=exchange,
exchange_type=exchange_type,
routing_key=routing_key, **options)
exchange_type = ('direct' if exchange_type is None
else exchange_type)
q = queues.select_add(queue,
exchange=exchange,
exchange_type=exchange_type,
routing_key=routing_key, **options)
if not cset.consuming_from(queue):
cset.add_queue(q)
cset.consume()
Expand Down

0 comments on commit 0bdbcd4

Please sign in to comment.