Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "How does it work?" section with a little explanation #69

Open
allburov opened this issue Jul 12, 2022 · 11 comments
Open

Add "How does it work?" section with a little explanation #69

allburov opened this issue Jul 12, 2022 · 11 comments

Comments

@allburov
Copy link

allburov commented Jul 12, 2022

Hi! I was looking at plugins for celery in the github and found celery-batches but didn't quite well get how it actually work.
It'd be great to add How does it work? section in README.md, as for https://github.com/steinitzu/celery-singleton#how-does-it-work

For instance, I have an usual HTTP API setup:

  1. HTTP API service - gunicorn that runs django with celery's app inside (not worker, just publisher). gunicorn runs 5 processes of django http app, and django the only code that calls celery's tasks like this: last_seen_user.delay(user=user_id, when=utcnow()).
  2. celery's workers - are run with celery --concurrency=4

How does celery-batches work in this setup?

The documentation says:

Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached.

Does it mean that Task requests are buffered inside one of 5 processes of django app that processed the request?
I mean if the user send two HTTP requests and gunicorn send them to different processes - it means that we have two local (in memory) queues for this BatchTask, even for the same user and after flush_interval or flush_every (that we count only for THIS process, not for all of them) - we actually send Task to the celery broker, for our case - we send two tasks even if it was the same user.

@clokep
Copy link
Owner

clokep commented Jul 12, 2022

Task requests are buffered in memory (on a worker) until either the flush count or flush interval is reached.

Does it mean that Task requests are buffered inside one of 5 processes of django app that processed the request?
I mean if the user send two HTTP requests and gunicorn send them to different processes - it means that we'll have two local (in memory) queues for this BatchTask, even for the same user?

No, the tasks are queued in the celery workers, it doesn't matter if they're sent from the same or different clients.

In your example, the tasks are sent from the gunicorn process(es) into the broker, as normal with Celery. The celery worker consuming from that queue then queues them in memory until the flush interval or flush every is met and executes the task.

@allburov
Copy link
Author

allburov commented Jul 12, 2022

@clokep The Flash!
Thank you for the quick response :)

Oh, I see - the plugin sends Task immediately to the broker, then a worker get a task from the queue but doesn't execute the "business" code until flush_interval or flush_every?

How it works with --concurrency=4 then, didn't get it :( If workers are independent processes then celery may run two Tasks for the above case?

@clokep
Copy link
Owner

clokep commented Jul 12, 2022

Oh, I see - the plugin sends Task immediately to the broker, then a worker get a task from the queue but doesn't execute the "business" code until flush_interval or flush_every?

The statement is correct, but celery-batches doesn't do anything special for sending the task, this is all standard celery behavior.

How it works with --concurrency=4 then, didn't get it :( If workers are independent, then celery may run two Tasks for the above case?

The following assumes you're using the standard "prefork" configuration of a celery worker (although the explanation doesn't change too much if you're using gevent or eventlet or threads):

Celery workers have a "main" process which fetches tasks from the broker, by default it gets whatever the "prefetch multiplier x concurrency" is (so if your prefetch multiplier is 100 and your concurrency is 4, it attempts to pull up to 400 items from the broker's queue). Once it has those in memory it deserializes them and runs whatever their Strategy is -- for a normal celery task this essentially just means feeding them to each worker in the processing pool (I think this is done on a task-by-task basis, but I don't remember off the top of my head).

What celery-batches changes is that the "main" celery worker process will queue in memory until flush interval or flush every is reached and send that entire set of tasks to the worker in the processing pool together.

(None of this is really explained well in the celery documentation, as far as I know... celery-batches should explain the difference it is making better, but it is hard without something else to refer to which discusses the "normal" way of doing things.)

@allburov
Copy link
Author

Great explanation, didn't know these details about celery!
The plugin seems perfect then, it works exactly as I expected for the plugin with such functionality! :)

One more question left - the flow will be broken when we run three "main" workers on different physical nodes or three workers in the same node, as documentation shows https://docs.celeryq.dev/en/stable/userguide/workers.html#starting-the-worker

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

Or it'll be handled by the broker somehow?

@clokep
Copy link
Owner

clokep commented Jul 12, 2022

One more question left - the flow will be broken when we run three "main" workers on different physical nodes or three workers in the same node, as documentation shows docs.celeryq.dev/en/stable/userguide/workers.html#starting-the-worker

Each worker will have their own in-memory queue, so you need to think carefully about what you want a prefetch multiplier (and concurrency) to be in order to keep your resources well occupied. It will work fine, but it might not work at utmost efficiency.

@allburov
Copy link
Author

@clokep thank you for the detailed explanation!

I think we can add How does it work section in README.md and add a link to this issue :D It explains everything.

@clokep
Copy link
Owner

clokep commented Jul 13, 2022

I think we can add How does it work section in README.md and add a link to this issue :D It explains everything.

I think this should get distilled and added to the documentation, yes.

@kinoute
Copy link
Contributor

kinoute commented Jul 15, 2022

Hello,

I agree that this is a great module and more examples/docs will be awesome. I made my own system to bulk insert data on ES with Celery/RabbitMQ but I encountered a lot of problems that I'm trying to solve, hopefully with this package.

For example, it seems (thanks to your answers here), that this package can be used with Celery in its default pool, "prefork". My solution can't: I'm keeping in memory documents before it reaches a certain amount to bulk insert data. But I also added a thread timer to check every X minutes if there are no documents left in order to push them (for instance when we don't receive new tasks).

The problem is that when Celery receives a SIGTERM (from Kubernetes for instance), in prefork, it doesn't wait for the tread timer to execute before terminating. Therefore, if I had documents waiting to get pushed, they got lost. I had to use the "solo" pool for that, which carefully wait the execution of the timer before terminating.

Unfortunately, Celery slowly increases the RAM usage and I can't make use of the max-tasks-per-child and max-memory-per-child options because they are only available in the "prefork" pool.

So I have a few questions:

  • Is your package compatible with the max-tasks-per-child and max-memory-per-child options and if so, how does it handle them? If I set my limit to 1000 tasks per worker before letting Celery restarting it, your package counts "batches" as "tasks"? (i.e after 1000 batches, it will restart or 1000 tasks/requests?)
  • Same for the max-memory-per-child option: by design, IIRC, if our worker reaches our limit, Celery will finish executing the task, stop listening to RabbitMQ, then safely start a new worker. Is it compatible with your solution?
  • Finally, it seems that in all your examples, your tasks use a class named Batches. How can we handle this when we have already tasks that use classes (I.e @app.task(base=Bulker))? Is it possible? Do my "Bulker" class has to extend your Batches class instead of the Task class?

Thank you very much for your answers!

@clokep
Copy link
Owner

clokep commented Jul 15, 2022

The problem is that when Celery receives a SIGTERM (from Kubernetes for instance), in prefork, it doesn't wait for the tread timer to execute before terminating.

I haven't run celery under kubernetes but it should wait for any running tasks to finish.

Therefore, if I had documents waiting to get pushed, they got lost. I had to use the "solo" pool for that, which carefully wait the execution of the timer before terminating.

Do you have acks_late enabled for those tasks? That should ensure they get placed back in the queue (although that's for "standard" Celery, I'm not sure how that would interact with your package).

Is your package compatible with the max-tasks-per-child and max-memory-per-child options and if so, how does it handle them?

I think you're just asking "what happens if I restarted the worker while using this package?" -- I suspect whatever is in the queue (so a maximum of flush_every, unless you're using ETAs) would get dropped, but I haven't verified this. If you have time to test this and verify it is a bug, a separate issue would be appreciated.

Finally, it seems that in all your examples, your tasks use a class named Batches. How can we handle this when we have already tasks that use classes (I.e @app.task(base=Bulker))? Is it possible? Do my "Bulker" class has to extend your Batches class instead of the Task class?

Yes.

@kinoute
Copy link
Contributor

kinoute commented Jul 15, 2022

I haven't run celery under kubernetes but it should wait for any running tasks to finish.

It should, and it does. But a thread timer is not a task per se, it is something next to the task. Therefore, Celery thinks everything is done and terminate.

Do you have acks_late enabled for those tasks? That should ensure they get placed back in the queue (although that's for "standard" Celery, I'm not sure how that would interact with your package).

Yes, I have acks_late activated. But in the "solo" pool, its buggy and it always enable acks_late even when you set it to False. I could think it's cool since messages are placed back to the queue in case of a crash but my documents waiting to get pushed are still lost. Only the last document added to my "waiting" Python list will get replaced by Celery/RabbitMQ.

I think you're just asking "what happens if I restarted the worker while using this package?" -- I suspect whatever is in the queue (so a maximum of flush_every, unless you're using ETAs) would get dropped, but I haven't verified this. If you have time to test this and verify it is a bug, a separate issue would be appreciated.

What do you mean by "dropped?", I will test yes. My question was more: does the package respect these options and consider a whole batch as a single "task" (just like we would if we were using Celery normally). In this case, Celery and your package, will wait to the full batch to get executed before terminating/stop listening to the queue which will be great.

I'm just concerned about the same problem I have above with my solution: I want to be sure that when the Celery worker restarts, thanks to these Celery options (either number of tasks or max memory reached), all my documents in the batch will get treated, meaning no loss.

Right now, by using these options alone, I don't have the control to generate a bulk insert when these limits are reached. This is the whole problem. If I could detect that a Celery worker is about to restart because it reaches one of these settings values, I could force a bulk insert and stop right there, no loss. If you have any ideas (sorry for the long message), please do.

Thanks!

@clokep
Copy link
Owner

clokep commented Jul 15, 2022

I think you're just asking "what happens if I restarted the worker while using this package?" -- I suspect whatever is in the queue (so a maximum of flush_every, unless you're using ETAs) would get dropped, but I haven't verified this. If you have time to test this and verify it is a bug, a separate issue would be appreciated.

What do you mean by "dropped?", I will test yes. My question was more: does the package respect these options and consider a whole batch as a single "task" (just like we would if we were using Celery normally). In this case, Celery and your package, will wait to the full batch to get executed before terminating/stop listening to the queue which will be great.

I did some light testing and it seems that the enqueued requests do get picked back up after a worker restart. (Alternately they should get picked up by a different worker if one exists.)

If there's a running batch task, then yes that gets treated as a single task and Celery will wait during shutdown for it to finish.

I'm just concerned about the same problem I have above with my solution: I want to be sure that when the Celery worker restarts, thanks to these Celery options (either number of tasks or max memory reached), all my documents in the batch will get treated, meaning no loss.

Right now, by using these options alone, I don't have the control to generate a bulk insert when these limits are reached. This is the whole problem. If I could detect that a Celery worker is about to restart because it reaches one of these settings values, I could force a bulk insert and stop right there, no loss. If you have any ideas (sorry for the long message), please do.

I can't really answer questions on how this compares to your current solution or if it will work for your workload, sorry.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants