Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions pyiceberg/catalog/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from typing import (
TYPE_CHECKING,
override,
)

from sqlalchemy import (
Expand Down Expand Up @@ -172,6 +173,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table:
catalog=self,
)

@override
def create_table(
self,
identifier: str | Identifier,
Expand Down Expand Up @@ -237,6 +239,7 @@ def create_table(

return self.load_table(identifier=identifier)

@override
def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table:
"""Register a new table using existing metadata.

Expand Down Expand Up @@ -278,6 +281,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o

return self.load_table(identifier=identifier)

@override
def load_table(self, identifier: str | Identifier) -> Table:
"""Load the table's metadata and return the table instance.

Expand Down Expand Up @@ -307,6 +311,7 @@ def load_table(self, identifier: str | Identifier) -> Table:
return self._convert_orm_to_iceberg(result)
raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}")

@override
def drop_table(self, identifier: str | Identifier) -> None:
"""Drop a table.

Expand Down Expand Up @@ -347,6 +352,7 @@ def drop_table(self, identifier: str | Identifier) -> None:
raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") from e
session.commit()

@override
def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table:
"""Rename a fully classified table name.

Expand Down Expand Up @@ -406,6 +412,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I
raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e
return self.load_table(to_identifier)

@override
def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
Expand Down Expand Up @@ -502,6 +509,7 @@ def commit_table(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

@override
def namespace_exists(self, identifier: str | Identifier) -> bool:
namespace_tuple = Catalog.identifier_to_tuple(identifier)
namespace = Catalog.namespace_to_string(namespace_tuple, NoSuchNamespaceError)
Expand Down Expand Up @@ -534,6 +542,7 @@ def namespace_exists(self, identifier: str | Identifier) -> bool:
return True
return False

@override
def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None:
"""Create a namespace in the catalog.

Expand Down Expand Up @@ -562,6 +571,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties =
)
session.commit()

@override
def drop_namespace(self, namespace: str | Identifier) -> None:
"""Drop a namespace.

Expand All @@ -588,6 +598,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None:
)
session.commit()

@override
def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
"""List tables under the given namespace in the catalog.

Expand All @@ -609,6 +620,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]:
result = session.scalars(stmt)
return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result]

@override
def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:
"""List namespaces from the given namespace. If not given, list top-level namespaces from the catalog.

Expand Down Expand Up @@ -650,6 +662,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]:

return namespaces

@override
def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
"""Get properties for a namespace.

Expand All @@ -673,6 +686,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties:
result = session.scalars(stmt)
return {props.property_key: props.property_value for props in result}

@override
def update_namespace_properties(
self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT
) -> PropertiesUpdateSummary:
Expand Down
11 changes: 11 additions & 0 deletions pyiceberg/io/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from typing import (
TYPE_CHECKING,
Any,
override,
)
from urllib.parse import ParseResult, urlparse

Expand Down Expand Up @@ -332,6 +333,7 @@ def __init__(self, location: str, fs: AbstractFileSystem):
self._fs = fs
super().__init__(location=location)

@override
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
object_info = self._fs.info(self.location)
Expand All @@ -341,10 +343,12 @@ def __len__(self) -> int:
return size
raise RuntimeError(f"Cannot retrieve object info: {self.location}")

@override
def exists(self) -> bool:
"""Check whether the location exists."""
return self._fs.lexists(self.location)

@override
def open(self, seekable: bool = True) -> InputStream:
"""Create an input stream for reading the contents of the file.

Expand Down Expand Up @@ -376,6 +380,7 @@ def __init__(self, location: str, fs: AbstractFileSystem):
self._fs = fs
super().__init__(location=location)

@override
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
object_info = self._fs.info(self.location)
Expand All @@ -385,10 +390,12 @@ def __len__(self) -> int:
return size
raise RuntimeError(f"Cannot retrieve object info: {self.location}")

@override
def exists(self) -> bool:
"""Check whether the location exists."""
return self._fs.lexists(self.location)

@override
def create(self, overwrite: bool = False) -> OutputStream:
"""Create an output stream for reading the contents of the file.

Expand All @@ -411,6 +418,7 @@ def create(self, overwrite: bool = False) -> OutputStream:
raise FileExistsError(f"Cannot create file, file already exists: {self.location}")
return self._fs.open(self.location, "wb")

@override
def to_input_file(self) -> FsspecInputFile:
"""Return a new FsspecInputFile for the location at `self.location`."""
return FsspecInputFile(location=self.location, fs=self._fs)
Expand All @@ -424,6 +432,7 @@ def __init__(self, properties: Properties):
self._thread_locals = threading.local()
super().__init__(properties=properties)

@override
def new_input(self, location: str) -> FsspecInputFile:
"""Get an FsspecInputFile instance to read bytes from the file at the given location.

Expand All @@ -437,6 +446,7 @@ def new_input(self, location: str) -> FsspecInputFile:
fs = self._get_fs_from_uri(uri)
return FsspecInputFile(location=location, fs=fs)

@override
def new_output(self, location: str) -> FsspecOutputFile:
"""Get an FsspecOutputFile instance to write bytes to the file at the given location.

Expand All @@ -450,6 +460,7 @@ def new_output(self, location: str) -> FsspecOutputFile:
fs = self._get_fs_from_uri(uri)
return FsspecOutputFile(location=location, fs=fs)

@override
def delete(self, location: str | InputFile | OutputFile) -> None:
"""Delete the file at the given location.

Expand Down
9 changes: 9 additions & 0 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
Generic,
TypeVar,
cast,
override,
)
from urllib.parse import urlparse

Expand Down Expand Up @@ -300,11 +301,13 @@ def _file_info(self) -> FileInfo:
raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}")
return file_info

@override
def __len__(self) -> int:
"""Return the total length of the file, in bytes."""
file_info = self._file_info()
return file_info.size

@override
def exists(self) -> bool:
"""Check whether the location exists."""
try:
Expand All @@ -313,6 +316,7 @@ def exists(self) -> bool:
except FileNotFoundError:
return False

@override
def open(self, seekable: bool = True) -> InputStream:
"""Open the location using a PyArrow FileSystem inferred from the location.

Expand Down Expand Up @@ -342,6 +346,7 @@ def open(self, seekable: bool = True) -> InputStream:
raise # pragma: no cover - If some other kind of OSError, raise the raw error
return input_file

@override
def create(self, overwrite: bool = False) -> OutputStream:
"""Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location.

Expand Down Expand Up @@ -373,6 +378,7 @@ def create(self, overwrite: bool = False) -> OutputStream:
raise # pragma: no cover - If some other kind of OSError, raise the raw error
return output_file

@override
def to_input_file(self) -> PyArrowFile:
"""Return a new PyArrowFile for the location of an existing PyArrowFile instance.

Expand Down Expand Up @@ -610,6 +616,7 @@ def _initialize_gcs_fs(self) -> FileSystem:
def _initialize_local_fs(self) -> FileSystem:
return PyArrowLocalFileSystem()

@override
def new_input(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to read bytes from the file at the given location.

Expand All @@ -627,6 +634,7 @@ def new_input(self, location: str) -> PyArrowFile:
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
)

@override
def new_output(self, location: str) -> PyArrowFile:
"""Get a PyArrowFile instance to write bytes to the file at the given location.

Expand All @@ -644,6 +652,7 @@ def new_output(self, location: str) -> PyArrowFile:
buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)),
)

@override
def delete(self, location: str | InputFile | OutputFile) -> None:
"""Delete the file at the given location.

Expand Down