Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: specify concurrency level per queue #1599

Open
Tinche opened this issue Oct 24, 2013 · 19 comments
Open

Feature: specify concurrency level per queue #1599

Tinche opened this issue Oct 24, 2013 · 19 comments

Comments

@Tinche
Copy link

Tinche commented Oct 24, 2013

Right now, if I understand this correctly, a single celery worker process acts as a single pool of processes/threads/green threads listening on 1+ queues.

How about allowing per queue (or per queue group) concurrency levels?

I realize this might be a little awkward, since the celery worker takes a lot of options which someone might want to specify per queue. Also I realize this is possible today by running multiple celery workers, but it feels awkward to have 6 services running an almost identical command.

My particular use case is running 1 worker process per queue, and using consistent hashing in either the broker or the producer to ensure tasks in a group are performed serially based on an arbitrary property, but still parallel if in different buckets.

So, for example, when starting the celery worker, it would be possible to somehow specify the concurrency level per queue (using the Django integration here):

./manage.py celery worker -Q q1:1,q2:1,q3:1,q4:1,q5:1,q6:1

(meaning: run a process pool of size 1 on each of the queues q1-q6).
Maybe allow autoscaling to be specified as well:

./manage.py celery worker -Q video:1-3,audio:1,image:1

(maybe use a different delimiter than the colon)

What do you guys think?

@ask
Copy link
Contributor

ask commented May 29, 2014

I think it's a neat idea, but 'concurrency level per queue' is tricky, it would make more sense as 'send messages from video to process at index 1, 2 or 3' and 'messages from audio to process at index 1'.

This is possible to do since 3.1 when we use one IPC queue per process (using amqp or redis), and is open for anyone to implement. Just need a pull request :)

@ask ask closed this as completed May 29, 2014
@jsh2134
Copy link

jsh2134 commented Apr 13, 2015

+1 vote for seeing this implemented. In for any followups.

@cecemel
Copy link

cecemel commented Aug 3, 2016

+1

3 similar comments
@aleksej-paschenko
Copy link

+1

@TiagoAssuncao
Copy link

+1

@dakinshin
Copy link

+1

@Timopheym
Copy link

@ask Is there some movements?

@auvipy
Copy link
Member

auvipy commented Apr 25, 2018

stop bugging anyone in github please. Ask do not maintain celery actively these days. if any of you have proper design for the implementation please open an issue on cep

@Timopheym
Copy link

Sorry, i just want to know if it possible to do somehow in celery.. seems like very commonly needs feature.

@auvipy
Copy link
Member

auvipy commented Apr 25, 2018

ok lets get the ball rolling by investigating what need to be done for the feature and open an issue on celery/cep by describing the possible implementation detail. thanks

@clokep
Copy link
Contributor

clokep commented Apr 25, 2018

We do this by running separate workers for each queue and just pass a different concurrency to each of them. We handle the different configurations using our configuration management solution (salt). It might not be the most elegant solution, but as the original reported noted it works well. There's definitely a little bit of overhead (additional processes, additional connections per worker), but it seems to work fairly well. 👍

@auvipy
Copy link
Member

auvipy commented Apr 25, 2018

thanks for sharing your work around

@alberdonpi
Copy link

+1

@deterb
Copy link

deterb commented Jun 15, 2018

I do this by using separate workers as well. I have them set up running under systemd template services using environment files for configuring each worker (parent file for common options + worker specific one using the template name). Agreed on the overhead, though I've also needed to scale out the number of workers because a single worker coordinator (note I'm using prefork) can't keep up.

@thedrow
Copy link
Member

thedrow commented Jan 1, 2019

I'm thinking about this right now and it will require us to design a more sophisticated scheduler.
There are of course algorithms which do this. An example is the Linux kernel which can schedule a process on a limited number of CPUs.
Right now the only scheduling we do is put items in a queue so the scheduling order is essentially FIFO.
To complicate things further, if we'd wish to have a global concurrency limit (that is, instead of a limit per worker) per queue we'd require coordination.

@monstermac77
Copy link

I think I have another workaround suggestion.

I've found myself really needing this feature as well. Specifically, I have about 25 queues (and growing), each of which have different concurrency requirements. This means, without this feature, I would need to spawn 25 celery processes on each server in our cluster. I thought this would be fine, but the CPU overhead for each celery worker is killer: the load average immediately shoots up to 20, even with no tasks executing.

However, I think chaining may come to the rescue here, although I think this solution will only be applicable if you know all the tasks you need to complete ahead of time in a given queue. So, say you currently have queues A, B, and C, with concurrencies 1, 2, and 3 respectively, and right now you have 3 workers, each handling a queue. Well, instead, you can stick all of your tasks for queue A in a single chain, tasks in queue B in two chains, and C in three chains, and put them all in queue X, and then spawn a single celery worker which handles a single queue, X, with concurrency 6 (1+2+3). The chaining will ensure your concurrency requirements, since tasks in a chain execute sequentially, and you didn't have to spawn more than one worker (no overhead).

@auvipy auvipy added this to the v5.0.0 milestone Jun 29, 2019
@wimoMisterX
Copy link

+1

1 similar comment
@guillaumerenault
Copy link

+1

@AeroLad
Copy link

AeroLad commented Aug 10, 2020

+1

I have a scenario requiring dynamic queues with differing concurrency per queue. As others have mentioned before, although you can run separate workers per queue (with appropriate concurrency), this is not very efficient, nor does it work for dynamic queues which are assigned during runtime.

@thedrow thedrow modified the milestones: 5.0.0, Future Sep 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests