Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in consumer when using lz4 compression #1021

Closed
declantraynor opened this issue Mar 9, 2017 · 13 comments
Closed

Memory leak in consumer when using lz4 compression #1021

declantraynor opened this issue Mar 9, 2017 · 13 comments

Comments

@declantraynor
Copy link

I've noticed a problem with unbounded memory usage in the consumer when handling lz4-compressed payloads. The symptoms point to a memory leak of some description. I've created a test case which should allow easy reproduction of the issue.

I've tried experimenting with various consumer settings such as fetch_max_bytes , max_partition_fetch_bytes and max_in_flight_requests_per_connection in an attempt to lessen the buffering requirements of each individual request to Kafka. In all cases, memory usage of consumer processes has continued to rise until such a point that those processes are killed by the host system.

I can additionally confirm that this issue is present in the latest PyPI release (1.3.2) as well as master and that the issue only manifests when using lz4 compression specifically. gzip appears to be working fine, for example.

Any help is greatly appreciated. Thanks!

@dpkp
Copy link
Owner

dpkp commented Mar 9, 2017

Thanks for the great bug report! I found the issue and will put up a PR to fix.

@dpkp dpkp changed the title Suspected memory leak in consumer when using lz4 compression Memory leak in consumer when using lz4 compression Mar 9, 2017
@dpkp
Copy link
Owner

dpkp commented Mar 9, 2017

It looks like there may also be a bug in lz4tools that could cause a crash during lz4 decompression (assuming #1024 fix is applied to kafka-python): darkdragn/lz4tools#8

@dpkp
Copy link
Owner

dpkp commented Mar 9, 2017

We might consider switching to https://github.com/python-lz4/python-lz4 for primary lz4 support going forward. It seems to be more active.

@declantraynor
Copy link
Author

Thanks for the amazingly fast turnaround @dpkp!

@dpkp
Copy link
Owner

dpkp commented Mar 13, 2017

I added python-lz4 support to #1024 -- can you take a look and see if it seems ok?

@dpkp
Copy link
Owner

dpkp commented Mar 14, 2017

A few issues popping up w/ python-lz4: it does not work with pypy, and its frame encoding does not work with the older "broken" lz4 code used by kafka brokers prior to 0.10. These aren't devastating problems, and certainly better than random segfaults on decode, but not as straightforward as I'd like.

@dpkp
Copy link
Owner

dpkp commented Mar 14, 2017

Given the complexity here w/ both options, I think I'm going to defer this until after the next release (which I'm hoping to push out in the next few days).

@declantraynor
Copy link
Author

Thanks for digging in @dpkp. I'll check out #1024 and get back to you.

@declantraynor
Copy link
Author

#1024 appears to be working as expected. Thanks again!

@dpkp
Copy link
Owner

dpkp commented Mar 14, 2017

Well I take it back -- i managed to get the lz4 patch passing tests again and I'm going to merge for release. Hooray!

@bt-wil
Copy link

bt-wil commented Jan 18, 2018

I encountered an issue where using lz4 compression is causing a memory leak. Looking at the output from tracemalloc, I've narrowed it down to the lz4_decode function, which internally uses lz4.frame.decompress. (I also filed a bug: python-lz4/python-lz4#96)

Output of the tracemalloc output:

[2018-01-17 23:08:43+0000] INFO:  /.../kafka/protocol/message.py:133: size=110 MiB (+6255 KiB), count=99792 (+5515), average=1155 B
....
[2018-01-17 23:21:46+0000] INFO:  /.../kafka/protocol/message.py:133: size=260 MiB (+4116 KiB), count=235781 (+3668), average=1154 B

File in question:

128           elif codec == self.CODEC_LZ4:
129              assert has_lz4(), 'LZ4 decompression unsupported'
130              if self.magic == 0:
131                  raw_bytes = lz4_decode_old_kafka(self.value)
132              else:
133                  raw_bytes = lz4_decode(self.value)
134          else:
135              raise Exception('This should be impossible')
136
137          return MessageSet.decode(raw_bytes, bytes_to_read=len(raw_bytes))

I'm using python v3.6.4 with python-kafka v1.3.3 via aiokafka v0.3. Is anyone running into the same issue?

@jeffwidman
Copy link
Collaborator

You might get more traction opening this as a new ticket... comments on closed issues/PRs typically get lost.

@jonathanunderwood
Copy link

The memory leak reported in lz4 0.18.1 has now been fixed, and the fix is in the new release 0.18.2. Thanks to @bt-wil for reporting this and giving a test case. I hope you guys will continue to reach out if you see any issues with lz4 - am keen to work with you, as I am a big fan, and user, of kafka :)

jeffwidman added a commit that referenced this issue Jan 26, 2018
Opening a PR to check if tests pass with the new version. If so, we'll want to bump `requirements-dev.txt` as well.

Many thanks to @jonathanunderwood for his diligent work here: #1021 (comment)
jeffwidman added a commit that referenced this issue Jan 26, 2018
Opening a PR to check if tests pass with the new version. If so, we'll want to bump `requirements-dev.txt` as well.

Many thanks to @jonathanunderwood for his diligent work here: #1021 (comment)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants