diff --git a/CHANGELOG.md b/CHANGELOG.md index 995f9891b8..257fbedb78 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -35,6 +35,8 @@ request adding CHANGELOG notes for breaking (!) changes and possibly other secti ### New Features +- Support credential vending for federated catalogs. `ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING` (default: true) was added to toggle this feature. + ### Changes ### Deprecations diff --git a/integration-tests/build.gradle.kts b/integration-tests/build.gradle.kts index 6c48edae65..7429c53718 100644 --- a/integration-tests/build.gradle.kts +++ b/integration-tests/build.gradle.kts @@ -67,6 +67,7 @@ dependencies { implementation(libs.awaitility) implementation(libs.s3mock.testcontainers) implementation(project(":polaris-runtime-test-common")) + implementation(project(":polaris-minio-testcontainer")) } copiedCodeChecks { diff --git a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java index 42032dc7f5..af9937b3d1 100644 --- a/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java +++ b/integration-tests/src/main/java/org/apache/polaris/service/it/test/CatalogFederationIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.UUID; import org.apache.iceberg.exceptions.ForbiddenException; import org.apache.polaris.core.admin.model.AuthenticationParameters; +import org.apache.polaris.core.admin.model.AwsStorageConfigInfo; import org.apache.polaris.core.admin.model.Catalog; import org.apache.polaris.core.admin.model.CatalogGrant; import org.apache.polaris.core.admin.model.CatalogPrivilege; @@ -34,7 +35,6 @@ import org.apache.polaris.core.admin.model.CatalogRole; import org.apache.polaris.core.admin.model.ConnectionConfigInfo; import org.apache.polaris.core.admin.model.ExternalCatalog; -import org.apache.polaris.core.admin.model.FileStorageConfigInfo; import org.apache.polaris.core.admin.model.GrantResource; import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo; import org.apache.polaris.core.admin.model.NamespaceGrant; @@ -52,6 +52,9 @@ import org.apache.polaris.service.it.env.PolarisClient; import org.apache.polaris.service.it.ext.PolarisIntegrationTestExtension; import org.apache.polaris.service.it.ext.SparkSessionBuilder; +import org.apache.polaris.test.minio.Minio; +import org.apache.polaris.test.minio.MinioAccess; +import org.apache.polaris.test.minio.MinioExtension; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.junit.jupiter.api.AfterAll; @@ -66,9 +69,14 @@ * Integration test for catalog federation functionality. This test verifies that an external * catalog can be created that federates with an internal catalog. */ +@ExtendWith(MinioExtension.class) @ExtendWith(PolarisIntegrationTestExtension.class) public class CatalogFederationIntegrationTest { + public static final String BUCKET_URI_PREFIX = "/minio-test-catalog-federation"; + public static final String MINIO_ACCESS_KEY = "test-ak-123-catalog-federation"; + public static final String MINIO_SECRET_KEY = "test-sk-123-catalog-federation"; + private static PolarisClient client; private static CatalogApi catalogApi; private static ManagementApi managementApi; @@ -78,6 +86,8 @@ public class CatalogFederationIntegrationTest { private static String federatedCatalogName; private static String localCatalogRoleName; private static String federatedCatalogRoleName; + private static URI storageBase; + private static String endpoint; private static final String PRINCIPAL_NAME = "test-catalog-federation-user"; private static final String PRINCIPAL_ROLE_NAME = "test-catalog-federation-user-role"; @@ -93,12 +103,17 @@ public class CatalogFederationIntegrationTest { private PrincipalWithCredentials newUserCredentials; @BeforeAll - static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials credentials) { + static void setup( + PolarisApiEndpoints apiEndpoints, + ClientCredentials credentials, + @Minio(accessKey = MINIO_ACCESS_KEY, secretKey = MINIO_SECRET_KEY) MinioAccess minioAccess) { endpoints = apiEndpoints; client = polarisClient(endpoints); String adminToken = client.obtainToken(credentials); managementApi = client.managementApi(adminToken); catalogApi = client.catalogApi(adminToken); + storageBase = minioAccess.s3BucketUri(BUCKET_URI_PREFIX); + endpoint = minioAccess.s3endpoint(); } @AfterAll @@ -129,12 +144,14 @@ void after() { } private void setupCatalogs() { - baseLocation = URI.create("file:///tmp/warehouse"); + baseLocation = storageBase; newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME); - FileStorageConfigInfo storageConfig = - FileStorageConfigInfo.builder() - .setStorageType(StorageConfigInfo.StorageTypeEnum.FILE) + AwsStorageConfigInfo storageConfig = + AwsStorageConfigInfo.builder() + .setStorageType(StorageConfigInfo.StorageTypeEnum.S3) + .setPathStyleAccess(true) + .setEndpoint(endpoint) .setAllowedLocations(List.of(baseLocation.toString())) .build(); @@ -197,6 +214,14 @@ private void setupCatalogs() { spark = SparkSessionBuilder.buildWithTestDefaults() .withWarehouse(warehouseDir.toUri()) + .withConfig( + "spark.sql.catalog." + localCatalogName + ".header.X-Iceberg-Access-Delegation", + "vended-credentials") + .withConfig( + "spark.sql.catalog." + federatedCatalogName + ".header.X-Iceberg-Access-Delegation", + "vended-credentials") + .withConfig("spark.sql.catalog." + localCatalogName + ".cache-enabled", "false") + .withConfig("spark.sql.catalog." + federatedCatalogName + ".cache-enabled", "false") .addCatalog( localCatalogName, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken) .addCatalog( @@ -296,10 +321,6 @@ void testFederatedCatalogWithNamespaceRBAC() { .sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id") .collectAsList(); assertThat(localNs2Data).hasSize(2); - - // Restore the grant - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, namespaceGrant); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); } @Test @@ -335,9 +356,76 @@ void testFederatedCatalogWithTableRBAC() { .sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id") .collectAsList(); assertThat(localNs2Data).hasSize(2); + } - // Restore the grant - managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant); - managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); + @Test + void testFederatedCatalogWithCredentialVending() { + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant); + + // Case 1: Only have TABLE_READ_PROPERTIES privilege, should not be able to read data + TableGrant tablePropertiesGrant = + TableGrant.builder() + .setType(GrantResource.TypeEnum.TABLE) + .setPrivilege(TablePrivilege.TABLE_READ_PROPERTIES) + .setNamespace(List.of("ns1")) + .setTableName("test_table") + .build(); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant); + spark.sql("USE " + federatedCatalogName); + + // Read table data should fail since TABLE_READ_PROPERTIES does not allow reading data + assertThatThrownBy(() -> spark.sql("SELECT * FROM ns1.test_table ORDER BY id")) + .isInstanceOf(ForbiddenException.class); + + // Case 2: Only have TABLE_READ_DATA privilege, should be able to read data but not write + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tablePropertiesGrant); + TableGrant tableReadDataGrant = + TableGrant.builder() + .setType(GrantResource.TypeEnum.TABLE) + .setPrivilege(TablePrivilege.TABLE_READ_DATA) + .setNamespace(List.of("ns1")) + .setTableName("test_table") + .build(); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant); + + // Verify that the vended credential allows reading the data + List ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList(); + assertThat(ns1Data).hasSize(2); + assertThat(ns1Data.get(0).getInt(0)).isEqualTo(1); + assertThat(ns1Data.get(0).getString(1)).isEqualTo("Alice"); + + // Verify that write is blocked since the vended credential should only have read permission + assertThatThrownBy(() -> spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')")) + .hasMessageContaining( + "software.amazon.awssdk.services.s3.model.S3Exception: Access Denied. (Service: S3, Status Code: 403,"); + + // Case 3: TABLE_WRITE_DATA should + managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableReadDataGrant); + TableGrant tableWriteDataGrant = + TableGrant.builder() + .setType(GrantResource.TypeEnum.TABLE) + .setPrivilege(TablePrivilege.TABLE_WRITE_DATA) + .setNamespace(List.of("ns1")) + .setTableName("test_table") + .build(); + managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableWriteDataGrant); + + spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')"); + + // Verify the write was successful by reading back + List updatedData = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList(); + assertThat(updatedData).hasSize(3); + assertThat(updatedData.get(2).getInt(0)).isEqualTo(3); + assertThat(updatedData.get(2).getString(1)).isEqualTo("Charlie"); + + // Verify the data is also visible from the local catalog (both point to same storage) + spark.sql(String.format("REFRESH TABLE %s.ns1.test_table", localCatalogName)); + List localData = + spark + .sql(String.format("SELECT * FROM %s.ns1.test_table ORDER BY id", localCatalogName)) + .collectAsList(); + assertThat(localData).hasSize(3); + assertThat(localData.get(2).getInt(0)).isEqualTo(3); + assertThat(localData.get(2).getString(1)).isEqualTo("Charlie"); } } diff --git a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java index 5d81c79f1e..1772f47256 100644 --- a/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java +++ b/polaris-core/src/main/java/org/apache/polaris/core/config/FeatureConfiguration.java @@ -429,4 +429,13 @@ public static void enforceFeatureEnabledOrThrow( "When true, enables finer grained update table privileges which are passed to the authorizer for update table operations") .defaultValue(true) .buildFeatureConfiguration(); + + public static final FeatureConfiguration ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING = + PolarisConfiguration.builder() + .key("ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING") + .catalogConfig("polaris.config.allow-federated-catalogs-credential-vending") + .description( + "If set to true (default), allow credential vending for external catalogs. Note this requires ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING to be true first.") + .defaultValue(true) + .buildFeatureConfiguration(); } diff --git a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java index 280c98d469..833c0ab118 100644 --- a/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java +++ b/runtime/service/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.catalog.iceberg; +import static org.apache.polaris.core.config.FeatureConfiguration.ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING; import static org.apache.polaris.core.config.FeatureConfiguration.LIST_PAGINATION_ENABLED; import static org.apache.polaris.service.catalog.AccessDelegationMode.VENDED_CREDENTIALS; @@ -265,6 +266,10 @@ protected void initializeCatalog() { throw new UnsupportedOperationException( "External catalog factory for type '" + connectionType + "' is unavailable."); } + // TODO: if the remote catalog is not RestCatalog, the corresponding table operation will use + // environment to load the table metadata, the env may not contain credentials to access the + // storage. In the future, we could leverage PolarisCredentialManager to inject storage + // credentials for non-rest remote catalog this.baseCatalog = federatedCatalog; } else { LOGGER.atInfo().log("Initializing non-federated catalog"); @@ -421,6 +426,12 @@ public void authorizeCreateTableDirect( PolarisAuthorizableOperation.CREATE_TABLE_DIRECT_WITH_WRITE_DELEGATION, TableIdentifier.of(namespace, request.name())); } + + CatalogEntity catalog = getResolvedCatalogEntity(); + if (catalog.isStaticFacade()) { + throw new BadRequestException("Cannot create table on static-facade external catalogs."); + } + checkAllowExternalCatalogCredentialVending(delegationModes); } public LoadTableResponse createTableDirect( @@ -431,10 +442,6 @@ public LoadTableResponse createTableDirect( authorizeCreateTableDirect(namespace, request, delegationModes); - CatalogEntity catalog = getResolvedCatalogEntity(); - if (catalog.isStaticFacade()) { - throw new BadRequestException("Cannot create table on static-facade external catalogs."); - } request.validate(); TableIdentifier tableIdentifier = TableIdentifier.of(namespace, request.name()); @@ -545,6 +552,12 @@ private void authorizeCreateTableStaged( PolarisAuthorizableOperation.CREATE_TABLE_STAGED_WITH_WRITE_DELEGATION, TableIdentifier.of(namespace, request.name())); } + + CatalogEntity catalog = getResolvedCatalogEntity(); + if (catalog.isStaticFacade()) { + throw new BadRequestException("Cannot create table on static-facade external catalogs."); + } + checkAllowExternalCatalogCredentialVending(delegationModes); } public LoadTableResponse createTableStaged( @@ -555,10 +568,6 @@ public LoadTableResponse createTableStaged( authorizeCreateTableStaged(namespace, request, delegationModes); - CatalogEntity catalog = getResolvedCatalogEntity(); - if (catalog.isStaticFacade()) { - throw new BadRequestException("Cannot create table on static-facade external catalogs."); - } TableIdentifier ident = TableIdentifier.of(namespace, request.name()); TableMetadata metadata = stageTableCreateHelper(namespace, request); @@ -723,23 +732,7 @@ private Set authorizeLoadTable( read, PolarisEntitySubType.ICEBERG_TABLE, tableIdentifier); } - CatalogEntity catalogEntity = getResolvedCatalogEntity(); - - LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType()); - LOGGER.info( - "allow external catalog credential vending: {}", - realmConfig.getConfig( - FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)); - if (catalogEntity - .getCatalogType() - .equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL) - && !realmConfig.getConfig( - FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) { - throw new ForbiddenException( - "Access Delegation is not enabled for this catalog. Please consult applicable " - + "documentation for the catalog config property '%s' to enable this feature", - FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig()); - } + checkAllowExternalCatalogCredentialVending(delegationModes); return actionsRequested; } @@ -808,8 +801,15 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential PolarisResolvedPathWrapper resolvedStoragePath = CatalogUtils.findResolvedStorageEntity(resolutionManifest, tableIdentifier); - if (baseCatalog instanceof IcebergCatalog && resolvedStoragePath != null) { + if (resolvedStoragePath == null) { + LOGGER.debug( + "Unable to find storage configuration information for table {}", tableIdentifier); + return responseBuilder; + } + if (baseCatalog instanceof IcebergCatalog + || realmConfig.getConfig( + ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING, getResolvedCatalogEntity())) { AccessConfig accessConfig = accessConfigProvider.getAccessConfig( callContext, @@ -838,6 +838,7 @@ private LoadTableResponse.Builder buildLoadTableResponseWithDelegationCredential } responseBuilder.addAllConfig(accessConfig.extraProperties()); } + return responseBuilder; } @@ -1211,6 +1212,31 @@ private EnumSet getUpdateTableAuthorizableOperatio } } + private void checkAllowExternalCatalogCredentialVending( + EnumSet delegationModes) { + + if (delegationModes.isEmpty()) { + return; + } + CatalogEntity catalogEntity = getResolvedCatalogEntity(); + + LOGGER.info("Catalog type: {}", catalogEntity.getCatalogType()); + LOGGER.info( + "allow external catalog credential vending: {}", + realmConfig.getConfig( + FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)); + if (catalogEntity + .getCatalogType() + .equals(org.apache.polaris.core.admin.model.Catalog.TypeEnum.EXTERNAL) + && !realmConfig.getConfig( + FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING, catalogEntity)) { + throw new ForbiddenException( + "Access Delegation is not enabled for this catalog. Please consult applicable " + + "documentation for the catalog config property '%s' to enable this feature", + FeatureConfiguration.ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING.catalogConfig()); + } + } + @Override public void close() throws Exception { if (baseCatalog instanceof Closeable closeable) { diff --git a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java index cdd9a2a904..60e01f9a08 100644 --- a/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java +++ b/runtime/spark-tests/src/intTest/java/org/apache/polaris/service/spark/it/CatalogFederationIT.java @@ -18,6 +18,7 @@ */ package org.apache.polaris.service.spark.it; +import com.google.common.collect.ImmutableMap; import io.quarkus.test.junit.QuarkusIntegrationTest; import io.quarkus.test.junit.QuarkusTestProfile; import io.quarkus.test.junit.TestProfile; @@ -31,17 +32,18 @@ public class CatalogFederationIT extends CatalogFederationIntegrationTest { public static class CatalogFederationProfile implements QuarkusTestProfile { @Override public Map getConfigOverrides() { - return Map.of( - "polaris.features.\"ENABLE_CATALOG_FEDERATION\"", - "true", - "polaris.features.\"SUPPORTED_CATALOG_CONNECTION_TYPES\"", - "[\"ICEBERG_REST\"]", - "polaris.features.\"ALLOW_OVERLAPPING_CATALOG_URLS\"", - "true", - "polaris.features.\"ENABLE_SUB_CATALOG_RBAC_FOR_FEDERATED_CATALOGS\"", - "true", - "polaris.features.\"ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG\"", - "true"); + return ImmutableMap.builder() + .put("polaris.features.\"ENABLE_CATALOG_FEDERATION\"", "true") + .put("polaris.features.\"SUPPORTED_CATALOG_CONNECTION_TYPES\"", "[\"ICEBERG_REST\"]") + .put("polaris.features.\"ALLOW_OVERLAPPING_CATALOG_URLS\"", "true") + .put("polaris.features.\"ENABLE_SUB_CATALOG_RBAC_FOR_FEDERATED_CATALOGS\"", "true") + .put("polaris.features.\"ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG\"", "true") + .put("polaris.features.\"SKIP_CREDENTIAL_SUBSCOPING_INDIRECTION\"", "false") + .put("polaris.features.\"ALLOW_EXTERNAL_CATALOG_CREDENTIAL_VENDING\"", "true") + .put("polaris.features.\"ALLOW_FEDERATED_CATALOGS_CREDENTIAL_VENDING\"", "true") + .put("polaris.storage.aws.access-key", CatalogFederationIntegrationTest.MINIO_ACCESS_KEY) + .put("polaris.storage.aws.secret-key", CatalogFederationIntegrationTest.MINIO_SECRET_KEY) + .build(); } } }