Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions pyiceberg/catalog/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ def __init__(self, name: str, **properties: str):
def create_table(
self,
identifier: str | Identifier,
schema: Schema | "pa.Schema",
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand All @@ -388,7 +388,7 @@ def create_table(
def create_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | "pa.Schema",
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand All @@ -411,7 +411,7 @@ def create_table_transaction(
def create_table_if_not_exists(
self,
identifier: str | Identifier,
schema: Schema | "pa.Schema",
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand Down Expand Up @@ -753,7 +753,7 @@ def _load_file_io(self, properties: Properties = EMPTY_DICT, location: str | Non

@staticmethod
def _convert_schema_if_needed(
schema: Schema | "pa.Schema", format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
schema: Schema | pa.Schema, format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION
) -> Schema:
if isinstance(schema, Schema):
return schema
Expand Down Expand Up @@ -799,7 +799,7 @@ def close(self) -> None: # noqa: B027
Default implementation does nothing. Override in subclasses that need cleanup.
"""

def __enter__(self) -> "Catalog":
def __enter__(self) -> Catalog:
"""Enter the context manager.

Returns:
Expand Down Expand Up @@ -829,7 +829,7 @@ def __init__(self, name: str, **properties: str):
def create_table_transaction(
self,
identifier: str | Identifier,
schema: Schema | "pa.Schema",
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand Down Expand Up @@ -869,7 +869,7 @@ def purge_table(self, identifier: str | Identifier) -> None:
def _create_staged_table(
self,
identifier: str | Identifier,
schema: Schema | "pa.Schema",
schema: Schema | pa.Schema,
location: str | None = None,
partition_spec: PartitionSpec = UNPARTITIONED_PARTITION_SPEC,
sort_order: SortOrder = UNSORTED_SORT_ORDER,
Expand Down
14 changes: 7 additions & 7 deletions pyiceberg/catalog/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ def primitive(self, primitive: PrimitiveType) -> str:


def _to_columns(metadata: TableMetadata) -> List["ColumnTypeDef"]:
results: Dict[str, "ColumnTypeDef"] = {}
results: Dict[str, ColumnTypeDef] = {}

def _append_to_results(field: NestedField, is_current: bool) -> None:
if field.name in results:
Expand Down Expand Up @@ -241,7 +241,7 @@ def _construct_table_input(
glue_table: Optional["TableTypeDef"] = None,
prev_metadata_location: str | None = None,
) -> "TableInputTypeDef":
table_input: "TableInputTypeDef" = {
table_input: TableInputTypeDef = {
"Name": table_name,
"TableType": EXTERNAL_TABLE,
"Parameters": _construct_parameters(metadata_location, glue_table, prev_metadata_location, properties),
Expand All @@ -258,7 +258,7 @@ def _construct_table_input(


def _construct_rename_table_input(to_table_name: str, glue_table: "TableTypeDef") -> "TableInputTypeDef":
rename_table_input: "TableInputTypeDef" = {"Name": to_table_name}
rename_table_input: TableInputTypeDef = {"Name": to_table_name}
# use the same Glue info to create the new table, pointing to the old metadata
if not glue_table["TableType"]:
raise ValueError("Glue table type is missing, cannot rename table")
Expand All @@ -283,7 +283,7 @@ def _construct_rename_table_input(to_table_name: str, glue_table: "TableTypeDef"


def _construct_database_input(database_name: str, properties: Properties) -> "DatabaseInputTypeDef":
database_input: "DatabaseInputTypeDef" = {"Name": database_name}
database_input: DatabaseInputTypeDef = {"Name": database_name}
parameters = {}
for k, v in properties.items():
if k == "Description":
Expand Down Expand Up @@ -506,7 +506,7 @@ def commit_table(
table_identifier = table.name()
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)

current_glue_table: "TableTypeDef" | None
current_glue_table: TableTypeDef | None
glue_table_version_id: str | None
current_table: Table | None
try:
Expand Down Expand Up @@ -718,7 +718,7 @@ def list_tables(self, namespace: str | Identifier) -> List[Identifier]:
NoSuchNamespaceError: If a namespace with the given name does not exist, or the identifier is invalid.
"""
database_name = self.identifier_to_database(namespace, NoSuchNamespaceError)
table_list: List["TableTypeDef"] = []
table_list: List[TableTypeDef] = []
next_token: str | None = None
try:
while True:
Expand Down Expand Up @@ -746,7 +746,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> List[Identifier]:
if namespace:
return []

database_list: List["DatabaseTypeDef"] = []
database_list: List[DatabaseTypeDef] = []
next_token: str | None = None

while True:
Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def as_struct(self) -> StructType:
"""Return the schema as a struct."""
return StructType(*self.fields)

def as_arrow(self) -> "pa.Schema":
def as_arrow(self) -> pa.Schema:
"""Return the schema as an Arrow schema."""
from pyiceberg.io.pyarrow import schema_to_pyarrow

Expand Down
2 changes: 1 addition & 1 deletion pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ def to_polars(self) -> pl.LazyFrame:

return pl.scan_iceberg(self)

def __datafusion_table_provider__(self) -> "IcebergDataFusionTable":
def __datafusion_table_provider__(self) -> IcebergDataFusionTable:
"""Return the DataFusion table provider PyCapsule interface.

To support DataFusion features such as push down filtering, this function will return a PyCapsule
Expand Down
44 changes: 22 additions & 22 deletions pyiceberg/table/inspect.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def _get_snapshot(self, snapshot_id: int | None = None) -> Snapshot:
else:
raise ValueError("Cannot get a snapshot as the table does not have any.")

def snapshots(self) -> "pa.Table":
def snapshots(self) -> pa.Table:
import pyarrow as pa

snapshots_schema = pa.schema(
Expand Down Expand Up @@ -98,7 +98,7 @@ def snapshots(self) -> "pa.Table":
schema=snapshots_schema,
)

def entries(self, snapshot_id: int | None = None) -> "pa.Table":
def entries(self, snapshot_id: int | None = None) -> pa.Table:
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down Expand Up @@ -229,7 +229,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
schema=entries_schema,
)

def refs(self) -> "pa.Table":
def refs(self) -> pa.Table:
import pyarrow as pa

ref_schema = pa.schema(
Expand Down Expand Up @@ -264,7 +264,7 @@ def partitions(
snapshot_id: int | None = None,
row_filter: str | BooleanExpression = ALWAYS_TRUE,
case_sensitive: bool = True,
) -> "pa.Table":
) -> pa.Table:
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down Expand Up @@ -368,7 +368,7 @@ def _update_partitions_map_from_manifest_entry(
else:
raise ValueError(f"Unknown DataFileContent ({file.content})")

def _get_manifests_schema(self) -> "pa.Schema":
def _get_manifests_schema(self) -> pa.Schema:
import pyarrow as pa

partition_summary_schema = pa.struct(
Expand Down Expand Up @@ -398,14 +398,14 @@ def _get_manifests_schema(self) -> "pa.Schema":
)
return manifest_schema

def _get_all_manifests_schema(self) -> "pa.Schema":
def _get_all_manifests_schema(self) -> pa.Schema:
import pyarrow as pa

all_manifests_schema = self._get_manifests_schema()
all_manifests_schema = all_manifests_schema.append(pa.field("reference_snapshot_id", pa.int64(), nullable=False))
return all_manifests_schema

def _generate_manifests_table(self, snapshot: Snapshot | None, is_all_manifests_table: bool = False) -> "pa.Table":
def _generate_manifests_table(self, snapshot: Snapshot | None, is_all_manifests_table: bool = False) -> pa.Table:
import pyarrow as pa

def _partition_summaries_to_rows(
Expand Down Expand Up @@ -474,10 +474,10 @@ def _partition_summaries_to_rows(
schema=self._get_all_manifests_schema() if is_all_manifests_table else self._get_manifests_schema(),
)

def manifests(self) -> "pa.Table":
def manifests(self) -> pa.Table:
return self._generate_manifests_table(self.tbl.current_snapshot())

def metadata_log_entries(self) -> "pa.Table":
def metadata_log_entries(self) -> pa.Table:
import pyarrow as pa

from pyiceberg.table.snapshots import MetadataLogEntry
Expand Down Expand Up @@ -513,7 +513,7 @@ def metadata_log_entry_to_row(metadata_entry: MetadataLogEntry) -> Dict[str, Any
schema=table_schema,
)

def history(self) -> "pa.Table":
def history(self) -> pa.Table:
import pyarrow as pa

history_schema = pa.schema(
Expand Down Expand Up @@ -546,7 +546,7 @@ def history(self) -> "pa.Table":

def _get_files_from_manifest(
self, manifest_list: ManifestFile, data_file_filter: Set[DataFileContent] | None = None
) -> "pa.Table":
) -> pa.Table:
import pyarrow as pa

files: list[dict[str, Any]] = []
Expand Down Expand Up @@ -610,7 +610,7 @@ def _get_files_from_manifest(
schema=self._get_files_schema(),
)

def _get_files_schema(self) -> "pa.Schema":
def _get_files_schema(self) -> pa.Schema:
import pyarrow as pa

from pyiceberg.io.pyarrow import schema_to_pyarrow
Expand Down Expand Up @@ -663,7 +663,7 @@ def _readable_metrics_struct(bound_type: PrimitiveType) -> pa.StructType:
)
return files_schema

def _files(self, snapshot_id: int | None = None, data_file_filter: Set[DataFileContent] | None = None) -> "pa.Table":
def _files(self, snapshot_id: int | None = None, data_file_filter: Set[DataFileContent] | None = None) -> pa.Table:
import pyarrow as pa

if not snapshot_id and not self.tbl.metadata.current_snapshot():
Expand All @@ -680,29 +680,29 @@ def _files(self, snapshot_id: int | None = None, data_file_filter: Set[DataFileC
)
return pa.concat_tables(results)

def files(self, snapshot_id: int | None = None) -> "pa.Table":
def files(self, snapshot_id: int | None = None) -> pa.Table:
return self._files(snapshot_id)

def data_files(self, snapshot_id: int | None = None) -> "pa.Table":
def data_files(self, snapshot_id: int | None = None) -> pa.Table:
return self._files(snapshot_id, {DataFileContent.DATA})

def delete_files(self, snapshot_id: int | None = None) -> "pa.Table":
def delete_files(self, snapshot_id: int | None = None) -> pa.Table:
return self._files(snapshot_id, {DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})

def all_manifests(self) -> "pa.Table":
def all_manifests(self) -> pa.Table:
import pyarrow as pa

snapshots = self.tbl.snapshots()
if not snapshots:
return pa.Table.from_pylist([], schema=self._get_all_manifests_schema())

executor = ExecutorFactory.get_or_create()
manifests_by_snapshots: Iterator["pa.Table"] = executor.map(
manifests_by_snapshots: Iterator[pa.Table] = executor.map(
lambda args: self._generate_manifests_table(*args), [(snapshot, True) for snapshot in snapshots]
)
return pa.concat_tables(manifests_by_snapshots)

def _all_files(self, data_file_filter: Set[DataFileContent] | None = None) -> "pa.Table":
def _all_files(self, data_file_filter: Set[DataFileContent] | None = None) -> pa.Table:
import pyarrow as pa

snapshots = self.tbl.snapshots()
Expand All @@ -720,11 +720,11 @@ def _all_files(self, data_file_filter: Set[DataFileContent] | None = None) -> "p

return pa.concat_tables(file_lists)

def all_files(self) -> "pa.Table":
def all_files(self) -> pa.Table:
return self._all_files()

def all_data_files(self) -> "pa.Table":
def all_data_files(self) -> pa.Table:
return self._all_files({DataFileContent.DATA})

def all_delete_files(self) -> "pa.Table":
def all_delete_files(self) -> pa.Table:
return self._all_files({DataFileContent.POSITION_DELETES, DataFileContent.EQUALITY_DELETES})
2 changes: 1 addition & 1 deletion pyiceberg/table/update/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def case_sensitive(self, case_sensitive: bool) -> UpdateSchema:
def union_by_name(
# TODO: Move TableProperties.DEFAULT_FORMAT_VERSION to separate file and set that as format_version default.
self,
new_schema: Schema | "pa.Schema",
new_schema: Schema | pa.Schema,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if this is safe, since we have a guard to avoid circular dependencies:

if TYPE_CHECKING:
    import pyarrow as pa

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt our linter catch this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this in theory. The fact that CI passes (and our linter) suggests that it might not be the case

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my LLM:

Yes, it's safe to remove the quotes from "pa.Schema" and change it to just pa.Schema. Here's why:

  • TYPE_CHECKING guard: Since pyarrow is imported within the TYPE_CHECKING block, it's only available during static type checking, not at runtime.
  • PEP 563 compatibility: With from future import annotations at the top of the file (line 17), all annotations are automatically stringified at runtime anyway, so the quotes are redundant.

Makes sense to me 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP 563 compatibility: With from future import annotations at the top of the file (line 17), all annotations are automatically stringified at runtime anyway, so the quotes are redundant.

Ah nice, thanks 👍

format_version: TableVersion = 2,
) -> UpdateSchema:
from pyiceberg.catalog import Catalog
Expand Down
4 changes: 2 additions & 2 deletions pyiceberg/table/update/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -1012,7 +1012,7 @@ def by_id(self, snapshot_id: int) -> ExpireSnapshots:

return self

def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
def by_ids(self, snapshot_ids: List[int]) -> ExpireSnapshots:
"""
Expire multiple snapshots by their IDs.

Expand All @@ -1027,7 +1027,7 @@ def by_ids(self, snapshot_ids: List[int]) -> "ExpireSnapshots":
self.by_id(snapshot_id)
return self

def older_than(self, dt: datetime) -> "ExpireSnapshots":
def older_than(self, dt: datetime) -> ExpireSnapshots:
"""
Expire all unprotected snapshots with a timestamp older than a given value.

Expand Down
1 change: 0 additions & 1 deletion ruff.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ select = [
]
ignore = [
"E501",
"UP037",
"UP035",
"UP006"
]
Expand Down