What is the goal with Celery/Kombu "native delayed messages"? Why is there no general interface? #10041
-
|
Basically title. We have been using some Redis-based logic (we use Redis as a broker as well) on top of Celery to handle delayed tasks, since the normal way of pulling the messages into the workers instantly and waiting for the eta was causing some problems. Now I thought that I should somehow upstream this and looked a bit into the possibly existing related code for that, and found the logic regarding delayed delivery for rabbitmq quorum queues in https://github.com/celery/celery/blob/main/celery/worker/consumer/consumer.py and https://github.com/celery/kombu/blob/main/kombu/transport/native_delayed_delivery.py. To me this looks not very extendable and I was hoping of a more general interface to also implement "native delayed delivery" for other transports. I'm guessing this was mostly a workaround to fix the issue of eta+quorum, but I might be missing something. Would it make sense to have a general interface on the Kombu transports for "native delayed delivery"? |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
|
I guess it might take a while to get some feedback here (and on related discussions). I have implemented my use case in a package https://pypi.org/project/celery-redis-plus/, and will aim to submit some changes to Celery/Kombu to have a general interface so transports can implement "native delayed delivery" properly. As seen in my package, aside from each transport having to implement the native delayed delivery itself, the changes to Celery and Kombu are minor (one more bootstep, then one or more message properties/headers). |
Beta Was this translation helpful? Give feedback.
-
|
I didnt plan or thought like that before.... so there is nothing... |
Beta Was this translation helpful? Give feedback.
I guess it might take a while to get some feedback here (and on related discussions).
I have implemented my use case in a package https://pypi.org/project/celery-redis-plus/, and will aim to submit some changes to Celery/Kombu to have a general interface so transports can implement "native delayed delivery" properly. As seen in my package, aside from each transport having to implement the native delayed delivery itself, the changes to Celery and Kombu are minor (one more bootstep, then one or more message properties/headers).