Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions pyiceberg/avro/codecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

from __future__ import annotations

from typing import Dict, Literal, Type
from typing import Literal

from typing_extensions import TypeAlias

Expand All @@ -40,7 +40,7 @@

AVRO_CODEC_KEY = "avro.codec"

KNOWN_CODECS: Dict[AvroCompressionCodec, Type[Codec] | None] = {
KNOWN_CODECS: dict[AvroCompressionCodec, type[Codec] | None] = {
"null": None,
"bzip2": BZip2Codec,
"snappy": SnappyCodec,
Expand All @@ -49,4 +49,4 @@
}

# Map to convert the naming from Iceberg to Avro
CODEC_MAPPING_ICEBERG_TO_AVRO: Dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"}
CODEC_MAPPING_ICEBERG_TO_AVRO: dict[str, str] = {"gzip": "deflate", "zstd": "zstandard"}
13 changes: 5 additions & 8 deletions pyiceberg/avro/decoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from abc import ABC, abstractmethod
from io import SEEK_CUR
from typing import (
Dict,
List,
Tuple,
cast,
)

Expand Down Expand Up @@ -67,11 +64,11 @@ def read_int(self) -> int:
datum = (n >> 1) ^ -(n & 1)
return datum

def read_ints(self, n: int) -> Tuple[int, ...]:
def read_ints(self, n: int) -> tuple[int, ...]:
"""Read a list of integers."""
return tuple(self.read_int() for _ in range(n))

def read_int_bytes_dict(self, n: int, dest: Dict[int, bytes]) -> None:
def read_int_bytes_dict(self, n: int, dest: dict[int, bytes]) -> None:
"""Read a dictionary of integers for keys and bytes for values into a destination dictionary."""
for _ in range(n):
k = self.read_int()
Expand All @@ -85,7 +82,7 @@ def read_float(self) -> float:
The float is converted into a 32-bit integer using a method equivalent to
Java's floatToIntBits and then encoded in little-endian format.
"""
return float(cast(Tuple[float, ...], STRUCT_FLOAT.unpack(self.read(4)))[0])
return float(cast(tuple[float, ...], STRUCT_FLOAT.unpack(self.read(4)))[0])

def read_double(self) -> float:
"""Read a value from the stream as a double.
Expand All @@ -94,7 +91,7 @@ def read_double(self) -> float:
The double is converted into a 64-bit integer using a method equivalent to
Java's doubleToLongBits and then encoded in little-endian format.
"""
return float(cast(Tuple[float, ...], STRUCT_DOUBLE.unpack(self.read(8)))[0])
return float(cast(tuple[float, ...], STRUCT_DOUBLE.unpack(self.read(8)))[0])

def read_bytes(self) -> bytes:
"""Bytes are encoded as a long followed by that many bytes of data."""
Expand Down Expand Up @@ -152,7 +149,7 @@ def read(self, n: int) -> bytes:
"""Read n bytes."""
if n < 0:
raise ValueError(f"Requested {n} bytes to read, expected positive integer.")
data: List[bytes] = []
data: list[bytes] = []

n_remaining = n
while n_remaining > 0:
Expand Down
25 changes: 11 additions & 14 deletions pyiceberg/avro/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@
from types import TracebackType
from typing import (
Callable,
Dict,
Generic,
List,
Type,
TypeVar,
)

Expand Down Expand Up @@ -77,14 +74,14 @@ def magic(self) -> bytes:
return self._data[0]

@property
def meta(self) -> Dict[str, str]:
def meta(self) -> dict[str, str]:
return self._data[1]

@property
def sync(self) -> bytes:
return self._data[2]

def compression_codec(self) -> Type[Codec] | None:
def compression_codec(self) -> type[Codec] | None:
"""Get the file's compression codec algorithm from the file's metadata.

In the case of a null codec, we return a None indicating that we
Expand Down Expand Up @@ -146,8 +143,8 @@ class AvroFile(Generic[D]):
)
input_file: InputFile
read_schema: Schema | None
read_types: Dict[int, Callable[..., StructProtocol]]
read_enums: Dict[int, Callable[..., Enum]]
read_types: dict[int, Callable[..., StructProtocol]]
read_enums: dict[int, Callable[..., Enum]]
header: AvroFileHeader
schema: Schema
reader: Reader
Expand All @@ -159,8 +156,8 @@ def __init__(
self,
input_file: InputFile,
read_schema: Schema | None = None,
read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
read_types: dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> None:
self.input_file = input_file
self.read_schema = read_schema
Expand All @@ -185,7 +182,7 @@ def __enter__(self) -> AvroFile[D]:

return self

def __exit__(self, exctype: Type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""

def __iter__(self) -> AvroFile[D]:
Expand Down Expand Up @@ -240,7 +237,7 @@ def __init__(
file_schema: Schema,
schema_name: str,
record_schema: Schema | None = None,
metadata: Dict[str, str] = EMPTY_DICT,
metadata: dict[str, str] = EMPTY_DICT,
) -> None:
self.output_file = output_file
self.file_schema = file_schema
Expand All @@ -267,7 +264,7 @@ def __enter__(self) -> AvroOutputFile[D]:

return self

def __exit__(self, exctype: Type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
def __exit__(self, exctype: type[BaseException] | None, excinst: BaseException | None, exctb: TracebackType | None) -> None:
"""Perform cleanup when exiting the scope of a 'with' statement."""
self.output_stream.close()

Expand All @@ -284,7 +281,7 @@ def _write_header(self) -> None:
header = AvroFileHeader(MAGIC, meta, self.sync_bytes)
construct_writer(META_SCHEMA).write(self.encoder, header)

def compression_codec(self) -> Type[Codec] | None:
def compression_codec(self) -> type[Codec] | None:
"""Get the file's compression codec algorithm from the file's metadata.

In the case of a null codec, we return a None indicating that we
Expand All @@ -302,7 +299,7 @@ def compression_codec(self) -> Type[Codec] | None:

return KNOWN_CODECS[codec_name] # type: ignore

def write_block(self, objects: List[D]) -> None:
def write_block(self, objects: list[D]) -> None:
in_memory = io.BytesIO()
block_content_encoder = BinaryEncoder(output_stream=in_memory)
for obj in objects:
Expand Down
16 changes: 7 additions & 9 deletions pyiceberg/avro/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@
from typing import (
Any,
Callable,
List,
Mapping,
Tuple,
)
from uuid import UUID

Expand Down Expand Up @@ -319,14 +317,14 @@ class StructReader(Reader):
"_hash",
"_max_pos",
)
field_readers: Tuple[Tuple[int | None, Reader], ...]
field_readers: tuple[tuple[int | None, Reader], ...]
create_struct: Callable[..., StructProtocol]
struct: StructType
field_reader_functions = Tuple[Tuple[str | None, int, Callable[[BinaryDecoder], Any] | None], ...]
field_reader_functions = tuple[tuple[str | None, int, Callable[[BinaryDecoder], Any] | None], ...]

def __init__(
self,
field_readers: Tuple[Tuple[int | None, Reader], ...],
field_readers: tuple[tuple[int | None, Reader], ...],
create_struct: Callable[..., StructProtocol],
struct: StructType,
) -> None:
Expand All @@ -338,7 +336,7 @@ def __init__(
if not isinstance(self.create_struct(), StructProtocol):
raise ValueError(f"Incompatible with StructProtocol: {self.create_struct}")

reading_callbacks: List[Tuple[int | None, Callable[[BinaryDecoder], Any]]] = []
reading_callbacks: list[tuple[int | None, Callable[[BinaryDecoder], Any]]] = []
max_pos = -1
for pos, field in field_readers:
if pos is not None:
Expand Down Expand Up @@ -394,8 +392,8 @@ def __init__(self, element: Reader) -> None:
self._hash = hash(self.element)
self._is_int_list = isinstance(self.element, IntegerReader)

def read(self, decoder: BinaryDecoder) -> List[Any]:
read_items: List[Any] = []
def read(self, decoder: BinaryDecoder) -> list[Any]:
read_items: list[Any] = []
block_count = decoder.read_int()
while block_count != 0:
if block_count < 0:
Expand Down Expand Up @@ -461,7 +459,7 @@ def _read_int_int(self, decoder: BinaryDecoder) -> Mapping[int, int]:
if block_count == 0:
return EMPTY_DICT

contents_array: List[Tuple[int, ...]] = []
contents_array: list[tuple[int, ...]] = []

while block_count != 0:
if block_count < 0:
Expand Down
33 changes: 15 additions & 18 deletions pyiceberg/avro/resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
from enum import Enum
from typing import (
Callable,
Dict,
List,
Tuple,
)

from pyiceberg.avro.decoder import BinaryDecoder
Expand Down Expand Up @@ -114,7 +111,7 @@


def construct_reader(
file_schema: Schema | IcebergType, read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT
file_schema: Schema | IcebergType, read_types: dict[int, Callable[..., StructProtocol]] = EMPTY_DICT
) -> Reader:
"""Construct a reader from a file schema.

Expand Down Expand Up @@ -146,7 +143,7 @@ class ConstructWriter(SchemaVisitorPerPrimitiveType[Writer]):
def schema(self, schema: Schema, struct_result: Writer) -> Writer:
return struct_result

def struct(self, struct: StructType, field_results: List[Writer]) -> Writer:
def struct(self, struct: StructType, field_results: list[Writer]) -> Writer:
return StructWriter(tuple((pos, result) for pos, result in enumerate(field_results)))

def field(self, field: NestedField, field_result: Writer) -> Writer:
Expand Down Expand Up @@ -234,8 +231,8 @@ def resolve_writer(
def resolve_reader(
file_schema: Schema | IcebergType,
read_schema: Schema | IcebergType,
read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
read_types: dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> Reader:
"""Resolve the file and read schema to produce a reader.

Expand Down Expand Up @@ -274,12 +271,12 @@ class WriteSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Writer]):
def schema(self, file_schema: Schema, record_schema: IcebergType | None, result: Writer) -> Writer:
return result

def struct(self, file_schema: StructType, record_struct: IcebergType | None, file_writers: List[Writer]) -> Writer:
def struct(self, file_schema: StructType, record_struct: IcebergType | None, file_writers: list[Writer]) -> Writer:
if not isinstance(record_struct, StructType):
raise ResolveError(f"File/write schema are not aligned for struct, got {record_struct}")

record_struct_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(record_struct.fields)}
results: List[Tuple[int | None, Writer]] = []
record_struct_positions: dict[int, int] = {field.field_id: pos for pos, field in enumerate(record_struct.fields)}
results: list[tuple[int | None, Writer]] = []

for writer, file_field in zip(file_writers, file_schema.fields, strict=True):
if file_field.field_id in record_struct_positions:
Expand Down Expand Up @@ -367,14 +364,14 @@ def visit_unknown(self, unknown_type: UnknownType, partner: IcebergType | None)

class ReadSchemaResolver(PrimitiveWithPartnerVisitor[IcebergType, Reader]):
__slots__ = ("read_types", "read_enums", "context")
read_types: Dict[int, Callable[..., StructProtocol]]
read_enums: Dict[int, Callable[..., Enum]]
context: List[int]
read_types: dict[int, Callable[..., StructProtocol]]
read_enums: dict[int, Callable[..., Enum]]
context: list[int]

def __init__(
self,
read_types: Dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: Dict[int, Callable[..., Enum]] = EMPTY_DICT,
read_types: dict[int, Callable[..., StructProtocol]] = EMPTY_DICT,
read_enums: dict[int, Callable[..., Enum]] = EMPTY_DICT,
) -> None:
self.read_types = read_types
self.read_enums = read_enums
Expand All @@ -389,7 +386,7 @@ def before_field(self, field: NestedField, field_partner: NestedField | None) ->
def after_field(self, field: NestedField, field_partner: NestedField | None) -> None:
self.context.pop()

def struct(self, struct: StructType, expected_struct: IcebergType | None, field_readers: List[Reader]) -> Reader:
def struct(self, struct: StructType, expected_struct: IcebergType | None, field_readers: list[Reader]) -> Reader:
read_struct_id = self.context[STRUCT_ROOT] if len(self.context) > 0 else STRUCT_ROOT
struct_callable = self.read_types.get(read_struct_id, Record)

Expand All @@ -399,10 +396,10 @@ def struct(self, struct: StructType, expected_struct: IcebergType | None, field_
if not isinstance(expected_struct, StructType):
raise ResolveError(f"File/read schema are not aligned for struct, got {expected_struct}")

expected_positions: Dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}
expected_positions: dict[int, int] = {field.field_id: pos for pos, field in enumerate(expected_struct.fields)}

# first, add readers for the file fields that must be in order
results: List[Tuple[int | None, Reader]] = [
results: list[tuple[int | None, Reader]] = [
(
expected_positions.get(field.field_id),
# Check if we need to convert it to an Enum
Expand Down
9 changes: 3 additions & 6 deletions pyiceberg/avro/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@
from dataclasses import field as dataclassfield
from typing import (
Any,
Dict,
List,
Tuple,
)
from uuid import UUID

Expand Down Expand Up @@ -186,7 +183,7 @@ def write(self, encoder: BinaryEncoder, val: Any) -> None:

@dataclass(frozen=True)
class StructWriter(Writer):
field_writers: Tuple[Tuple[int | None, Writer], ...] = dataclassfield()
field_writers: tuple[tuple[int | None, Writer], ...] = dataclassfield()

def write(self, encoder: BinaryEncoder, val: Record) -> None:
for pos, writer in self.field_writers:
Expand All @@ -210,7 +207,7 @@ def __hash__(self) -> int:
class ListWriter(Writer):
element_writer: Writer

def write(self, encoder: BinaryEncoder, val: List[Any]) -> None:
def write(self, encoder: BinaryEncoder, val: list[Any]) -> None:
encoder.write_int(len(val))
for v in val:
self.element_writer.write(encoder, v)
Expand All @@ -223,7 +220,7 @@ class MapWriter(Writer):
key_writer: Writer
value_writer: Writer

def write(self, encoder: BinaryEncoder, val: Dict[Any, Any]) -> None:
def write(self, encoder: BinaryEncoder, val: dict[Any, Any]) -> None:
encoder.write_int(len(val))
for k, v in val.items():
self.key_writer.write(encoder, k)
Expand Down
Loading