Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Free lz4 decompression context to avoid leak #1024

Merged
merged 4 commits into from
Mar 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ Compression
***********

kafka-python supports gzip compression/decompression natively. To produce or
consume lz4 compressed messages, you must install lz4tools and xxhash (modules
may not work on python2.6). To enable snappy, install python-snappy (also
requires snappy library).
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
To enable snappy, install python-snappy (also requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.


Expand Down
6 changes: 2 additions & 4 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ Bleeding-Edge
Optional LZ4 install
********************

To enable LZ4 compression/decompression, install lz4tools and xxhash:
To enable LZ4 compression/decompression, install python-lz4:

>>> pip install lz4tools
>>> pip install xxhash
>>> pip install lz4

*Note*: these modules do not support python2.6

Optional Snappy install
***********************
Expand Down
60 changes: 47 additions & 13 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
except ImportError:
snappy = None

try:
import lz4.frame as lz4
except ImportError:
lz4 = None

try:
import lz4f
import xxhash
except ImportError:
lz4f = None

try:
import xxhash
except ImportError:
xxhash = None

PYPY = bool(platform.python_implementation() == 'PyPy')

def has_gzip():
Expand All @@ -33,7 +42,11 @@ def has_snappy():


def has_lz4():
return lz4f is not None
if lz4 is not None:
return True
if lz4f is not None:
return True
return False


def gzip_encode(payload, compresslevel=None):
Expand Down Expand Up @@ -181,17 +194,20 @@ def snappy_decode(payload):
return snappy.decompress(payload)


def lz4_encode(payload):
"""Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
return lz4f.compressFrame(payload)
if lz4:
lz4_encode = lz4.compress # pylint: disable-msg=no-member
elif lz4f:
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
else:
lz4_encode = None


def lz4_decode(payload):
def lz4f_decode(payload):
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
ctx = lz4f.createDecompContext()
data = lz4f.decompressFrame(payload, ctx)
lz4f.freeDecompContext(ctx)

# lz4f python module does not expose how much of the payload was
# actually read if the decompression was only partial.
Expand All @@ -200,29 +216,47 @@ def lz4_decode(payload):
return data['decomp']


if lz4:
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
elif lz4f:
lz4_decode = lz4f_decode
else:
lz4_decode = None


def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
flg = data[4]
else:
flg = ord(data[4])
flg = data[4]
if not isinstance(flg, int):
flg = ord(flg)

content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
header_size += 8
# Old kafka does not accept the content-size field
# so we need to discard it and reset the header flag
flg -= 8
data = bytearray(data)
data[4] = flg
data = bytes(data)
payload = data[header_size+8:]
else:
payload = data[header_size:]

# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member

return b''.join([
data[0:header_size-1],
hc,
data[header_size:]
payload
])


def lz4_decode_old_kafka(payload):
assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
Expand Down
4 changes: 3 additions & 1 deletion test/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import absolute_import

import io
import platform

import pytest

Expand Down Expand Up @@ -34,7 +35,8 @@ def test_buffer_close():
@pytest.mark.parametrize('compression', [
'gzip',
'snappy',
pytest.mark.skipif("sys.version_info < (2,7)")('lz4'), # lz4tools does not work on py26
pytest.mark.skipif(platform.python_implementation() == 'PyPy',
reason='python-lz4 crashes on older versions of pypy')('lz4'),
])
def test_compressed_buffer_close(compression):
records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
Expand Down
13 changes: 9 additions & 4 deletions test/test_codec.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from __future__ import absolute_import

import platform
import struct

import pytest
Expand Down Expand Up @@ -80,7 +83,8 @@ def test_snappy_encode_xerial():
assert compressed == to_ensure


@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4():
for i in xrange(1000):
b1 = random_string(100).encode('utf-8')
Expand All @@ -89,7 +93,8 @@ def test_lz4():
assert b1 == b2


@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4_old():
for i in xrange(1000):
b1 = random_string(100).encode('utf-8')
Expand All @@ -98,8 +103,8 @@ def test_lz4_old():
assert b1 == b2


@pytest.mark.xfail(reason="lz4tools library doesnt support incremental decompression")
@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available")
@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy',
reason="python-lz4 crashes on old versions of pypy")
def test_lz4_incremental():
for i in xrange(1000):
# lz4 max single block size is 4MB
Expand Down
4 changes: 2 additions & 2 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def test_end_to_end(kafka_broker, compression):
# LZ4 requires 0.8.2
if version() < (0, 8, 2):
return
# LZ4 python libs don't work on python2.6
elif sys.version_info < (2, 7):
# python-lz4 crashes on older versions of pypy
elif platform.python_implementation() == 'PyPy':
return

connect_str = 'localhost:' + str(kafka_broker.port)
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ deps =
pytest-mock
mock
python-snappy
lz4tools
lz4
xxhash
py26: unittest2
commands =
Expand Down