Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redeliver same tasks repeatedly with redis broker and gevent #1098

Closed
wuqiallen opened this issue Sep 10, 2019 · 14 comments
Closed

Redeliver same tasks repeatedly with redis broker and gevent #1098

wuqiallen opened this issue Sep 10, 2019 · 14 comments
Milestone

Comments

@wuqiallen
Copy link

Hi,

kombu==4.6.4
celery==4.3.0
redis==3.2.1

The visibility_timeout is ignored when using redis broker and gevent. As the result, when launching multiple workers with redis broker and gevent, the tasks will be re-delivered repeatedly.
Maybe the reason is #905.

def restore_visible(self, start=0, num=10, interval=10):
......
ceil = time() - self.visibility_timeout
......
env = _detect_environment()
if env == 'gevent':
    ceil = time()
visible = client.zrevrangebyscore(
    self.unacked_index_key, ceil, 0,
    start=num and start, num=num, withscores=True)
for tag, score in visible or []:
    self.restore_by_tag(tag, client)

When env==gevent, ceil will be changed to time() from time() - self.visibility_timeout.
As a result, all the tasks even the newly added ones in unacked set will be fetched out into visible, and then re-delivered by calling restore_by_tag, ignoring the functionality of visibility_timeout. The function restore_visible in QoS is called by maybe_restore_messages

    def maybe_restore_messages(self):
        for channel in self._channels:
            if channel.active_queues:
                # only need to do this once, as they are not local to channel.
                return channel.qos.restore_visible(
                    num=channel.unacked_restore_limit,
                )

But the function maybe_restore_messages is further called by other methods multiple times. Particularly in register_with_event_loop, where maybe_restore_messages is called every 10 seconds.

    def register_with_event_loop(self, connection, loop):
        ...
        loop.call_repeatedly(10, cycle.maybe_restore_messages)

So when launching multiple workers with redis broker and gevent will re-deliver all tasks in unacked set repeatedly.

@matteius
Copy link
Contributor

@wuqiallen I will ask that you also try this with the master branch of kombu in place of 4.6.4 -- I did a bunch of work with Redis recently that was merged: https://github.com/celery/kombu/pull/1089/files

While your issue report may be different, we need to know if it is still an issue for you on the master branch as we are prepping for a soon release. If we know it is still an issue than a possible fix may be discovered.

@matteius
Copy link
Contributor

matteius commented Sep 10, 2019

Also try master celery branch with that please, or at least latest. Also for what its worth latest redis on pypi is redis==3.3.8

@wuqiallen
Copy link
Author

@matteius Thanks for your reply. I will try to reproduce this issue with the env which you talked me. I will let you know the results as soon as I can.

@arikfr
Copy link

arikfr commented Oct 6, 2019

This still reproduces on latest versions, as the relevant code is still there:

env = _detect_environment()
if env == 'gevent':
ceil = time()

Whenever restore_visible is called in a gevent environment it will restore all unacked messages, regardless of their visibility timeout. While #905 tried to fix an issue with not restoring messages when worker starts, they broke this functionality in all other cases (restore_visible is called periodically and not only when worker starts).

In our case, we don't even use gevent based workers. But our API services do use gevent. When we use celery.control to monitor the status of Celery (we have an API to return queues status), it triggers restore_visible with this broken code. It took us a while to find this 😢

@deepanjan-nsk
Copy link

Can confirm @arikfr 's statement that we are still seeing this problem on 3rd feb 2019 as the code in question is still present.

@arikfr
Copy link

arikfr commented Feb 3, 2020

@deepanjan-nsk we ended up patching this in our codebase (and later replacing Celery with RQ). If you want I can try and find the patch.

@deepanjan-nsk
Copy link

Thanks @arikfr, We ended up doing the same w.r.t. a patch in our codebase.

@calvin620707
Copy link

Also, we do a patch to remove ceil = time() to avoid duplicated tasks since we don't care too much regarding losing tasks after workers restart.

@auvipy
Copy link
Member

auvipy commented Mar 16, 2020

celery==4.4.*?

@calvin620707
Copy link

I had reproduced this with celery==4.4.0 and kombu==4.6.7. Actually, it seems like a kombu's issue instead of celery.

@auvipy
Copy link
Member

auvipy commented Mar 18, 2020

try celery==4.4.2 and report again please

@mayouzi
Copy link

mayouzi commented May 15, 2020

replace gevent with eventlet

@mayouzi
Copy link

mayouzi commented May 15, 2020

replace gevent with eventlet

it`s necessary to remind that make a HTTP request cautious when use eventlet + requests
which need to install pyopenssl with pip install pyOpenSSL

@wuqiallen
Copy link
Author

fixed on #1259

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

7 participants