From f83fb5e5a2cc0320278fd4105962320c628803ff Mon Sep 17 00:00:00 2001 From: Stanley Law <19900516+dingo4dev@users.noreply.github.com> Date: Tue, 29 Jul 2025 09:41:30 +0000 Subject: [PATCH 1/2] Adds multi-part namespace support to catalogs Enables catalogs to support hierarchical organization of tables through multi-part namespaces. Introduces changes to the Catalog class and its implementations (DynamoDB, REST, SQL) to handle multi-part namespaces. A new document is added to explain the usage of the multi-part namespace feature. enable multipart namespace Multipart namespace efers to a namespace that is structured with multiple levels or parts and default separated by a delmiiter dot (`.`) --- mkdocs/docs/multi-part-namespace.md | 31 +++++++++++++++++++++++++ pyiceberg/catalog/__init__.py | 36 ++++++++++++++++++++++------- pyiceberg/catalog/dynamodb.py | 33 +++++++++++++++----------- pyiceberg/catalog/rest/__init__.py | 1 + pyiceberg/catalog/sql.py | 2 ++ tests/catalog/test_rest.py | 17 ++++++++++++++ 6 files changed, 98 insertions(+), 22 deletions(-) create mode 100644 mkdocs/docs/multi-part-namespace.md diff --git a/mkdocs/docs/multi-part-namespace.md b/mkdocs/docs/multi-part-namespace.md new file mode 100644 index 0000000000..f4301bdb0d --- /dev/null +++ b/mkdocs/docs/multi-part-namespace.md @@ -0,0 +1,31 @@ +# Multi-Part Namespace Support + +Some catalog implementations support multi-part namespaces, which allows for hierarchical organization of tables. The following table summarizes the support for multi-part namespaces across different catalog implementations in Iceberg Python. + +| Catalog Implementation | Multi-Part Namespace Support | Notes | +|------------------------|------------------------------|-------| +| REST Catalog | ✅ Yes | Fully supports multi-part namespace as defined by the REST catalog specification. | +| Hive Catalog | ❌ No | Spark does not support multi-part namespace. | +| DynamoDB Catalog | ✅ Yes | Namespace is represented as a composite key in DynamoDB. | +| Glue Catalog | ❌ No | Uses AWS Glue databases which don't support multi-part namespace. | +| File Catalog | ✅ Yes | Namespace parts are represented as directory hierarchies in the file system. | +| In-Memory Catalog | ✅ Yes | Supports multi-part namespace for testing purposes. | + +## Usage Example + +```python +from pyiceberg.catalog import load_catalog + +# Using a catalog with multi-part namespace support +catalog = load_catalog("my_catalog") + +# Creating a table with a multi-part namespace +catalog.create_table("default.multi.table_name", schema, spec) + +# Listing tables in a multi-part namespace +tables = catalog.list_tables("default.multi") +``` + +## Configuration + +When using catalogs that support multi-part namespaces, make sure to use the appropriate delimiter (typically `.`) when referencing namespaces in your code. \ No newline at end of file diff --git a/pyiceberg/catalog/__init__.py b/pyiceberg/catalog/__init__.py index a434193573..c590c6446e 100644 --- a/pyiceberg/catalog/__init__.py +++ b/pyiceberg/catalog/__init__.py @@ -354,6 +354,7 @@ class Catalog(ABC): name: str properties: Properties + _support_namespaces: bool = False def __init__(self, name: str, **properties: str): self.name = name @@ -732,25 +733,44 @@ def namespace_to_string( return ".".join(segment.strip() for segment in tuple_identifier) @staticmethod + def namespace_level(identifier: Union[str, Identifier]) -> int: + """Get the level of a namespace identifier. + + Args: + identifier (Union[str, Identifier]): a namespace identifier. + + Returns: + int: The level of the namespace. + """ + if not identifier: + return 1 + tuple_identifier = Catalog.identifier_to_tuple(identifier) + return len(tuple_identifier) + 1 + + @classmethod def identifier_to_database( - identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError + cls, identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchNamespaceError]] = ValueError ) -> str: tuple_identifier = Catalog.identifier_to_tuple(identifier) - if len(tuple_identifier) != 1: - raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}") - return tuple_identifier[0] + if not cls._support_namespaces: + if len(tuple_identifier) != 1: + raise err(f"Invalid database, hierarchical namespaces are not supported: {identifier}") + else: + return tuple_identifier[0] + + return ".".join(tuple_identifier) - @staticmethod + @classmethod def identifier_to_database_and_table( + cls, identifier: Union[str, Identifier], err: Union[Type[ValueError], Type[NoSuchTableError], Type[NoSuchNamespaceError]] = ValueError, ) -> Tuple[str, str]: tuple_identifier = Catalog.identifier_to_tuple(identifier) - if len(tuple_identifier) != 2: + if not cls._support_namespaces and len(tuple_identifier) != 2: raise err(f"Invalid path, hierarchical namespaces are not supported: {identifier}") - - return tuple_identifier[0], tuple_identifier[1] + return ".".join(tuple_identifier[:-1]), tuple_identifier[-1] def _load_file_io(self, properties: Properties = EMPTY_DICT, location: Optional[str] = None) -> FileIO: return load_file_io({**self.properties, **properties}, location) diff --git a/pyiceberg/catalog/dynamodb.py b/pyiceberg/catalog/dynamodb.py index 420fa5b523..2462674a4c 100644 --- a/pyiceberg/catalog/dynamodb.py +++ b/pyiceberg/catalog/dynamodb.py @@ -96,6 +96,8 @@ class DynamoDbCatalog(MetastoreCatalog): + _support_namespaces: bool = True + def __init__(self, name: str, client: Optional["DynamoDBClient"] = None, **properties: str): """Dynamodb catalog. @@ -445,16 +447,23 @@ def list_tables(self, namespace: Union[str, Identifier]) -> List[Identifier]: return table_identifiers def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identifier]: - """List top-level namespaces from the catalog. - - We do not support hierarchical namespace. + """List namespaces from the catalog. Returns: List[Identifier]: a List of namespace identifiers. """ - # Hierarchical namespace is not supported. Return an empty list + level = self.namespace_level(namespace) + conditions = f"{DYNAMODB_COL_IDENTIFIER} = :identifier" + expression_attribute_values = { + ":identifier": { + "S": DYNAMODB_NAMESPACE, + } + } if namespace: - return [] + conditions += f" AND begins_with({DYNAMODB_COL_NAMESPACE},:ns)" + expression_attribute_values[":ns"] = { + "S": self.namespace_to_string(namespace) + ".", + } paginator = self.dynamodb.get_paginator("query") @@ -462,12 +471,8 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi page_iterator = paginator.paginate( TableName=self.dynamodb_table_name, ConsistentRead=True, - KeyConditionExpression=f"{DYNAMODB_COL_IDENTIFIER} = :identifier", - ExpressionAttributeValues={ - ":identifier": { - "S": DYNAMODB_NAMESPACE, - } - }, + KeyConditionExpression=conditions, + ExpressionAttributeValues=expression_attribute_values, ) except ( self.dynamodb.exceptions.ProvisionedThroughputExceededException, @@ -477,14 +482,14 @@ def list_namespaces(self, namespace: Union[str, Identifier] = ()) -> List[Identi ) as e: raise GenericDynamoDbError(e.message) from e - database_identifiers = [] + database_identifiers = set() for page in page_iterator: for item in page["Items"]: _dict = _convert_dynamo_item_to_regular_dict(item) namespace_col = _dict[DYNAMODB_COL_NAMESPACE] - database_identifiers.append(self.identifier_to_tuple(namespace_col)) + database_identifiers.add(self.identifier_to_tuple(namespace_col)[:level]) - return database_identifiers + return list(database_identifiers) def load_namespace_properties(self, namespace: Union[str, Identifier]) -> Properties: """ diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index 913dc85bea..f24ebd97d8 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -219,6 +219,7 @@ class ListViewsResponse(IcebergBaseModel): class RestCatalog(Catalog): uri: str _session: Session + _support_namespaces: bool = True def __init__(self, name: str, **properties: str): """Rest Catalog. diff --git a/pyiceberg/catalog/sql.py b/pyiceberg/catalog/sql.py index dfa573bc13..6e1f4ba252 100644 --- a/pyiceberg/catalog/sql.py +++ b/pyiceberg/catalog/sql.py @@ -117,6 +117,8 @@ class SqlCatalog(MetastoreCatalog): The `SqlCatalog` has a different convention where a `TableIdentifier` requires a `Namespace`. """ + _support_namespaces: bool = True + def __init__(self, name: str, **properties: str): super().__init__(name, **properties) diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 5aee65d8b5..b7ec7deb31 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -544,6 +544,23 @@ def test_list_namespaces_200(rest_mock: Mocker) -> None: ] +def test_list_multipart_namespaces_200(rest_mock: Mocker) -> None: + rest_mock.get( + f"{TEST_URI}v1/namespaces", + json={"namespaces": [["default"], ["multipart"]]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + assert RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN).list_namespaces() == [("default",), ("multipart",)] + + rest_mock.get( + f"{TEST_URI}v1/namespaces?parent=multipart", + json={"namespaces": [["multipart", "namespace1"], ["multipart", "namespace2"]]}, + status_code=200, + request_headers=TEST_HEADERS, + ) + + def test_list_namespace_with_parent_200(rest_mock: Mocker) -> None: rest_mock.get( f"{TEST_URI}v1/namespaces?parent=accounting", From 52d7283c2e001944dafd553bbdf55098fe39e526 Mon Sep 17 00:00:00 2001 From: DinGo4DEV <19900516+dingo4dev@users.noreply.github.com> Date: Wed, 8 Oct 2025 18:37:13 +0800 Subject: [PATCH 2/2] Refactor Namespace and TableIdentifier classes improved type handling and added utility methods --- mkdocs/docs/multi-part-namespace.md | 2 +- pyiceberg/table/__init__.py | 57 ++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/mkdocs/docs/multi-part-namespace.md b/mkdocs/docs/multi-part-namespace.md index f4301bdb0d..e5f7e14f93 100644 --- a/mkdocs/docs/multi-part-namespace.md +++ b/mkdocs/docs/multi-part-namespace.md @@ -28,4 +28,4 @@ tables = catalog.list_tables("default.multi") ## Configuration -When using catalogs that support multi-part namespaces, make sure to use the appropriate delimiter (typically `.`) when referencing namespaces in your code. \ No newline at end of file +When using catalogs that support multi-part namespaces, make sure to use the appropriate delimiter (typically `.`) when referencing namespaces in your code. diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index 972efc8c47..7e050e05f5 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -29,10 +29,13 @@ TYPE_CHECKING, Any, Callable, + ClassVar, Dict, Iterable, + Iterator, List, Optional, + Sequence, Set, Tuple, Type, @@ -1028,20 +1031,70 @@ def commit_transaction(self) -> Table: return self._table -class Namespace(IcebergRootModel[List[str]]): +class Namespace(IcebergRootModel[Tuple[str, ...]]): """Reference to one or more levels of a namespace.""" - root: List[str] = Field( + root: Tuple[str, ...] = Field( ..., description="Reference to one or more levels of a namespace", ) + def __len__(self) -> int: + """Fetch the size of Namespace.""" + return len(self.root) + + def __getitem__(self, index: int) -> str: + """Fetch a value from a Namespace.""" + return self.root[index] + + def __iter__(self) -> Iterator[str]: + """Return an iterator over the elements in the root of the table.""" + return iter(self.root) + + def levels(self) -> int: + """Return the number of levels in this namespace.""" + return len(self.root) + + def __repr__(self) -> str: + """Return a string representation of the namespace.""" + return f"Namespace({self.root})" + class TableIdentifier(IcebergBaseModel): """Fully Qualified identifier to a table.""" namespace: Namespace name: str + _separator: ClassVar[str] = "." + + @classmethod + def from_string(cls, identifier: str) -> TableIdentifier: + """Create a TableIdentifier from a separator. + + Args: + identifier: A separator representing the table identifier, e.g., "db.schema.table". + + Returns: + A TableIdentifier instance. + """ + parts = identifier.split(cls._separator) + return cls.from_tuple(parts) + + @classmethod + def from_tuple(cls, identifier: Sequence[str]) -> TableIdentifier: + """Create a TableIdentifier from a tuple. + + Args: + identifier: A tuple representing the table identifier, e.g., ("db", "schema", "table"). + + Returns: + A TableIdentifier instance. + """ + if len(identifier) < 2: + raise ValueError("Identifier must include at least a namespace and a table name.") + namespace = Namespace(root=tuple(identifier[:-1])) + name = identifier[-1] + return cls(namespace=namespace, name=name) class CommitTableRequest(IcebergBaseModel):