From 23d1cc444d16ddd58c9fec82f8a6f4331fed46dc Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 9 Mar 2017 11:08:48 -0800 Subject: [PATCH 1/4] Free lz4 decompression context to avoid leak --- kafka/codec.py | 1 + 1 file changed, 1 insertion(+) diff --git a/kafka/codec.py b/kafka/codec.py index 1e5710791..4deec49da 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -192,6 +192,7 @@ def lz4_decode(payload): # pylint: disable-msg=no-member ctx = lz4f.createDecompContext() data = lz4f.decompressFrame(payload, ctx) + lz4f.freeDecompContext(ctx) # lz4f python module does not expose how much of the payload was # actually read if the decompression was only partial. From 43820dfb93712b5721505c4e6dee542cd73bfca9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 13 Mar 2017 14:22:57 -0700 Subject: [PATCH 2/4] Prefer python-lz4 over lz4f if available --- docs/index.rst | 5 ++--- docs/install.rst | 6 ++---- kafka/codec.py | 39 ++++++++++++++++++++++++++++++++------- tox.ini | 2 +- 4 files changed, 37 insertions(+), 15 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 2cef7fe06..21cb3b9b8 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -113,9 +113,8 @@ Compression *********** kafka-python supports gzip compression/decompression natively. To produce or -consume lz4 compressed messages, you must install lz4tools and xxhash (modules -may not work on python2.6). To enable snappy, install python-snappy (also -requires snappy library). +consume lz4 compressed messages, you should install python-lz4 (pip install lz4). +To enable snappy, install python-snappy (also requires snappy library). See `Installation `_ for more information. diff --git a/docs/install.rst b/docs/install.rst index 9720d65a1..cc0e82d68 100644 --- a/docs/install.rst +++ b/docs/install.rst @@ -26,12 +26,10 @@ Bleeding-Edge Optional LZ4 install ******************** -To enable LZ4 compression/decompression, install lz4tools and xxhash: +To enable LZ4 compression/decompression, install python-lz4: ->>> pip install lz4tools ->>> pip install xxhash +>>> pip install lz4 -*Note*: these modules do not support python2.6 Optional Snappy install *********************** diff --git a/kafka/codec.py b/kafka/codec.py index 4deec49da..29db48e48 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -16,12 +16,21 @@ except ImportError: snappy = None +try: + import lz4.frame as lz4 +except ImportError: + lz4 = None + try: import lz4f - import xxhash except ImportError: lz4f = None +try: + import xxhash +except ImportError: + xxhash = None + PYPY = bool(platform.python_implementation() == 'PyPy') def has_gzip(): @@ -33,7 +42,11 @@ def has_snappy(): def has_lz4(): - return lz4f is not None + if lz4 is not None: + return True + if lz4f is not None: + return True + return False def gzip_encode(payload, compresslevel=None): @@ -181,13 +194,15 @@ def snappy_decode(payload): return snappy.decompress(payload) -def lz4_encode(payload): - """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" - # pylint: disable-msg=no-member - return lz4f.compressFrame(payload) +if lz4: + lz4_encode = lz4.compress # pylint: disable-msg=no-member +elif lz4f: + lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member +else: + lz4_encode = None -def lz4_decode(payload): +def lz4f_decode(payload): """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" # pylint: disable-msg=no-member ctx = lz4f.createDecompContext() @@ -201,8 +216,17 @@ def lz4_decode(payload): return data['decomp'] +if lz4: + lz4_decode = lz4.decompress # pylint: disable-msg=no-member +elif lz4f: + lz4_decode = lz4f_decode +else: + lz4_decode = None + + def lz4_encode_old_kafka(payload): """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum.""" + assert xxhash is not None data = lz4_encode(payload) header_size = 7 if isinstance(data[4], int): @@ -224,6 +248,7 @@ def lz4_encode_old_kafka(payload): def lz4_decode_old_kafka(payload): + assert xxhash is not None # Kafka's LZ4 code has a bug in its header checksum implementation header_size = 7 if isinstance(payload[4], int): diff --git a/tox.ini b/tox.ini index 23ca385ba..03a6893ad 100644 --- a/tox.ini +++ b/tox.ini @@ -17,7 +17,7 @@ deps = pytest-mock mock python-snappy - lz4tools + lz4 xxhash py26: unittest2 commands = From 9ba21b836731af258afb8c400f40110b3ff58ebc Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Mar 2017 11:01:58 -0700 Subject: [PATCH 3/4] LZ4 support in kafka 0.8/0.9 does not accept a ContentSize header --- kafka/codec.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index 29db48e48..a527b4273 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -229,13 +229,21 @@ def lz4_encode_old_kafka(payload): assert xxhash is not None data = lz4_encode(payload) header_size = 7 - if isinstance(data[4], int): - flg = data[4] - else: - flg = ord(data[4]) + flg = data[4] + if not isinstance(flg, int): + flg = ord(flg) + content_size_bit = ((flg >> 3) & 1) if content_size_bit: - header_size += 8 + # Old kafka does not accept the content-size field + # so we need to discard it and reset the header flag + flg -= 8 + data = bytearray(data) + data[4] = flg + data = bytes(data) + payload = data[header_size+8:] + else: + payload = data[header_size:] # This is the incorrect hc hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member @@ -243,7 +251,7 @@ def lz4_encode_old_kafka(payload): return b''.join([ data[0:header_size-1], hc, - data[header_size:] + payload ]) From fd054aaea7b3e9993c6a02c6b9155dc66d69e2d3 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 14 Mar 2017 11:38:49 -0700 Subject: [PATCH 4/4] Alter test skips: python-lz4 works on python26, but not pypy --- test/test_buffer.py | 4 +++- test/test_codec.py | 13 +++++++++---- test/test_producer.py | 4 ++-- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/test/test_buffer.py b/test/test_buffer.py index c8e283d25..db6cbb37c 100644 --- a/test/test_buffer.py +++ b/test/test_buffer.py @@ -2,6 +2,7 @@ from __future__ import absolute_import import io +import platform import pytest @@ -34,7 +35,8 @@ def test_buffer_close(): @pytest.mark.parametrize('compression', [ 'gzip', 'snappy', - pytest.mark.skipif("sys.version_info < (2,7)")('lz4'), # lz4tools does not work on py26 + pytest.mark.skipif(platform.python_implementation() == 'PyPy', + reason='python-lz4 crashes on older versions of pypy')('lz4'), ]) def test_compressed_buffer_close(compression): records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression) diff --git a/test/test_codec.py b/test/test_codec.py index 906b53c33..d31fc8674 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -1,3 +1,6 @@ +from __future__ import absolute_import + +import platform import struct import pytest @@ -80,7 +83,8 @@ def test_snappy_encode_xerial(): assert compressed == to_ensure -@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") +@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', + reason="python-lz4 crashes on old versions of pypy") def test_lz4(): for i in xrange(1000): b1 = random_string(100).encode('utf-8') @@ -89,7 +93,8 @@ def test_lz4(): assert b1 == b2 -@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") +@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', + reason="python-lz4 crashes on old versions of pypy") def test_lz4_old(): for i in xrange(1000): b1 = random_string(100).encode('utf-8') @@ -98,8 +103,8 @@ def test_lz4_old(): assert b1 == b2 -@pytest.mark.xfail(reason="lz4tools library doesnt support incremental decompression") -@pytest.mark.skipif(not has_lz4(), reason="LZ4 not available") +@pytest.mark.skipif(not has_lz4() or platform.python_implementation() == 'PyPy', + reason="python-lz4 crashes on old versions of pypy") def test_lz4_incremental(): for i in xrange(1000): # lz4 max single block size is 4MB diff --git a/test/test_producer.py b/test/test_producer.py index 136d85f81..54b9db230 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -31,8 +31,8 @@ def test_end_to_end(kafka_broker, compression): # LZ4 requires 0.8.2 if version() < (0, 8, 2): return - # LZ4 python libs don't work on python2.6 - elif sys.version_info < (2, 7): + # python-lz4 crashes on older versions of pypy + elif platform.python_implementation() == 'PyPy': return connect_str = 'localhost:' + str(kafka_broker.port)