Skip to content

Commit 5ec91e7

Browse files
authored
Fix type annotations for compression (#1119)
1 parent c496692 commit 5ec91e7

File tree

8 files changed

+59
-109
lines changed

8 files changed

+59
-109
lines changed

aiokafka/record/_crecords/default_records.pyi

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,6 @@ from aiokafka.record._protocols import (
88
DefaultRecordMetadataProtocol,
99
DefaultRecordProtocol,
1010
)
11-
from aiokafka.record._types import (
12-
CodecGzipT,
13-
CodecLz4T,
14-
CodecMaskT,
15-
CodecNoneT,
16-
CodecSnappyT,
17-
CodecZstdT,
18-
DefaultCompressionTypeT,
19-
)
2011

2112
@final
2213
class DefaultRecord(DefaultRecordProtocol):
@@ -46,12 +37,12 @@ class DefaultRecord(DefaultRecordProtocol):
4637

4738
@final
4839
class DefaultRecordBatch(DefaultRecordBatchProtocol):
49-
CODEC_MASK: ClassVar[CodecMaskT]
50-
CODEC_NONE: ClassVar[CodecNoneT]
51-
CODEC_GZIP: ClassVar[CodecGzipT]
52-
CODEC_SNAPPY: ClassVar[CodecSnappyT]
53-
CODEC_LZ4: ClassVar[CodecLz4T]
54-
CODEC_ZSTD: ClassVar[CodecZstdT]
40+
CODEC_MASK: ClassVar[int]
41+
CODEC_NONE: ClassVar[int]
42+
CODEC_GZIP: ClassVar[int]
43+
CODEC_SNAPPY: ClassVar[int]
44+
CODEC_LZ4: ClassVar[int]
45+
CODEC_ZSTD: ClassVar[int]
5546

5647
def __init__(self, buffer: bytes): ...
5748
@property
@@ -96,7 +87,7 @@ class DefaultRecordBatchBuilder(DefaultRecordBatchBuilderProtocol):
9687
def __init__(
9788
self,
9889
magic: int,
99-
compression_type: DefaultCompressionTypeT,
90+
compression_type: int,
10091
is_transactional: int,
10192
producer_id: int,
10293
producer_epoch: int,

aiokafka/record/_crecords/legacy_records.pyi

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,6 @@ from aiokafka.record._protocols import (
99
LegacyRecordMetadataProtocol,
1010
LegacyRecordProtocol,
1111
)
12-
from aiokafka.record._types import (
13-
CodecGzipT,
14-
CodecLz4T,
15-
CodecMaskT,
16-
CodecSnappyT,
17-
LegacyCompressionTypeT,
18-
)
1912

2013
@final
2114
class LegacyRecord(LegacyRecordProtocol):
@@ -47,10 +40,10 @@ class LegacyRecord(LegacyRecordProtocol):
4740
class LegacyRecordBatch(LegacyRecordBatchProtocol):
4841
RECORD_OVERHEAD_V0: ClassVar[int]
4942
RECORD_OVERHEAD_V1: ClassVar[int]
50-
CODEC_MASK: ClassVar[CodecMaskT]
51-
CODEC_GZIP: ClassVar[CodecGzipT]
52-
CODEC_SNAPPY: ClassVar[CodecSnappyT]
53-
CODEC_LZ4: ClassVar[CodecLz4T]
43+
CODEC_MASK: ClassVar[int]
44+
CODEC_GZIP: ClassVar[int]
45+
CODEC_SNAPPY: ClassVar[int]
46+
CODEC_LZ4: ClassVar[int]
5447

5548
is_control_batch: bool
5649
is_transactional: bool
@@ -63,14 +56,12 @@ class LegacyRecordBatch(LegacyRecordBatchProtocol):
6356

6457
@final
6558
class LegacyRecordBatchBuilder(LegacyRecordBatchBuilderProtocol):
66-
CODEC_MASK: ClassVar[CodecMaskT]
67-
CODEC_GZIP: ClassVar[CodecGzipT]
68-
CODEC_SNAPPY: ClassVar[CodecSnappyT]
69-
CODEC_LZ4: ClassVar[CodecLz4T]
59+
CODEC_MASK: ClassVar[int]
60+
CODEC_GZIP: ClassVar[int]
61+
CODEC_SNAPPY: ClassVar[int]
62+
CODEC_LZ4: ClassVar[int]
7063

71-
def __init__(
72-
self, magic: int, compression_type: LegacyCompressionTypeT, batch_size: int
73-
) -> None: ...
64+
def __init__(self, magic: int, compression_type: int, batch_size: int) -> None: ...
7465
def append(
7566
self,
7667
offset: int,

aiokafka/record/_protocols.py

Lines changed: 12 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,23 +11,12 @@
1111

1212
from typing_extensions import Never
1313

14-
from ._types import (
15-
CodecGzipT,
16-
CodecLz4T,
17-
CodecMaskT,
18-
CodecNoneT,
19-
CodecSnappyT,
20-
CodecZstdT,
21-
DefaultCompressionTypeT,
22-
LegacyCompressionTypeT,
23-
)
24-
2514

2615
class DefaultRecordBatchBuilderProtocol(Protocol):
2716
def __init__(
2817
self,
2918
magic: int,
30-
compression_type: DefaultCompressionTypeT,
19+
compression_type: int,
3120
is_transactional: int,
3221
producer_id: int,
3322
producer_epoch: int,
@@ -90,12 +79,12 @@ def timestamp(self) -> int: ...
9079

9180

9281
class DefaultRecordBatchProtocol(Iterator["DefaultRecordProtocol"], Protocol):
93-
CODEC_MASK: ClassVar[CodecMaskT]
94-
CODEC_NONE: ClassVar[CodecNoneT]
95-
CODEC_GZIP: ClassVar[CodecGzipT]
96-
CODEC_SNAPPY: ClassVar[CodecSnappyT]
97-
CODEC_LZ4: ClassVar[CodecLz4T]
98-
CODEC_ZSTD: ClassVar[CodecZstdT]
82+
CODEC_MASK: ClassVar[int]
83+
CODEC_NONE: ClassVar[int]
84+
CODEC_GZIP: ClassVar[int]
85+
CODEC_SNAPPY: ClassVar[int]
86+
CODEC_LZ4: ClassVar[int]
87+
CODEC_ZSTD: ClassVar[int]
9988

10089
def __init__(self, buffer: bytes | bytearray | memoryview) -> None: ...
10190
@property
@@ -170,7 +159,7 @@ class LegacyRecordBatchBuilderProtocol(Protocol):
170159
def __init__(
171160
self,
172161
magic: Literal[0, 1],
173-
compression_type: LegacyCompressionTypeT,
162+
compression_type: int,
174163
batch_size: int,
175164
) -> None: ...
176165
def append(
@@ -213,10 +202,10 @@ def timestamp(self) -> int: ...
213202

214203

215204
class LegacyRecordBatchProtocol(Iterable["LegacyRecordProtocol"], Protocol):
216-
CODEC_MASK: ClassVar[CodecMaskT]
217-
CODEC_GZIP: ClassVar[CodecGzipT]
218-
CODEC_SNAPPY: ClassVar[CodecSnappyT]
219-
CODEC_LZ4: ClassVar[CodecLz4T]
205+
CODEC_MASK: ClassVar[int]
206+
CODEC_GZIP: ClassVar[int]
207+
CODEC_SNAPPY: ClassVar[int]
208+
CODEC_LZ4: ClassVar[int]
220209

221210
is_control_batch: bool
222211
is_transactional: bool

aiokafka/record/_types.py

Lines changed: 0 additions & 12 deletions
This file was deleted.

aiokafka/record/default_records.py

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@
6060
from dataclasses import dataclass
6161
from typing import Any, final
6262

63-
from typing_extensions import Self, TypeIs, assert_never
63+
from typing_extensions import Self
6464

6565
import aiokafka.codec as codecs
6666
from aiokafka.codec import (
@@ -82,14 +82,6 @@
8282
DefaultRecordMetadataProtocol,
8383
DefaultRecordProtocol,
8484
)
85-
from ._types import (
86-
CodecGzipT,
87-
CodecLz4T,
88-
CodecMaskT,
89-
CodecNoneT,
90-
CodecSnappyT,
91-
CodecZstdT,
92-
)
9385
from .util import calc_crc32c, decode_varint, encode_varint, size_of_varint
9486

9587

@@ -116,12 +108,12 @@ class DefaultRecordBase:
116108
CRC_OFFSET = struct.calcsize(">qiib")
117109
AFTER_LEN_OFFSET = struct.calcsize(">qi")
118110

119-
CODEC_MASK: CodecMaskT = 0x07
120-
CODEC_NONE: CodecNoneT = 0x00
121-
CODEC_GZIP: CodecGzipT = 0x01
122-
CODEC_SNAPPY: CodecSnappyT = 0x02
123-
CODEC_LZ4: CodecLz4T = 0x03
124-
CODEC_ZSTD: CodecZstdT = 0x04
111+
CODEC_MASK = 0x07
112+
CODEC_NONE = 0x00
113+
CODEC_GZIP = 0x01
114+
CODEC_SNAPPY = 0x02
115+
CODEC_LZ4 = 0x03
116+
CODEC_ZSTD = 0x04
125117
TIMESTAMP_TYPE_MASK = 0x08
126118
TRANSACTIONAL_MASK = 0x10
127119
CONTROL_MASK = 0x20
@@ -131,9 +123,7 @@ class DefaultRecordBase:
131123

132124
NO_PARTITION_LEADER_EPOCH = -1
133125

134-
def _assert_has_codec(
135-
self, compression_type: int
136-
) -> TypeIs[CodecGzipT | CodecSnappyT | CodecLz4T | CodecZstdT]:
126+
def _assert_has_codec(self, compression_type: int) -> bool:
137127
if compression_type == self.CODEC_GZIP:
138128
checker, name = codecs.has_gzip, "gzip"
139129
elif compression_type == self.CODEC_SNAPPY:
@@ -240,7 +230,10 @@ def _maybe_uncompress(self) -> None:
240230
elif compression_type == self.CODEC_ZSTD:
241231
uncompressed = zstd_decode(data.tobytes())
242232
else:
243-
assert_never(compression_type)
233+
# Must not be possible
234+
raise RuntimeError(
235+
f"Invalid compression codec {compression_type:#04x}"
236+
)
244237
self._buffer = bytearray(uncompressed)
245238
self._pos = 0
246239
self._decompressed = True
@@ -560,7 +553,10 @@ def _maybe_compress(self) -> bool:
560553
elif self._compression_type == self.CODEC_ZSTD:
561554
compressed = zstd_encode(data)
562555
else:
563-
assert_never(self._compression_type)
556+
# Must not be possible
557+
raise RuntimeError(
558+
f"Invalid compression codec {self._compression_type:#04x}"
559+
)
564560
compressed_size = len(compressed)
565561
if len(data) <= compressed_size:
566562
# We did not get any benefit from compression, lets send

aiokafka/record/legacy_records.py

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from dataclasses import dataclass
88
from typing import Any, Literal, final
99

10-
from typing_extensions import Never, TypeIs, assert_never
10+
from typing_extensions import Never
1111

1212
import aiokafka.codec as codecs
1313
from aiokafka.codec import (
@@ -27,13 +27,6 @@
2727
LegacyRecordMetadataProtocol,
2828
LegacyRecordProtocol,
2929
)
30-
from ._types import (
31-
CodecGzipT,
32-
CodecLz4T,
33-
CodecMaskT,
34-
CodecSnappyT,
35-
LegacyCompressionTypeT,
36-
)
3730

3831

3932
class LegacyRecordBase:
@@ -85,18 +78,16 @@ class LegacyRecordBase:
8578
KEY_OFFSET_V1 = HEADER_STRUCT_V1.size
8679
KEY_LENGTH = VALUE_LENGTH = struct.calcsize(">i") # Bytes length is Int32
8780

88-
CODEC_MASK: CodecMaskT = 0x07
89-
CODEC_GZIP: CodecGzipT = 0x01
90-
CODEC_SNAPPY: CodecSnappyT = 0x02
91-
CODEC_LZ4: CodecLz4T = 0x03
81+
CODEC_MASK = 0x07
82+
CODEC_GZIP = 0x01
83+
CODEC_SNAPPY = 0x02
84+
CODEC_LZ4 = 0x03
9285
TIMESTAMP_TYPE_MASK = 0x08
9386

9487
LOG_APPEND_TIME = 1
9588
CREATE_TIME = 0
9689

97-
def _assert_has_codec(
98-
self, compression_type: int
99-
) -> TypeIs[CodecGzipT | CodecSnappyT | CodecLz4T]:
90+
def _assert_has_codec(self, compression_type: int) -> bool:
10091
if compression_type == self.CODEC_GZIP:
10192
checker, name = codecs.has_gzip, "gzip"
10293
elif compression_type == self.CODEC_SNAPPY:
@@ -189,7 +180,8 @@ def _decompress(self, key_offset: int) -> bytes:
189180
else:
190181
uncompressed = lz4_decode(data.tobytes())
191182
else:
192-
assert_never(compression_type)
183+
# Must not be possible
184+
raise RuntimeError(f"Invalid compression codec {compression_type:#04x}")
193185
return uncompressed
194186

195187
def _read_header(self, pos: int) -> tuple[int, int, int, int, int, int | None]:
@@ -323,7 +315,7 @@ class _LegacyRecordBatchBuilderPy(LegacyRecordBase, LegacyRecordBatchBuilderProt
323315
def __init__(
324316
self,
325317
magic: Literal[0, 1],
326-
compression_type: LegacyCompressionTypeT,
318+
compression_type: int,
327319
batch_size: int,
328320
) -> None:
329321
assert magic in [0, 1]
@@ -477,7 +469,10 @@ def _maybe_compress(self) -> bool:
477469
compressed = lz4_encode(bytes(buf))
478470

479471
else:
480-
assert_never(self._compression_type)
472+
# Must not be possible
473+
raise RuntimeError(
474+
f"Invalid compression codec {self._compression_type:#04x}"
475+
)
481476
compressed_size = len(compressed)
482477
size = self._size_in_bytes(key_size=0, value_size=compressed_size)
483478
if size > len(self._buffer):

tests/record/test_default_records.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -282,7 +282,7 @@ def test_unsupported_yet_codec() -> None:
282282
compression_type = DefaultRecordBatch.CODEC_MASK # It doesn't exist
283283
builder = DefaultRecordBatchBuilder(
284284
magic=2,
285-
compression_type=compression_type, # type: ignore[arg-type]
285+
compression_type=compression_type,
286286
is_transactional=0,
287287
producer_id=-1,
288288
producer_epoch=-1,

tests/record/test_legacy.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ def test_unsupported_yet_codec() -> None:
255255
compression_type = LegacyRecordBatch.CODEC_MASK # It doesn't exist
256256
builder = LegacyRecordBatchBuilder(
257257
magic=0,
258-
compression_type=compression_type, # type: ignore[arg-type]
258+
compression_type=compression_type,
259259
batch_size=1024,
260260
)
261261
with pytest.raises(UnsupportedCodecError):

0 commit comments

Comments
 (0)