Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Celery rapidly leaking memory with Celery .get() on RPC backend #5344

Open
3 tasks done
monstermac77 opened this issue Feb 17, 2019 · 32 comments
Open
3 tasks done

Celery rapidly leaking memory with Celery .get() on RPC backend #5344

monstermac77 opened this issue Feb 17, 2019 · 32 comments

Comments

@monstermac77
Copy link

monstermac77 commented Feb 17, 2019

Checklist

  • I have included the output of celery -A proj report in the issue.
  • I have included the contents of pip freeze in the issue.
  • I have verified that the issue exists against the master branch of Celery.

Environment & Settings

Celery version: 4.2.0

Report:
software -> celery:4.2.0 (windowlicker) kombu:4.3.0 py:2.7.3
billiard:3.6.0.0 py-amqp:2.4.1
platform -> system:Linux arch:64bit, ELF
kernel version:3.2.0-4-amd64 imp:CPython
loader -> celery.loaders.app.AppLoader
settings -> transport:pyamqp results:rpc://[ip]/

task_queues: <generator object at 0x28ce780>
broker_url: u'amqp://guest:********@[ip]:5672//'
result_backend: u'rpc://[ip]/'

Steps to Reproduce

The script that I've written to add tasks to the my Celery queue is leaking memory (to the point where the kernel kills the process after 20 minutes). In this script, I'm just executing the same 300 tasks repeatedly, every 60 seconds (inside a while True loop).

The parameters passed to the task, makeGroupRequest(), are dictionaries containing strings, and according to hpy and objgraph, dicts and strings are also what's growing uncontrollably in memory (specifically, they grow after the .get() call). I've included the outputs of hpy below on successive iterations of the loop.

I've spent days on this, and I can't understand why memory would grow uncontrollably, considering nothing is re-used between loops, and everything is overridden. If I skip the sending/retrieval of tasks, the memory doesn't appear to leak (so the leak appears to be with Celery or some combination of my code/objects and Celery). An issue like this was mentioned in #3813, but I'm seeing the memory leak even though I'm not polling the results, and just calling .get(). Is there something that could be causing the task results to stick around in memory, even though I no longer have references to them?

Here is an outline of the code that's executing.

while True:

	# preparation is done here to set set up the arguments for the tasks (processedChains)

	chains = []

	for processedChain in processedChains:

		# shorthanding
		supportingData = processedChain["supportingDataAndCheckedGroups"]

		# init the first element, which includes the supportingData and the first group
		argsList = [(supportingData, processedChain["groups"][0])]

		# add in the rest of the groups
		argsList.extend([(groupInChain,) for groupInChain in processedChain["groups"][1:]])

		# actually create the chain
		chain = celery.chain(*[makeGroupRequest.signature(params, options={'queue':queue}) for params in argsList])

		# add this to the list of chains
		chains.append(chain)

	groupSignature = celery.group(*chains).apply_async()
    
	# this line appears to cause a large increase in memory each cycle
	results = groupSignature.get(timeout = 2 * acceptableLoopTime)
	
	time.sleep(60)

Here is the output of hpy on sucessive runs:

Loop 2:

Partition of a set of 366560 objects. Total size = 57136824 bytes.
 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0  27065   7 17665112  31  17665112  31 dict (no owner)
     1 122390  33 11966720  21  29631832  52 unicode
     2  89133  24  8291952  15  37923784  66 str
     3  45448  12  3802968   7  41726752  73 tuple
     4    548   0  1631072   3  43357824  76 dict of module
     5  11195   3  1432960   3  44790784  78 types.CodeType
     6   9224   3  1343296   2  46134080  81 list
     7  11123   3  1334760   2  47468840  83 function
     8   1414   0  1274552   2  48743392  85 type
     9   1414   0  1240336   2  49983728  87 dict of type

Loop 3:

 Index  Count   %     Size   % Cumulative  % Kind (class / dict of class)
     0  44754   9 29240496  37  29240496  37 dict (no owner)
     1 224883  44 20946280  26  50186776  63 unicode
     2  89104  18  8290248  10  58477024  74 str
     3  45455   9  3803288   5  62280312  79 tuple
     4  14955   3  2149784   3  64430096  81 list
     5    548   0  1631072   2  66061168  83 dict of module
     6  11195   2  1432960   2  67494128  85 types.CodeType
     7  11122   2  1334640   2  68828768  87 function
     8   1402   0  1263704   2  70092472  88 type
     9   1402   0  1236976   2  71329448  90 dict of type

Python Packages

amqp==2.4.1
apns==2.0.1
appdirs==1.4.3
asn1crypto==0.24.0
beautifulsoup4==4.5.1
billiard==3.6.0.0
cached-property==1.3.0
celery==4.2.0
cffi==1.11.5
chardet==2.0.1
cryptography==2.1.4
defusedxml==0.5.0
enum34==1.1.6
eventlet==0.22.1
fake-useragent==0.1.10
fpconst==0.7.2
graphviz==0.10.1
greenlet==0.4.13
guppy==0.1.10
html5lib==0.95
idna==2.6
ipaddress==1.0.19
IPy==0.75
isodate==0.5.4
kombu==4.3.0
lxml==3.7.3
mem-top==0.1.7
MySQL-python==1.2.3
ndg-httpsclient==0.4.2
objgraph==3.4.0
pyasn1==0.1.9
pycparser==2.18
pyfcm==1.2.1
pyOpenSSL==16.2.0
python-apt==0.8.8.2
python-debian==0.1.21
python-debianbts==1.11
python-http-client==2.2.1
pytz==2018.9
reportbug==6.4.4
requests==2.10.0
requests-toolbelt==0.7.1
sendgrid==1.5.12
six==1.11.0
smtpapi==0.2.0
SOAPpy==0.12.0
vine==1.2.0
zeep==1.3.0
@thedrow
Copy link
Member

thedrow commented Feb 20, 2019

This may already be resolved in #5332.

I see that you already tried master so it maybe something else.

@monstermac77
Copy link
Author

@thedrow Yeah, must be something else since I have tested against master.

@monstermac77
Copy link
Author

monstermac77 commented Mar 6, 2019

I've done some further investigation. objgraph shows that these are the objects that are growing without bound:

dict                           4942      +147
list                           3545      +154
builtin_function_or_method     2607       +69
deque                           101       +23
method-wrapper                   98       +23
Messagebuffer                    92       +23

Update: these are the objects that grow without bound when I switch from rpc:// to memcache:

promise            6760      +614
dict              14156      +612
instancemethod     6984      +610
AsyncResult        6688      +608
GroupResult          22        +2
barrier              22        +2
tuple              4284        +2
list               3915        +2

@monstermac77
Copy link
Author

Also, if I add ignore_result=True to the task, the memory leak disappears. So, Celery must be holding onto results and not letting go (I have also found that the rate at which memory leaks scales with the number of tasks being executed, supporting the hypothesis that Celery is holding onto results when it shouldn't be).

@thedrow
Copy link
Member

thedrow commented Mar 6, 2019

Yes, the fact that we use dictionaries to hold results isn't a good idea.
Celery 5 is going to resolve this by creating an object for results.

@monstermac77
Copy link
Author

Great, that’ll certainly make diagnosing the memory leaks easier.

In the meantime, how can I force Celery to purge the results from memory once I’ve handled them?

I’m putting a bunch of Chains into a Group, so I need to force Celery to remove all references from all the tasks in all the Chains in the Group.

@monstermac77
Copy link
Author

monstermac77 commented Mar 6, 2019

I just ran on master (4.3.0rc2) with memcache as the backend and no leak! Then I switched back to master with rpc:// as the backend to confirm the leak was still there, and it was.

Perhaps #5332 resolved the memory leak for some backends but for some reason not rpc? I did see AsyncResult objects were growing without bound when running on Celery 4.2.1 with memcache, so upgrading to master resolved the leak for the memcache backend but not for the rpc backend.

Edit: I tested on another version of Celery (which I'm running in production). I wanted to give a summary of what I've found so far, in case it helps track down the issue:

Celery 4.1.0: rpc leaks, memcache no leak
Celery 4.2.1: rpc leaks, memcache leaks (#5332 probably fixed this for memcache)
Celery 4.3.0rc2: rpc leaks, memcache no leak

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

I am using RedisSentinel backend with Celery 4.3.0rc2 and the issue seems to be caused by constantly growing https://i.imgur.com/0z86v8Z.png _pending_messages BufferMap. If I run app.backend._pending_messages.clear() I am able to clean up the memory.

Strangely enough, calling app.backend._pending_messages.take(task.id) does decrease the reported size of the <BufferMap: 0/8192> but doesn't deallocate the object itself:

builtin_function_or_method     1543        +3
dict                           6505        +1
method-wrapper                    4        +1
deque                            37        +1
Messagebuffer                     2        +1

I managed to circumvent the issue by running the following after task.get()

app.backend._pending_messages.take(task.id)
del app.backend._pending_messages[task.id]

@monstermac77
Copy link
Author

monstermac77 commented Mar 7, 2019

@Sewci0 I just tried adding app.backend._pending_messages.clear() and it doesn't appear to have any effect on the memory leak I've been experiencing using rpc://, so our leaks may be different.

I added the following code:

print app.backend._pending_messages
app.backend._pending_messages.clear()
print app.backend._pending_messages

And I get the following across successive loop iterations (edit: and top still shows that memory is leaking):

<BufferMap: 1728/8192>
<BufferMap: 1728/8192>
...
<BufferMap: 2016/8192>
<BufferMap: 2016/8192>
...
<BufferMap: 2304/8192>
<BufferMap: 2304/8192>

Are you polling for the status of a task before calling .get()? If so, issue #4830 may be applicable to you.

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

@monstermac77 Can you try running that inside your task and let it run for a while, then upload the generated image here?

        objgraph.show_chain(
                 objgraph.find_backref_chain(
                 random.choice(objgraph.by_type('celery.utils.collections.Messagebuffer')),
                 objgraph.is_proper_module),
                 filename='chain.png')

@monstermac77
Copy link
Author

monstermac77 commented Mar 7, 2019

@Sewci0 Absolutely. Just did so.

Here's the output near the start of program execution (after first loop iteration): https://imgur.com/a/3RNSZPZ

Here's the output after a while of execution (by this point, the process' memory had about quadrupled in size): https://imgur.com/a/I6QRcDu

Indeed, it seems like BufferMap is growing, but app.backend._pending_messages.clear() doesn't stop the memory leak for me like it did for you.

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

@monstermac77 clear() won't change the underlying counter for the BufferMap but it should be able to free up resources. Can you please verify that by running top or atop?

@monstermac77
Copy link
Author

@Sewci0 Yeah, I was looking at top when I made the determination that memory was still leaking (I also have been running objgraph.show_growth() on each loop iteration, to see which objects are multiplying in number).

Early iteration:
7722 server ... 60m ... 0:01.37 myScript.py
10 minutes later:
7722 server ... 100m ... 0:17.40 myScript.py

When memory isn't leaking (such as when I use memcached as the backend), it executes for hours without going above 60m.

Also, when I try your other suggestion after groupSignature.get():

for task in groupSignature:
    app.backend._pending_messages.take(task.id)
    del app.backend._pending_messages[task.id]

I get the following error:

Traceback (most recent call last):
  File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "./script.py", line 467, in function
    app.backend._pending_messages.take(task.id)
  File "/usr/local/lib/python2.7/dist-packages/celery/utils/collections.py", line 877, in take
    raise self.Empty()
Empty

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

@monstermac77

    from celery.five import Empty

    try:
        app.backend._pending_messages.take(task.id)
        del app.backend._pending_messages[task.id]
    except Empty:
        pass

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

@monstermac77 also all of those methods in your case should be run against main celery instance which I guess is celery so instead of app.backend._pending_messages.take(task.id) it should be celery.backend._pending_messages.take(task.id)

@monstermac77
Copy link
Author

monstermac77 commented Mar 7, 2019

@Sewci0 I added the import and try, except, but it hits that exception every time.

Just tried switching app for celery and running first your .clear() code and then your .take() and del code and am getting AttributeError: 'celery' object has no attribute 'backend'.

I'm doing the following at the top of my file:

from app import *
import celery

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

@monstermac77 How exactly are you initializing celery and specifying the backend?

@monstermac77
Copy link
Author

monstermac77 commented Mar 7, 2019

@Sewci0 In my producer, I run:

from app import *
import celery

And in app.py I have:

from celery import Celery
from kombu import Queue, Exchange
app = Celery('app', broker='pyamqp://[ip]', backend='[ip]')
app.conf.task_queues = (
	Queue(...) for queue in queues
)

@app.task()
def taskFunction():
   ...

When I run your suggestions on capital C Celery instead of app or celery, I get 'cached_property' object has no attribute '_pending_messages'.

@Sewci0
Copy link

Sewci0 commented Mar 7, 2019

@monstermac77 app should be right

@monstermac77
Copy link
Author

monstermac77 commented Mar 7, 2019

@Sewci0 Yeah, so to summarize, running app.backend._pending_messages.clear() yields no error, but does not stop the memory from leaking. And looping through each task in the group signature and calling app.backend._pending_messages.take(task.id) just errors with raise self.Empty().

We were able to see that _pending_messages is growing without bound, as you suspected. It's just that your patch doesn't appear to work with rpc as the backend? I am using Chains and Groups, so perhaps that's the complicating factor.

@auvipy auvipy added this to the 4.3.x Maintenance milestone Mar 12, 2019
@auvipy auvipy modified the milestones: 4.4.0, 4.5 May 7, 2019
@Korijn
Copy link
Contributor

Korijn commented Nov 16, 2020

It's been quite a while since the last post in this issue; was it ever resolved for you @monstermac77 ?

We appear to be hitting the same issue at the moment.

@monstermac77
Copy link
Author

@Korijn Appreciate the check-in. Celery still sits at the core of a lot of what we do, but we've resorted to using memcached instead of RPC due to this memory leak issue. Although, we haven't done any testing since these comments back in 2019, so I suppose it's possible that this was resolved in a later version of Celery.

Hoping you were able to find this thread before you had torn out too much hair...

@Korijn
Copy link
Contributor

Korijn commented Nov 17, 2020

We tried this morning with Celery 5.0.2, and the leak in the RPC backend is still present. The Redis backend also leaks, unless we apply the workaround from #3813 (to call .forget() after every .get() call). So that's what we settled for (even though it's much slower than the RPC backend, sadly).

Our tasks work on big chunks of data (medical imaging data) so memory grows out of control very quickly (~20 tasks is enough to reach a couple gigabytes). It's unworkable for us.

@monstermac77
Copy link
Author

@Korijn unfortunately the only workaround we've found is using memcached, which is almost as fast as RPC it seems but doesn't have the leak. We also found that Redis was slower than RPC for our use case, which is why we had been using RPC. Doesn't calling .forget() after the .get() call delete all of the results? Or I suppose you're calling .get(), pulling the data into whatever you need (some non-Celery data structure), and then calling .forget()?

We're actually at the opposite end of the spectrum, interestingly. We have hundreds of thousands of tasks running every minute that are very small, but still within an hour the kernel kills our process because of the memory leak.

@Korijn
Copy link
Contributor

Korijn commented Nov 18, 2020

Interestingly/frustratingly enough, I am so far unable to reproduce this problem in a toy example with celery 5.0.2, where I pass around huge strings (3MB each).

@monstermac77
Copy link
Author

Interestingly/frustratingly enough, I am so far unable to reproduce this problem in a toy example with celery 5.0.2, where I pass around huge strings (3MB each).

I will say this: we had been using RPC for about 2 years with almost exactly the same workload before, when implementing Celery again for a very similar use case, we started seeing the memory leak occur. I spent no less than 30 hours trying to figure out what the difference was between the scripts that leaked and those that didn't (this was back when I opened this issue), even though both were using RPC.

@Korijn
Copy link
Contributor

Korijn commented Nov 19, 2020

Well, in the end I managed to reproduce the issue. It's due to our usage of ThreadPoolExecutor. I didn't realize Celery isn't thread safe. See here for minimal example: https://github.com/Korijn/celeryrepro

The reason we use ThreadPoolExecutor is to keep our main thread IO free in a Starlette/asyncio setup. Sadly that isn't going to work, I suppose.

With LEAK=False and no usage of thread pool executor:

memory usage: 25161728
function     6626     +6626
dict         3289     +3289
tuple        3130     +3130
memory usage: 193564672
function     8289     +1663
list         2069     +1230
dict         4421     +1132
memory usage: 218431488
memory usage: 213295104
memory usage: 171044864
memory usage: 122679296
memory usage: 209305600
memory usage: 126676992
memory usage: 209412096
memory usage: 158679040
memory usage: 216305664

With LEAK=True, using thread pool executor:

memory usage: 25346048
function     6626     +6626
dict         3289     +3289
tuple        3130     +3130
memory usage: 537124864
function     8306     +1680
list         2111     +1272
dict         4511     +1222
memory usage: 959385600
dict                           4561       +50
list                           2161       +50
builtin_function_or_method     1071       +30
memory usage: 1213181952
dict                           4611       +50
list                           2211       +50
builtin_function_or_method     1101       +30
memory usage: 1463549952
dict                           4661       +50
list                           2261       +50
builtin_function_or_method     1131       +30
memory usage: 1655377920
dict                           4711       +50
list                           2311       +50
builtin_function_or_method     1161       +30
memory usage: 1864581120
dict                           4761       +50
list                           2361       +50
builtin_function_or_method     1191       +30
memory usage: 2038120448
dict                           4811       +50
list                           2411       +50
builtin_function_or_method     1221       +30
memory usage: 2265395200
dict                           4861       +50
list                           2461       +50
builtin_function_or_method     1251       +30
memory usage: 2450505728
dict                           4911       +50
list                           2511       +50
builtin_function_or_method     1281       +30
memory usage: 2714198016
dict                           4961       +50
list                           2561       +50
builtin_function_or_method     1311       +30

@Korijn
Copy link
Contributor

Korijn commented Nov 19, 2020

Thanks for labeling @auvipy, but I guess I wasn't clear: if you look at the reproduction README, I'm not using Celery's threadpool workers pool, but the python built-in one, to try and avoid Celery's blocking IO in the main thread.

In the reproduction I'm using the solo Pool and concurrency 1 to isolate the leak to other things. In this case, I couldn't point to any specific component; both app.send_task and result.ready() are leaky when run via executor.submit.

I was using the rabbit mq (amqp) broker and the rpc result backend.

@monstermac77
Copy link
Author

monstermac77 commented Nov 23, 2020

@Korijn ah! It seems like thread safety could be the cause of our memory leak as well? Our use case that results in a runaway memory leak looks like this:

from multiprocessing import Process

def myProcess(data): 
    while True:
         chains = makeChains(data)
         sig = celery.group(*chains).apply_async()
         result = sig.get()


for data in datas: 
      p = Process(target=myProcess)
      p.start()

Where "datas" is length ~30 or so. We're using multiprocessing though, not multithreading, but I talked one of my coworkers who understands this stuff much better than me and he said that even though conventional wisdom has it that the memory space wouldn't be shared, the memory leak could still be happening with multiprocessing if Celery has a bug. Here's what he said:

Regarding multiprocessing vs multi threading, in theory if the process is created by forking, all memory leaks from the main threading in which it is forked will be duplicated (when leaks get out of hand, the process will crash and re-fork). That would be a consistent leak of M for N processes, resulting in memory deficient M*N.
The transport code in celery may have a leak, and that’s the main thread being forked. That means every time as either a process or thread sends information, M memory gets leaked at rate X. As processes die due to the forking leak at time T (with starting rate A) the memory deficient would be roughly A + (M * X * T).
I’m sure I forgot a coefficient with the processes/threads in the second one but you get the point.

@Korijn
Copy link
Contributor

Korijn commented Nov 24, 2020

Well, sadly, returning to main thread celery calls resolved a part of our leakages but not all of it. Also, now there is IO happening in our main thread which negatively affects our maximum number of concurrent users per server instance. So for us it's not yet in an acceptable state and we're continuing to search for more root causes...

@thedrow
Copy link
Member

thedrow commented Feb 24, 2021

Please keep investigating this.

@thedrow thedrow modified the milestones: 5.1.0, Future Feb 24, 2021
@Korijn
Copy link
Contributor

Korijn commented Feb 24, 2021

Please keep investigating this.

I never was able to resolve the issue, sadly. I ran out of time on the project.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

6 participants