Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous memory leak #4843

Open
marvelph opened this issue Jun 23, 2018 · 157 comments · Fixed by #5870
Open

Continuous memory leak #4843

marvelph opened this issue Jun 23, 2018 · 157 comments · Fixed by #5870

Comments

@marvelph
Copy link

@marvelph marvelph commented Jun 23, 2018

There is a memory leak in the parent process of Celery's worker.
It is not a child process executing a task.
It happens suddenly every few days.
Unless you stop Celery, it consumes server memory in tens of hours.

This problem happens at least in Celery 4.1, and it also occurs in Celery 4.2.
Celery is running on Ubuntu 16 and brokers use RabbitMQ.

memory

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 23, 2018

Are you using Canvas workflows? Maybe #4839 is related.

Also I assume you are using prefork pool for worker concurrency?

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 23, 2018

Thanks georgepsarakis.

I am not using workflow.
I use prefork concurrency 1 on single server.

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 23, 2018

The increase rate seems quite linear, quite weird. Is the worker processing tasks during this time period? Also, can you add a note with the complete command you are using to start the worker?

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 23, 2018

Yes. The worker continues to process the task normally.

The worker is started with the following command.

/xxxxxxxx/bin/celery worker --app=xxxxxxxx --loglevel=INFO --pidfile=/var/run/xxxxxxxx.pid

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 23, 2018

This problem is occurring in both the production environment and the test environment.
I can add memory profile and test output to the test environment.
If there is anything I can do, please say something.

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 23, 2018

We need to understand what the worker is running during the time that the memory increase is observed. Any information and details you can possibly provide would definitely. It is also good that you can reproduce this.

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 23, 2018

Although it was a case occurred at a timing different from the graph, the next log was output at the timing when the memory leak started.

[2018-02-24 07:50:52,953: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
File "/xxxxxxxx/lib/python3.5/site-packages/celery/worker/consumer/consumer.py", line 320, in start
blueprint.start(self)
File "/xxxxxxxx/lib/python3.5/site-packages/celery/bootsteps.py", line 119, in start
step.start(parent)
File "/xxxxxxxx/lib/python3.5/site-packages/celery/worker/consumer/consumer.py", line 596, in start
c.loop(*c.loop_args())
File "/xxxxxxxx/lib/python3.5/site-packages/celery/worker/loops.py", line 88, in asynloop
next(loop)
File "/xxxxxxxx/lib/python3.5/site-packages/kombu/async/hub.py", line 293, in create_loop
poll_timeout = fire_timers(propagate=propagate) if scheduled else 1
File "/xxxxxxxx/lib/python3.5/site-packages/kombu/async/hub.py", line 136, in fire_timers
entry()
File "/xxxxxxxx/lib/python3.5/site-packages/kombu/async/timer.py", line 68, in __call__
return self.fun(*self.args, **self.kwargs)
File "/xxxxxxxx/lib/python3.5/site-packages/kombu/async/timer.py", line 127, in _reschedules
return fun(*args, **kwargs)
File "/xxxxxxxx/lib/python3.5/site-packages/kombu/connection.py", line 290, in heartbeat_check
return self.transport.heartbeat_check(self.connection, rate=rate)
File "/xxxxxxxx/lib/python3.5/site-packages/kombu/transport/pyamqp.py", line 149, in heartbeat_check
return connection.heartbeat_tick(rate=rate)
File "/xxxxxxxx/lib/python3.5/site-packages/amqp/connection.py", line 696, in heartbeat_tick
self.send_heartbeat()
File "/xxxxxxxx/lib/python3.5/site-packages/amqp/connection.py", line 647, in send_heartbeat
self.frame_writer(8, 0, None, None, None)
File "/xxxxxxxx/lib/python3.5/site-packages/amqp/method_framing.py", line 166, in write_frame
write(view[:offset])
File "/xxxxxxxx/lib/python3.5/site-packages/amqp/transport.py", line 258, in write
self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-02-24 08:49:12,016: INFO/MainProcess] Connected to amqp://xxxxxxxx:**@xxx.xxx.xxx.xxx:5672/xxxxxxxx

It seems that it occurred when the connection with RabbitMQ was temporarily cut off.

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 24, 2018

@marvelph so it occurs during RabbitMQ reconnections? Perhaps these issues are related:

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 24, 2018

Yes.
It seems that reconnection triggers it.

@jxltom
Copy link

@jxltom jxltom commented Jun 25, 2018

It looks like I'm having the same issue... It is so hard for me to find out what triggers it and why there is a memeory leak. It annoys me for at least a month. I fallback to used celery 3 and everything is fine.

For the memory leak issue, I'm using ubuntu 16, celery 4.1.0 with rabbitmq. I deployed it via docker.

The memory leak is with MainProcess not ForkPoolWorker. The memory usage of ForkPoolWorker is normal, but memory usage of MainProcess is always increasing. For five seconds, around 0.1MB memeory is leaked. The memory leak doesn't start after the work starts immediatly but maybe after one or two days.

I used gdb and pyrasite to inject the running process and try to gc.collect(), but nothing is collected.

I checked the log, the consumer: Connection to broker lost. Trying to re-establish the connection... did happens, but for now I'm not sure this is the time when memory leak happens.

Any hints for debugging this issue and to find out what really happens? Thanks.

@jxltom
Copy link

@jxltom jxltom commented Jun 25, 2018

Since @marvelph mentioned it may relate with rabbitmq reconnection, I try to stop my rabbitmq server. The memory usage did increase after each reconnection, following is the log. So I can confirm this celery/kombu#843 issue.

But after the connection is reconnected, the memory usage stops to gradually increase. So I'm not sure this is the reason for memory leak.

I will try to use redis to figure out whether this memory leak issue relates wtih rabbitmq or not.

[2018-06-25 02:43:33,456: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 316, in start
    blueprint.start(self)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 592, in start
    c.loop(*c.loop_args())
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/loops.py", line 91, in asynloop
    next(loop)
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/asynchronous/hub.py", line 354, in create_loop
    cb(*cbargs)
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/transport/base.py", line 236, in on_readable
    reader(loop)
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/transport/base.py", line 218, in _read
    drain_events(timeout=0)
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/connection.py", line 491, in drain_events
    while not self.blocking_read(timeout):
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/connection.py", line 496, in blocking_read
    frame = self.transport.read_frame()
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/transport.py", line 243, in read_frame
    frame_header = read(7, True)
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/transport.py", line 418, in _read
    s = recv(n - len(rbuf))
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-06-25 02:43:33,497: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 2.00 seconds...

[2018-06-25 02:43:35,526: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 4.00 seconds...

[2018-06-25 02:43:39,560: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 6.00 seconds...

[2018-06-25 02:43:45,599: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 8.00 seconds...

[2018-06-25 02:43:53,639: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 10.00 seconds...

[2018-06-25 02:44:03,680: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 12.00 seconds...

[2018-06-25 02:44:15,743: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 14.00 seconds...

[2018-06-25 02:44:29,790: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 16.00 seconds...

[2018-06-25 02:44:45,839: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 18.00 seconds...

[2018-06-25 02:45:03,890: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 20.00 seconds...

[2018-06-25 02:45:23,943: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 22.00 seconds...

[2018-06-25 02:45:46,002: ERROR/MainProcess] consumer: Cannot connect to amqp://***:**@***:***/***: [Errno 111] Connection refused.
Trying again in 24.00 seconds...

[2018-06-25 02:46:10,109: INFO/MainProcess] Connected to amqp://***:**@***:***/***
[2018-06-25 02:46:10,212: INFO/MainProcess] mingle: searching for neighbors
[2018-06-25 02:46:10,291: WARNING/MainProcess] consumer: Connection to broker lost. Trying to re-establish the connection...
Traceback (most recent call last):
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 316, in start
    blueprint.start(self)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/bootsteps.py", line 119, in start
    step.start(parent)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 40, in start
    self.sync(c)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 44, in sync
    replies = self.send_hello(c)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/worker/consumer/mingle.py", line 57, in send_hello
    replies = inspect.hello(c.hostname, our_revoked._data) or {}
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/app/control.py", line 132, in hello
    return self._request('hello', from_node=from_node, revoked=revoked)
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/app/control.py", line 84, in _request
    timeout=self.timeout, reply=True,
  File "/app/.heroku/python/lib/python3.6/site-packages/celery/app/control.py", line 439, in broadcast
    limit, callback, channel=channel,
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/pidbox.py", line 315, in _broadcast
    serializer=serializer)
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/pidbox.py", line 290, in _publish
    serializer=serializer,
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/messaging.py", line 181, in publish
    exchange_name, declare,
  File "/app/.heroku/python/lib/python3.6/site-packages/kombu/messaging.py", line 203, in _publish
    mandatory=mandatory, immediate=immediate,
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/channel.py", line 1732, in _basic_publish
    (0, exchange, routing_key, mandatory, immediate), msg
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/abstract_channel.py", line 50, in send_method
    conn.frame_writer(1, self.channel_id, sig, args, content)
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/method_framing.py", line 166, in write_frame
    write(view[:offset])
  File "/app/.heroku/python/lib/python3.6/site-packages/amqp/transport.py", line 275, in write
    self._write(s)
ConnectionResetError: [Errno 104] Connection reset by peer
[2018-06-25 02:46:10,375: INFO/MainProcess] Connected to amqp://***:**@***:***/***
[2018-06-25 02:46:10,526: INFO/MainProcess] mingle: searching for neighbors
[2018-06-25 02:46:11,764: INFO/MainProcess] mingle: all alone

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 25, 2018

Although I checked the logs, I found a log of reconnection at the timing of memory leak, but there was also a case where a memory leak started at the timing when reconnection did not occur.
I agree with the idea of jxlton.

Also, when I was using Celery 3.x, I did not encounter such a problem.

@dmitry-kostin
Copy link

@dmitry-kostin dmitry-kostin commented Jun 25, 2018

same problem here
screenshot 2018-06-25 11 09 22
Every few days i have to restart workers due to this problem
there are no any significant clues in logs, but I have a suspicion that reconnects can affect; since i have reconnect log entries somewhere in time when memory starts constantly growing
My conf is ubuntu 17, 1 server - 1 worker with 3 concurrency; rabbit and redis on backend; all packages are the latest versions

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 25, 2018

@marvelph @dmitry-kostin could you please provide your exact configuration (omitting sensitive information of course) and possibly a task, or sample, that reproduces the issue? Also, do you have any estimate of the average uptime interval that the worker memory increase starts appearing?

@dmitry-kostin
Copy link

@dmitry-kostin dmitry-kostin commented Jun 25, 2018

the config is nearby to default

imports = ('app.tasks',)
result_persistent = True
task_ignore_result = False
task_acks_late = True
worker_concurrency = 3
worker_prefetch_multiplier = 4
enable_utc = True
timezone = 'Europe/Moscow'
broker_transport_options = {'visibility_timeout': 3600, 'confirm_publish': True, 'fanout_prefix': True, 'fanout_patterns': True}

screenshot 2018-06-25 11 35 17

Basically this is new deployed node; it was deployed on 06/21 18-50; stared to grow 6/23 around 05-00 and finally crashed 6/23 around 23-00

the task is pretty simple and there is no superlogic there, i think i can reproduce the whole situation on a clear temp project but have no free time for now, if i will be lucky i will try to do a full example on weekend

UPD
as you can see the task itself consumes some memory you can see it by spikes on the graph, but the time when memory stared to leak there were no any tasks produced or any other activities

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 25, 2018

@marvelph @dmitry-kostin @jxltom I noticed you use Python3. Would you mind enabling tracemalloc for the process? You may need to patch the worker process though to log memory allocation traces, let me know if you need help with that.

@jxltom
Copy link

@jxltom jxltom commented Jun 25, 2018

@georgepsarakis You mean enable tracemalloc in worker and log stats, such as the top 10 memory usage files, at a specific interval such as 5 minutes?

@georgepsarakis
Copy link
Member

@georgepsarakis georgepsarakis commented Jun 25, 2018

@jxltom I think something like that would help locate the part of code that is responsible. What do you think?

@jxltom
Copy link

@jxltom jxltom commented Jun 25, 2018

@georgepsarakis I'v tried to use gdb and https://github.com/lmacken/pyrasite to inject the memory leak process, and start debug via tracemalloc. Here is the top 10 file with highest mem usage.

I use resource.getrusage(resource.RUSAGE_SELF).ru_maxrss / 1024 and the memory usage is gradually increasing indeed.

>>> import tracemalloc
>>> 
>>> tracemalloc.start()
>>> snapshot = tracemalloc.take_snapshot()
>>> top_stats = snapshot.statistics('lineno')
>>> for stat in top_stats[:10]:
...     print(stat)
... 
/app/.heroku/python/lib/python3.6/site-packages/kombu/utils/eventio.py:84: size=12.0 KiB, count=1, average=12.0 KiB
/app/.heroku/python/lib/python3.6/site-packages/celery/worker/heartbeat.py:47: size=3520 B, count=8, average=440 B
/app/.heroku/python/lib/python3.6/site-packages/amqp/method_framing.py:166: size=3264 B, count=12, average=272 B
/app/.heroku/python/lib/python3.6/site-packages/celery/events/dispatcher.py:142: size=3060 B, count=10, average=306 B
/app/.heroku/python/lib/python3.6/site-packages/celery/events/dispatcher.py:157: size=2912 B, count=8, average=364 B
/app/.heroku/python/lib/python3.6/site-packages/amqp/abstract_channel.py:50: size=2912 B, count=8, average=364 B
/app/.heroku/python/lib/python3.6/site-packages/kombu/messaging.py:181: size=2816 B, count=12, average=235 B
/app/.heroku/python/lib/python3.6/site-packages/kombu/messaging.py:203: size=2816 B, count=8, average=352 B
/app/.heroku/python/lib/python3.6/site-packages/celery/events/dispatcher.py:199: size=2672 B, count=6, average=445 B
/app/.heroku/python/lib/python3.6/site-packages/amqp/channel.py:1734: size=2592 B, count=8, average=324 B

Here is the difference between two snapshots after around 5 minutes.

>>> snapshot2 = tracemalloc.take_snapshot()
>>> top_stats = snapshot2.compare_to(snapshot, 'lineno')
>>> print("[ Top 10 differences ]")
[ Top 10 differences ]

>>> for stat in top_stats[:10]:
...     print(stat)
... 
/app/.heroku/python/lib/python3.6/site-packages/celery/worker/heartbeat.py:47: size=220 KiB (+216 KiB), count=513 (+505), average=439 B
/app/.heroku/python/lib/python3.6/site-packages/celery/events/dispatcher.py:142: size=211 KiB (+208 KiB), count=758 (+748), average=285 B
/app/.heroku/python/lib/python3.6/site-packages/amqp/method_framing.py:166: size=210 KiB (+206 KiB), count=789 (+777), average=272 B
/app/.heroku/python/lib/python3.6/site-packages/celery/events/dispatcher.py:157: size=190 KiB (+187 KiB), count=530 (+522), average=366 B
/app/.heroku/python/lib/python3.6/site-packages/amqp/abstract_channel.py:50: size=186 KiB (+183 KiB), count=524 (+516), average=363 B
/app/.heroku/python/lib/python3.6/site-packages/celery/events/dispatcher.py:199: size=185 KiB (+182 KiB), count=490 (+484), average=386 B
/app/.heroku/python/lib/python3.6/site-packages/kombu/messaging.py:203: size=182 KiB (+179 KiB), count=528 (+520), average=353 B
/app/.heroku/python/lib/python3.6/site-packages/kombu/messaging.py:181: size=179 KiB (+176 KiB), count=786 (+774), average=233 B
/app/.heroku/python/lib/python3.6/site-packages/amqp/channel.py:1734: size=165 KiB (+163 KiB), count=525 (+517), average=323 B
/app/.heroku/python/lib/python3.6/site-packages/kombu/async/hub.py:293: size=157 KiB (+155 KiB), count=255 (+251), average=632 B

@jxltom
Copy link

@jxltom jxltom commented Jun 25, 2018

Any suggestions for how to continue to debug this? I have no clue for how to proceed. Thanks.

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 26, 2018

@georgepsarakis

I want a little time to cut out the project for reproduction.

It is setting of Celery.

BROKER_URL = [
    'amqp://xxxxxxxx:yyyyyyyy@aaa.bbb.ccc.ddd:5672/zzzzzzzz'
]
BROKER_TRANSPORT_OPTIONS = {}

The scheduler has the following settings.

CELERYBEAT_SCHEDULE = {
    'aaaaaaaa_bbbbbbbb': {
        'task': 'aaaa.bbbbbbbb_cccccccc',
        'schedule': celery.schedules.crontab(minute=0),
    },
    'dddddddd_eeeeeeee': {
        'task': 'dddd.eeeeeeee_ffffffff',
        'schedule': celery.schedules.crontab(minute=0),
    },
}

On EC 2, I am using supervisord to operate it.

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 26, 2018

@georgepsarakis
Since my test environment can tolerate performance degradation, you can use tracemalloc.
Can you make a patched Celery to dump memory usage?

@dmitry-kostin
Copy link

@dmitry-kostin dmitry-kostin commented Jun 26, 2018

@jxltom I bet tracemalloc with 5 minutes wont help to locate problem
For example I have 5 nodes and only 3 of them had this problem for last 4 days, and 2 worked fine all this this time, so it will be very tricky to locate problem ..
I feel like there is some toggle that switches on and then memory starts grow, until this switch memory consumption looks very well

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 26, 2018

I tried to find out whether similar problems occurred in other running systems.
The frequency of occurrence varies, but a memory leak has occurred on three systems using Celery 4.x, and it has not happened on one system.
The system that has a memory leak is Python 3.5.x, and the system with no memory leak is Python 2.7.x.

@jxltom
Copy link

@jxltom jxltom commented Jun 26, 2018

@dmitry-kostin What's the difference with the other two normal nodes, are they both using same rabbitmq as broker?

Since our discussion mentioned it may related to rabbitmq, I started another new node with same configuration except for using redis instead. So far, this node has no memory leak after running 24 hours. I will post it here if it has memory leak later

@jxltom
Copy link

@jxltom jxltom commented Jun 26, 2018

@marvelph So do you mean that the three system with memory leak are using python3 while the one which is fine is using python2?

@dmitry-kostin
Copy link

@dmitry-kostin dmitry-kostin commented Jun 26, 2018

@jxltom no difference at all, and yes they are on python 3 & rabit as broker and redis on backend
I made a testing example to reproduce this, if it will succeed in a couple of days i will give credentials to this servers for somebody who aware how to locate this bug

@marvelph
Copy link
Author

@marvelph marvelph commented Jun 26, 2018

@jxltom
Yes.
As far as my environment is concerned, problems do not occur in Python 2.

@auvipy auvipy added this to the 5.2.x milestone Oct 30, 2021
@pawl
Copy link
Contributor

@pawl pawl commented Dec 8, 2021

I was able to reproduce this issue with pyamqp 5.0.6 + celery 5.2.0. This issue seems very similar to the one described in #5047, but the issue with re-connection has been fixed. However, the memory leak remains.

I created this repo with a docker-compose file and readme to make it easier for others to reproduce the issue: https://github.com/pawl/celery_pyamqp_memory_leak

I think the temporary fix for this issue involves turning off pyamqp's heartbeat functionality with broker_heartbeat=0. Also, @rgeens says he was able to fix this issue by using librabbitmq (because it doesn't implement amqp heartbeat functionality): #3916 (comment)

This also seems to explain why some people have said they've fixed the issue by moving to redis as a broker.

@michael-lazar
Copy link

@michael-lazar michael-lazar commented Dec 11, 2021

@pawl @VinayGValsaraj @cmontanaro and myself looked into this bug for a hackathon at our company.

The key to reproducing this leak was the docker container that @pawl posted. Because the heartbeat is set so low, the celery worker does not have time to respond to AMQP heartbeats and RabbitMQ will close the connection.
This results in a loop where every time the connection is closed on the broker side, celery's memory will continue to grow.

We used the pympler profiler tool and found a good spot in the celery code to take a snapshot of the memory diff.

class Consumer:
    """Consumer blueprint."""
    ...
        
    from pympler import tracker
    tr = tracker.SummaryTracker()

    def on_connection_error_after_connected(self, exc):
        from pympler import summary
        diff = self.tr.diff()
        summary.print_(diff, limit=1000)
        ...  # Continue normal code after this point
                               types |   # objects |   total size
 =================================== | =========== | ============
                         array.array |           1 |    130.39 KB
                           bytearray |           1 |    128.05 KB
                                dict |          33 |      8.41 KB
                                code |           0 |      3.71 KB
                   collections.deque |           4 |      2.44 KB
                              method |          23 |      1.44 KB
                                 str |          15 |      1.07 KB
                                 set |           4 |    864     B
                                cell |          19 |    760     B
                               tuple |          10 |    640     B
                                list |           7 |    616     B
          builtin_function_or_method |           8 |    576     B
             collections.defaultdict |           2 |    480     B
               vine.promises.promise |           3 |    432     B
                          memoryview |           1 |    184     B
                 threading.Condition |           3 |    144     B
                 function (on_frame) |           1 |    136     B
               function (on_message) |           1 |    136     B
              function (write_frame) |           1 |    136     B
                 function (<lambda>) |           1 |    136     B
                       managedbuffer |           1 |    128     B
                                 int |           4 |    112     B
                       socket.socket |           1 |     88     B
                               bytes |           2 |     79     B
      kombu.transport.pyamqp.Channel |           1 |     48     B
                     amqp.sasl.PLAIN |           1 |     48     B
                         queue.Queue |           1 |     48     B
                  kombu.entity.Queue |           1 |     48     B
          amqp.method_framing.Buffer |           1 |     48     B
   kombu.transport.pyamqp.Connection |           1 |     48     B
         amqp.transport.TCPTransport |           1 |     48     B
            kombu.messaging.Consumer |           1 |     48     B
    kombu.transport.pyamqp.Transport |           1 |     48     B
         kombu.connection.Connection |           1 |     48     B
               kombu.entity.Exchange |           1 |     48     B
                        _thread.lock |           1 |     40     B
                               float |           1 |     24     B

This summary shows the net difference in memory from the last snapshot that was taken. We want this table to be empty, because as connections are created and cleaned up we expect the memory profile for the celery worker to be more or less constant. All of this stuff shown is related to one Connection/Transport/Channel object, and this essentially tells us that the dropped connection object is persisting in memory and can't be garbage collected.

We also ran tracemalloc which shows a different view of the same issue.

image

(If you're curious about the two large arrays at the top, they're avail_channel_ids and buffer_store.)

Bug 1

Next, we were able to track down the source of the memory leak to the main kombu event loop. New connections are registered to the event loop by calling conn.transport.register_with_event_loop(conn.connection, self.hub). However, connections that are dropped by the broker are never removed from the event loop. I took a snapshot in my debugger and you can see there are a bunch of old, disconnected transport objects hanging around with different file descriptors. This is the root cause of the memory leak. We added some custom code here periodically purge the hub of disconnected transports, and the leak went away.

Screen-Shot-2021-12-09-at-5 54 01-PM

This curious thing about this is that we failed to find any code that even attempts to remove disconnected transports from the event loop. There's a method stub unregister_from_event_loop() on the the base transport class but it's never implemented or called from anywhere. The Redis transport registers a callback _on_disconnect() to do it, which is probably why people have claimed their memory leaks disappear when they switched to Redis. However, none of the other transports, including transport.pyamqp, do anything like this from what I can tell.

Bug 2

The other related bug we found was that py-amqp is not correctly handling sockets that are closed on the server side.

https://github.com/celery/py-amqp/blob/master/amqp/transport.py#L273-L282

    def close(self):
        if self.sock is not None:
            self._shutdown_transport()
            # Call shutdown first to make sure that pending messages
            # reach the AMQP broker if the program exits after
            # calling this method.
            self.sock.shutdown(socket.SHUT_RDWR)
            self.sock.close()
            self.sock = None
        self.connected = False

If the connection is closed on the remote side, calling sock.shutdown() will raise an OSError. This is caught further up the stack in a blanket except Exception: clause. But the side-effect is that socket.close() is never reached, and the socket is never appropriately cleaned up and released by python.

This is why you see in the above screenshot that the socket file descriptors are incrementing (81, 82, 83, ...) each time the connection is dropped and re-established. Fixing this (by adding a separate try...except... clause around socket.shutdown()) incidentally also fixed bug 1 for us. This is because when the file descriptor is properly released, and the next time py-amqp calls socket.socket() the OS will recycle the same file descriptor. And when celery goes to register the new transport to the kombu event loop (which stores transports in a dict indexed by fd), it will overwrite the disconnected transport and allow it to be garbage collected.

I don't believe that this is a dependable fix for bug 1 though, because there is no guarantee that celery will be allocated back the same file descriptor the next time that amqp calls socket.socket(). This just seems to be what happens in my OS & environment.

@auvipy
Copy link
Member

@auvipy auvipy commented Dec 14, 2021

we have 2 related PR merged on py-amqp, one is already on pypi and another in master, which will be released soon.

michael-lazar added a commit to michael-lazar/celery that referenced this issue Dec 14, 2021
michael-lazar added a commit to michael-lazar/celery that referenced this issue Dec 14, 2021
michael-lazar added a commit to michael-lazar/celery that referenced this issue Dec 14, 2021
@jaysoffian
Copy link

@jaysoffian jaysoffian commented Dec 20, 2021

If the connection is closed on the remote side, calling sock.shutdown() will raise an OSError. This is caught further up the stack in a blanket except Exception: clause. But the side-effect is that socket.close() is never reached, and the socket is never appropriately cleaned up and released by python.

Actually, I think the issue isn't that socket.close() is never called, but rather that self.sock is never set to None when an exception is raised by shutdown(). Of course, you should always close your sockets explicitly, but if you fail to do so, Python will do that on your behalf when the reference to the socket is GC'd. Since self.sock was never set to None in the case where socket.shutdown() raised an exception, the socket object itself never got GC'd by Python.

@pawl
Copy link
Contributor

@pawl pawl commented Dec 20, 2021

@jaysoffian I made a branch on py-amqp to test your theory: pawl/py-amqp@45f5eef (and used that branch in a submodule here)

Here's what I see after running it for a short time:

Top 10 lines
#1: <frozen importlib._bootstrap_external>:525: 7104.2 KiB
#2: /celery_app/py-amqp/amqp/connection.py:270: 5085.4 KiB
    self._avail_channel_ids = array('H', range(self.channel_max, 0, -1))
#3: /celery_app/py-amqp/amqp/method_framing.py:106: 4996.7 KiB
    buffer_store = Buffer(bytearray(connection.frame_max - 8))
#4: /usr/local/lib/python3.7/linecache.py:137: 880.4 KiB
    lines = fp.readlines()
#5: /usr/local/lib/python3.7/abc.py:126: 318.6 KiB
    cls = super().__new__(mcls, name, bases, namespace, **kwargs)
#6: /usr/local/lib/python3.7/tracemalloc.py:65: 156.6 KiB
    return (self.size, self.count, self.traceback)
#7: /usr/local/lib/python3.7/typing.py:711: 141.7 KiB
    super().__setattr__(attr, val)
#8: /usr/local/lib/python3.7/tracemalloc.py:185: 135.2 KiB
    self._frames = tuple(reversed(frames))
#9: /usr/local/lib/python3.7/abc.py:127: 92.7 KiB
    _abc_init(cls)
#10: /celery_app/py-amqp/amqp/connection.py:428: 84.1 KiB
    self.server_heartbeat = server_heartbeat or 0
9371 other: 6224.4 KiB
Total allocated size: 25219.9 KiB
Closed channel #1

Based on that _avail_channel_ids line, Connection isn't getting cleaned up unless socket.close() runs.

This might be evidence that this is an issue on redis-py too and the fix I made there might help with this issue: redis/redis-py#1797

Although, like Michael was saying, there's a lot more clean-up logic for the redis transport. It may still be possible that it's not an issue when using redis.

@jaysoffian
Copy link

@jaysoffian jaysoffian commented Dec 20, 2021

Based on that _avail_channel_ids line, Connection isn't getting cleaned up unless socket.close() runs.

@pawl, I appreciate you running this test. Maybe the GC is delayed. Python is documented as closing a socket when the object is GC'd, and I just empirically tested this is the case using Python 3.9.5 and nc -l 9999 in a pair of terminal windows. I'm not familiar enough with the celery/aqmp code to follow what's going on with _avail_channel_ids.

@michael-lazar
Copy link

@michael-lazar michael-lazar commented Dec 20, 2021

I think even if you set self.sock = None, there are still other references to the socket object that will hang around and prevent GC.

https://github.com/celery/py-amqp/blob/81d4072825b341c797beffae8e6c4e7ecf8ad71d/amqp/transport.py#L417
https://github.com/celery/py-amqp/blob/81d4072825b341c797beffae8e6c4e7ecf8ad71d/amqp/transport.py#L630

>>> self.sock
<ssl.SSLSocket fd=79, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 56934), raddr=('127.0.0.1', 5671)>
>>> self.sock = None
>>> self._quick_recv.__self__
<ssl.SSLSocket fd=79, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 56934), raddr=('127.0.0.1', 5671)>

@jaysoffian
Copy link

@jaysoffian jaysoffian commented Dec 20, 2021

Yes, I agree, that's what's going on, combined with Connection.collect() not destroying the Transport object when Transport.close() raises an exception:

https://github.com/celery/py-amqp/blob/81d4072825b341c797beffae8e6c4e7ecf8ad71d/amqp/connection.py#L468

Thanks, glad to understand this. Again, my point was only that the issue that this isn't some sort of bug in Python leaking sockets ("the socket is never appropriately cleaned up and released by python"), but rather a design issue with py-aqmp/celery retaining a reference to it due to bad design/exception handling.

@thedrow
Copy link
Member

@thedrow thedrow commented Dec 21, 2021

Did we finally mitigate all known memory leaks?

@auvipy
Copy link
Member

@auvipy auvipy commented Dec 21, 2021

Did we finally mitigate all known memory leaks?

I think we are still work in progress

@pawl
Copy link
Contributor

@pawl pawl commented Dec 21, 2021

I was able to reproduce the memory leak issue with celery 5.2.0 + redis-py from the master branch (after 4.1.0rc2). It has this socket closing fix (which doesn't seem to fix the issue): redis/redis-py#1797.

I created this branch with a docker-compose file and readme to make it easier for others to reproduce the issue: https://github.com/pawl/celery_pyamqp_memory_leak/tree/test_redis

The issue might be similar to py-amqp's #374, which was caused by failing to run socket.close() or clean up all the references to the socket.

The output from tracemalloc looks similar to the py-amqp issue, which shows a kombu Transport that didn't get cleaned up by garbage collection. Probably because the Transport still has a lingering reference to the unclosed socket. This top line's memory usage will continue to grow each time you restart redis:

Top 10 lines
#1: /celery_app/kombu/kombu/transport/virtual/base.py:911: 2999.1 KiB
    ARRAY_TYPE_H, range(self.channel_max, 0, -1),
<truncated>

Maybe we're missing some socket closing logic at some part of this stack trace when a ConnectionError is encountered.

I also opened this PR to kombu which should reduce the severity of the leaks: celery/kombu#1470 However, we still need to figure out the root cause.

@pawl
Copy link
Contributor

@pawl pawl commented Dec 24, 2021

The pull request I made today with the fix for the Redis broker leaking memory (when connections to the broker fail) was just merged.

I'm not aware of any other ways to reproduce memory leaks for #4843 at the moment.

Here's a summary of the fixes so far:

These fixes should completely prevent leaks due to disconnected connections to the broker:

And, if there are still some scenarios where that doesn't work... There's also these fixes that make Connections and Transports use ~150kb less memory each (making some potential leaks much less severe):

Thank you @auvipy for all the feedback and help with getting this stuff reviewed and merged.

@auvipy
Copy link
Member

@auvipy auvipy commented Dec 24, 2021

@pawl thanks to you and your team mates for the great collaboration & contributions. will push point releases with other merged changes next Sunday if not swallowed by family/holiday vibes. but next week for sure

@caleb15
Copy link

@caleb15 caleb15 commented Jan 6, 2022

@auvipy Just to double-check, version 5.2.3 of celery that you pushed recently has the memory leak fixes, right?

@pawl
Copy link
Contributor

@pawl pawl commented Jan 6, 2022

@caleb15 Celery 5.2.3 does have a minor leak fix I didn't mention in my comment above: #7187 But, I'm not sure that one is the main one that is generating the complaints in this thread.

I think the main leak fixes are going to come from upgrading kombu to 5.2.3 (if you're using the redis broker) and py-amqp to 5.0.9 (if you're using py-amqp for connecting to rabbitmq).

For more details, see: #4843 (comment)

You may also want to check out this new section of the docs about handling memory leaks: https://docs.celeryproject.org/en/stable/userguide/optimizing.html#memory-usage

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment