diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index 92ac53751f..60848337f0 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -19,6 +19,7 @@ from typing import ( TYPE_CHECKING, + override, ) from sqlalchemy import ( @@ -172,6 +173,7 @@ def _convert_orm_to_iceberg(self, orm_table: IcebergTables) -> Table: catalog=self, ) + @override def create_table( self, identifier: str | Identifier, @@ -237,6 +239,7 @@ def create_table( return self.load_table(identifier=identifier) + @override def register_table(self, identifier: str | Identifier, metadata_location: str, overwrite: bool = False) -> Table: """Register a new table using existing metadata. @@ -278,6 +281,7 @@ def register_table(self, identifier: str | Identifier, metadata_location: str, o return self.load_table(identifier=identifier) + @override def load_table(self, identifier: str | Identifier) -> Table: """Load the table's metadata and return the table instance. @@ -307,6 +311,7 @@ def load_table(self, identifier: str | Identifier) -> Table: return self._convert_orm_to_iceberg(result) raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") + @override def drop_table(self, identifier: str | Identifier) -> None: """Drop a table. @@ -347,6 +352,7 @@ def drop_table(self, identifier: str | Identifier) -> None: raise NoSuchTableError(f"Table does not exist: {namespace}.{table_name}") from e session.commit() + @override def rename_table(self, from_identifier: str | Identifier, to_identifier: str | Identifier) -> Table: """Rename a fully classified table name. @@ -406,6 +412,7 @@ def rename_table(self, from_identifier: str | Identifier, to_identifier: str | I raise TableAlreadyExistsError(f"Table {to_namespace}.{to_table_name} already exists") from e return self.load_table(to_identifier) + @override def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -502,6 +509,7 @@ def commit_table( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) + @override def namespace_exists(self, identifier: str | Identifier) -> bool: namespace_tuple = Catalog.identifier_to_tuple(identifier) namespace = Catalog.namespace_to_string(namespace_tuple, NoSuchNamespaceError) @@ -534,6 +542,7 @@ def namespace_exists(self, identifier: str | Identifier) -> bool: return True return False + @override def create_namespace(self, namespace: str | Identifier, properties: Properties = EMPTY_DICT) -> None: """Create a namespace in the catalog. @@ -562,6 +571,7 @@ def create_namespace(self, namespace: str | Identifier, properties: Properties = ) session.commit() + @override def drop_namespace(self, namespace: str | Identifier) -> None: """Drop a namespace. @@ -588,6 +598,7 @@ def drop_namespace(self, namespace: str | Identifier) -> None: ) session.commit() + @override def list_tables(self, namespace: str | Identifier) -> list[Identifier]: """List tables under the given namespace in the catalog. @@ -609,6 +620,7 @@ def list_tables(self, namespace: str | Identifier) -> list[Identifier]: result = session.scalars(stmt) return [(Catalog.identifier_to_tuple(table.table_namespace) + (table.table_name,)) for table in result] + @override def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: """List namespaces from the given namespace. If not given, list top-level namespaces from the catalog. @@ -650,6 +662,7 @@ def list_namespaces(self, namespace: str | Identifier = ()) -> list[Identifier]: return namespaces + @override def load_namespace_properties(self, namespace: str | Identifier) -> Properties: """Get properties for a namespace. @@ -673,6 +686,7 @@ def load_namespace_properties(self, namespace: str | Identifier) -> Properties: result = session.scalars(stmt) return {props.property_key: props.property_value for props in result} + @override def update_namespace_properties( self, namespace: str | Identifier, removals: set[str] | None = None, updates: Properties = EMPTY_DICT ) -> PropertiesUpdateSummary: diff --git a/pyiceberg/io/fsspec.py b/pyiceberg/io/fsspec.py index 63ec55bab4..6ca5d52b4d 100644 --- a/pyiceberg/io/fsspec.py +++ b/pyiceberg/io/fsspec.py @@ -28,6 +28,7 @@ from typing import ( TYPE_CHECKING, Any, + override, ) from urllib.parse import ParseResult, urlparse @@ -332,6 +333,7 @@ def __init__(self, location: str, fs: AbstractFileSystem): self._fs = fs super().__init__(location=location) + @override def __len__(self) -> int: """Return the total length of the file, in bytes.""" object_info = self._fs.info(self.location) @@ -341,10 +343,12 @@ def __len__(self) -> int: return size raise RuntimeError(f"Cannot retrieve object info: {self.location}") + @override def exists(self) -> bool: """Check whether the location exists.""" return self._fs.lexists(self.location) + @override def open(self, seekable: bool = True) -> InputStream: """Create an input stream for reading the contents of the file. @@ -376,6 +380,7 @@ def __init__(self, location: str, fs: AbstractFileSystem): self._fs = fs super().__init__(location=location) + @override def __len__(self) -> int: """Return the total length of the file, in bytes.""" object_info = self._fs.info(self.location) @@ -385,10 +390,12 @@ def __len__(self) -> int: return size raise RuntimeError(f"Cannot retrieve object info: {self.location}") + @override def exists(self) -> bool: """Check whether the location exists.""" return self._fs.lexists(self.location) + @override def create(self, overwrite: bool = False) -> OutputStream: """Create an output stream for reading the contents of the file. @@ -411,6 +418,7 @@ def create(self, overwrite: bool = False) -> OutputStream: raise FileExistsError(f"Cannot create file, file already exists: {self.location}") return self._fs.open(self.location, "wb") + @override def to_input_file(self) -> FsspecInputFile: """Return a new FsspecInputFile for the location at `self.location`.""" return FsspecInputFile(location=self.location, fs=self._fs) @@ -424,6 +432,7 @@ def __init__(self, properties: Properties): self._thread_locals = threading.local() super().__init__(properties=properties) + @override def new_input(self, location: str) -> FsspecInputFile: """Get an FsspecInputFile instance to read bytes from the file at the given location. @@ -437,6 +446,7 @@ def new_input(self, location: str) -> FsspecInputFile: fs = self._get_fs_from_uri(uri) return FsspecInputFile(location=location, fs=fs) + @override def new_output(self, location: str) -> FsspecOutputFile: """Get an FsspecOutputFile instance to write bytes to the file at the given location. @@ -450,6 +460,7 @@ def new_output(self, location: str) -> FsspecOutputFile: fs = self._get_fs_from_uri(uri) return FsspecOutputFile(location=location, fs=fs) + @override def delete(self, location: str | InputFile | OutputFile) -> None: """Delete the file at the given location. diff --git a/pyiceberg/io/pyarrow.py b/pyiceberg/io/pyarrow.py index 4517ae7327..43b2a9cd61 100644 --- a/pyiceberg/io/pyarrow.py +++ b/pyiceberg/io/pyarrow.py @@ -48,6 +48,7 @@ Generic, TypeVar, cast, + override, ) from urllib.parse import urlparse @@ -300,11 +301,13 @@ def _file_info(self) -> FileInfo: raise FileNotFoundError(f"Cannot get file info, file not found: {self.location}") return file_info + @override def __len__(self) -> int: """Return the total length of the file, in bytes.""" file_info = self._file_info() return file_info.size + @override def exists(self) -> bool: """Check whether the location exists.""" try: @@ -313,6 +316,7 @@ def exists(self) -> bool: except FileNotFoundError: return False + @override def open(self, seekable: bool = True) -> InputStream: """Open the location using a PyArrow FileSystem inferred from the location. @@ -342,6 +346,7 @@ def open(self, seekable: bool = True) -> InputStream: raise # pragma: no cover - If some other kind of OSError, raise the raw error return input_file + @override def create(self, overwrite: bool = False) -> OutputStream: """Create a writable pyarrow.lib.NativeFile for this PyArrowFile's location. @@ -373,6 +378,7 @@ def create(self, overwrite: bool = False) -> OutputStream: raise # pragma: no cover - If some other kind of OSError, raise the raw error return output_file + @override def to_input_file(self) -> PyArrowFile: """Return a new PyArrowFile for the location of an existing PyArrowFile instance. @@ -610,6 +616,7 @@ def _initialize_gcs_fs(self) -> FileSystem: def _initialize_local_fs(self) -> FileSystem: return PyArrowLocalFileSystem() + @override def new_input(self, location: str) -> PyArrowFile: """Get a PyArrowFile instance to read bytes from the file at the given location. @@ -627,6 +634,7 @@ def new_input(self, location: str) -> PyArrowFile: buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)), ) + @override def new_output(self, location: str) -> PyArrowFile: """Get a PyArrowFile instance to write bytes to the file at the given location. @@ -644,6 +652,7 @@ def new_output(self, location: str) -> PyArrowFile: buffer_size=int(self.properties.get(BUFFER_SIZE, ONE_MEGABYTE)), ) + @override def delete(self, location: str | InputFile | OutputFile) -> None: """Delete the file at the given location.