Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti

### New Features

- Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature.

### Changes

### Deprecations
Expand Down
1 change: 1 addition & 0 deletions integration-tests/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ dependencies {
implementation(libs.awaitility)
implementation(libs.s3mock.testcontainers)
implementation(project(":polaris-runtime-test-common"))
implementation(project(":polaris-minio-testcontainer"))
}

copiedCodeChecks {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@
import java.util.UUID;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.AwsStorageConfigInfo;
import org.apache.polaris.core.admin.model.Catalog;
import org.apache.polaris.core.admin.model.CatalogGrant;
import org.apache.polaris.core.admin.model.CatalogPrivilege;
import org.apache.polaris.core.admin.model.CatalogProperties;
import org.apache.polaris.core.admin.model.CatalogRole;
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
import org.apache.polaris.core.admin.model.ExternalCatalog;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.GrantResource;
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
import org.apache.polaris.core.admin.model.NamespaceGrant;
Expand All @@ -52,6 +52,9 @@
import org.apache.polaris.service.it.env.PolarisClient;
import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension;
import org.apache.polaris.service.it.ext.SparkSessionBuilder;
import org.apache.polaris.test.minio.Minio;
import org.apache.polaris.test.minio.MinioAccess;
import org.apache.polaris.test.minio.MinioExtension;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -66,9 +69,14 @@
* Integration test for catalog federation functionality. This test verifies that an external
* catalog can be created that federates with an internal catalog.
*/
@ExtendWith(MinioExtension.class)
@ExtendWith(PolarisIntegrationTestExtension.class)
public class CatalogFederationIntegrationTest {

public static final String BUCKET_URI_PREFIX = "/minio-test-catalog-federation";
public static final String MINIO_ACCESS_KEY = "test-ak-123-catalog-federation";
public static final String MINIO_SECRET_KEY = "test-sk-123-catalog-federation";

private static PolarisClient client;
private static CatalogApi catalogApi;
private static ManagementApi managementApi;
Expand All @@ -78,6 +86,8 @@ public class CatalogFederationIntegrationTest {
private static String federatedCatalogName;
private static String localCatalogRoleName;
private static String federatedCatalogRoleName;
private static URI storageBase;
private static String endpoint;

private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
private static final String PRINCIPAL_ROLE_NAME = "test-catalog-federation-user-role";
Expand All @@ -93,12 +103,17 @@ public class CatalogFederationIntegrationTest {
private PrincipalWithCredentials newUserCredentials;

@BeforeAll
static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials credentials) {
static void setup(
PolarisApiEndpoints apiEndpoints,
ClientCredentials credentials,
@Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess) {
endpoints = apiEndpoints;
client = polarisClient(endpoints);
String adminToken = client.obtainToken(credentials);
managementApi = client.managementApi(adminToken);
catalogApi = client.catalogApi(adminToken);
storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX);
endpoint = minioAccess.s3endpoint();
}

@AfterAll
Expand Down Expand Up @@ -129,12 +144,14 @@ void after() {
}

private void setupCatalogs() {
baseLocation = URI.create("file:///tmp/warehouse");
baseLocation = storageBase;
newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);

FileStorageConfigInfo storageConfig =
FileStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.FILE)
AwsStorageConfigInfo storageConfig =
AwsStorageConfigInfo.builder()
.setStorageType(StorageConfigInfo.StorageTypeEnum.S3)
.setPathStyleAccess(true)
.setEndpoint(endpoint)
.setAllowedLocations(List.of(baseLocation.toString()))
.build();

Expand Down Expand Up @@ -197,6 +214,14 @@ private void setupCatalogs() {
spark =
SparkSessionBuilder.buildWithTestDefaults()
.withWarehouse(warehouseDir.toUri())
.withConfig(
"spark.sql.catalog." + localCatalogName + ".header.X-Iceberg-Access-Delegation",
"vended-credentials")
.withConfig(
"spark.sql.catalog." + federatedCatalogName + ".header.X-Iceberg-Access-Delegation",
"vended-credentials")
.withConfig("spark.sql.catalog." + localCatalogName + ".cache-enabled", "false")
.withConfig("spark.sql.catalog." + federatedCatalogName + ".cache-enabled", "false")
.addCatalog(
localCatalogName, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken)
.addCatalog(
Expand Down Expand Up @@ -296,10 +321,6 @@ void testFederatedCatalogWithNamespaceRBAC() {
.sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id")
.collectAsList();
assertThat(localNs2Data).hasSize(2);

// Restore the grant
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, namespaceGrant);
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
}

@Test
Expand Down Expand Up @@ -335,9 +356,76 @@ void testFederatedCatalogWithTableRBAC() {
.sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id")
.collectAsList();
assertThat(localNs2Data).hasSize(2);
}

// Restore the grant
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant);
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
@Test
void testFederatedCatalogWithCredentialVending() {
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);

// Case 1: Only have TABLE_READ_PROPERTIES privilege, should not be able to read data
TableGrant tablePropertiesGrant =
TableGrant.builder()
.setType(GrantResource.TypeEnum.TABLE)
.setPrivilege(TablePrivilege.TABLE_READ_PROPERTIES)
.setNamespace(List.of("ns1"))
.setTableName("test_table")
.build();
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant);
spark.sql("USE " + federatedCatalogName);

// Read table data should fail since TABLE_READ_PROPERTIES does not allow reading data
assertThatThrownBy(() -> spark.sql("SELECT * FROM ns1.test_table ORDER BY id"))
.isInstanceOf(ForbiddenException.class);

// Case 2: Only have TABLE_READ_DATA privilege, should be able to read data but not write
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant);
TableGrant tableReadDataGrant =
TableGrant.builder()
.setType(GrantResource.TypeEnum.TABLE)
.setPrivilege(TablePrivilege.TABLE_READ_DATA)
.setNamespace(List.of("ns1"))
.setTableName("test_table")
.build();
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);

// Verify that the vended credential allows reading the data
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
assertThat(ns1Data).hasSize(2);
assertThat(ns1Data.get(0).getInt(0)).isEqualTo(1);
assertThat(ns1Data.get(0).getString(1)).isEqualTo("Alice");

// Verify that write is blocked since the vended credential should only have read permission
assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')"))
.hasMessageContaining(
"software.amazon.awssdk.services.s3.model.S3Exception: Access Denied. (Service: S3, Status Code: 403,");

// Case 3: TABLE_WRITE_DATA should
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant);
TableGrant tableWriteDataGrant =
TableGrant.builder()
.setType(GrantResource.TypeEnum.TABLE)
.setPrivilege(TablePrivilege.TABLE_WRITE_DATA)
.setNamespace(List.of("ns1"))
.setTableName("test_table")
.build();
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableWriteDataGrant);

spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')");

// Verify the write was successful by reading back
List<Row> updatedData = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
assertThat(updatedData).hasSize(3);
assertThat(updatedData.get(2).getInt(0)).isEqualTo(3);
assertThat(updatedData.get(2).getString(1)).isEqualTo("Charlie");

// Verify the data is also visible from the local catalog (both point to same storage)
spark.sql(String.format("REFRESH TABLE %s.ns1.test_table", localCatalogName));
List<Row> localData =
spark
.sql(String.format("SELECT * FROM %s.ns1.test_table ORDER BY id", localCatalogName))
.collectAsList();
assertThat(localData).hasSize(3);
assertThat(localData.get(2).getInt(0)).isEqualTo(3);
assertThat(localData.get(2).getString(1)).isEqualTo("Charlie");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -429,4 +429,13 @@ public static void enforceFeatureEnabledOrThrow(
"When true, enables finer grained update table privileges which are passed to the authorizer for update table operations")
.defaultValue(true)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Boolean> ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's reasonable to introduce the config now. It may not be necessary for long-term. We might think of a way to remove these configs. They makes system unnecessary complex. Can we mark it to be removed at certain versions, e.g., version 1.3/1.4? and file an issue to remove these in case we forgot it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestions! We may still want this new config but may need to scope it differently. Ideally we should let ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING only controls the behavior of static-facade EXTERNAL catalog and let ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING controls the behavior of passthrough-facade EXTERNAL catalog, since the implication of credential vending is different in these 2 cases. But that will be a behavior change and we'd better address that in a follow-up. I will drive that to make sure we stabilize this before 1.3

Copy link
Contributor

@flyrain flyrain Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still a bit confused by the long-term need of this config. Taking passthrough-facade EXTERNAL catalog as an example, why do we need a config to control the behavior? It should be the default behavior. Or is there any use case that user don't need credential vending, but still config the storage info.?

Copy link
Contributor Author

@HonahX HonahX Oct 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree this should be the default behavior. But I think it may be still good to preserve the ability to toggle the behavior until the new feature stabilized. And yes, may be several versions afterwards, we could remove this flag completely and make this a fundamental feature : ) I will have PR/issue to track this, thanks!

PolarisConfiguration.<Boolean>builder()
.key("ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING")
.catalogConfig("polaris.config.allow-federated-catalogs-credential-vending")
.description(
"If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.")
.defaultValue(true)
.buildFeatureConfiguration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.polaris.service.catalog.iceberg;

import static org.apache.polaris.core.config.FeatureConfiguration.ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING;
import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED;
import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS;

Expand Down Expand Up @@ -265,6 +266,10 @@ protected void initializeCatalog() {
throw new UnsupportedOperationException(
"External catalog factory for type '" + connectionType + "' is unavailable.");
}
// TODO: if the remote catalog is not RestCatalog, the corresponding table operation will use
// environment to load the table metadata, the env may not contain credentials to access the
// storage. In the future, we could leverage PolarisCredentialManager to inject storage
// credentials for non-rest remote catalog
this.baseCatalog = federatedCatalog;
} else {
LOGGER.atInfo().log("Initializing non-federated catalog");
Expand Down Expand Up @@ -421,6 +426,12 @@ public void authorizeCreateTableDirect(
PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION,
TableIdentifier.of(namespace, request.name()));
}

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
}
checkAllowExternalCatalogCredentialVending(delegationModes);
}

public LoadTableResponse createTableDirect(
Expand All @@ -431,10 +442,6 @@ public LoadTableResponse createTableDirect(

authorizeCreateTableDirect(namespace, request, delegationModes);

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
}
request.validate();

TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name());
Expand Down Expand Up @@ -545,6 +552,12 @@ private void authorizeCreateTableStaged(
PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION,
TableIdentifier.of(namespace, request.name()));
}

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
}
checkAllowExternalCatalogCredentialVending(delegationModes);
}

public LoadTableResponse createTableStaged(
Expand All @@ -555,10 +568,6 @@ public LoadTableResponse createTableStaged(

authorizeCreateTableStaged(namespace, request, delegationModes);

CatalogEntity catalog = getResolvedCatalogEntity();
if (catalog.isStaticFacade()) {
throw new BadRequestException("Cannot create table on static-facade external catalogs.");
}
TableIdentifier ident = TableIdentifier.of(namespace, request.name());
TableMetadata metadata = stageTableCreateHelper(namespace, request);

Expand Down Expand Up @@ -723,23 +732,7 @@ private Set<PolarisStorageActions> authorizeLoadTable(
read, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier);
}

CatalogEntity catalogEntity = getResolvedCatalogEntity();

LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType());
LOGGER.info(
"allow external catalog credential vending: {}",
realmConfig.getConfig(
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity));
if (catalogEntity
.getCatalogType()
.equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL)
&& !realmConfig.getConfig(
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) {
throw new ForbiddenException(
"Access Delegation is not enabled for this catalog. Please consult applicable "
+ "documentation for the catalog config property '%s' to enable this feature",
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig());
}
checkAllowExternalCatalogCredentialVending(delegationModes);

return actionsRequested;
}
Expand Down Expand Up @@ -808,8 +801,15 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
PolarisResolvedPathWrapper resolvedStoragePath =
CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier);

if (baseCatalog instanceof IcebergCatalog && resolvedStoragePath != null) {
if (resolvedStoragePath == null) {
LOGGER.debug(
"Unable to find storage configuration information for table {}", tableIdentifier);
return responseBuilder;
}

if (baseCatalog instanceof IcebergCatalog
|| realmConfig.getConfig(
ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) {
AccessConfig accessConfig =
accessConfigProvider.getAccessConfig(
callContext,
Expand Down Expand Up @@ -838,6 +838,7 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential
}
responseBuilder.addAllConfig(accessConfig.extraProperties());
}

return responseBuilder;
}

Expand Down Expand Up @@ -1211,6 +1212,31 @@ private EnumSet<PolarisAuthorizableOperation> getUpdateTableAuthorizableOperatio
}
}

private void checkAllowExternalCatalogCredentialVending(
EnumSet<AccessDelegationMode> delegationModes) {

if (delegationModes.isEmpty()) {
return;
}
CatalogEntity catalogEntity = getResolvedCatalogEntity();

LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType());
LOGGER.info(
"allow external catalog credential vending: {}",
realmConfig.getConfig(
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity));
if (catalogEntity
.getCatalogType()
.equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL)
&& !realmConfig.getConfig(
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) {
throw new ForbiddenException(
"Access Delegation is not enabled for this catalog. Please consult applicable "
+ "documentation for the catalog config property '%s' to enable this feature",
FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig());
}
}

@Override
public void close() throws Exception {
if (baseCatalog instanceof Closeable closeable) {
Expand Down
Loading