From df1c5e94ceb90d2645f050acd0129d3bf6b2807c Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 00:14:52 +0100 Subject: [PATCH 1/7] Add zstd relevant code to a separate PR --- kafka/codec.py | 21 +++++++++++++++++++++ kafka/producer/kafka.py | 8 ++++++-- kafka/record/default_records.py | 11 +++++++++-- kafka/record/memory_records.py | 2 +- test/test_codec.py | 11 ++++++++++- test/test_producer.py | 15 +++++++++++++-- tox.ini | 1 + 7 files changed, 61 insertions(+), 8 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index aa9fc8291..a63bbdcb9 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -16,6 +16,11 @@ except ImportError: snappy = None +try: + import zstandard as zstd +except ImportError: + zstd = None + try: import lz4.frame as lz4 @@ -58,6 +63,10 @@ def has_snappy(): return snappy is not None +def has_zstd(): + return zstd is not None + + def has_lz4(): if lz4 is not None: return True @@ -299,3 +308,15 @@ def lz4_decode_old_kafka(payload): payload[header_size:] ]) return lz4_decode(munged_payload) + + +def zstd_encode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdCompressor().compress(payload) + + +def zstd_decode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdDecompressor().decompress(payload) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9509ab940..dba18015a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,7 +12,7 @@ import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors -from kafka.codec import has_gzip, has_snappy, has_lz4 +from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner from kafka.producer.future import FutureRecordMetadata, FutureProduceResult @@ -119,7 +119,7 @@ class KafkaProducer(object): available guarantee. If unset, defaults to acks=1. compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -339,6 +339,7 @@ class KafkaProducer(object): 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD), None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), } @@ -388,6 +389,9 @@ def __init__(self, **configs): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' + if self.config['compression_type'] == 'zstd': + assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers' + # Check compression_type for library support ct = self.config['compression_type'] if ct not in self._COMPRESSORS: diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 07368bba9..917c81cb8 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -62,8 +62,8 @@ ) from kafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, - gzip_decode, snappy_decode, lz4_decode + gzip_encode, snappy_encode, lz4_encode, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, zstd_decode ) import kafka.codec as codecs @@ -97,6 +97,7 @@ class DefaultRecordBase(object): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 TRANSACTIONAL_MASK = 0x10 CONTROL_MASK = 0x20 @@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == self.CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" if not checker(): raise UnsupportedCodecError( "Libraries for {} compression codec not found".format(name)) @@ -185,6 +188,8 @@ def _maybe_uncompress(self): uncompressed = snappy_decode(data.tobytes()) if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) + if compression_type == self.CODEC_ZSTD: + uncompressed = zstd_decode(data) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True @@ -517,6 +522,8 @@ def _maybe_compress(self): compressed = snappy_encode(data) elif self._compression_type == self.CODEC_LZ4: compressed = lz4_encode(data) + elif self._compression_type == self.CODEC_ZSTD: + compressed = zstd_encode(data) compressed_size = len(compressed) if len(data) <= compressed_size: # We did not get any benefit from compression, lets send diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index a6c4b51f7..fc2ef2d6b 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object): def __init__(self, magic, compression_type, batch_size): assert magic in [0, 1, 2], "Not supported magic" - assert compression_type in [0, 1, 2, 3], "Not valid compression type" + assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type" if magic >= 2: self._builder = DefaultRecordBatchBuilder( magic=magic, compression_type=compression_type, diff --git a/test/test_codec.py b/test/test_codec.py index 9eff888fe..e05707451 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -7,11 +7,12 @@ from kafka.vendor.six.moves import range from kafka.codec import ( - has_snappy, has_lz4, + has_snappy, has_lz4, has_zstd, gzip_encode, gzip_decode, snappy_encode, snappy_decode, lz4_encode, lz4_decode, lz4_encode_old_kafka, lz4_decode_old_kafka, + zstd_encode, zstd_decode, ) from test.testutil import random_string @@ -113,3 +114,11 @@ def test_lz4_incremental(): b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) assert b1 == b2 + + +@pytest.mark.skipif(not has_zstd(), reason="Zstd not available") +def test_zstd(): + for _ in range(1000): + b1 = random_string(100).encode('utf-8') + b2 = zstd_decode(zstd_encode(b1)) + assert b1 == b2 diff --git a/test/test_producer.py b/test/test_producer.py index 9605adf58..793ad660c 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -23,7 +23,7 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': @@ -34,10 +34,15 @@ def test_end_to_end(kafka_broker, compression): elif platform.python_implementation() == 'PyPy': return + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + return + env_version = env_kafka_version() + api_version = env_version if env_version >= (2, 1, 0) else None connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, + api_version=api_version, compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -81,16 +86,22 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + return connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + env_version = env_kafka_version() + api_version = env_version if env_version >= (2, 1, 0) else None producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, + api_version=api_version, max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic() # record headers are supported in 0.11.0 + if env_kafka_version() < (0, 11, 0): headers = None else: diff --git a/tox.ini b/tox.ini index 06403d6ed..596a9f211 100644 --- a/tox.ini +++ b/tox.ini @@ -15,6 +15,7 @@ deps = pytest-mock mock python-snappy + zstandard lz4 xxhash crc32c From 0ce489e878433a2c4f1d78e027ad51d71ff05a01 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 00:26:11 +0100 Subject: [PATCH 2/7] Remove extra newline --- test/test_producer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_producer.py b/test/test_producer.py index 793ad660c..5a470179f 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -101,7 +101,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): magic = producer._max_usable_produce_magic() # record headers are supported in 0.11.0 - if env_kafka_version() < (0, 11, 0): headers = None else: From 60a3b79a26a6723a24d5fb38723eb5080010428b Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 19:14:01 +0100 Subject: [PATCH 3/7] Update docstring, always force api version to be in sync with actual broker running when building producer --- docs/index.rst | 10 +++++----- test/test_producer.py | 10 +++------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index fa6f93c50..242f9eb8d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,11 +122,11 @@ multiprocessing is recommended. Compression *********** -kafka-python supports gzip compression/decompression natively. To produce or -consume lz4 compressed messages, you should install python-lz4 (pip install lz4). -To enable snappy, install python-snappy (also requires snappy library). -See `Installation `_ for more information. - +kafka-python supports multiple compression types. To produce or + - gzip : supported natively + - lz4 : requires `python-lz4 `_ installed + - snappy : requires the `python-snappy `_ package (which requires the snappy C library) + - zstd : requires the `python-zstandard `_ package installed Protocol ******** diff --git a/test/test_producer.py b/test/test_producer.py index 5a470179f..94b2faa9e 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -25,7 +25,6 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): - if compression == 'lz4': # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): @@ -36,13 +35,12 @@ def test_end_to_end(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): return - env_version = env_kafka_version() - api_version = env_version if env_version >= (2, 1, 0) else None + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, - api_version=api_version, + api_version=env_kafka_version(), compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -91,11 +89,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): return connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) - env_version = env_kafka_version() - api_version = env_version if env_version >= (2, 1, 0) else None producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, - api_version=api_version, + api_version=env_kafka_version(), max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic() From b0e6934cb861d982c2660f5c3c6be8e15e56e06e Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Tue, 17 Mar 2020 10:44:19 +0100 Subject: [PATCH 4/7] Update producer test to use pytest.skip instead of return statements Harden zstd decompression for missing frame size information (can happen when the sender is not under our control) --- kafka/codec.py | 6 +++++- test/test_producer.py | 14 +++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index a63bbdcb9..8ca0728f6 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,6 +10,7 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' +ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3 try: import snappy @@ -319,4 +320,7 @@ def zstd_encode(payload): def zstd_decode(payload): if not zstd: raise NotImplementedError("Zstd codec is not available") - return zstd.ZstdDecompressor().decompress(payload) + try: + return zstd.ZstdDecompressor().decompress(payload) + except zstd.ZstdError: + return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE) diff --git a/test/test_producer.py b/test/test_producer.py index 94b2faa9e..af8fc26f3 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -26,15 +26,13 @@ def test_buffer_pool(): @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': - # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): - return - # python-lz4 crashes on older versions of pypy + pytest.skip('LZ4 requires 0.8.2') elif platform.python_implementation() == 'PyPy': - return + pytest.skip('python-lz4 crashes on older versions of pypy') if compression == 'zstd' and env_kafka_version() < (2, 1, 0): - return + pytest.skip('zstd requires kafka 2.1.0 or newer') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, @@ -87,7 +85,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): - return + pytest.skip('zstd requires 2.1.0 or more') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, @@ -130,10 +128,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if headers: assert record.serialized_header_size == 22 - # generated timestamp case is skipped for broker 0.9 and below if magic == 0: - return - + pytest.skip('generated timestamp case is skipped for broker 0.9 and below') send_time = time.time() * 1000 future = producer.send( topic, From 22dce4289944d4d507622a9f0070d09c2c22329c Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Sat, 21 Mar 2020 00:31:50 +0100 Subject: [PATCH 5/7] Update travis config with native zstd lib package install directive --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 8e2fdfedf..b731337cd 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,6 +20,7 @@ addons: apt: packages: - libsnappy-dev + - libzstd-dev - openjdk-8-jdk cache: From 337ca5665ca207e82cba342ce9d2b3144891dd5f Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Sat, 21 Mar 2020 00:54:40 +0100 Subject: [PATCH 6/7] Update readme --- docs/index.rst | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/index.rst b/docs/index.rst index 242f9eb8d..9c46e3313 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,7 +122,8 @@ multiprocessing is recommended. Compression *********** -kafka-python supports multiple compression types. To produce or +kafka-python supports multiple compression types: + - gzip : supported natively - lz4 : requires `python-lz4 `_ installed - snappy : requires the `python-snappy `_ package (which requires the snappy C library) From c82b6e09dadad289148661f9b1a5faf4e4092208 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Wed, 25 Mar 2020 17:29:32 +0100 Subject: [PATCH 7/7] zstd decompress will transform memoryview object to bytes before decompressing, to keep the same behavior as other decompression strategies --- kafka/record/default_records.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 917c81cb8..a098c42a9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -189,7 +189,7 @@ def _maybe_uncompress(self): if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) if compression_type == self.CODEC_ZSTD: - uncompressed = zstd_decode(data) + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True