New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RabbitMQ priority support #2635
Comments
The AMQP priority property is already sent with the message |
So this is implemented right? |
I manually added a priority queue to RabbitMQ. When trying to push tasks to it, Celery fails with
Seems like it fails to (re)declare the queue in Rabbit. |
Could it be that RabbitMQ doesn't conform to the standard in that case? |
Documentation is clearly very sparse and outdated in this aspect, but this works correctly for me when the queue is not created previously: CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default', queue_arguments={'x-max-priority': 8}),
) If you want to apply a more strict priority to items probably prefetching should also be disabled: CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1 Use add_task.apply_async(args=(arg1, arg2, arg3), priority=6) |
@ernestoalejo It is not actually true, due to this issue RabbitMQ accepts and sets priority attribute but does not respect it during message processing. |
I received the tasks in the correct priority. I send ~100 of a lower priority and then two of a higher priority and they get processed in the desired order, also if I sent that tasks several times in a row (with different parameters). Retries were processed after the other ones because I subtract 1 to the priority. It is definitely working for me. I can see the priority on the management of RabbitMQ configured without any problem. Maybe there's something in your test that didn't reproduced the correct settings? Did you tried with a queue that was already created? For reference this is my full if DEBUG:
TASKQUEUE_BACKOFF = 10
else:
CELERYD_HIJACK_ROOT_LOGGER = False
CELERY_DEFAULT_RATE_LIMIT = '60/m'
TASKQUEUE_BACKOFF = 30
CELERYD_TASK_TIME_LIMIT = 60
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACKS_LATE = True
CELERYD_PREFETCH_MULTIPLIER = 1
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default', queue_arguments={'x-max-priority': 8}),
)
CELERYD_MAX_TASKS_PER_CHILD = 500
CELERY_ENABLE_REMOTE_CONTROL = False
app = Celery('tasks', broker='amqp://guest@XXX-taskqueue//')
app.config_from_object('XXX.settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS) The file setting the task: foo_task.apply_async(args=(arg1, arg2), priority=7) And my tasks: @shared_task(max_retries=10)
def foo_task(arg1, arg2):
try:
_safe_foo_task(arg1, arg2)
except Exception, err:
logger.error('Task Exception: %s', str(err))
traceback.print_exc()
foo_task.retry(exc=err, countdown=settings.TASKQUEUE_BACKOFF, priority=6) |
We've tested two configurations: 1 Two different priority queues
With
2 Different priority tasks inside one queue
And consumer
In first case it takes tasks nearly one by one from both queues. @ernestoalejo Don't you sending different priority task into the different queues? if so you will get 2th "high-priority" task sooner than 100th "low-priority". Did you checked link, I provided? It's quite strange that you managed to get working feature, that developer doesn't. |
The link is from 2014, maybe priority queues support was added later; it's documented now anyway: https://www.rabbitmq.com/priority.html. I'm using the standard "rabbitmq:3.6-management" image in Docker. I considered priority between queues undefined behaviour so I went directly for a single queue (your second example). Note however that you have to declare the maximum priority in the correct queue. In your example: CELERY_QUEUES = (
Queue('important', Exchange('important'), routing_key='important', queue_arguments={'x-max-priority': 100}),
Queue('default', Exchange('default'), routing_key='default', queue_arguments={'x-max-priority': 1}),
) has to be changed to: CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default', queue_arguments={'x-max-priority': 100}),
) if you want to send tasks with a priority of 100. The command will be Also note the warning in the documentation about hungry consumers. If the queue doesn't have a chance to see the full list it can't order the messages. If it's a test task without work add a sleep of one second or something to slow it artificially to see the effect of the priority queue. |
You are my personal hero, @ernestoalejo - you are right. Looks like per task |
Should we change this into a documentation issue then? |
Yes, we should make sure the documentation does not say this is unsupported anymore. |
Hey! I have just started using Celery and I'm trying to implement a sample application to understand prioritized tasking (with one queue) so that it will help a lot of other people as well 😄 I have currently added this code block provided here in the celery docs for prioritized tasking. It is a very simple config for celery, and I'm thinking I might be missing something very trivial as the tasks are just executed in the received order. Kindly have a quick look and let me know what the problem might be!
|
@vijeth-aradhya It's better to open a Stack Overflow question to avoid hitting this bug now that is closed. Seeing your code maybe the queue has no chance to prioritize the messages. Try with these two settings (adapt to your project as needed):
Prefetch multiplier is 4 by default, maybe your tasks are downloaded before the queue can sort them. |
@ernestoalejo Special thanks for clearing this out so quickly! Yes, I will put it up in Stack Overflow henceforth. I commented here as the thread was very related to my query. My apologies for commenting on a closed issue! 😃 |
@ernestoalejo Many thanks for the answer! It is really helpful! |
As of RabbitMQ 3.5, RabbitMQ has priority support: https://www.rabbitmq.com/priority.html
As such, can it be added to Celery? Doesn't it just need to have the priority field (which is already in Task) passed to RabbitMQ?
The text was updated successfully, but these errors were encountered: