Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix redis connection leak with app.send_task #7685

Closed
wants to merge 3 commits into from

Conversation

wochinge
Copy link
Contributor

@wochinge wochinge commented Aug 5, 2022

Description

This test shows that there is a Redis connection leak when using app.send_task.
The connection leak doesn't happen if task.delay() is used.

Todos:

  • find solution

@wochinge wochinge changed the title redis connection leak Fix redis connection leak with app.send_task Aug 5, 2022
redis = get_redis_connection()
connections_before = len(redis.client_list())

result = manager.app.send_task('t.integration.tasks.add', args=(1, 2))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use add.delay() then everything works

]
assert result.get(timeout=3) == 3
assert get_active_redis_channels() == []

def test_redis_connection_cleanup_with_send_task(self, manager):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auvipy Do you have some ideas why this test would fail? Happy to do a fix but I'd need some pointers

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be related to #6963 .

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It definitely seems like that the connection always stays in the pending_unsubscribe_channels within the Redis client when using app.send_task. I tried with the solution from the linked issue but that doesn't resolve things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@auvipy can you please give me some pointers where the issue might be?

@lgtm-com
Copy link

lgtm-com bot commented Aug 5, 2022

This pull request fixes 1 alert when merging 6c3378e into bdbf6d6 - view on LGTM.com

fixed alerts:

  • 1 for Non-exception in 'except' clause

@codecov
Copy link

codecov bot commented Aug 5, 2022

Codecov Report

Merging #7685 (6c5d575) into master (3db7c9d) will decrease coverage by 1.01%.
The diff coverage is 76.92%.

❗ Current head 6c5d575 differs from pull request most recent head b1cfd8c. Consider uploading reports for the commit b1cfd8c to get more accurate results

@@            Coverage Diff             @@
##           master    #7685      +/-   ##
==========================================
- Coverage   89.75%   88.74%   -1.02%     
==========================================
  Files         138      138              
  Lines       17000    17007       +7     
  Branches     2510     2493      -17     
==========================================
- Hits        15259    15093     -166     
- Misses       1491     1673     +182     
+ Partials      250      241       -9     
Flag Coverage Δ
unittests 88.73% <76.92%> (-1.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
celery/backends/base.py 92.53% <57.14%> (-0.41%) ⬇️
celery/app/base.py 96.45% <100.00%> (ø)
celery/concurrency/eventlet.py 0.00% <0.00%> (-99.15%) ⬇️
celery/backends/asynchronous.py 48.91% <0.00%> (-16.46%) ⬇️
celery/__init__.py 75.86% <0.00%> (-8.63%) ⬇️
celery/app/autoretry.py 90.32% <0.00%> (-3.23%) ⬇️
celery/utils/imports.py 87.50% <0.00%> (-2.28%) ⬇️
celery/bin/base.py 47.84% <0.00%> (-2.16%) ⬇️
celery/bin/celery.py 70.43% <0.00%> (-1.74%) ⬇️
celery/backends/redis.py 91.98% <0.00%> (-0.28%) ⬇️
... and 4 more

Help us with your feedback. Take ten seconds to tell us how you rate us. Have a feature suggestion? Share it here.

@wochinge wochinge mentioned this pull request Aug 9, 2022
18 tasks
Copy link
Member

@auvipy auvipy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please rebase this?

@wochinge
Copy link
Contributor Author

done @auvipy

@@ -328,3 +329,16 @@ def test_asyncresult_get_cancels_subscription(self, manager):

new_channels = [channel for channel in get_active_redis_channels() if channel not in channels_before_test]
assert new_channels == []

def test_redis_connection_cleanup_with_send_task(self, manager):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we mark the test as xfailed and merge this PR.
There should be a matching issue for this PR so that we'll know this isn't yet fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thedrow I don't think that merging this before fixing this is helpful. There are already multiple issues pointing towards a connection leak:

What I would I need is some pointers / help. Besides that I'd be more than happy to investigate and help with a fix.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thedrow I don't think that merging this before fixing this is helpful. There are already multiple issues pointing towards a connection leak:

* [Redis result consumer is not drained periodically (memory leak and possible connection reset) #6963](https://github.com/celery/celery/issues/6963)

* [Redis result backend connections leak #6819](https://github.com/celery/celery/issues/6819)

What I would I need is some pointers / help. Besides that I'd be more than happy to investigate and help with a fix.

that should be the spirit

@auvipy auvipy added this to the 5.3 milestone Aug 14, 2022
@wochinge
Copy link
Contributor Author

@auvipy I spent some time debugging it and that's the situation:

  • the local cache here doesn't work with eventlet as each greenthread has unique ID and hence its own local() instance. You can see this in the code snippet below as well in the comment here:
    import threading
    from time import sleep
    
    import eventlet
    eventlet.monkey_patch()
    
    
    def f(idx: int):
        t = threading.current_thread()
      	  # We always have a different ID 
        print(threading.get_ident(), idx)
    
    pool = eventlet.GreenPool(size=1)
    
    threads = []
    for i in range(10):
        t = pool.spawn(f, i)
        threads.append(t)
    
    for t in threads:
        t.wait()
  • this means that with eventlets we create one backend connection per task. These caches are not always cleaned up correctly. I've added a piece of code which cleans up the cache for each task in case it's eventlet cache. Note that we can't test this as the integration tests run in prefork mode. If you want to reproduce it you need to:
    • Change prefork to eventlet here
    • Monkeypatch the eventlet stuff here by adding these lines:
      import eventlet
      eventlet.monkey_patch()
  • The above stuff should help with leaking connections. The other question is whether we can fix the cache for eventlet pools. I don't have a good solution for this at the moment. I'll explore using cached_property for Redis.

@wochinge
Copy link
Contributor Author

Added a potential fix for the caching issues with eventlet. Any idea why the CI is failing? In my opinion a bit weird as it's failing in somewhat unrelated parts.

@lgtm-com
Copy link

lgtm-com bot commented Aug 19, 2022

This pull request introduces 1 alert when merging b1cfd8c into 3db7c9d - view on LGTM.com

new alerts:

  • 1 for Unused local variable

Comment on lines +673 to +683
try:
from eventlet import corolocal
task_backend_cache = self.app.local
# Whenever an eventlet pool spawns a coroutine it has a different identifier and hence will acquire
# a new coroutine-local storage. To avoid that we leak connections we explicitly clean out the
# cache once the coroutine is done.
if isinstance(task_backend_cache, corolocal.local):
task_backend_cache.__dict__.clear()
except ImportError:
# Eventlet is not installed and hence not used
pass
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I am content with coupling eventlet or any other concurrency mode with the backend. Is there another way to resolve this issue?

@auvipy
Copy link
Member

auvipy commented Feb 7, 2023

@wochinge would you mind trying #8058?

@wochinge
Copy link
Contributor Author

wochinge commented Feb 7, 2023

We're unfortunately no longer running into this (we put ignore_result whereever possible), sorry 🤷🏻

@wochinge wochinge closed this Feb 7, 2023
@wochinge wochinge deleted the redis-connection-leak branch February 7, 2023 12:30
@hzc989
Copy link

hzc989 commented Jul 12, 2023

@wochinge would you mind trying #8058?

after upgrading to 5.3.1 , with setting result_backend_thread_safe=true, works for us.

p.s. we've try 4.4.7 & 5.2.7 , both failed with the same problem.

@auvipy
Copy link
Member

auvipy commented Jul 12, 2023

thanks for the feedback Wong!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants