Skip to content

Commit

Permalink
Fix handling unsupported compression codec (#795) (#798)
Browse files Browse the repository at this point in the history
* Fix requirements conflict

* Fix handling unsupported compression codec (#795)

* Add changelog entry
  • Loading branch information
ods committed Nov 22, 2021
1 parent fcee7d1 commit 444a1a7
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGES/795.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix handling unsupported compression codec (issue #795)
2 changes: 2 additions & 0 deletions aiokafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
KafkaUnavailableError,
KafkaTimeoutError,
KafkaConnectionError,
UnsupportedCodecError,
)

__all__ = [
Expand Down Expand Up @@ -144,6 +145,7 @@
"KafkaUnavailableError",
"KafkaTimeoutError",
"KafkaConnectionError",
"UnsupportedCodecError",
]


Expand Down
37 changes: 28 additions & 9 deletions aiokafka/record/_crecords/default_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,12 @@
# * Timestamp Type (3)
# * Compression Type (0-2)

from aiokafka.errors import CorruptRecordException
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
import kafka.codec as codecs

from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_WRITABLE, \
PyBUF_SIMPLE, PyBUF_READ, Py_buffer, \
Expand Down Expand Up @@ -108,12 +109,28 @@ DEF NO_PARTITION_LEADER_EPOCH = -1
cutil.crc32c_global_init()


cdef _assert_has_codec(char compression_type):
if compression_type == _ATTR_CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == _ATTR_CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == _ATTR_CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


@cython.no_gc_clear
@cython.final
@cython.freelist(_DEFAULT_RECORD_BATCH_FREELIST_SIZE)
cdef class DefaultRecordBatch:

CODEC_NONE = _ATTR_CODEC_NONE
CODEC_MASK = _ATTR_CODEC_MASK
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
Expand Down Expand Up @@ -211,18 +228,19 @@ cdef class DefaultRecordBatch:
if not self._decompressed:
compression_type = <char> self.attributes & _ATTR_CODEC_MASK
if compression_type != _ATTR_CODEC_NONE:
_assert_has_codec(compression_type)
buf = <char *> self._buffer.buf
data = PyMemoryView_FromMemory(
&buf[self._pos],
self._buffer.len - self._pos,
PyBUF_READ)
if compression_type == _ATTR_CODEC_GZIP:
uncompressed = gzip_decode(data)
if compression_type == _ATTR_CODEC_SNAPPY:
elif compression_type == _ATTR_CODEC_SNAPPY:
uncompressed = snappy_decode(data.tobytes())
if compression_type == _ATTR_CODEC_LZ4:
elif compression_type == _ATTR_CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())

PyBuffer_Release(&self._buffer)
PyObject_GetBuffer(uncompressed, &self._buffer, PyBUF_SIMPLE)
self._pos = 0
Expand Down Expand Up @@ -360,7 +378,7 @@ cdef class DefaultRecordBatch:
raise CorruptRecordException(
"{} unconsumed bytes after all records consumed".format(
self._buffer.len - self._pos))
self._next_record_index = 0
self._next_record_index = 0
raise StopIteration

msg = self._read_msg()
Expand Down Expand Up @@ -541,7 +559,7 @@ cdef class DefaultRecordBatchBuilder:
buf = PyByteArray_AS_STRING(self._buffer)
self._encode_msg(pos, buf, offset, ts, msg_size, key, value, headers)
self._pos = pos + size

return DefaultRecordMetadata.new(offset, size, ts)

cdef int _encode_msg(
Expand Down Expand Up @@ -571,7 +589,7 @@ cdef class DefaultRecordBatchBuilder:

buf[pos] = 0 # Attributes => Int8
pos += 1

cutil.encode_varint64(buf, &pos, timestamp_delta)
# Base offset is always 0 on Produce
cutil.encode_varint64(buf, &pos, offset)
Expand Down Expand Up @@ -668,6 +686,7 @@ cdef class DefaultRecordBatchBuilder:


if self._compression_type != _ATTR_CODEC_NONE:
_assert_has_codec(self._compression_type)
data = bytes(self._buffer[FIRST_RECORD_OFFSET:self._pos])
if self._compression_type == _ATTR_CODEC_GZIP:
compressed = gzip_encode(data)
Expand Down Expand Up @@ -747,7 +766,7 @@ cdef inline Py_ssize_t _bytelike_len(object obj) except -2:
else:
PyObject_GetBuffer(obj, &buf, PyBUF_SIMPLE)
obj_len = buf.len
PyBuffer_Release(&buf)
PyBuffer_Release(&buf)
return obj_len


Expand Down Expand Up @@ -822,4 +841,4 @@ cdef class DefaultRecordMetadata:
return (
"DefaultRecordMetadata(offset={!r}, size={!r}, timestamp={!r})"
.format(self.offset, self.size, self.timestamp)
)
)
22 changes: 21 additions & 1 deletion aiokafka/record/_crecords/legacy_records.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
)
from aiokafka.errors import CorruptRecordException
import kafka.codec as codecs
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from zlib import crc32 as py_crc32 # needed for windows macro

from cpython cimport PyObject_GetBuffer, PyBuffer_Release, PyBUF_WRITABLE, \
Expand Down Expand Up @@ -44,13 +45,29 @@ DEF ATTRIBUTES_OFFSET = MAGIC_OFFSET + 1
DEF TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + 1


cdef _assert_has_codec(char compression_type):
if compression_type == _ATTR_CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == _ATTR_CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == _ATTR_CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


@cython.no_gc_clear
@cython.final
@cython.freelist(_LEGACY_RECORD_BATCH_FREELIST_SIZE)
cdef class LegacyRecordBatch:

RECORD_OVERHEAD_V0 = RECORD_OVERHEAD_V0_DEF
RECORD_OVERHEAD_V1 = RECORD_OVERHEAD_V1_DEF
CODEC_MASK = _ATTR_CODEC_MASK
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
Expand Down Expand Up @@ -117,6 +134,7 @@ cdef class LegacyRecordBatch:
raise CorruptRecordException("Value of compressed message is None")
value = self._main_record.value

_assert_has_codec(compression_type)
if compression_type == _ATTR_CODEC_GZIP:
uncompressed = gzip_decode(value)
elif compression_type == _ATTR_CODEC_SNAPPY:
Expand Down Expand Up @@ -336,6 +354,7 @@ cdef class LegacyRecordBatchBuilder:
Py_ssize_t _batch_size
bytearray _buffer

CODEC_MASK = _ATTR_CODEC_MASK
CODEC_GZIP = _ATTR_CODEC_GZIP
CODEC_SNAPPY = _ATTR_CODEC_SNAPPY
CODEC_LZ4 = _ATTR_CODEC_LZ4
Expand Down Expand Up @@ -411,6 +430,7 @@ cdef class LegacyRecordBatchBuilder:
uint32_t crc

if self._compression_type != 0:
_assert_has_codec(self._compression_type)
if self._compression_type == _ATTR_CODEC_GZIP:
compressed = gzip_encode(self._buffer)
elif self._compression_type == _ATTR_CODEC_SNAPPY:
Expand Down
25 changes: 22 additions & 3 deletions aiokafka/record/default_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,19 @@
import time
from .util import decode_varint, encode_varint, calc_crc32c, size_of_varint

from aiokafka.errors import CorruptRecordException
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode,
gzip_decode, snappy_decode, lz4_decode
)
import kafka.codec as codecs


class DefaultRecordBase:

__slots__ = ()

HEADER_STRUCT = struct.Struct(
">q" # BaseOffset => Int64
"i" # Length => Int32
Expand Down Expand Up @@ -102,6 +105,20 @@ class DefaultRecordBase:

NO_PARTITION_LEADER_EPOCH = -1

def _assert_has_codec(self, compression_type):
if compression_type == self.CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == self.CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


class _DefaultRecordBatchPy(DefaultRecordBase):

Expand Down Expand Up @@ -177,12 +194,13 @@ def _maybe_uncompress(self):
if not self._decompressed:
compression_type = self.compression_type
if compression_type != self.CODEC_NONE:
self._assert_has_codec(compression_type)
data = memoryview(self._buffer)[self._pos:]
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
if compression_type == self.CODEC_SNAPPY:
elif compression_type == self.CODEC_SNAPPY:
uncompressed = snappy_decode(data.tobytes())
if compression_type == self.CODEC_LZ4:
elif compression_type == self.CODEC_LZ4:
uncompressed = lz4_decode(data.tobytes())
self._buffer = bytearray(uncompressed)
self._pos = 0
Expand Down Expand Up @@ -504,6 +522,7 @@ def write_header(self, use_compression_type=True):

def _maybe_compress(self):
if self._compression_type != self.CODEC_NONE:
self._assert_has_codec(self._compression_type)
header_size = self.HEADER_STRUCT.size
data = bytes(self._buffer[header_size:])
if self._compression_type == self.CODEC_GZIP:
Expand Down
21 changes: 20 additions & 1 deletion aiokafka/record/legacy_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,22 @@

from binascii import crc32

from aiokafka.errors import CorruptRecordException
from aiokafka.errors import CorruptRecordException, UnsupportedCodecError
from aiokafka.util import NO_EXTENSIONS
from kafka.codec import (
gzip_encode, snappy_encode, lz4_encode, lz4_encode_old_kafka,
gzip_decode, snappy_decode, lz4_decode, lz4_decode_old_kafka
)
import kafka.codec as codecs


NoneType = type(None)


class LegacyRecordBase:

__slots__ = ()

HEADER_STRUCT_V0 = struct.Struct(
">q" # BaseOffset => Int64
"i" # Length => Int32
Expand Down Expand Up @@ -73,6 +76,20 @@ class LegacyRecordBase:
LOG_APPEND_TIME = 1
CREATE_TIME = 0

def _assert_has_codec(self, compression_type):
if compression_type == self.CODEC_GZIP:
checker, name = codecs.has_gzip, "gzip"
elif compression_type == self.CODEC_SNAPPY:
checker, name = codecs.has_snappy, "snappy"
elif compression_type == self.CODEC_LZ4:
checker, name = codecs.has_lz4, "lz4"
else:
raise UnsupportedCodecError(
f"Unknown compression codec {compression_type:#04x}")
if not checker():
raise UnsupportedCodecError(
f"Libraries for {name} compression codec not found")


class _LegacyRecordBatchPy(LegacyRecordBase):

Expand Down Expand Up @@ -135,6 +152,7 @@ def _decompress(self, key_offset):
data = self._buffer[pos:pos + value_size]

compression_type = self.compression_type
self._assert_has_codec(compression_type)
if compression_type == self.CODEC_GZIP:
uncompressed = gzip_decode(data)
elif compression_type == self.CODEC_SNAPPY:
Expand Down Expand Up @@ -389,6 +407,7 @@ def _encode_msg(self, buf, offset, timestamp, key_size, key,

def _maybe_compress(self):
if self._compression_type:
self._assert_has_codec(self._compression_type)
buf = self._buffer
if self._compression_type == self.CODEC_GZIP:
compressed = gzip_encode(buf)
Expand Down
2 changes: 1 addition & 1 deletion requirements-ci.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ docker==5.0.3
lz4==3.1.3
xxhash==2.0.2
python-snappy==0.6.0
docutils==0.18
docutils==0.17.1
Pygments==2.10.0
gssapi==1.7.2
dataclasses==0.8; python_version<"3.7"
Expand Down
2 changes: 1 addition & 1 deletion requirements-docs.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
-r requirements-cython.txt
Sphinx==4.2.0
Sphinx==4.3.0
sphinxcontrib-asyncio==0.3.0
sphinxcontrib-spelling==7.2.1
alabaster==0.7.12
45 changes: 45 additions & 0 deletions tests/record/test_default_records.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
from unittest import mock

import kafka.codec
from kafka.errors import UnsupportedCodecError
import pytest
from aiokafka.record.default_records import (
DefaultRecordBatch, DefaultRecordBatchBuilder
Expand Down Expand Up @@ -175,6 +179,47 @@ def test_default_batch_size_limit():
assert len(builder.build()) < 1000


@pytest.mark.parametrize("compression_type,name,checker_name", [
(DefaultRecordBatch.CODEC_GZIP, "gzip", "has_gzip"),
(DefaultRecordBatch.CODEC_SNAPPY, "snappy", "has_snappy"),
(DefaultRecordBatch.CODEC_LZ4, "lz4", "has_lz4"),
])
def test_unavailable_codec(compression_type, name, checker_name):
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
builder.append(0, timestamp=None, key=None, value=b"M" * 2000, headers=[])
correct_buffer = builder.build()

with mock.patch.object(kafka.codec, checker_name, return_value=False):
# Check that builder raises error
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
error_msg = "Libraries for {} compression codec not found".format(name)
with pytest.raises(UnsupportedCodecError, match=error_msg):
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
builder.build()

# Check that reader raises same error
batch = DefaultRecordBatch(bytes(correct_buffer))
with pytest.raises(UnsupportedCodecError, match=error_msg):
list(batch)


def test_unsupported_yet_codec():
compression_type = DefaultRecordBatch.CODEC_MASK # It doesn't exist
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=compression_type, is_transactional=0,
producer_id=-1, producer_epoch=-1, base_sequence=-1,
batch_size=1024)
with pytest.raises(UnsupportedCodecError):
builder.append(0, timestamp=None, key=None, value=b"M", headers=[])
builder.build()


def test_build_without_append():
builder = DefaultRecordBatchBuilder(
magic=2, compression_type=0, is_transactional=1,
Expand Down

0 comments on commit 444a1a7

Please sign in to comment.