Skip to content

Commit

Permalink
Python: Non-Cython fallback Avro parser (apache#8545)
Browse files Browse the repository at this point in the history
* Python: Non-Cython fallback Avro parser

* Python: Non-Cython fallback Avro parser
  • Loading branch information
Fokko committed Sep 10, 2023
1 parent 7d513f4 commit 9ad9976
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 82 deletions.
13 changes: 13 additions & 0 deletions python/pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,16 @@ def read(self, n: int) -> bytes:

def skip(self, n: int) -> None:
self._input_stream.seek(n, SEEK_CUR)


def new_decoder(b: bytes) -> BinaryDecoder:
try:
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder

return CythonBinaryDecoder(b)
except ModuleNotFoundError:
import warnings

warnings.warn("Falling back to pure Python Avro decoder, missing Cython implementation")

return StreamingBinaryDecoder(b)
4 changes: 3 additions & 1 deletion python/pyiceberg/avro/decoder_fast.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

from typing import Tuple, Dict

class CythonBinaryDecoder:
from pyiceberg.avro.decoder import BinaryDecoder

class CythonBinaryDecoder(BinaryDecoder):
def __init__(self, input_contents: bytes) -> None:
pass
def tell(self) -> int:
Expand Down
19 changes: 6 additions & 13 deletions python/pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,9 @@
)

from pyiceberg.avro.codecs import KNOWN_CODECS, Codec

try:
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder as AvroDecoder
except ModuleNotFoundError:
import warnings

warnings.warn("Falling back to pure Python Avro decoder, missing Cython extension")
from pyiceberg.avro.decoder import StreamingBinaryDecoder as AvroDecoder # type: ignore
from pyiceberg.avro.decoder import BinaryDecoder, new_decoder
from pyiceberg.avro.encoder import BinaryEncoder
from pyiceberg.avro.reader import ReadableDecoder, Reader
from pyiceberg.avro.reader import Reader
from pyiceberg.avro.resolver import construct_reader, construct_writer, resolve
from pyiceberg.avro.writer import Writer
from pyiceberg.io import InputFile, OutputFile, OutputStream
Expand Down Expand Up @@ -112,7 +105,7 @@ def get_schema(self) -> Schema:
class Block(Generic[D]):
reader: Reader
block_records: int
block_decoder: ReadableDecoder
block_decoder: BinaryDecoder
position: int = 0

def __iter__(self) -> Block[D]:
Expand Down Expand Up @@ -150,7 +143,7 @@ class AvroFile(Generic[D]):
schema: Schema
reader: Reader

decoder: ReadableDecoder
decoder: BinaryDecoder
block: Optional[Block[D]]

def __init__(
Expand All @@ -173,7 +166,7 @@ def __enter__(self) -> AvroFile[D]:
A generator returning the AvroStructs.
"""
with self.input_file.open() as f:
self.decoder = AvroDecoder(f.read())
self.decoder = new_decoder(f.read())
self.header = self._read_header()
self.schema = self.header.get_schema()
if not self.read_schema:
Expand Down Expand Up @@ -205,7 +198,7 @@ def _read_block(self) -> int:
if codec := self.header.compression_codec():
block_bytes = codec.decompress(block_bytes)

self.block = Block(reader=self.reader, block_records=block_records, block_decoder=AvroDecoder(block_bytes))
self.block = Block(reader=self.reader, block_records=block_records, block_decoder=new_decoder(block_bytes))
return block_records

def __next__(self) -> D:
Expand Down
76 changes: 36 additions & 40 deletions python/pyiceberg/avro/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,22 +36,18 @@
Mapping,
Optional,
Tuple,
Union,
)
from uuid import UUID

from pyiceberg.avro.decoder import BinaryDecoder
from pyiceberg.avro.decoder_fast import CythonBinaryDecoder
from pyiceberg.typedef import StructProtocol
from pyiceberg.types import StructType
from pyiceberg.utils.decimal import bytes_to_decimal, decimal_required_bytes
from pyiceberg.utils.lazydict import LazyDict
from pyiceberg.utils.singleton import Singleton

ReadableDecoder = Union[BinaryDecoder, CythonBinaryDecoder]


def _skip_map_array(decoder: ReadableDecoder, skip_entry: Callable[[], None]) -> None:
def _skip_map_array(decoder: BinaryDecoder, skip_entry: Callable[[], None]) -> None:
"""Skips over an array or map.
Both the array and map are encoded similar, and we can re-use
Expand Down Expand Up @@ -89,11 +85,11 @@ def _skip_map_array(decoder: ReadableDecoder, skip_entry: Callable[[], None]) ->

class Reader(Singleton):
@abstractmethod
def read(self, decoder: ReadableDecoder) -> Any:
def read(self, decoder: BinaryDecoder) -> Any:
...

@abstractmethod
def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
...

def __repr__(self) -> str:
Expand All @@ -102,10 +98,10 @@ def __repr__(self) -> str:


class NoneReader(Reader):
def read(self, _: ReadableDecoder) -> None:
def read(self, _: BinaryDecoder) -> None:
return None

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
return None


Expand All @@ -116,44 +112,44 @@ class DefaultReader(Reader):
def __init__(self, default_value: Any) -> None:
self.default_value = default_value

def read(self, _: ReadableDecoder) -> Any:
def read(self, _: BinaryDecoder) -> Any:
return self.default_value

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
pass


class BooleanReader(Reader):
def read(self, decoder: ReadableDecoder) -> bool:
def read(self, decoder: BinaryDecoder) -> bool:
return decoder.read_boolean()

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_boolean()


class IntegerReader(Reader):
"""Longs and ints are encoded the same way, and there is no long in Python."""

def read(self, decoder: ReadableDecoder) -> int:
def read(self, decoder: BinaryDecoder) -> int:
return decoder.read_int()

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_int()


class FloatReader(Reader):
def read(self, decoder: ReadableDecoder) -> float:
def read(self, decoder: BinaryDecoder) -> float:
return decoder.read_float()

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_float()


class DoubleReader(Reader):
def read(self, decoder: ReadableDecoder) -> float:
def read(self, decoder: BinaryDecoder) -> float:
return decoder.read_double()

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_double()


Expand Down Expand Up @@ -191,29 +187,29 @@ class TimestamptzReader(IntegerReader):


class StringReader(Reader):
def read(self, decoder: ReadableDecoder) -> str:
def read(self, decoder: BinaryDecoder) -> str:
return decoder.read_utf8()

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_utf8()


class UUIDReader(Reader):
def read(self, decoder: ReadableDecoder) -> UUID:
def read(self, decoder: BinaryDecoder) -> UUID:
return UUID(bytes=decoder.read(16))

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip(16)


@dataclass(frozen=True)
class FixedReader(Reader):
_len: int = dataclassfield()

def read(self, decoder: ReadableDecoder) -> bytes:
def read(self, decoder: BinaryDecoder) -> bytes:
return decoder.read(len(self))

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip(len(self))

def __len__(self) -> int:
Expand All @@ -232,10 +228,10 @@ class BinaryReader(Reader):
then reads the binary field itself.
"""

def read(self, decoder: ReadableDecoder) -> bytes:
def read(self, decoder: BinaryDecoder) -> bytes:
return decoder.read_bytes()

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_bytes()


Expand All @@ -256,10 +252,10 @@ def __init__(self, precision: int, scale: int):
object.__setattr__(self, "scale", scale)
object.__setattr__(self, "_length", decimal_required_bytes(precision))

def read(self, decoder: ReadableDecoder) -> Decimal:
def read(self, decoder: BinaryDecoder) -> Decimal:
return bytes_to_decimal(decoder.read(self._length), self.scale)

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
decoder.skip_bytes()

def __repr__(self) -> str:
Expand All @@ -271,7 +267,7 @@ def __repr__(self) -> str:
class OptionReader(Reader):
option: Reader = dataclassfield()

def read(self, decoder: ReadableDecoder) -> Optional[Any]:
def read(self, decoder: BinaryDecoder) -> Optional[Any]:
# For the Iceberg spec it is required to set the default value to null
# From https://iceberg.apache.org/spec/#avro
# Optional fields must always set the Avro field default value to null.
Expand All @@ -285,7 +281,7 @@ def read(self, decoder: ReadableDecoder) -> Optional[Any]:
return self.option.read(decoder)
return None

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
if decoder.read_int() > 0:
return self.option.skip(decoder)

Expand All @@ -295,7 +291,7 @@ class StructReader(Reader):
field_readers: Tuple[Tuple[Optional[int], Reader], ...]
create_struct: Callable[..., StructProtocol]
struct: StructType
field_reader_functions = Tuple[Tuple[Optional[str], int, Optional[Callable[[ReadableDecoder], Any]]], ...]
field_reader_functions = Tuple[Tuple[Optional[str], int, Optional[Callable[[BinaryDecoder], Any]]], ...]

def __init__(
self,
Expand All @@ -321,7 +317,7 @@ def __init__(
if not isinstance(created_struct, StructProtocol):
raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")

reading_callbacks: List[Tuple[Optional[int], Callable[[ReadableDecoder], Any]]] = []
reading_callbacks: List[Tuple[Optional[int], Callable[[BinaryDecoder], Any]]] = []
for pos, field in field_readers:
if pos is not None:
reading_callbacks.append((pos, field.read))
Expand All @@ -331,7 +327,7 @@ def __init__(
self._field_reader_functions = tuple(reading_callbacks)
self._hash = hash(self._field_reader_functions)

def read(self, decoder: ReadableDecoder) -> StructProtocol:
def read(self, decoder: BinaryDecoder) -> StructProtocol:
struct = self.create_struct(struct=self.struct) if self._create_with_keyword else self.create_struct()
for pos, field_reader in self._field_reader_functions:
if pos is not None:
Expand All @@ -341,7 +337,7 @@ def read(self, decoder: ReadableDecoder) -> StructProtocol:

return struct

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
for _, field in self.field_readers:
field.skip(decoder)

Expand Down Expand Up @@ -373,7 +369,7 @@ def __init__(self, element: Reader) -> None:
self._hash = hash(self.element)
self._is_int_list = isinstance(self.element, IntegerReader)

def read(self, decoder: ReadableDecoder) -> List[Any]:
def read(self, decoder: BinaryDecoder) -> List[Any]:
read_items: List[Any] = []
block_count = decoder.read_int()
while block_count != 0:
Expand All @@ -388,7 +384,7 @@ def read(self, decoder: ReadableDecoder) -> List[Any]:
block_count = decoder.read_int()
return read_items

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
_skip_map_array(decoder, lambda: self.element.skip(decoder))

def __hash__(self) -> int:
Expand Down Expand Up @@ -420,7 +416,7 @@ def __init__(self, key: Reader, value: Reader) -> None:
self._value_reader = self.value.read
self._hash = hash((self.key, self.value))

def _read_int_int(self, decoder: ReadableDecoder) -> Mapping[int, int]:
def _read_int_int(self, decoder: BinaryDecoder) -> Mapping[int, int]:
"""Read a mapping from int to int from the decoder.
Read a map of ints to ints from the decoder, since this is such a common
Expand Down Expand Up @@ -455,7 +451,7 @@ def _read_int_int(self, decoder: ReadableDecoder) -> Mapping[int, int]:

return LazyDict(contents_array)

def read(self, decoder: ReadableDecoder) -> Mapping[Any, Any]:
def read(self, decoder: BinaryDecoder) -> Mapping[Any, Any]:
read_items: dict[Any, Any] = {}

if self._is_int_int or self._is_int_bytes:
Expand Down Expand Up @@ -484,7 +480,7 @@ def read(self, decoder: ReadableDecoder) -> Mapping[Any, Any]:

return read_items

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
def skip() -> None:
self.key.skip(decoder)
self.value.skip(decoder)
Expand Down
6 changes: 3 additions & 3 deletions python/pyiceberg/avro/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
Union,
)

from pyiceberg.avro.decoder import BinaryDecoder
from pyiceberg.avro.reader import (
BinaryReader,
BooleanReader,
Expand All @@ -39,7 +40,6 @@
MapReader,
NoneReader,
OptionReader,
ReadableDecoder,
Reader,
StringReader,
StructReader,
Expand Down Expand Up @@ -226,10 +226,10 @@ def __init__(self, enum: Callable[..., Enum], reader: Reader) -> None:
self.enum = enum
self.reader = reader

def read(self, decoder: ReadableDecoder) -> Enum:
def read(self, decoder: BinaryDecoder) -> Enum:
return self.enum(self.reader.read(decoder))

def skip(self, decoder: ReadableDecoder) -> None:
def skip(self, decoder: BinaryDecoder) -> None:
pass


Expand Down
Loading

0 comments on commit 9ad9976

Please sign in to comment.