Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task is executed twice when the worker restarts #4426

Closed
canassa opened this issue Dec 8, 2017 · 9 comments
Closed

Task is executed twice when the worker restarts #4426

canassa opened this issue Dec 8, 2017 · 9 comments

Comments

@canassa
Copy link

canassa commented Dec 8, 2017

Currently using Celery 4.1.0

Steps to reproduce

Start a new project using RabbitMQ and register the following task:

from django.core.cache import cache

@shared_task(bind=True)
def test_1(self):
    if not cache.add(self.request.id, 1):
        raise Exception('Duplicated task {}'.format(self.request.id))

Now start 2 workers. I used gevent with a concurrency of 25 for this test:

celery worker -A my_proj -Q my_queue -P gevent -c 25

Open a python shell and fire a a bunch of tasks:

from myproj.tasks import test_1

for i in range(10000):
    test_1.apply_async()

Now quickly do a warm shutdown (Ctrl+c) in one of the workers while it's still processing the tasks, you should see the errors popping in the second worker:

ERROR    Task my_proj.tasks.test_1[e28e6760-1371-49c9-af87-d196c59375e9] raised unexpected: Exception('Duplicated task e28e6760-1371-49c9-af87-d196c59375e9',)
Traceback (most recent call last):
  File "/code/virtualenv/CURRENT/lib/python3.5/site-packages/celery/app/trace.py", line 374, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/code/virtualenv/CURRENT/lib/python3.5/site-packages/celery/app/trace.py", line 629, in __protected_call__
    return self.run(*args, **kwargs)
  File "/code/scp/python/my_proj/tasks.py", line 33, in test_1
    raise Exception('Duplicated task {}'.format(self.request.id))
Exception: Duplicated task e28e6760-1371-49c9-af87-d196c59375e9

Expected behavior

Since I am not using late acknowledgment and I am not killing the workers I wasn't expecting the tasks to execute again.

Actual behavior

The tasking are being executed twice, this is causing some problems in our servers because we restart our works every 15 minutes or so in order to avoid memory leaks.

@auvipy
Copy link
Member

auvipy commented Aug 12, 2018

not sure if it's an issue of celery. try the newer release and report it again, if seems to be a celery issue

@auvipy auvipy closed this as completed Aug 12, 2018
@imomaliev
Copy link
Contributor

Having the same issue, tasks with eta get executed multiple times after restart of the worker. I've increased processes on my virtual machine, and celery created worker with concurrency of 2(had 1 before).

@komuw
Copy link
Contributor

komuw commented Jul 10, 2019

This bug should be re-opened. at $DAY_JOB we are seeing this error every time we deploy(restart).
this is with both celery v4.1.1 or v4.3.0

cc @auvipy

@komuw
Copy link
Contributor

komuw commented Jul 10, 2019

Hi @canassa and @imomaliev did either of you manage to fix this issue on your end?

I would appreciate any workarounds that you may have.

@canassa
Copy link
Author

canassa commented Jul 10, 2019

@komuw My workaround was similar to what I posted in the issue description, I used a cache key with the request id to detect duplicated tasks and dropped them.

We never found a fix issue for issue but it was very easy to reproduce by following the steps that I provided. I have long since left the company where this issue was occurring so I am not checking this problem anymore.

@komuw
Copy link
Contributor

komuw commented Jul 10, 2019

thanks

@mwaaas
Copy link

mwaaas commented Jul 12, 2019

Found the issue seems like celery is using a promise to acknowledge.
https://github.com/celery/celery/blob/v4.3.0/celery/worker/consumer/consumer.py#L566

meaning during a restart when the rabbitmq connection fails it won't ack but still process the task will be returned back to ready and processed later on again

@mwaaas
Copy link

mwaaas commented Jul 12, 2019

To fix patch request object with this in proj/celery.py

from celery.worker.request import Request
# making acknowledgment to be synchronous
# instead of using promise 
def new_acknowledge(self):
    """Acknowledge task."""
    if not self.acknowledged:
        self.message.ack()
        self.acknowledged = True

Request.acknowledge = new_acknowledge

@komuw
Copy link
Contributor

komuw commented Jul 12, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants