From 944f5c34aa49373db546ce90f5a32b0e674a2438 Mon Sep 17 00:00:00 2001 From: Honah J Date: Thu, 25 Sep 2025 17:42:25 -0500 Subject: [PATCH 1/4] enable credential vending for rest catalog and add tests --- integration-tests/build.gradle.kts | 1 + .../CatalogFederationIntegrationTest.java | 109 +++++++++++++++++- .../iceberg/IcebergCatalogHandler.java | 6 +- .../service/spark/it/CatalogFederationIT.java | 23 ++-- 4 files changed, 121 insertions(+), 18 deletions(-) diff --git a/integration-tests/build.gradle.kts b/integration-tests/build.gradle.kts index 6c48edae65..7429c53718 100644 --- a/integration-tests/build.gradle.kts +++ b/integration-tests/build.gradle.kts @@ -67,6 +67,7 @@ dependencies { implementation(libs.awaitility) implementation(libs.s3mock.testcontainers) implementation(project(":polaris-runtime-test-common")) + implementation(project(":polaris-minio-testcontainer")) } copiedCodeChecks { diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java index 42032dc7f5..363721c769 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.polaris.core.admin.model.AuthenticationParameters; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; import org.apache.polaris.core.admin.model.Catalog; import org.apache.polaris.core.admin.model.CatalogGrant; import org.apache.polaris.core.admin.model.CatalogPrivilege; @@ -34,7 +35,6 @@ import org.apache.polaris.core.admin.model.CatalogRole; import org.apache.polaris.core.admin.model.ConnectionConfigInfo; import org.apache.polaris.core.admin.model.ExternalCatalog; -import org.apache.polaris.core.admin.model.FileStorageConfigInfo; import org.apache.polaris.core.admin.model.GrantResource; import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo; import org.apache.polaris.core.admin.model.NamespaceGrant; @@ -52,6 +52,9 @@ import org.apache.polaris.service.it.env.PolarisClient; import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; import org.apache.polaris.service.it.ext.SparkSessionBuilder; +import org.apache.polaris.test.minio.Minio; +import org.apache.polaris.test.minio.MinioAccess; +import org.apache.polaris.test.minio.MinioExtension; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; @@ -66,9 +69,14 @@ * Integration test for catalog federation functionality. This test verifies that an external * catalog can be created that federates with an internal catalog. */ +@ExtendWith(MinioExtension.class) @ExtendWith(PolarisIntegrationTestExtension.class) public class CatalogFederationIntegrationTest { + public static final String BUCKET_URI_PREFIX = "/minio-test-catalog-federation"; + public static final String MINIO_ACCESS_KEY = "test-ak-123-catalog-federation"; + public static final String MINIO_SECRET_KEY = "test-sk-123-catalog-federation"; + private static PolarisClient client; private static CatalogApi catalogApi; private static ManagementApi managementApi; @@ -78,6 +86,8 @@ public class CatalogFederationIntegrationTest { private static String federatedCatalogName; private static String localCatalogRoleName; private static String federatedCatalogRoleName; + private static URI storageBase; + private static String endpoint; private static final String PRINCIPAL_NAME = "test-catalog-federation-user"; private static final String PRINCIPAL_ROLE_NAME = "test-catalog-federation-user-role"; @@ -93,12 +103,17 @@ public class CatalogFederationIntegrationTest { private PrincipalWithCredentials newUserCredentials; @BeforeAll - static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials credentials) { + static void setup( + PolarisApiEndpoints apiEndpoints, + ClientCredentials credentials, + @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess) { endpoints = apiEndpoints; client = polarisClient(endpoints); String adminToken = client.obtainToken(credentials); managementApi = client.managementApi(adminToken); catalogApi = client.catalogApi(adminToken); + storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX); + endpoint = minioAccess.s3endpoint(); } @AfterAll @@ -129,12 +144,14 @@ void after() { } private void setupCatalogs() { - baseLocation = URI.create("file:///tmp/warehouse"); + baseLocation = storageBase; newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME); - FileStorageConfigInfo storageConfig = - FileStorageConfigInfo.builder() - .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE) + AwsStorageConfigInfo storageConfig = + AwsStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setPathStyleAccess(true) + .setEndpoint(endpoint) .setAllowedLocations(List.of(baseLocation.toString())) .build(); @@ -197,6 +214,14 @@ private void setupCatalogs() { spark = SparkSessionBuilder.buildWithTestDefaults() .withWarehouse(warehouseDir.toUri()) + .withConfig( + "spark.sql.catalog." + localCatalogName + ".header.X-Iceberg-Access-Delegation", + "vended-credentials") + .withConfig( + "spark.sql.catalog." + federatedCatalogName + ".header.X-Iceberg-Access-Delegation", + "vended-credentials") + .withConfig("spark.sql.catalog." + localCatalogName + ".cache-enabled", "false") + .withConfig("spark.sql.catalog." + federatedCatalogName + ".cache-enabled", "false") .addCatalog( localCatalogName, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken) .addCatalog( @@ -340,4 +365,76 @@ void testFederatedCatalogWithTableRBAC() { managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); } + + @Test + void testFederatedCatalogWithCredentialVending() { + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); + + // Case 1: Only have TABLE_READ_PROPERTIES privilege, should not be able to read data + TableGrant tableGrant = + TableGrant.builder() + .setType(GrantResource.TypeEnum.TABLE) + .setPrivilege(TablePrivilege.TABLE_READ_PROPERTIES) + .setNamespace(List.of("ns1")) + .setTableName("test_table") + .build(); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + spark.sql("USE " + federatedCatalogName); + + // Read table data should fail since TABLE_READ_PROPERTIES does not allow reading data + assertThatThrownBy(() -> spark.sql("SELECT * FROM ns1.test_table ORDER BY id")) + .isInstanceOf(ForbiddenException.class); + + // Case 2: Only have TABLE_READ_DATA privilege, should be able to read data but not write + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + tableGrant = + TableGrant.builder() + .setType(GrantResource.TypeEnum.TABLE) + .setPrivilege(TablePrivilege.TABLE_READ_DATA) + .setNamespace(List.of("ns1")) + .setTableName("test_table") + .build(); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + + // Verify that the vended credential allows reading the data + List ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList(); + assertThat(ns1Data).hasSize(2); + assertThat(ns1Data.get(0).getInt(0)).isEqualTo(1); + assertThat(ns1Data.get(0).getString(1)).isEqualTo("Alice"); + + // Verify that write is blocked since the vended credential should only have read permission + assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')")) + .hasMessageContaining("Access Denied. (Service: S3, Status Code: 403"); + + // Case 3: TABLE_WRITE_DATA should + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + tableGrant = + TableGrant.builder() + .setType(GrantResource.TypeEnum.TABLE) + .setPrivilege(TablePrivilege.TABLE_WRITE_DATA) + .setNamespace(List.of("ns1")) + .setTableName("test_table") + .build(); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + + spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')"); + + // Verify the write was successful by reading back + List updatedData = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList(); + assertThat(updatedData).hasSize(3); + assertThat(updatedData.get(2).getInt(0)).isEqualTo(3); + assertThat(updatedData.get(2).getString(1)).isEqualTo("Charlie"); + + // Verify the data is also visible from the local catalog (both point to same storage) + spark.sql(String.format("REFRESH TABLE %s.ns1.test_table", localCatalogName)); + List localData = + spark + .sql(String.format("SELECT * FROM %s.ns1.test_table ORDER BY id", localCatalogName)) + .collectAsList(); + assertThat(localData).hasSize(3); + assertThat(localData.get(2).getInt(0)).isEqualTo(3); + assertThat(localData.get(2).getString(1)).isEqualTo("Charlie"); + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); + } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 280c98d469..85c1da7292 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.catalog.iceberg; +import static org.apache.polaris.core.config.FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING; import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED; import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS; @@ -808,7 +809,10 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential PolarisResolvedPathWrapper resolvedStoragePath = CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier); - if (baseCatalog instanceof IcebergCatalog && resolvedStoragePath != null) { + if ((baseCatalog instanceof IcebergCatalog + || realmConfig.getConfig( + ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, getResolvedCatalogEntity())) + && resolvedStoragePath != null) { AccessConfig accessConfig = accessConfigProvider.getAccessConfig( diff --git a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java index cdd9a2a904..70a13290c8 100644 --- a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java +++ b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.spark.it; +import com.google.common.collect.ImmutableMap; import io.quarkus.test.junit.QuarkusIntegrationTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; @@ -31,17 +32,17 @@ public class CatalogFederationIT extends CatalogFederationIntegrationTest { public static class CatalogFederationProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { - return Map.of( - "polaris.features.\"ENABLE_CATALOG_FEDERATION\"", - "true", - "polaris.features.\"SUPPORTED_CATALOG_CONNECTION_TYPES\"", - "[\"ICEBERG_REST\"]", - "polaris.features.\"ALLOW_OVERLAPPING_CATALOG_URLS\"", - "true", - "polaris.features.\"ENABLE_SUB_CATALOG_RBAC_FOR_FEDERATED_CATALOGS\"", - "true", - "polaris.features.\"ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG\"", - "true"); + return ImmutableMap.builder() + .put("polaris.features.\"ENABLE_CATALOG_FEDERATION\"", "true") + .put("polaris.features.\"SUPPORTED_CATALOG_CONNECTION_TYPES\"", "[\"ICEBERG_REST\"]") + .put("polaris.features.\"ALLOW_OVERLAPPING_CATALOG_URLS\"", "true") + .put("polaris.features.\"ENABLE_SUB_CATALOG_RBAC_FOR_FEDERATED_CATALOGS\"", "true") + .put("polaris.features.\"ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG\"", "true") + .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", "false") + .put("polaris.features.\"ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING\"", "true") + .put("polaris.storage.aws.access-key", CatalogFederationIntegrationTest.MINIO_ACCESS_KEY) + .put("polaris.storage.aws.secret-key", CatalogFederationIntegrationTest.MINIO_SECRET_KEY) + .build(); } } } From 34d5edd2127b1eeb9fca7b398d5b8ad211a74bf2 Mon Sep 17 00:00:00 2001 From: Honah J Date: Thu, 9 Oct 2025 13:57:17 -0500 Subject: [PATCH 2/4] Add a note about current limitation --- .../service/catalog/iceberg/IcebergCatalogHandler.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 85c1da7292..98c9bebde6 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -266,6 +266,10 @@ protected void initializeCatalog() { throw new UnsupportedOperationException( "External catalog factory for type '" + connectionType + "' is unavailable."); } + // TODO: if the remote catalog is not RestCatalog, the corresponding table operation will use + // environment to load the table metadata, the env may not contain credentials to access the + // storage. In the future, we could leverage PolarisCredentialManager to inject storage + // credentials for non-rest remote catalog this.baseCatalog = federatedCatalog; } else { LOGGER.atInfo().log("Initializing non-federated catalog"); From 06062925b9377ff94a8836306f8ee24c9a6319f5 Mon Sep 17 00:00:00 2001 From: Honah J Date: Thu, 9 Oct 2025 17:11:31 -0500 Subject: [PATCH 3/4] Refactor --- .../CatalogFederationIntegrationTest.java | 29 +++++++------------ .../iceberg/IcebergCatalogHandler.java | 12 +++++--- 2 files changed, 18 insertions(+), 23 deletions(-) diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java index 363721c769..af9937b3d1 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java @@ -321,10 +321,6 @@ void testFederatedCatalogWithNamespaceRBAC() { .sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id") .collectAsList(); assertThat(localNs2Data).hasSize(2); - - // Restore the grant - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, namespaceGrant); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); } @Test @@ -360,10 +356,6 @@ void testFederatedCatalogWithTableRBAC() { .sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id") .collectAsList(); assertThat(localNs2Data).hasSize(2); - - // Restore the grant - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); } @Test @@ -371,14 +363,14 @@ void testFederatedCatalogWithCredentialVending() { managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); // Case 1: Only have TABLE_READ_PROPERTIES privilege, should not be able to read data - TableGrant tableGrant = + TableGrant tablePropertiesGrant = TableGrant.builder() .setType(GrantResource.TypeEnum.TABLE) .setPrivilege(TablePrivilege.TABLE_READ_PROPERTIES) .setNamespace(List.of("ns1")) .setTableName("test_table") .build(); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant); spark.sql("USE " + federatedCatalogName); // Read table data should fail since TABLE_READ_PROPERTIES does not allow reading data @@ -386,15 +378,15 @@ void testFederatedCatalogWithCredentialVending() { .isInstanceOf(ForbiddenException.class); // Case 2: Only have TABLE_READ_DATA privilege, should be able to read data but not write - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); - tableGrant = + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant); + TableGrant tableReadDataGrant = TableGrant.builder() .setType(GrantResource.TypeEnum.TABLE) .setPrivilege(TablePrivilege.TABLE_READ_DATA) .setNamespace(List.of("ns1")) .setTableName("test_table") .build(); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant); // Verify that the vended credential allows reading the data List ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList(); @@ -404,18 +396,19 @@ void testFederatedCatalogWithCredentialVending() { // Verify that write is blocked since the vended credential should only have read permission assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')")) - .hasMessageContaining("Access Denied. (Service: S3, Status Code: 403"); + .hasMessageContaining( + "software.amazon.awssdk.services.s3.model.S3Exception: Access Denied. (Service: S3, Status Code: 403,"); // Case 3: TABLE_WRITE_DATA should - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); - tableGrant = + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant); + TableGrant tableWriteDataGrant = TableGrant.builder() .setType(GrantResource.TypeEnum.TABLE) .setPrivilege(TablePrivilege.TABLE_WRITE_DATA) .setNamespace(List.of("ns1")) .setTableName("test_table") .build(); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableWriteDataGrant); spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')"); @@ -434,7 +427,5 @@ void testFederatedCatalogWithCredentialVending() { assertThat(localData).hasSize(3); assertThat(localData.get(2).getInt(0)).isEqualTo(3); assertThat(localData.get(2).getString(1)).isEqualTo("Charlie"); - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); } } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 98c9bebde6..de3324699a 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -813,11 +813,14 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential PolarisResolvedPathWrapper resolvedStoragePath = CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier); - if ((baseCatalog instanceof IcebergCatalog - || realmConfig.getConfig( - ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, getResolvedCatalogEntity())) - && resolvedStoragePath != null) { + if (resolvedStoragePath == null) { + LOGGER.debug("Unable to find storage configuration information for table {}", tableIdentifier); + return responseBuilder; + } + if (baseCatalog instanceof IcebergCatalog + || realmConfig.getConfig( + ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { AccessConfig accessConfig = accessConfigProvider.getAccessConfig( callContext, @@ -846,6 +849,7 @@ ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, getResolvedCatalogEntity())) } responseBuilder.addAllConfig(accessConfig.extraProperties()); } + return responseBuilder; } From 6428b9e1c3f92a545bccfd0d956826da14b4971a Mon Sep 17 00:00:00 2001 From: Honah J Date: Thu, 9 Oct 2025 17:47:15 -0500 Subject: [PATCH 4/4] Add change log and a new feature flag --- CHANGELOG.md | 2 + .../core/config/FeatureConfiguration.java | 9 +++ .../iceberg/IcebergCatalogHandler.java | 70 +++++++++++-------- .../service/spark/it/CatalogFederationIT.java | 1 + 4 files changed, 54 insertions(+), 28 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 995f9891b8..257fbedb78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features +- Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature. + ### Changes ### Deprecations diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 5d81c79f1e..1772f47256 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -429,4 +429,13 @@ public static void enforceFeatureEnabledOrThrow( "When true, enables finer grained update table privileges which are passed to the authorizer for update table operations") .defaultValue(true) .buildFeatureConfiguration(); + + public static final FeatureConfiguration ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING = + PolarisConfiguration.builder() + .key("ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING") + .catalogConfig("polaris.config.allow-federated-catalogs-credential-vending") + .description( + "If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.") + .defaultValue(true) + .buildFeatureConfiguration(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index de3324699a..833c0ab118 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -18,7 +18,7 @@ */ package org.apache.polaris.service.catalog.iceberg; -import static org.apache.polaris.core.config.FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING; +import static org.apache.polaris.core.config.FeatureConfiguration.ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING; import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED; import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS; @@ -426,6 +426,12 @@ public void authorizeCreateTableDirect( PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION, TableIdentifier.of(namespace, request.name())); } + + CatalogEntity catalog = getResolvedCatalogEntity(); + if (catalog.isStaticFacade()) { + throw new BadRequestException("Cannot create table on static-facade external catalogs."); + } + checkAllowExternalCatalogCredentialVending(delegationModes); } public LoadTableResponse createTableDirect( @@ -436,10 +442,6 @@ public LoadTableResponse createTableDirect( authorizeCreateTableDirect(namespace, request, delegationModes); - CatalogEntity catalog = getResolvedCatalogEntity(); - if (catalog.isStaticFacade()) { - throw new BadRequestException("Cannot create table on static-facade external catalogs."); - } request.validate(); TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); @@ -550,6 +552,12 @@ private void authorizeCreateTableStaged( PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION, TableIdentifier.of(namespace, request.name())); } + + CatalogEntity catalog = getResolvedCatalogEntity(); + if (catalog.isStaticFacade()) { + throw new BadRequestException("Cannot create table on static-facade external catalogs."); + } + checkAllowExternalCatalogCredentialVending(delegationModes); } public LoadTableResponse createTableStaged( @@ -560,10 +568,6 @@ public LoadTableResponse createTableStaged( authorizeCreateTableStaged(namespace, request, delegationModes); - CatalogEntity catalog = getResolvedCatalogEntity(); - if (catalog.isStaticFacade()) { - throw new BadRequestException("Cannot create table on static-facade external catalogs."); - } TableIdentifier ident = TableIdentifier.of(namespace, request.name()); TableMetadata metadata = stageTableCreateHelper(namespace, request); @@ -728,23 +732,7 @@ private Set authorizeLoadTable( read, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); } - CatalogEntity catalogEntity = getResolvedCatalogEntity(); - - LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType()); - LOGGER.info( - "allow external catalog credential vending: {}", - realmConfig.getConfig( - FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)); - if (catalogEntity - .getCatalogType() - .equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL) - && !realmConfig.getConfig( - FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) { - throw new ForbiddenException( - "Access Delegation is not enabled for this catalog. Please consult applicable " - + "documentation for the catalog config property '%s' to enable this feature", - FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig()); - } + checkAllowExternalCatalogCredentialVending(delegationModes); return actionsRequested; } @@ -814,13 +802,14 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier); if (resolvedStoragePath == null) { - LOGGER.debug("Unable to find storage configuration information for table {}", tableIdentifier); + LOGGER.debug( + "Unable to find storage configuration information for table {}", tableIdentifier); return responseBuilder; } if (baseCatalog instanceof IcebergCatalog || realmConfig.getConfig( - ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { + ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { AccessConfig accessConfig = accessConfigProvider.getAccessConfig( callContext, @@ -1223,6 +1212,31 @@ private EnumSet getUpdateTableAuthorizableOperatio } } + private void checkAllowExternalCatalogCredentialVending( + EnumSet delegationModes) { + + if (delegationModes.isEmpty()) { + return; + } + CatalogEntity catalogEntity = getResolvedCatalogEntity(); + + LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType()); + LOGGER.info( + "allow external catalog credential vending: {}", + realmConfig.getConfig( + FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)); + if (catalogEntity + .getCatalogType() + .equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL) + && !realmConfig.getConfig( + FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) { + throw new ForbiddenException( + "Access Delegation is not enabled for this catalog. Please consult applicable " + + "documentation for the catalog config property '%s' to enable this feature", + FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig()); + } + } + @Override public void close() throws Exception { if (baseCatalog instanceof Closeable closeable) { diff --git a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java index 70a13290c8..60e01f9a08 100644 --- a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java +++ b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java @@ -40,6 +40,7 @@ public Map getConfigOverrides() { .put("polaris.features.\"ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG\"", "true") .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", "false") .put("polaris.features.\"ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING\"", "true") + .put("polaris.features.\"ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING\"", "true") .put("polaris.storage.aws.access-key", CatalogFederationIntegrationTest.MINIO_ACCESS_KEY) .put("polaris.storage.aws.secret-key", CatalogFederationIntegrationTest.MINIO_SECRET_KEY) .build();