Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,13 @@ public void deletePrincipalRole(PrincipalRole role) {
}
}

public void deletePrincipalRole(String roleName) {
try (Response response =
request("v1/principal-roles/{name}", Map.of("name", roleName)).delete()) {
assertThat(response.getStatus()).isEqualTo(NO_CONTENT.getStatusCode());
}
}

public void dropCatalog(String catalogName) {
listCatalogRoles(catalogName).stream()
.filter(cr -> !cr.getName().equals(PolarisEntityConstants.getNameOfCatalogAdminRole()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@

import static org.apache.polaris.service.it.env.PolarisClient.polarisClient;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.net.URI;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.exceptions.ForbiddenException;
import org.apache.polaris.core.admin.model.AuthenticationParameters;
import org.apache.polaris.core.admin.model.Catalog;
import org.apache.polaris.core.admin.model.CatalogGrant;
Expand All @@ -32,11 +35,17 @@
import org.apache.polaris.core.admin.model.ConnectionConfigInfo;
import org.apache.polaris.core.admin.model.ExternalCatalog;
import org.apache.polaris.core.admin.model.FileStorageConfigInfo;
import org.apache.polaris.core.admin.model.GrantResource;
import org.apache.polaris.core.admin.model.IcebergRestConnectionConfigInfo;
import org.apache.polaris.core.admin.model.NamespaceGrant;
import org.apache.polaris.core.admin.model.NamespacePrivilege;
import org.apache.polaris.core.admin.model.OAuthClientCredentialsParameters;
import org.apache.polaris.core.admin.model.PolarisCatalog;
import org.apache.polaris.core.admin.model.PrincipalWithCredentials;
import org.apache.polaris.core.admin.model.StorageConfigInfo;
import org.apache.polaris.core.admin.model.TableGrant;
import org.apache.polaris.core.admin.model.TablePrivilege;
import org.apache.polaris.service.it.env.CatalogApi;
import org.apache.polaris.service.it.env.ClientCredentials;
import org.apache.polaris.service.it.env.ManagementApi;
import org.apache.polaris.service.it.env.PolarisApiEndpoints;
Expand All @@ -61,17 +70,22 @@
public class CatalogFederationIntegrationTest {

private static PolarisClient client;
private static CatalogApi catalogApi;
private static ManagementApi managementApi;
private static PolarisApiEndpoints endpoints;
private static SparkSession spark;
private static String sparkToken;
private static String adminToken;
private static String localCatalogName;
private static String federatedCatalogName;
private static String localCatalogRoleName;
private static String federatedCatalogRoleName;

private static final String PRINCIPAL_NAME = "test-catalog-federation-user";
private static final String LOCAL_CATALOG_NAME = "test_catalog_local";
private static final String EXTERNAL_CATALOG_NAME = "test_catalog_external";
private static final String CATALOG_ROLE_NAME = "catalog_admin";
private static final String PRINCIPAL_ROLE_NAME = "service_admin";
private static final String PRINCIPAL_ROLE_NAME = "test-catalog-federation-user-role";
private static final CatalogGrant defaultCatalogGrant =
CatalogGrant.builder()
.setType(GrantResource.TypeEnum.CATALOG)
.setPrivilege(CatalogPrivilege.CATALOG_MANAGE_CONTENT)
.build();

@TempDir static java.nio.file.Path warehouseDir;

Expand All @@ -82,9 +96,9 @@ public class CatalogFederationIntegrationTest {
static void setup(PolarisApiEndpoints apiEndpoints, ClientCredentials credentials) {
endpoints = apiEndpoints;
client = polarisClient(endpoints);
adminToken = client.obtainToken(credentials);
String adminToken = client.obtainToken(credentials);
managementApi = client.managementApi(adminToken);
sparkToken = client.obtainToken(credentials);
catalogApi = client.catalogApi(adminToken);
}

@AfterAll
Expand All @@ -96,7 +110,8 @@ static void close() throws Exception {

@BeforeEach
void before() {
this.baseLocation = URI.create("file:///tmp/warehouse");
setupCatalogs();
setupExampleNamespacesAndTables();
}

@AfterEach
Expand All @@ -106,14 +121,16 @@ void after() {
SparkSession.clearActiveSession();
spark.close();
}
managementApi.dropCatalog(EXTERNAL_CATALOG_NAME);
managementApi.dropCatalog(LOCAL_CATALOG_NAME);
catalogApi.purge(localCatalogName);
managementApi.dropCatalog(federatedCatalogName);
managementApi.dropCatalog(localCatalogName);
managementApi.deletePrincipalRole(PRINCIPAL_ROLE_NAME);
managementApi.deletePrincipal(PRINCIPAL_NAME);
}

@Test
void testCatalogFederation() {
newUserCredentials = managementApi.createPrincipal(PRINCIPAL_NAME);
private void setupCatalogs() {
baseLocation = URI.create("file:///tmp/warehouse");
newUserCredentials = managementApi.createPrincipalWithRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);

FileStorageConfigInfo storageConfig =
FileStorageConfigInfo.builder()
Expand All @@ -123,26 +140,26 @@ void testCatalogFederation() {

CatalogProperties catalogProperties = new CatalogProperties(baseLocation.toString());

localCatalogName = "test_catalog_local_" + UUID.randomUUID().toString().replace("-", "");
localCatalogRoleName = "test-catalog-role_" + UUID.randomUUID().toString().replace("-", "");
federatedCatalogName = "test_catalog_external_" + UUID.randomUUID().toString().replace("-", "");
federatedCatalogRoleName = "test-catalog-role_" + UUID.randomUUID().toString().replace("-", "");

Catalog localCatalog =
PolarisCatalog.builder()
.setType(Catalog.TypeEnum.INTERNAL)
.setName(LOCAL_CATALOG_NAME)
.setName(localCatalogName)
.setProperties(catalogProperties)
.setStorageConfigInfo(storageConfig)
.build();
managementApi.createCatalog(localCatalog);
managementApi.createCatalogRole(localCatalogName, localCatalogRoleName);

CatalogGrant catalogGrant =
CatalogGrant.builder()
.setType(CatalogGrant.TypeEnum.CATALOG)
.setPrivilege(CatalogPrivilege.TABLE_WRITE_DATA)
.build();
managementApi.addGrant(LOCAL_CATALOG_NAME, CATALOG_ROLE_NAME, catalogGrant);
managementApi.assignPrincipalRole(PRINCIPAL_NAME, PRINCIPAL_ROLE_NAME);
CatalogRole localCatalogAdminRole =
managementApi.getCatalogRole(LOCAL_CATALOG_NAME, CATALOG_ROLE_NAME);
managementApi.addGrant(localCatalogName, localCatalogRoleName, defaultCatalogGrant);
CatalogRole localCatalogRole =
managementApi.getCatalogRole(localCatalogName, localCatalogRoleName);
managementApi.grantCatalogRoleToPrincipalRole(
PRINCIPAL_ROLE_NAME, LOCAL_CATALOG_NAME, localCatalogAdminRole);
PRINCIPAL_ROLE_NAME, localCatalogName, localCatalogRole);

AuthenticationParameters authParams =
OAuthClientCredentialsParameters.builder()
Expand All @@ -156,38 +173,42 @@ void testCatalogFederation() {
IcebergRestConnectionConfigInfo.builder()
.setConnectionType(ConnectionConfigInfo.ConnectionTypeEnum.ICEBERG_REST)
.setUri(endpoints.catalogApiEndpoint().toString())
.setRemoteCatalogName(LOCAL_CATALOG_NAME)
.setRemoteCatalogName(localCatalogName)
.setAuthenticationParameters(authParams)
.build();
ExternalCatalog externalCatalog =
ExternalCatalog.builder()
.setType(Catalog.TypeEnum.EXTERNAL)
.setName(EXTERNAL_CATALOG_NAME)
.setName(federatedCatalogName)
.setConnectionConfigInfo(connectionConfig)
.setProperties(catalogProperties)
.setStorageConfigInfo(storageConfig)
.build();
managementApi.createCatalog(externalCatalog);
managementApi.createCatalogRole(federatedCatalogName, federatedCatalogRoleName);

managementApi.addGrant(EXTERNAL_CATALOG_NAME, CATALOG_ROLE_NAME, catalogGrant);
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
CatalogRole externalCatalogAdminRole =
managementApi.getCatalogRole(EXTERNAL_CATALOG_NAME, CATALOG_ROLE_NAME);
managementApi.getCatalogRole(federatedCatalogName, federatedCatalogRoleName);
managementApi.grantCatalogRoleToPrincipalRole(
PRINCIPAL_ROLE_NAME, EXTERNAL_CATALOG_NAME, externalCatalogAdminRole);
PRINCIPAL_ROLE_NAME, federatedCatalogName, externalCatalogAdminRole);

String sparkToken = client.obtainToken(newUserCredentials);
spark =
SparkSessionBuilder.buildWithTestDefaults()
.withWarehouse(warehouseDir.toUri())
.addCatalog(
LOCAL_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken)
localCatalogName, "org.apache.iceberg.spark.SparkCatalog", endpoints, sparkToken)
.addCatalog(
EXTERNAL_CATALOG_NAME,
federatedCatalogName,
"org.apache.iceberg.spark.SparkCatalog",
endpoints,
sparkToken)
.getOrCreate();
}

spark.sql("USE " + LOCAL_CATALOG_NAME);
private void setupExampleNamespacesAndTables() {
spark.sql("USE " + localCatalogName);
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1");
spark.sql("CREATE TABLE IF NOT EXISTS ns1.test_table (id int, name string)");
spark.sql("INSERT INTO ns1.test_table VALUES (1, 'Alice')");
Expand All @@ -198,42 +219,125 @@ void testCatalogFederation() {
spark.sql("INSERT INTO ns2.test_table VALUES (1, 'Apache Spark')");
spark.sql("INSERT INTO ns2.test_table VALUES (2, 'Apache Iceberg')");

spark.sql("USE " + EXTERNAL_CATALOG_NAME);
spark.sql("CREATE NAMESPACE IF NOT EXISTS ns1.ns1a");
spark.sql("CREATE TABLE IF NOT EXISTS ns1.ns1a.test_table (id int, name string)");
spark.sql("INSERT INTO ns1.ns1a.test_table VALUES (1, 'Alice')");

spark.sql("CREATE TABLE IF NOT EXISTS ns1.ns1a.test_table2 (id int, name string)");
spark.sql("INSERT INTO ns1.ns1a.test_table2 VALUES (1, 'Apache Iceberg')");
}

@Test
void testFederatedCatalogBasicReadWriteOperations() {
spark.sql("USE " + federatedCatalogName);
List<Row> namespaces = spark.sql("SHOW NAMESPACES").collectAsList();
assertThat(namespaces).hasSize(2);

List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
assertThat(ns1Data).hasSize(2);
assertThat(ns1Data.get(0).getInt(0)).isEqualTo(1);
assertThat(ns1Data.get(0).getString(1)).isEqualTo("Alice");
assertThat(ns1Data.get(1).getInt(0)).isEqualTo(2);
assertThat(ns1Data.get(1).getString(1)).isEqualTo("Bob");
List<Row> refNs1Data =
spark
.sql(String.format("SELECT * FROM %s.ns1.test_table ORDER BY id", localCatalogName))
.collectAsList();
assertThat(ns1Data).isEqualTo(refNs1Data);
spark.sql("INSERT INTO ns1.test_table VALUES (3, 'Charlie')");

List<Row> ns2Data = spark.sql("SELECT * FROM ns2.test_table ORDER BY id").collectAsList();
assertThat(ns2Data).hasSize(2);
assertThat(ns2Data.get(0).getInt(0)).isEqualTo(1);
assertThat(ns2Data.get(0).getString(1)).isEqualTo("Apache Spark");
assertThat(ns2Data.get(1).getInt(0)).isEqualTo(2);
assertThat(ns2Data.get(1).getString(1)).isEqualTo("Apache Iceberg");
List<Row> refNs2Data =
spark
.sql(String.format("SELECT * FROM %s.ns2.test_table ORDER BY id", localCatalogName))
.collectAsList();
assertThat(ns2Data).isEqualTo(refNs2Data);
spark.sql("INSERT INTO ns2.test_table VALUES (3, 'Apache Polaris')");

spark.sql("USE " + LOCAL_CATALOG_NAME);
spark.sql("REFRESH TABLE ns1.test_table");
spark.sql("REFRESH TABLE ns2.test_table");
spark.sql(String.format("REFRESH TABLE %s.ns1.test_table", localCatalogName));
spark.sql(String.format("REFRESH TABLE %s.ns2.test_table", localCatalogName));

List<Row> updatedNs1Data =
spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
spark
.sql(String.format("SELECT * FROM %s.ns1.test_table ORDER BY id", localCatalogName))
.collectAsList();
assertThat(updatedNs1Data).hasSize(3);
assertThat(updatedNs1Data.get(2).getInt(0)).isEqualTo(3);
assertThat(updatedNs1Data.get(2).getString(1)).isEqualTo("Charlie");
List<Row> updatedNs2Data =
spark.sql("SELECT * FROM ns2.test_table ORDER BY id").collectAsList();
spark
.sql(String.format("SELECT * FROM %s.ns2.test_table ORDER BY id", localCatalogName))
.collectAsList();
assertThat(updatedNs2Data).hasSize(3);
assertThat(updatedNs2Data.get(2).getInt(0)).isEqualTo(3);
assertThat(updatedNs2Data.get(2).getString(1)).isEqualTo("Apache Polaris");
}

@Test
void testFederatedCatalogWithNamespaceRBAC() {
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
NamespaceGrant namespaceGrant =
NamespaceGrant.builder()
.setType(GrantResource.TypeEnum.NAMESPACE)
.setPrivilege(NamespacePrivilege.TABLE_READ_DATA)
.setNamespace(List.of("ns1"))
.build();
// Grant read to table under namespace ns1 only
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, namespaceGrant);

spark.sql("USE " + federatedCatalogName);
// Read should work for tables under ns1 and ns1.ns1a
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
assertThat(ns1Data).hasSize(2);
List<Row> ns1aData = spark.sql("SELECT * FROM ns1.ns1a.test_table ORDER BY id").collectAsList();
assertThat(ns1aData).hasSize(1);

// Read should fail for tables under ns2
assertThatThrownBy(() -> spark.sql("SELECT * FROM ns2.test_table ORDER BY id").collectAsList())
.isInstanceOf(ForbiddenException.class);

// Read should work for tables in local catalog
List<Row> localNs2Data =
spark
.sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id")
.collectAsList();
assertThat(localNs2Data).hasSize(2);

// Restore the grant
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, namespaceGrant);
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
}

@Test
void testFederatedCatalogWithTableRBAC() {
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
TableGrant tableGrant =
TableGrant.builder()
.setType(GrantResource.TypeEnum.TABLE)
.setPrivilege(TablePrivilege.TABLE_READ_DATA)
.setNamespace(List.of("ns1"))
.setTableName("test_table")
.build();
// Grant read to table under namespace ns1 only
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant);

spark.sql("USE " + federatedCatalogName);
// Read should work for tables under ns1
List<Row> ns1Data = spark.sql("SELECT * FROM ns1.test_table ORDER BY id").collectAsList();
assertThat(ns1Data).hasSize(2);

// Read should fail for tables under ns1.ns1a
assertThatThrownBy(
() -> spark.sql("SELECT * FROM ns1.ns1a.test_table ORDER BY id").collectAsList())
.isInstanceOf(ForbiddenException.class);

// Read should fail for tables under ns2
assertThatThrownBy(() -> spark.sql("SELECT * FROM ns2.test_table ORDER BY id").collectAsList())
.isInstanceOf(ForbiddenException.class);

// Read should work for tables in local catalog
List<Row> localNs2Data =
spark
.sql("SELECT * FROM " + localCatalogName + ".ns2.test_table ORDER BY id")
.collectAsList();
assertThat(localNs2Data).hasSize(2);

spark.sql("DROP TABLE ns1.test_table");
spark.sql("DROP TABLE ns2.test_table");
spark.sql("DROP NAMESPACE ns1");
spark.sql("DROP NAMESPACE ns2");
// Restore the grant
managementApi.revokeGrant(federatedCatalogName, federatedCatalogRoleName, tableGrant);
managementApi.addGrant(federatedCatalogName, federatedCatalogRoleName, defaultCatalogGrant);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,4 +407,16 @@ public static void enforceFeatureEnabledOrThrow(
+ "If set to false, Polaris will disallow setting or changing the above catalog property")
.defaultValue(true)
.buildFeatureConfiguration();

public static final FeatureConfiguration<Boolean>
ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG =
PolarisConfiguration.<Boolean>builder()
.key("ALLOW_DROPPING_NON_EMPTY_PASSTHROUGH_FACADE_CATALOG")
.description(
"If enabled, allow dropping a passthrough-facade catalog even if it contains namespaces or tables. "
+ "passthrough-facade catalogs may contain leftover entities when syncing with source catalog."
+ "In the short term these entities will be ignored, in the long term there will be method/background job to clean them up.")
.catalogConfig("polaris.config.allow-dropping-non-empty-passthrough-facade-catalog")
.defaultValue(false)
.buildFeatureConfiguration();
}
Loading