Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Configuring RabbitMQ Quorum queue for Celery #302

Closed
ljyca opened this issue Jan 14, 2020 · 10 comments
Closed

Configuring RabbitMQ Quorum queue for Celery #302

ljyca opened this issue Jan 14, 2020 · 10 comments
Assignees

Comments

@ljyca
Copy link

ljyca commented Jan 14, 2020

I added the following line to my test application to try to configure a quorum queue.

app.conf.task_queues = [
Queue('tasks', Exchange('tasks'), routing_key='tasks',
queue_arguments={'x-queue-type' :'quorum'}),
]


And here are the errors that I have received. As suggested by Ing. Josue Balandrano Coronel in the user group, I am creating a new issue in Github. Thanks!

[2020-01-14 09:47:25,752: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 318, in start
blueprint.start(self)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/celery/worker/loops.py", line 52, in asynloop
consumer.consume()
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/kombu/messaging.py", line 477, in consume
self._basic_consume(T, no_ack=no_ack, nowait=False)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/kombu/messaging.py", line 598, in _basic_consume
no_ack=no_ack, nowait=nowait)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/kombu/entity.py", line 741, in consume
arguments=self.consumer_arguments)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/channel.py", line 1567, in basic_consume
returns_tuple=True
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/abstract_channel.py", line 68, in send_method
return self.wait(wait, returns_tuple=returns_tuple)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/abstract_channel.py", line 88, in wait
self.connection.drain_events(timeout=timeout)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/connection.py", line 505, in drain_events
while not self.blocking_read(timeout):
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/connection.py", line 511, in blocking_read
return self.on_inbound_frame(frame)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/method_framing.py", line 55, in on_frame
callback(channel, method_sig, buf, None)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/connection.py", line 518, in on_inbound_method
method_sig, payload, content,
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/abstract_channel.py", line 145, in dispatch_method
listener(*args)
File "/usr/local/share/venv/python37/lib/python3.7/site-packages/amqp/connection.py", line 648, in _on_close
(class_id, method_id), ConnectionError)
amqp.exceptions.AMQPNotImplementedError: Basic.consume: (540) NOT_IMPLEMENTED - queue 'tasks' in vhost 'tasks' does not support global qos

@karolpawlowski
Copy link

After updating from 2.3.2 to 2.5.2 (and celery from 4.2.1 to 4.4.0) I have something like this:

amqp.exceptions.NotFound: Basic.publish: (404) NOT_FOUND - no exchange 'websocket_broadcast' in vhost 'am_vhost'

Maybe this is connected?

@ashexpertVersion2
Copy link

i have the same problem

@ashexpertVersion2
Copy link

quorum queues don't support global qos prefetch:
https://www.rabbitmq.com/quorum-queues.html#global-qos
but i could't find a way to disable this feature...

@auvipy
Copy link
Member

auvipy commented Apr 29, 2020

@matusvalo

@matusvalo
Copy link
Member

I will look at this issue soon.

@matusvalo
Copy link
Member

I confirm for quorum queues one needs to use per consumer qos prefetch. qos prefetch can be set using basic_qos() [1] by setting a_global to False:

def basic_qos(self, prefetch_size, prefetch_count, a_global,

Unfortunatelly, I am not having newest RabbitMQ supporting quorum queues but I suppose that this should be executed before consuming from queues:

  • py-amqp:
c = amqp.Connection('broker.example.com')
c.connect()
chan = c.channel()
chan.basic_qos(PREFETCH_SIZE, PREFETCH_COUNT, False)
  • kombu:
c = Connection('broker.example.com')
chan = c.default_channel
chan.basic_qos(PREFETCH_SIZE, PREFETCH_COUNT, False)

btw. py-amqp docs also missing semantics difference of global parameter of basic_qos - compare table in [1] with docstring:

py-amqp/amqp/channel.py

Lines 1843 to 1850 in 8d0b37c

a_global: boolean
apply to entire connection
By default the QoS settings apply to the current
channel only. If this field is set, they are applied
to the entire connection.
"""

[1] https://www.rabbitmq.com/consumer-prefetch.html

@matusvalo
Copy link
Member

It seems that Celery has hardcoded qos to channel level:

https://github.com/celery/celery/blob/d5cddb09c7f703be38513116827e3ca80b0be899/celery/worker/consumer/tasks.py#L30-L38

I would create an Issue on Celery to enable user to configure global parameter of qos prefetch. At least you can try to get in your code Connection object and to call default_channel.basic_qos(..., False) before starting draining messages.

But this is question for someone who knows better Celery than me.

@ashexpertVersion2
Copy link

can you describe a more specific way to enforcing celery to use this predefined Connection?

@matusvalo
Copy link
Member

@ashexpertVersion2 as I said I am not expert in Celery. Try to ask/raise question in Celery issues.

@matusvalo
Copy link
Member

@auvipy you can close this issue.

@auvipy auvipy closed this as completed May 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants