Skip to content

Commit

Permalink
Merge cf4ac27 into e775e05
Browse files Browse the repository at this point in the history
  • Loading branch information
dpkp committed Mar 14, 2017
2 parents e775e05 + cf4ac27 commit 6074521
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 24 deletions.
5 changes: 2 additions & 3 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,8 @@ Compression
***********

kafka-python supports gzip compression/decompression natively. To produce or
consume lz4 compressed messages, you must install lz4tools and xxhash (modules
may not work on python2.6). To enable snappy, install python-snappy (also
requires snappy library).
consume lz4 compressed messages, you should install python-lz4 (pip install lz4).
To enable snappy, install python-snappy (also requires snappy library).
See `Installation <install.html#optional-snappy-install>`_ for more information.


Expand Down
6 changes: 2 additions & 4 deletions docs/install.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@ Bleeding-Edge
Optional LZ4 install
********************

To enable LZ4 compression/decompression, install lz4tools and xxhash:
To enable LZ4 compression/decompression, install python-lz4:

>>> pip install lz4tools
>>> pip install xxhash
>>> pip install lz4

*Note*: these modules do not support python2.6

Optional Snappy install
***********************
Expand Down
60 changes: 47 additions & 13 deletions kafka/codec.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@
except ImportError:
snappy = None

try:
import lz4.frame as lz4
except ImportError:
lz4 = None

try:
import lz4f
import xxhash
except ImportError:
lz4f = None

try:
import xxhash
except ImportError:
xxhash = None

PYPY = bool(platform.python_implementation() == 'PyPy')

def has_gzip():
Expand All @@ -33,7 +42,11 @@ def has_snappy():


def has_lz4():
return lz4f is not None
if lz4 is not None:
return True
if lz4f is not None:
return True
return False


def gzip_encode(payload, compresslevel=None):
Expand Down Expand Up @@ -181,17 +194,20 @@ def snappy_decode(payload):
return snappy.decompress(payload)


def lz4_encode(payload):
"""Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
return lz4f.compressFrame(payload)
if lz4:
lz4_encode = lz4.compress # pylint: disable-msg=no-member
elif lz4f:
lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
else:
lz4_encode = None


def lz4_decode(payload):
def lz4f_decode(payload):
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
ctx = lz4f.createDecompContext()
data = lz4f.decompressFrame(payload, ctx)
lz4f.freeDecompContext(ctx)

# lz4f python module does not expose how much of the payload was
# actually read if the decompression was only partial.
Expand All @@ -200,29 +216,47 @@ def lz4_decode(payload):
return data['decomp']


if lz4:
lz4_decode = lz4.decompress # pylint: disable-msg=no-member
elif lz4f:
lz4_decode = lz4f_decode
else:
lz4_decode = None


def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
flg = data[4]
else:
flg = ord(data[4])
flg = data[4]
if not isinstance(flg, int):
flg = ord(flg)

content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
header_size += 8
# Old kafka does not accept the content-size field
# so we need to discard it and reset the header flag
flg -= 8
data = bytearray(data)
data[4] = flg
data = bytes(data)
payload = data[header_size+8:]
else:
payload = data[header_size:]

# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member

return b''.join([
data[0:header_size-1],
hc,
data[header_size:]
payload
])


def lz4_decode_old_kafka(payload):
assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
Expand Down
4 changes: 3 additions & 1 deletion test/test_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from __future__ import absolute_import

import io
import platform

import pytest

Expand Down Expand Up @@ -34,7 +35,8 @@ def test_buffer_close():
@pytest.mark.parametrize('compression', [
'gzip',
'snappy',
pytest.mark.skipif("sys.version_info < (2,7)")('lz4'), # lz4tools does not work on py26
pytest.mark.skipif(platform.python_implementation() == 'PyPy',
reason='lz4 does not work on pypy')('lz4'),
])
def test_compressed_buffer_close(compression):
records = MessageSetBuffer(io.BytesIO(), 100000, compression_type=compression)
Expand Down
4 changes: 2 additions & 2 deletions test/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def test_end_to_end(kafka_broker, compression):
# LZ4 requires 0.8.2
if version() < (0, 8, 2):
return
# LZ4 python libs don't work on python2.6
elif sys.version_info < (2, 7):
# python-lz4 doesn't work on pypy yet
elif platform.python_implementation() == 'PyPy':
return

connect_str = 'localhost:' + str(kafka_broker.port)
Expand Down
2 changes: 1 addition & 1 deletion tox.ini
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ deps =
pytest-mock
mock
python-snappy
lz4tools
lz4
xxhash
py26: unittest2
commands =
Expand Down

0 comments on commit 6074521

Please sign in to comment.