diff --git a/pyiceberg/catalog/rest/__init__.py b/pyiceberg/catalog/rest/__init__.py index f7d8eec960..9027cc63a5 100644 --- a/pyiceberg/catalog/rest/__init__.py +++ b/pyiceberg/catalog/rest/__init__.py @@ -152,10 +152,16 @@ def _retry_hook(retry_state: RetryCallState) -> None: } +class StorageCredential(IcebergBaseModel): + prefix: str = Field() + config: Properties = Field() + + class TableResponse(IcebergBaseModel): metadata_location: Optional[str] = Field(alias="metadata-location", default=None) metadata: TableMetadata config: Properties = Field(default_factory=dict) + storage_credentials: Optional[List[StorageCredential]] = Field(alias="storage-credentials", default=None) class CreateTableRequest(IcebergBaseModel): @@ -454,7 +460,16 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response: metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + { + **table_response.metadata.properties, + **self._get_credentials( + table_response.storage_credentials, + table_response.config, + table_response.metadata_location, + getattr(table_response.metadata, "location", None), + ), + }, + table_response.metadata_location, ), catalog=self, config=table_response.config, @@ -466,11 +481,42 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res metadata_location=table_response.metadata_location, # type: ignore metadata=table_response.metadata, io=self._load_file_io( - {**table_response.metadata.properties, **table_response.config}, table_response.metadata_location + { + **table_response.metadata.properties, + **self._get_credentials( + table_response.storage_credentials, + table_response.config, + table_response.metadata_location, + getattr(table_response.metadata, "location", None), + ), + }, + table_response.metadata_location, ), catalog=self, ) + @staticmethod + def _get_credentials( + storage_credentials: Optional[List[StorageCredential]], + config: Properties, + metadata_location: Optional[str], + table_location: Optional[str], + ) -> Properties: + if not storage_credentials: + return config + + target = metadata_location or table_location + if not target: + return config + + # Choose the most specific (longest) matching prefix + matching: List[StorageCredential] = [sc for sc in storage_credentials if target.startswith(sc.prefix)] + if not matching: + return config + + selected = max(matching, key=lambda sc: len(sc.prefix)) + return selected.config + def _refresh_token(self) -> None: # Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread # instead of retrying on Auth Exceptions. Keeping refresh behavior for the LegacyOAuth2AuthManager diff --git a/tests/catalog/test_rest.py b/tests/catalog/test_rest.py index 223c6d2f9e..01bae68734 100644 --- a/tests/catalog/test_rest.py +++ b/tests/catalog/test_rest.py @@ -39,7 +39,7 @@ ServerError, TableAlreadyExistsError, ) -from pyiceberg.io import load_file_io +from pyiceberg.io import AWS_ACCESS_KEY_ID, load_file_io from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema from pyiceberg.table import Table @@ -858,6 +858,57 @@ def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_ assert actual == expected +def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None: + response = { + "metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + "metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"], + "config": { + AWS_ACCESS_KEY_ID: "from_config", + }, + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/", + "config": { + AWS_ACCESS_KEY_ID: "from_storage_credentials", + }, + } + ], + } + + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=response, + status_code=200, + request_headers=TEST_HEADERS, + ) + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) + + assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_storage_credentials" + + +def test_config_when_no_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None: + response = { + "metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"], + "metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"], + "config": { + AWS_ACCESS_KEY_ID: "from_config", + }, + } + + rest_mock.get( + f"{TEST_URI}v1/namespaces/fokko/tables/table", + json=response, + status_code=200, + request_headers=TEST_HEADERS, + ) + + catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN) + table = catalog.load_table(("fokko", "table")) + + assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_config" + def test_load_table_200_loading_mode( rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any] ) -> None: