Skip to content
This repository has been archived by the owner on Mar 24, 2021. It is now read-only.

Commit

Permalink
Merge pull request #915 from carsonip/fix-lz4-dependent-blocks
Browse files Browse the repository at this point in the history
Disable Kafka-unsupported lz4 dependent blocks
  • Loading branch information
Emmett J. Butler committed Mar 6, 2019
2 parents c816efa + b207a80 commit 7781e55
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 5 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ install:
fi
- pip install -U pip setuptools
- pip install codecov kazoo tox testinstances
- wget https://github.com/edenhill/librdkafka/archive/0.9.1.tar.gz
- tar -xzf 0.9.1.tar.gz
- cd librdkafka-0.9.1/ && ./configure --prefix=$HOME
- wget https://github.com/edenhill/librdkafka/archive/v0.9.5.tar.gz
- tar -xzf v0.9.5.tar.gz
- cd librdkafka-0.9.5/ && ./configure --prefix=$HOME
- make -j 2 && make -j 2 install && cd -

before_script:
Expand Down
14 changes: 13 additions & 1 deletion pykafka/utils/compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,18 @@

try:
import lz4.frame as lz4

def _lz4_compress(payload, **kwargs):
# Kafka does not support LZ4 dependent blocks
try:
# For lz4>=0.12.0
kwargs.pop('block_linked', None)
return lz4.compress(payload, block_linked=False, **kwargs)
except TypeError:
# For earlier versions of lz4
kwargs.pop('block_mode', None)
return lz4.compress(payload, block_mode=1, **kwargs)

except ImportError:
lz4 = None

Expand Down Expand Up @@ -184,7 +196,7 @@ def _detect_xerial_stream(buff):


if lz4:
encode_lz4 = lz4.compress # pylint: disable-msg=no-member
encode_lz4 = _lz4_compress # pylint: disable-msg=no-member
elif lz4f:
encode_lz4 = lz4f.compressFrame # pylint: disable-msg=no-member
else:
Expand Down
1 change: 1 addition & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ python-snappy
mock
unittest2
xxhash==1.3.0
parameterized==0.7.0
-e git+https://github.com/Parsely/testinstances.git@0.3.0#egg=testinstances
27 changes: 26 additions & 1 deletion tests/pykafka/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import platform
import pytest
from parameterized import parameterized
import random
import time
import types
Expand Down Expand Up @@ -147,7 +148,8 @@ def test_async_produce(self):
message = consumer.consume()
assert message.value == payload

def test_recover_disconnected(self):
# FIXME: add xxx prefix to move to last because this test causes subsequent tests to fail with rdkafka
def test_xxx_recover_disconnected(self):
"""Test our retry-loop with a recoverable error"""
payload = uuid4().bytes
prod = self._get_producer(min_queued_messages=1, delivery_reports=True)
Expand Down Expand Up @@ -379,6 +381,29 @@ def ensure_all_messages_consumed():
assert len(msgs) == 10
retry(ensure_all_messages_consumed, retry_time=15)

@parameterized.expand([
[CompressionType.NONE],
[CompressionType.GZIP],
[CompressionType.SNAPPY],
[CompressionType.LZ4],
])
def test_sync_produce_compression_large_message(self, compression_type):
if platform.python_implementation() == 'PyPy' and compression_type == CompressionType.LZ4:
pytest.skip("PyPy doesn't work well with LZ4")

consumer = self._get_consumer()

prod = self._get_producer(sync=True, min_queued_messages=1, compression=compression_type)

# 16B * 1024 * 10 = 160KB
# Default lz4 block size is 64KB
# 160KB message size will catch Kafka-incompatible lz4 dependent blocks error
payload = b''.join([uuid4().bytes for _ in range(10 * 1024)])
prod.produce(payload)

message = consumer.consume()
assert message.value == payload


@pytest.mark.skipif(not RDKAFKA, reason="rdkafka")
class TestRdKafkaProducer(ProducerIntegrationTests):
Expand Down

0 comments on commit 7781e55

Please sign in to comment.