From 9ad9976551e8697d60ed7e0169ba0776ff1cffe5 Mon Sep 17 00:00:00 2001 From: Fokko Driesprong Date: Sun, 10 Sep 2023 23:08:36 +0200 Subject: [PATCH] Python: Non-Cython fallback Avro parser (#8545) * Python: Non-Cython fallback Avro parser * Python: Non-Cython fallback Avro parser --- python/pyiceberg/avro/decoder.py | 13 +++++ python/pyiceberg/avro/decoder_fast.pyi | 4 +- python/pyiceberg/avro/file.py | 19 ++----- python/pyiceberg/avro/reader.py | 76 ++++++++++++-------------- python/pyiceberg/avro/resolver.py | 6 +- python/tests/avro/test_decoder.py | 47 +++++++++------- python/tests/avro/test_reader.py | 11 ++-- 7 files changed, 94 insertions(+), 82 deletions(-) diff --git a/python/pyiceberg/avro/decoder.py b/python/pyiceberg/avro/decoder.py index b6349f38a7ef..ab7813670f09 100644 --- a/python/pyiceberg/avro/decoder.py +++ b/python/pyiceberg/avro/decoder.py @@ -171,3 +171,16 @@ def read(self, n: int) -> bytes: def skip(self, n: int) -> None: self._input_stream.seek(n, SEEK_CUR) + + +def new_decoder(b: bytes) -> BinaryDecoder: + try: + from pyiceberg.avro.decoder_fast import CythonBinaryDecoder + + return CythonBinaryDecoder(b) + except ModuleNotFoundError: + import warnings + + warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation") + + return StreamingBinaryDecoder(b) diff --git a/python/pyiceberg/avro/decoder_fast.pyi b/python/pyiceberg/avro/decoder_fast.pyi index 5d68fa262833..989ad8c5f8ec 100644 --- a/python/pyiceberg/avro/decoder_fast.pyi +++ b/python/pyiceberg/avro/decoder_fast.pyi @@ -17,7 +17,9 @@ from typing import Tuple, Dict -class CythonBinaryDecoder: +from pyiceberg.avro.decoder import BinaryDecoder + +class CythonBinaryDecoder(BinaryDecoder): def __init__(self, input_contents: bytes) -> None: pass def tell(self) -> int: diff --git a/python/pyiceberg/avro/file.py b/python/pyiceberg/avro/file.py index fb9f32b405d4..dc843f6dc0d2 100644 --- a/python/pyiceberg/avro/file.py +++ b/python/pyiceberg/avro/file.py @@ -35,16 +35,9 @@ ) from pyiceberg.avro.codecs import KNOWN_CODECS, Codec - -try: - from pyiceberg.avro.decoder_fast import CythonBinaryDecoder as AvroDecoder -except ModuleNotFoundError: - import warnings - - warnings.warn("Falling back to pure Python Avro decoder, missing Cython extension") - from pyiceberg.avro.decoder import StreamingBinaryDecoder as AvroDecoder # type: ignore +from pyiceberg.avro.decoder import BinaryDecoder, new_decoder from pyiceberg.avro.encoder import BinaryEncoder -from pyiceberg.avro.reader import ReadableDecoder, Reader +from pyiceberg.avro.reader import Reader from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve from pyiceberg.avro.writer import Writer from pyiceberg.io import InputFile, OutputFile, OutputStream @@ -112,7 +105,7 @@ def get_schema(self) -> Schema: class Block(Generic[D]): reader: Reader block_records: int - block_decoder: ReadableDecoder + block_decoder: BinaryDecoder position: int = 0 def __iter__(self) -> Block[D]: @@ -150,7 +143,7 @@ class AvroFile(Generic[D]): schema: Schema reader: Reader - decoder: ReadableDecoder + decoder: BinaryDecoder block: Optional[Block[D]] def __init__( @@ -173,7 +166,7 @@ def __enter__(self) -> AvroFile[D]: A generator returning the AvroStructs. """ with self.input_file.open() as f: - self.decoder = AvroDecoder(f.read()) + self.decoder = new_decoder(f.read()) self.header = self._read_header() self.schema = self.header.get_schema() if not self.read_schema: @@ -205,7 +198,7 @@ def _read_block(self) -> int: if codec := self.header.compression_codec(): block_bytes = codec.decompress(block_bytes) - self.block = Block(reader=self.reader, block_records=block_records, block_decoder=AvroDecoder(block_bytes)) + self.block = Block(reader=self.reader, block_records=block_records, block_decoder=new_decoder(block_bytes)) return block_records def __next__(self) -> D: diff --git a/python/pyiceberg/avro/reader.py b/python/pyiceberg/avro/reader.py index 741a103e860f..2b87e4b06f33 100644 --- a/python/pyiceberg/avro/reader.py +++ b/python/pyiceberg/avro/reader.py @@ -36,22 +36,18 @@ Mapping, Optional, Tuple, - Union, ) from uuid import UUID from pyiceberg.avro.decoder import BinaryDecoder -from pyiceberg.avro.decoder_fast import CythonBinaryDecoder from pyiceberg.typedef import StructProtocol from pyiceberg.types import StructType from pyiceberg.utils.decimal import bytes_to_decimal, decimal_required_bytes from pyiceberg.utils.lazydict import LazyDict from pyiceberg.utils.singleton import Singleton -ReadableDecoder = Union[BinaryDecoder, CythonBinaryDecoder] - -def _skip_map_array(decoder: ReadableDecoder, skip_entry: Callable[[], None]) -> None: +def _skip_map_array(decoder: BinaryDecoder, skip_entry: Callable[[], None]) -> None: """Skips over an array or map. Both the array and map are encoded similar, and we can re-use @@ -89,11 +85,11 @@ def _skip_map_array(decoder: ReadableDecoder, skip_entry: Callable[[], None]) -> class Reader(Singleton): @abstractmethod - def read(self, decoder: ReadableDecoder) -> Any: + def read(self, decoder: BinaryDecoder) -> Any: ... @abstractmethod - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: ... def __repr__(self) -> str: @@ -102,10 +98,10 @@ def __repr__(self) -> str: class NoneReader(Reader): - def read(self, _: ReadableDecoder) -> None: + def read(self, _: BinaryDecoder) -> None: return None - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: return None @@ -116,44 +112,44 @@ class DefaultReader(Reader): def __init__(self, default_value: Any) -> None: self.default_value = default_value - def read(self, _: ReadableDecoder) -> Any: + def read(self, _: BinaryDecoder) -> Any: return self.default_value - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: pass class BooleanReader(Reader): - def read(self, decoder: ReadableDecoder) -> bool: + def read(self, decoder: BinaryDecoder) -> bool: return decoder.read_boolean() - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_boolean() class IntegerReader(Reader): """Longs and ints are encoded the same way, and there is no long in Python.""" - def read(self, decoder: ReadableDecoder) -> int: + def read(self, decoder: BinaryDecoder) -> int: return decoder.read_int() - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_int() class FloatReader(Reader): - def read(self, decoder: ReadableDecoder) -> float: + def read(self, decoder: BinaryDecoder) -> float: return decoder.read_float() - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_float() class DoubleReader(Reader): - def read(self, decoder: ReadableDecoder) -> float: + def read(self, decoder: BinaryDecoder) -> float: return decoder.read_double() - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_double() @@ -191,18 +187,18 @@ class TimestamptzReader(IntegerReader): class StringReader(Reader): - def read(self, decoder: ReadableDecoder) -> str: + def read(self, decoder: BinaryDecoder) -> str: return decoder.read_utf8() - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_utf8() class UUIDReader(Reader): - def read(self, decoder: ReadableDecoder) -> UUID: + def read(self, decoder: BinaryDecoder) -> UUID: return UUID(bytes=decoder.read(16)) - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip(16) @@ -210,10 +206,10 @@ def skip(self, decoder: ReadableDecoder) -> None: class FixedReader(Reader): _len: int = dataclassfield() - def read(self, decoder: ReadableDecoder) -> bytes: + def read(self, decoder: BinaryDecoder) -> bytes: return decoder.read(len(self)) - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip(len(self)) def __len__(self) -> int: @@ -232,10 +228,10 @@ class BinaryReader(Reader): then reads the binary field itself. """ - def read(self, decoder: ReadableDecoder) -> bytes: + def read(self, decoder: BinaryDecoder) -> bytes: return decoder.read_bytes() - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() @@ -256,10 +252,10 @@ def __init__(self, precision: int, scale: int): object.__setattr__(self, "scale", scale) object.__setattr__(self, "_length", decimal_required_bytes(precision)) - def read(self, decoder: ReadableDecoder) -> Decimal: + def read(self, decoder: BinaryDecoder) -> Decimal: return bytes_to_decimal(decoder.read(self._length), self.scale) - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: decoder.skip_bytes() def __repr__(self) -> str: @@ -271,7 +267,7 @@ def __repr__(self) -> str: class OptionReader(Reader): option: Reader = dataclassfield() - def read(self, decoder: ReadableDecoder) -> Optional[Any]: + def read(self, decoder: BinaryDecoder) -> Optional[Any]: # For the Iceberg spec it is required to set the default value to null # From https://iceberg.apache.org/spec/#avro # Optional fields must always set the Avro field default value to null. @@ -285,7 +281,7 @@ def read(self, decoder: ReadableDecoder) -> Optional[Any]: return self.option.read(decoder) return None - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: if decoder.read_int() > 0: return self.option.skip(decoder) @@ -295,7 +291,7 @@ class StructReader(Reader): field_readers: Tuple[Tuple[Optional[int], Reader], ...] create_struct: Callable[..., StructProtocol] struct: StructType - field_reader_functions = Tuple[Tuple[Optional[str], int, Optional[Callable[[ReadableDecoder], Any]]], ...] + field_reader_functions = Tuple[Tuple[Optional[str], int, Optional[Callable[[BinaryDecoder], Any]]], ...] def __init__( self, @@ -321,7 +317,7 @@ def __init__( if not isinstance(created_struct, StructProtocol): raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}") - reading_callbacks: List[Tuple[Optional[int], Callable[[ReadableDecoder], Any]]] = [] + reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = [] for pos, field in field_readers: if pos is not None: reading_callbacks.append((pos, field.read)) @@ -331,7 +327,7 @@ def __init__( self._field_reader_functions = tuple(reading_callbacks) self._hash = hash(self._field_reader_functions) - def read(self, decoder: ReadableDecoder) -> StructProtocol: + def read(self, decoder: BinaryDecoder) -> StructProtocol: struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct() for pos, field_reader in self._field_reader_functions: if pos is not None: @@ -341,7 +337,7 @@ def read(self, decoder: ReadableDecoder) -> StructProtocol: return struct - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: for _, field in self.field_readers: field.skip(decoder) @@ -373,7 +369,7 @@ def __init__(self, element: Reader) -> None: self._hash = hash(self.element) self._is_int_list = isinstance(self.element, IntegerReader) - def read(self, decoder: ReadableDecoder) -> List[Any]: + def read(self, decoder: BinaryDecoder) -> List[Any]: read_items: List[Any] = [] block_count = decoder.read_int() while block_count != 0: @@ -388,7 +384,7 @@ def read(self, decoder: ReadableDecoder) -> List[Any]: block_count = decoder.read_int() return read_items - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: _skip_map_array(decoder, lambda: self.element.skip(decoder)) def __hash__(self) -> int: @@ -420,7 +416,7 @@ def __init__(self, key: Reader, value: Reader) -> None: self._value_reader = self.value.read self._hash = hash((self.key, self.value)) - def _read_int_int(self, decoder: ReadableDecoder) -> Mapping[int, int]: + def _read_int_int(self, decoder: BinaryDecoder) -> Mapping[int, int]: """Read a mapping from int to int from the decoder. Read a map of ints to ints from the decoder, since this is such a common @@ -455,7 +451,7 @@ def _read_int_int(self, decoder: ReadableDecoder) -> Mapping[int, int]: return LazyDict(contents_array) - def read(self, decoder: ReadableDecoder) -> Mapping[Any, Any]: + def read(self, decoder: BinaryDecoder) -> Mapping[Any, Any]: read_items: dict[Any, Any] = {} if self._is_int_int or self._is_int_bytes: @@ -484,7 +480,7 @@ def read(self, decoder: ReadableDecoder) -> Mapping[Any, Any]: return read_items - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: def skip() -> None: self.key.skip(decoder) self.value.skip(decoder) diff --git a/python/pyiceberg/avro/resolver.py b/python/pyiceberg/avro/resolver.py index eb08bf7ff64a..8b2daeb7c7fe 100644 --- a/python/pyiceberg/avro/resolver.py +++ b/python/pyiceberg/avro/resolver.py @@ -25,6 +25,7 @@ Union, ) +from pyiceberg.avro.decoder import BinaryDecoder from pyiceberg.avro.reader import ( BinaryReader, BooleanReader, @@ -39,7 +40,6 @@ MapReader, NoneReader, OptionReader, - ReadableDecoder, Reader, StringReader, StructReader, @@ -226,10 +226,10 @@ def __init__(self, enum: Callable[..., Enum], reader: Reader) -> None: self.enum = enum self.reader = reader - def read(self, decoder: ReadableDecoder) -> Enum: + def read(self, decoder: BinaryDecoder) -> Enum: return self.enum(self.reader.read(decoder)) - def skip(self, decoder: ReadableDecoder) -> None: + def skip(self, decoder: BinaryDecoder) -> None: pass diff --git a/python/tests/avro/test_decoder.py b/python/tests/avro/test_decoder.py index 2eaf20cd4af4..fd660247cd24 100644 --- a/python/tests/avro/test_decoder.py +++ b/python/tests/avro/test_decoder.py @@ -21,12 +21,12 @@ from io import SEEK_SET from types import TracebackType from typing import Callable, Optional, Type +from unittest.mock import MagicMock, patch import pytest -from pyiceberg.avro.decoder import StreamingBinaryDecoder +from pyiceberg.avro.decoder import BinaryDecoder, StreamingBinaryDecoder, new_decoder from pyiceberg.avro.decoder_fast import CythonBinaryDecoder -from pyiceberg.avro.reader import ReadableDecoder from pyiceberg.avro.resolver import resolve from pyiceberg.io import InputStream from pyiceberg.types import DoubleType, FloatType @@ -35,19 +35,19 @@ @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_boolean_true(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_boolean_true(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x01") assert decoder.read_boolean() is True @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_boolean_false(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_boolean_false(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00") assert decoder.read_boolean() is False @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_boolean(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_skip_boolean(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00") assert decoder.tell() == 0 decoder.skip_boolean() @@ -55,13 +55,13 @@ def test_skip_boolean(decoder_class: Callable[[bytes], ReadableDecoder]) -> None @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_int(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") assert decoder.read_int() == 12 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_int_longer(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_int_longer(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x8e\xd1\x87\x01") assert decoder.read_int() == 1111111 @@ -80,7 +80,7 @@ def zigzag_encode(datum: int) -> bytes: "decoder_class, expected_value", list(itertools.product(AVAILABLE_DECODERS, [0, -1, 2**32, -(2**32), (2**63 - 1), -(2**63)])), ) -def test_read_int_custom_encode(decoder_class: Callable[[bytes], ReadableDecoder], expected_value: int) -> None: +def test_read_int_custom_encode(decoder_class: Callable[[bytes], BinaryDecoder], expected_value: int) -> None: encoded = zigzag_encode(expected_value) decoder = decoder_class(encoded) decoded = decoder.read_int() @@ -88,7 +88,7 @@ def test_read_int_custom_encode(decoder_class: Callable[[bytes], ReadableDecoder @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_skip_int(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") assert decoder.tell() == 0 decoder.skip_int() @@ -96,7 +96,7 @@ def test_skip_int(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_negative_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_negative_bytes(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"") with pytest.raises(ValueError) as exc_info: @@ -137,19 +137,19 @@ def __exit__( # InMemoryBinaryDecoder doesn't work for a byte at a time reading @pytest.mark.parametrize("decoder_class", [StreamingBinaryDecoder]) -def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_single_byte_at_the_time(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(OneByteAtATimeInputStream()) # type: ignore assert decoder.read(2) == b"\x01\x02" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00\x00\x9A\x41") assert decoder.read_float() == 19.25 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_skip_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00\x00\x9A\x41") assert decoder.tell() == 0 decoder.skip_float() @@ -157,13 +157,13 @@ def test_skip_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_double(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40") assert decoder.read_double() == 19.25 @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_skip_double(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00\x00\x00\x00\x00\x40\x33\x40") assert decoder.tell() == 0 decoder.skip_double() @@ -171,20 +171,20 @@ def test_skip_double(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_bytes(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_bytes(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x08\x01\x02\x03\x04") actual = decoder.read_bytes() assert actual == b"\x01\x02\x03\x04" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_utf8(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x04\x76\x6F") assert decoder.read_utf8() == "vo" @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_skip_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_skip_utf8(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x04\x76\x6F") assert decoder.tell() == 0 decoder.skip_utf8() @@ -192,7 +192,16 @@ def test_skip_utf8(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_int_as_float(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_int_as_float(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x00\x00\x9A\x41") reader = resolve(FloatType(), DoubleType()) assert reader.read(decoder) == 19.25 + + +@patch("pyiceberg.avro.decoder_fast.CythonBinaryDecoder") +def test_fallback_to_pure_python_decoder(cython_decoder: MagicMock) -> None: + cython_decoder.side_effect = ModuleNotFoundError + + with pytest.warns(UserWarning, match="Falling back to pure Python Avro decoder, missing Cython implementation"): + dec = new_decoder(b"") + assert isinstance(dec, StreamingBinaryDecoder) diff --git a/python/tests/avro/test_reader.py b/python/tests/avro/test_reader.py index 9da8d04593f5..a3a502bcff1c 100644 --- a/python/tests/avro/test_reader.py +++ b/python/tests/avro/test_reader.py @@ -20,7 +20,7 @@ import pytest -from pyiceberg.avro.decoder import StreamingBinaryDecoder +from pyiceberg.avro.decoder import BinaryDecoder, StreamingBinaryDecoder from pyiceberg.avro.decoder_fast import CythonBinaryDecoder from pyiceberg.avro.file import AvroFile from pyiceberg.avro.reader import ( @@ -32,7 +32,6 @@ FixedReader, FloatReader, IntegerReader, - ReadableDecoder, StringReader, StructReader, TimeReader, @@ -340,7 +339,7 @@ def test_uuid_reader() -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_struct(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) result = StructReader(((0, IntegerReader()),), Record, struct).read(decoder) @@ -348,7 +347,7 @@ def test_read_struct(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct_lambda(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_struct_lambda(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) @@ -360,7 +359,7 @@ def test_read_struct_lambda(decoder_class: Callable[[bytes], ReadableDecoder]) - @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_not_struct_type(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_not_struct_type(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") struct = StructType(NestedField(1, "id", IntegerType(), required=True)) @@ -371,7 +370,7 @@ def test_read_not_struct_type(decoder_class: Callable[[bytes], ReadableDecoder]) @pytest.mark.parametrize("decoder_class", AVAILABLE_DECODERS) -def test_read_struct_exception_handling(decoder_class: Callable[[bytes], ReadableDecoder]) -> None: +def test_read_struct_exception_handling(decoder_class: Callable[[bytes], BinaryDecoder]) -> None: decoder = decoder_class(b"\x18") def raise_err(struct: StructType) -> None: