Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Same task runs multiple times at once? #4400

Open
jenstroeger opened this issue Nov 20, 2017 · 57 comments
Open

Same task runs multiple times at once? #4400

jenstroeger opened this issue Nov 20, 2017 · 57 comments

Comments

@jenstroeger
Copy link

jenstroeger commented Nov 20, 2017

The issue is a repost of an unattended Google groups post Same task runs multiple times?

> ./bin/celery -A celery_app report

software -> celery:4.1.0 (latentcall) kombu:4.1.0 py:3.6.1
            billiard:3.5.0.3 redis:2.10.6
platform -> system:Linux arch:64bit, ELF imp:CPython
loader   -> celery.loaders.app.AppLoader
settings -> transport:redis results:redis://localhost:6379/2

broker_url: 'redis://localhost:6379/2'
result_backend: 'redis://localhost:6379/2'
task_serializer: 'json'
result_serializer: 'json'
accept_content: ['json']
timezone: 'Europe/Berlin'
enable_utc: True
imports: 'tasks'
task_routes: {
 'tasks': {'queue': 'celery-test-queue'}}

My application schedules a single group of two, sometimes three tasks, each of which with their own ETA within one hour. When the ETA arrives, I see the following in my celery log:

[2017-11-20 09:55:34,470: INFO/ForkPoolWorker-2] Task tasks._test_exec[bd08ab85-28a8-488f-ba03-c2befde10054] succeeded in 33.81780316866934s: None
[2017-11-20 09:55:34,481: INFO/ForkPoolWorker-2] Task tasks._test_exec[bd08ab85-28a8-488f-ba03-c2befde10054] succeeded in 0.009824380278587341s: None
[2017-11-20 09:55:34,622: INFO/ForkPoolWorker-2] Task tasks._test_exec[bd08ab85-28a8-488f-ba03-c2befde10054] succeeded in 0.14010038413107395s: None
…
[2017-11-20 09:55:37,890: INFO/ForkPoolWorker-8] Task tasks._test_exec[bd08ab85-28a8-488f-ba03-c2befde10054] succeeded in 0.012678759172558784s: None
[2017-11-20 09:55:37,891: INFO/ForkPoolWorker-2] Task tasks._test_exec[bd08ab85-28a8-488f-ba03-c2befde10054] succeeded in 0.01177949644625187s: None
[2017-11-20 09:55:37,899: INFO/ForkPoolWorker-8] Task tasks._test_exec[bd08ab85-28a8-488f-ba03-c2befde10054] succeeded in 0.008250340819358826s: None
…

This can repeat dozens of times. Note the first task’s 33 seconds execution time, and the use of different workers!

I have no explanation for this behavior, and would like to understand what’s going on here.

@georgepsarakis
Copy link
Contributor

Maybe this is related to visibility timeout?

@jenstroeger
Copy link
Author

@georgepsarakis Could you please elaborate on your suspicion?

@georgepsarakis
Copy link
Contributor

As far as I know, this is a known issue for broker transports that do not have built-in acknowledgement characteristics of AMQP. The task will be assigned to a new worker if the task completion time exceeds the visibility timeout, thus you may see tasks being executed in parallel.

@jenstroeger
Copy link
Author

jenstroeger commented Nov 24, 2017

@georgepsarakis So if the task is scheduled far ahead in the future, then I might see the above? The “visibility timeout” addresses that? From the documentation you linked:

The default visibility timeout for Redis is 1 hour.

Meaning that if within the hour the worker does not ack the task (i.e. run it?) that task is being sent to another worker which wouldn’t ack, and so on… Indeed this seems to be the case looking at the caveats section of the documentation; this related issue celery/kombu#337; or quoting from this blog:

But when developers just start using it, they regularly face abnormal behaviour of workers, specifically multiple execution of the same task by several workers. The reason which causes it is a visibility timeout setting.

Looks like setting the visibility_timeout to 31,540,000 seconds (one year) might be a quick fix.

@georgepsarakis
Copy link
Contributor

I would say that if you increase the visibility timeout to 2 hours, your tasks will be executed only once.

So if you combine:

  • Redis broker
  • Late acknowledgement
  • ETA equal/above the visibility timeout
    you get multiple executions on the task.

I think what happens is:

  • After one hour passes one worker process starts processing the task.
  • A second worker will see that this message has been in the queue for longer than the visibility timeout and is being processed by another worker.
  • Message is restored in the queue.
  • Another worker starts processing the same message.
  • The above happens for all worker processes.

Looking into the Redis transport implementation, you will notice that it uses Sorted Sets, passing the queued time as a score to zadd. The message is restored based on that timestamp and comparing to an interval equal to the visibility timeout.

Hope this explains a bit the internals of the Redis transport.

@jenstroeger
Copy link
Author

jenstroeger commented Nov 27, 2017

@georgepsarakis, I’m now thoroughly confused. If a task’s ETA is set for two months from now, why would a worker pick it up one hour after the tasks has been scheduled? Am I missing something?

My (incorrect?) assumption is that:

  • I schedule a task with an ETA at any time in the future; then
  • the task (i.e. its marshaled arguments) and ETA will sit in the queue until ETA arrives; then
  • a worker begins processing the task at ETA.

Your “I think what happens is:” above is quite different from my assumption.

@zivsu
Copy link

zivsu commented Dec 17, 2017

I also encountered the same problem,have you solved it? @jenstroeger

Thanks!

@georgepsarakis
Copy link
Contributor

@jenstroeger that does not sound like a feasible flow, I think the worker just continuously requeues the message in order to postpone execution until the ETA condition is finally met. The concept of the queue is to distribute messages as soon as they arrive, so the worker examines the message and just requeues.

Please note that this is my guess, I am not really aware of the internals of the ETA implementation.

@jenstroeger
Copy link
Author

@zivsu, as mentioned above I’ve set the visibility_timeout to a very large number and that seems to have resolved the symptoms. However, as @georgepsarakis points out, that seems to be a poor approach.

I do not know the cause of the original problem nor how to address it properly.

@zivsu
Copy link

zivsu commented Dec 21, 2017

@jenstroeger I read some blog, change visibility_timeout can not solve the problem completely, so I change my borker to rabbitmq.

@jenstroeger
Copy link
Author

jenstroeger commented Dec 22, 2017

@zivsu, can you please share the link to the blog? Did you use Redis before?

@zivsu
Copy link

zivsu commented Dec 22, 2017

@jenstroeger I can't find the blog, I used Redis as broker before. For schedule task, I choose rebbitmq to avoid the error happen again.

@Anton-Shutik
Copy link

I have exactly same issue, my config is:
django==1.11.6
celery==4.2rc2
django-celery-beat==1.0.1

settings:

CELERY_ENABLE_UTC = True
# CELERY_TIMEZONE = 'America/Los_Angeles'

And that is the only one working combination of this settings. Also I have to schedule my periodic tasks in UTC timezone.
If you enable CELERY_TIMEZONE or disable CELERY_ENABLE_UTC it starts running periodic tasks multiple times.

@ghost
Copy link

ghost commented Apr 27, 2018

I have the save problem. the eta task excute multiply times when using redis as a broker.
any way to solve this..
look like change broker from redis to rabbitmq solve this problem..

@auvipy auvipy added this to the v5.0.0 milestone Apr 27, 2018
@2ps
Copy link

2ps commented May 31, 2018

Using redis, there is a well-known issue when you specify a timezone other than UTC. To work around the issue, subclass the default app, and add your own timezone handling function:

from celery import Celery


class MyAppCelery(Celery):
    def now(self):
        """Return the current time and date as a datetime."""
        from datetime import datetime
        return datetime.now(self.timezone)

Hope that helps anyone else that is running into this problem.

@auvipy auvipy modified the milestones: v5.0.0, v4.3 May 31, 2018
@chrisconlan
Copy link

I get this problem sometimes when frequently restarting celery jobs with beat on multicore machines. I've gotten in the habit of running ps aux | grep celery then kill <each_pid> to resolve it.

Best advice I have is to always make sure you see the "restart DONE" message before disconnecting from the machine.

@zetaab
Copy link

zetaab commented Oct 10, 2018

{"log":"INFO 2018-10-09 17:41:08,468 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T17:41:08.468912644Z"}
{"log":"INFO 2018-10-09 17:41:08,468 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T17:41:08.468955918Z"}
{"log":"INFO 2018-10-09 19:46:04,293 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T19:46:04.293780045Z"}
{"log":"INFO 2018-10-09 19:46:04,293 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T19:46:04.293953621Z"}
{"log":"INFO 2018-10-09 20:46:04,802 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T20:46:04.802819711Z"}
{"log":"INFO 2018-10-09 20:46:04,802 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T20:46:04.802974829Z"}
{"log":"INFO 2018-10-09 21:46:05,335 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T21:46:05.336081133Z"}
{"log":"INFO 2018-10-09 21:46:05,335 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T21:46:05.336107517Z"}
{"log":"INFO 2018-10-09 22:46:05,900 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T22:46:05.901078395Z"}
{"log":"INFO 2018-10-09 22:46:05,900 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T22:46:05.901173663Z"}
{"log":"INFO 2018-10-09 23:46:06,484 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T23:46:06.485276904Z"}
{"log":"INFO 2018-10-09 23:46:06,484 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-09T23:46:06.485415253Z"}
{"log":"INFO 2018-10-10 00:46:07,072 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T00:46:07.072529828Z"}
{"log":"INFO 2018-10-10 00:46:07,072 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T00:46:07.072587887Z"}
{"log":"INFO 2018-10-10 01:46:07,602 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T01:46:07.60325321Z"}
{"log":"INFO 2018-10-10 01:46:07,602 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T01:46:07.603327426Z"}
{"log":"INFO 2018-10-10 02:46:08,155 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T02:46:08.155868992Z"}
{"log":"INFO 2018-10-10 02:46:08,155 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T02:46:08.155921893Z"}
{"log":"INFO 2018-10-10 03:46:08,753 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T03:46:08.75401387Z"}
{"log":"INFO 2018-10-10 03:46:08,753 strategy celery.worker.strategy 1 140031597243208 Received task: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f]  ETA:[2018-10-10 04:00:00+00:00] \n","stream":"stderr","time":"2018-10-10T03:46:08.754056891Z"}
{"log":"DEBUG 2018-10-10 04:00:00,013 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:70\n","stream":"stderr","time":"2018-10-10T04:00:00.013548928Z"}
{"log":"DEBUG 2018-10-10 04:00:00,013 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:70\n","stream":"stderr","time":"2018-10-10T04:00:00.013592318Z"}
{"log":"DEBUG 2018-10-10 04:00:00,013 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:71\n","stream":"stderr","time":"2018-10-10T04:00:00.014000106Z"}
{"log":"DEBUG 2018-10-10 04:00:00,013 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:71\n","stream":"stderr","time":"2018-10-10T04:00:00.014167558Z"}
{"log":"DEBUG 2018-10-10 04:00:00,014 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:64\n","stream":"stderr","time":"2018-10-10T04:00:00.014661348Z"}
{"log":"DEBUG 2018-10-10 04:00:00,014 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:64\n","stream":"stderr","time":"2018-10-10T04:00:00.014684354Z"}
{"log":"DEBUG 2018-10-10 04:00:00,014 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:65\n","stream":"stderr","time":"2018-10-10T04:00:00.01514884Z"}
{"log":"DEBUG 2018-10-10 04:00:00,014 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:65\n","stream":"stderr","time":"2018-10-10T04:00:00.015249646Z"}
{"log":"DEBUG 2018-10-10 04:00:00,015 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:66\n","stream":"stderr","time":"2018-10-10T04:00:00.01571124Z"}
{"log":"DEBUG 2018-10-10 04:00:00,015 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:66\n","stream":"stderr","time":"2018-10-10T04:00:00.01580249Z"}
{"log":"DEBUG 2018-10-10 04:00:00,019 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:68\n","stream":"stderr","time":"2018-10-10T04:00:00.019260948Z"}
{"log":"DEBUG 2018-10-10 04:00:00,019 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:68\n","stream":"stderr","time":"2018-10-10T04:00:00.019322151Z"}
{"log":"DEBUG 2018-10-10 04:00:00,245 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:70\n","stream":"stderr","time":"2018-10-10T04:00:00.245159563Z"}
{"log":"DEBUG 2018-10-10 04:00:00,245 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:70\n","stream":"stderr","time":"2018-10-10T04:00:00.245177267Z"}
{"log":"DEBUG 2018-10-10 04:00:00,245 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:67\n","stream":"stderr","time":"2018-10-10T04:00:00.245338722Z"}
{"log":"DEBUG 2018-10-10 04:00:00,245 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:67\n","stream":"stderr","time":"2018-10-10T04:00:00.245351289Z"}
{"log":"DEBUG 2018-10-10 04:00:00,256 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:65\n","stream":"stderr","time":"2018-10-10T04:00:00.256770035Z"}
{"log":"DEBUG 2018-10-10 04:00:00,256 request celery.worker.request 1 140031597243208 Task accepted: main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] pid:65\n","stream":"stderr","time":"2018-10-10T04:00:00.256788689Z"}
{"log":"INFO 2018-10-10 04:00:00,371 trace celery.app.trace 68 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.35710329699213617s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.371967002Z"}
{"log":"INFO 2018-10-10 04:00:00,371 trace celery.app.trace 68 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.35710329699213617s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.371983293Z"}
{"log":"INFO 2018-10-10 04:00:00,387 trace celery.app.trace 69 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.10637873200175818s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.388119538Z"}
{"log":"INFO 2018-10-10 04:00:00,387 trace celery.app.trace 69 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.10637873200175818s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.388166317Z"}
{"log":"INFO 2018-10-10 04:00:00,404 trace celery.app.trace 70 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.16254851799749304s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.404834545Z"}
{"log":"INFO 2018-10-10 04:00:00,404 trace celery.app.trace 70 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.16254851799749304s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.404862208Z"}
{"log":"INFO 2018-10-10 04:00:00,421 trace celery.app.trace 65 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.1654666289978195s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.421607856Z"}
{"log":"INFO 2018-10-10 04:00:00,421 trace celery.app.trace 65 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.1654666289978195s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.421674687Z"}
{"log":"INFO 2018-10-10 04:00:00,438 trace celery.app.trace 67 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.19588526099687442s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.438295459Z"}
{"log":"INFO 2018-10-10 04:00:00,438 trace celery.app.trace 67 140031597243208 Task main.batch.sendspam[2a6e5dc8-5fd2-40bd-8f65-7e7334a14b3f] succeeded in 0.19588526099687442s: None\n","stream":"stderr","time":"2018-10-10T04:00:00.438311386Z"}
...

if we check Received task timestamps, every hour it will get new task with same id. The result is that all ETA messages are sent more than 10 times. Looks like rabbitmq is only option if we want to use ETA

@Dedal-O
Copy link

Dedal-O commented Jan 2, 2019

Rcently meet similar bug. Also ps aux | grep celery showed more processes than workers started, twice more. Appending parameter --pool gevent to command launching celery workers lowered number of processes to exact number of started workers and celery beat. And now i'm wathnig my tasks execution.

@auvipy auvipy modified the milestones: v4.3, v5.0.0 Jan 8, 2019
@killthekitten
Copy link

Might another solution be disabling ack emulation entirely? i.e. "broker_transport_options": {"ack_emulation": False}. Any drawbacks for short-running tasks / countdowns?

@auvipy
Copy link
Member

auvipy commented Oct 14, 2020

@thedrow thedrow added this to To do in Celery 5.1.0 Feb 24, 2021
@thedrow thedrow moved this from To do to Backlog in Celery 5.1.0 Mar 23, 2021
@auvipy auvipy modified the milestones: 5.1.0, 5.2 Mar 28, 2021
@xiaozuo7
Copy link

This seems to be a feasible way, https://github.com/cameronmaske/celery-once

@thedrow thedrow moved this from Backlog to Postponed in Celery 5.1.0 May 3, 2021
@omya3
Copy link

omya3 commented Jul 7, 2021

@ErikKalkoken we end up doing exactly that.

def semaphore(fn):
    @wraps(fn)
    def wrapper(self_origin, *args, **kwargs):
        cache_name = f"{UID}-{args[0].request.id}-semaphore"
        agreement_redis = AgreementsRedis()
        if not agreement_redis.redis.set(cache_name, "", ex=30, nx=True):
          Raise Exception("...")
        try:
            return fn(self_origin, *args, **kwargs)
        finally:
            agreement_redis.redis.delete(cache_name)

    return wrapper

The code above is not used for celery, but celery multiple execution is the same logic, you just need to get the task_id and set the cache. So far is working fine.

Hi did this actually work ? . I have the same issue where the celery workers execute the same task multiple times. Does this actually prevents multiple workers from executing the same task or it prevents them from executing the same code.

@xiaozuo7
Copy link

xiaozuo7 commented Jul 8, 2021

@omya3 I tried it and it doesn’t seem to work. I suggest changing the broker from redis to rabbitmq,It can solve the problem very well. One more thing, redis as a broker has scheduling problems, it will pull tasks every hour, so the tasks with eta will be executed repeatedly. The official said that v5.1.0 will be repaired, but I am not sure.

@gurland
Copy link

gurland commented Jul 31, 2021

I have same problem. ETA tasks are executed multiple times with very low interval in-between (like .001 of second).

@tomharvey
Copy link
Contributor

tomharvey commented Aug 11, 2021

Looking at the same problem right now. gevent seems to be what's causing it - or at least, switching to eventlet does not show this behaviour.

Seems to be a few seconds between the workers receiving the task again. Sometimes, the first worker has started the task before it is received again. It can be received many times by the same worker, or it can be received many times by different workers.

@daymien
Copy link

daymien commented Nov 4, 2021

Looking at the same problem right now. gevent seems to be what's causing it - or at least, switching to eventlet does not show this behaviour.

Seems to be a few seconds between the workers receiving the task again. Sometimes, the first worker has started the task before it is received again. It can be received many times by the same worker, or it can be received many times by different workers.

Did not work for me. Same issue with eventlet.

@akshaykalia
Copy link

same for me. having the same issues with eventlet.

@amirrezafahimi
Copy link

amirrezafahimi commented Jan 23, 2022

I had the same problem, I did set my task to run in the future (24 hours later) and the same problem occurred. using redis as a broker for celery causes the issue.
You can set BROKER_TRANSPORT_OPTIONS = {‘visibility_timeout’: 43200} but this solution has it's own disadvantages. according to the documentation having a long visibility timeout will delay the redelivery of ‘lost’ tasks in the event of a power failure or forcefully terminated workers. so I tried using RabbitMQ for this but there is some consideration to that solution as well. again according to celery documentation you should configure rabbitmq and set consumer_timeout to a longer value to avoid PreconditionFailed error.

@amirhoseinbidar
Copy link

amirhoseinbidar commented Jan 31, 2022

I had same problem but seems if a task execute multiple time, all the executions will be on same worker run sequentially and their id will be same so a efficient (but not realy good) solution is cache id of task in first execution for nearly long time and after first execution block other ones that have same id, combination of this idea with nearly high visibility_timeout (I think 12 or 24 hour) would be a good solution

# celery version 5.2.1

def block_multiple_celery_task_execution(task, prefix):
    # prefix actully is not necessary but I thing it's good practice to seperate id of each task 
    is_task_executed = cache.get(f"{prefix}_{task.request.id}")
    if not is_task_executed:
        cache.set(f"{prefix}_{task.request.id}", True, timeout=60 * 30) # 30 min
        return False
    return True

@shared_task(bind=True)
def some_task(self):
    prefix = "some_task"
    if block_multiple_celery_task_execution(self, prefix):
        return
    
    # task codes goes here

I tested it in development with just celery -A my_project worker -E -l INFO command and it also worked in production with multiple workers

@auvipy auvipy modified the milestones: 5.2.x, 5.3.x Jun 29, 2022
@beihai0xff
Copy link

could celery create a delay queue use redis zset? And use delay queue to host ETA Task

@JanMalte
Copy link

Are there any updates on this issue?
Is this just not working with redis and will never do?
Or is this a missing feature not yet implemented?

@Milanoverdose
Copy link

I was having same issue, we managed to solve the problem...

the problem was that we schedule the task in django (m/h/dm/my/d) * 13 * * 1-5.

We made this to run the task at 13pm every day of week, but in django we need to specify minutes!

after editing the schedule to (m/h/dm/my/d) 00 13 * * 1-5, problem solved!

@craigds
Copy link

craigds commented Jul 12, 2023

* 13 * * 1-5 means run at every minute, as long as the hour is 13. So run at 13:00, 13:01, 13:02, 13:03, etc... Whereas specifying the minutes as zero means run only at 13:00.

@Nusnus Nusnus modified the milestones: 5.3.x, 5.5 Jun 25, 2024
@TheNormalStudent
Copy link

It is interesting, but my fix for this issue was just adding names for the task.
I`m using django + reddis combination.

For me this was simply

@shared_task(name="yourname")
def py_func():
pass

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
No open projects
Celery 5.1.0
  
Postponed
Development

No branches or pull requests