Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,16 @@ def _retry_hook(retry_state: RetryCallState) -> None:
}


class StorageCredential(IcebergBaseModel):
prefix: str = Field()
config: Properties = Field()


class TableResponse(IcebergBaseModel):
metadata_location: Optional[str] = Field(alias="metadata-location", default=None)
metadata: TableMetadata
config: Properties = Field(default_factory=dict)
storage_credentials: Optional[List[StorageCredential]] = Field(alias="storage-credentials", default=None)


class CreateTableRequest(IcebergBaseModel):
Expand Down Expand Up @@ -454,7 +460,16 @@ def _response_to_table(self, identifier_tuple: Tuple[str, ...], table_response:
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
{
**table_response.metadata.properties,
**self._get_credentials(
table_response.storage_credentials,
table_response.config,
table_response.metadata_location,
getattr(table_response.metadata, "location", None),
),
},
table_response.metadata_location,
),
catalog=self,
config=table_response.config,
Expand All @@ -466,11 +481,42 @@ def _response_to_staged_table(self, identifier_tuple: Tuple[str, ...], table_res
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
{
**table_response.metadata.properties,
**self._get_credentials(
table_response.storage_credentials,
table_response.config,
table_response.metadata_location,
getattr(table_response.metadata, "location", None),
),
},
table_response.metadata_location,
),
catalog=self,
)

@staticmethod
def _get_credentials(
storage_credentials: Optional[List[StorageCredential]],
config: Properties,
metadata_location: Optional[str],
table_location: Optional[str],
) -> Properties:
if not storage_credentials:
return config

target = metadata_location or table_location
if not target:
return config

# Choose the most specific (longest) matching prefix
matching: List[StorageCredential] = [sc for sc in storage_credentials if target.startswith(sc.prefix)]
if not matching:
return config

selected = max(matching, key=lambda sc: len(sc.prefix))
return selected.config

def _refresh_token(self) -> None:
# Reactive token refresh is atypical - we should proactively refresh tokens in a separate thread
# instead of retrying on Auth Exceptions. Keeping refresh behavior for the LegacyOAuth2AuthManager
Expand Down
53 changes: 52 additions & 1 deletion tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
ServerError,
TableAlreadyExistsError,
)
from pyiceberg.io import load_file_io
from pyiceberg.io import AWS_ACCESS_KEY_ID, load_file_io
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import Table
Expand Down Expand Up @@ -858,6 +858,57 @@ def test_load_table_200(rest_mock: Mocker, example_table_metadata_with_snapshot_
assert actual == expected


def test_storage_credentials_over_config(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
response = {
"metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
"metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"],
"config": {
AWS_ACCESS_KEY_ID: "from_config",
},
"storage-credentials": [
{
"prefix": "s3://warehouse/database/",
"config": {
AWS_ACCESS_KEY_ID: "from_storage_credentials",
},
}
],
}

rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=response,
status_code=200,
request_headers=TEST_HEADERS,
)

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("fokko", "table"))

assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_storage_credentials"


def test_config_when_no_storage_credentials(rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]) -> None:
response = {
"metadata-location": example_table_metadata_with_snapshot_v1_rest_json["metadata-location"],
"metadata": example_table_metadata_with_snapshot_v1_rest_json["metadata"],
"config": {
AWS_ACCESS_KEY_ID: "from_config",
},
}

rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=response,
status_code=200,
request_headers=TEST_HEADERS,
)

catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("fokko", "table"))

assert table.io.properties[AWS_ACCESS_KEY_ID] == "from_config"

def test_load_table_200_loading_mode(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: Dict[str, Any]
) -> None:
Expand Down
Loading