From 9769db0c0cb0c50b30a907c3d96121d84b68b417 Mon Sep 17 00:00:00 2001 From: caican Date: Tue, 18 Jun 2024 17:07:36 +0800 Subject: [PATCH] [#3891] feat(catalog-lakehouse-paimon): Support schema operations for Paimon Catalog --- .../catalog-lakehouse-paimon/build.gradle.kts | 77 ++++--- .../lakehouse/paimon/PaimonCatalog.java | 12 +- .../paimon/PaimonCatalogBackend.java | 10 + .../paimon/PaimonCatalogOperations.java | 104 +++++++++- .../PaimonCatalogPropertiesMetadata.java | 68 +++++++ .../lakehouse/paimon/PaimonConfig.java | 48 +++++ .../lakehouse/paimon/PaimonSchema.java | 67 ++++++ .../PaimonSchemaPropertiesMetadata.java | 36 ++++ .../lakehouse/paimon/ops/PaimonTableOps.java | 51 +++++ .../lakehouse/paimon/utils/CatalogUtils.java | 48 +++++ .../lakehouse/paimon/TestPaimonConfig.java | 30 +++ .../lakehouse/paimon/TestPaimonSchema.java | 136 +++++++++++++ .../integration/test/CatalogPaimonBaseIT.java | 192 ++++++++++++++++++ .../test/CatalogPaimonFileSystemIT.java | 31 +++ .../paimon/utils/TestCatalogUtils.java | 50 +++++ .../src/test/resources/log4j2.properties | 33 +++ 16 files changed, 940 insertions(+), 53 deletions(-) create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogBackend.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonConfig.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonTableOps.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/CatalogUtils.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonConfig.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java create mode 100644 catalogs/catalog-lakehouse-paimon/src/test/resources/log4j2.properties diff --git a/catalogs/catalog-lakehouse-paimon/build.gradle.kts b/catalogs/catalog-lakehouse-paimon/build.gradle.kts index da3b6301ce..860fa7303b 100644 --- a/catalogs/catalog-lakehouse-paimon/build.gradle.kts +++ b/catalogs/catalog-lakehouse-paimon/build.gradle.kts @@ -20,51 +20,21 @@ dependencies { exclude("com.github.spotbugs") } implementation(libs.hadoop2.hdfs) - implementation(libs.hive2.exec) { - artifact { - classifier = "core" - } - exclude("com.google.code.findbugs", "jsr305") - exclude("com.google.protobuf") - exclude("org.apache.avro") - exclude("org.apache.calcite") - exclude("org.apache.calcite.avatica") - exclude("org.apache.curator") - exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") - exclude("org.apache.logging.log4j") - exclude("org.apache.zookeeper") - exclude("org.eclipse.jetty.aggregate", "jetty-all") - exclude("org.eclipse.jetty.orbit", "javax.servlet") - exclude("org.openjdk.jol") - exclude("org.pentaho") - exclude("org.slf4j") - } - implementation(libs.hive2.metastore) { - exclude("co.cask.tephra") - exclude("com.github.spotbugs") - exclude("com.google.code.findbugs", "jsr305") - exclude("com.tdunning", "json") - exclude("javax.transaction", "transaction-api") - exclude("org.apache.avro", "avro") - exclude("org.apache.hbase") - exclude("org.apache.hadoop", "hadoop-yarn-api") - exclude("org.apache.hadoop", "hadoop-yarn-server-applicationhistoryservice") - exclude("org.apache.hadoop", "hadoop-yarn-server-common") - exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager") - exclude("org.apache.hadoop", "hadoop-yarn-server-web-proxy") - exclude("org.apache.logging.log4j") - exclude("org.apache.parquet", "parquet-hadoop-bundle") - exclude("org.apache.zookeeper") - exclude("org.eclipse.jetty.aggregate", "jetty-all") - exclude("org.eclipse.jetty.orbit", "javax.servlet") - exclude("org.pentaho") // missing dependency - exclude("org.slf4j", "slf4j-log4j12") - exclude("com.zaxxer", "HikariCP") - exclude("com.sun.jersey", "jersey-server") - } + implementation(libs.hadoop2.mapreduce.client.core) annotationProcessor(libs.lombok) compileOnly(libs.lombok) + + testImplementation(project(":clients:client-java")) + testImplementation(project(":integration-test-common", "testArtifacts")) + testImplementation(project(":server")) + testImplementation(project(":server-common")) + testImplementation(libs.junit.jupiter.api) + testImplementation(libs.junit.jupiter.params) + + testImplementation(libs.testcontainers) + + testRuntimeOnly(libs.junit.jupiter.engine) } tasks { @@ -105,6 +75,29 @@ tasks { } } +tasks.test { + val skipUTs = project.hasProperty("skipTests") + if (skipUTs) { + // Only run integration tests + include("**/integration/**") + } + + val skipITs = project.hasProperty("skipITs") + if (skipITs) { + // Exclude integration tests + exclude("**/integration/**") + } else { + dependsOn(tasks.jar) + + doFirst { + environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12") + } + + val init = project.extra.get("initIntegrationTest") as (Test) -> Unit + init(this) + } +} + tasks.getByName("generateMetadataFileForMavenJavaPublication") { dependsOn("runtimeJars") } diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java index 17059bd896..6c74076df1 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalog.java @@ -14,6 +14,12 @@ /** Implementation of {@link Catalog} that represents a Paimon catalog in Gravitino. */ public class PaimonCatalog extends BaseCatalog { + static final PaimonCatalogPropertiesMetadata CATALOG_PROPERTIES_META = + new PaimonCatalogPropertiesMetadata(); + + static final PaimonSchemaPropertiesMetadata SCHEMA_PROPERTIES_META = + new PaimonSchemaPropertiesMetadata(); + /** @return The short name of the catalog. */ @Override public String shortName() { @@ -44,13 +50,11 @@ public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationE @Override public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "The catalog does not support catalog properties metadata"); + return CATALOG_PROPERTIES_META; } @Override public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException { - throw new UnsupportedOperationException( - "The catalog does not support schema properties metadata"); + return SCHEMA_PROPERTIES_META; } } diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogBackend.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogBackend.java new file mode 100644 index 0000000000..8a1f742ced --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogBackend.java @@ -0,0 +1,10 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +/** The type of Paimon catalog backend. */ +public enum PaimonCatalogBackend { + FILESYSTEM +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java index 1e18bd9003..f29dadb8de 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogOperations.java @@ -4,9 +4,13 @@ */ package com.datastrato.gravitino.catalog.lakehouse.paimon; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonSchema; +import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX; + import com.datastrato.gravitino.NameIdentifier; import com.datastrato.gravitino.Namespace; import com.datastrato.gravitino.SchemaChange; +import com.datastrato.gravitino.catalog.lakehouse.paimon.ops.PaimonTableOps; import com.datastrato.gravitino.connector.CatalogInfo; import com.datastrato.gravitino.connector.CatalogOperations; import com.datastrato.gravitino.connector.HasPropertyMetadata; @@ -17,6 +21,7 @@ import com.datastrato.gravitino.exceptions.NonEmptySchemaException; import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; import com.datastrato.gravitino.exceptions.TableAlreadyExistsException; +import com.datastrato.gravitino.meta.AuditInfo; import com.datastrato.gravitino.rel.Column; import com.datastrato.gravitino.rel.TableCatalog; import com.datastrato.gravitino.rel.TableChange; @@ -24,8 +29,15 @@ import com.datastrato.gravitino.rel.expressions.sorts.SortOrder; import com.datastrato.gravitino.rel.expressions.transforms.Transform; import com.datastrato.gravitino.rel.indexes.Index; +import com.datastrato.gravitino.utils.MapUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Maps; import java.io.IOException; +import java.time.Instant; import java.util.Map; +import org.apache.paimon.catalog.Catalog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Implementation of {@link CatalogOperations} that represents operations for interacting with the @@ -33,6 +45,17 @@ */ public class PaimonCatalogOperations implements CatalogOperations, SupportsSchemas, TableCatalog { + public static final Logger LOG = LoggerFactory.getLogger(PaimonCatalogOperations.class); + + @VisibleForTesting public PaimonTableOps paimonTableOps; + + private static final String NO_SUCH_SCHEMA_EXCEPTION = + "Paimon schema (database) %s does not exist."; + private static final String NON_EMPTY_SCHEMA_EXCEPTION = + "Paimon schema (database) %s is not empty. One or more tables exist."; + private static final String SCHEMA_ALREADY_EXISTS_EXCEPTION = + "Paimon schema (database) %s already exists."; + /** * Initializes the Paimon catalog operations with the provided configuration. * @@ -43,7 +66,20 @@ public class PaimonCatalogOperations implements CatalogOperations, SupportsSchem @Override public void initialize( Map conf, CatalogInfo info, HasPropertyMetadata propertiesMetadata) - throws RuntimeException {} + throws RuntimeException { + // Key format like gravitino.bypass.a.b + Map prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX); + + // Hold keys that lie in GRAVITINO_CONFIG_TO_PAIMON + Map gravitinoConfig = + ((PaimonCatalogPropertiesMetadata) propertiesMetadata.catalogPropertiesMetadata()) + .transformProperties(conf); + + Map resultConf = Maps.newHashMap(prefixMap); + resultConf.putAll(gravitinoConfig); + + this.paimonTableOps = new PaimonTableOps(new PaimonConfig(resultConf)); + } /** * Lists the schemas under the specified namespace. @@ -54,7 +90,9 @@ public void initialize( */ @Override public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException { - throw new UnsupportedOperationException(); + return paimonTableOps.listDatabases().stream() + .map(NameIdentifier::of) + .toArray(NameIdentifier[]::new); } /** @@ -71,7 +109,29 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc public PaimonSchema createSchema( NameIdentifier identifier, String comment, Map properties) throws NoSuchCatalogException, SchemaAlreadyExistsException { - throw new UnsupportedOperationException(); + String currentUser = currentUser(); + PaimonSchema createdSchema = + PaimonSchema.builder() + .withName(identifier.name()) + .withComment(comment) + .withProperties(properties) + .withAuditInfo( + AuditInfo.builder().withCreator(currentUser).withCreateTime(Instant.now()).build()) + .build(); + try { + paimonTableOps.createDatabase(createdSchema.toPaimonSchema()); + } catch (Catalog.DatabaseAlreadyExistException e) { + throw new SchemaAlreadyExistsException(e, SCHEMA_ALREADY_EXISTS_EXCEPTION, identifier); + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info( + "Created Paimon schema (database): {}.\nCurrent user: {} \nComment: {}.\nMetadata: {}.", + identifier, + currentUser, + comment, + properties); + return createdSchema; } /** @@ -83,7 +143,14 @@ public PaimonSchema createSchema( */ @Override public PaimonSchema loadSchema(NameIdentifier identifier) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + Map properties; + try { + properties = paimonTableOps.loadDatabase(identifier.name()); + } catch (Catalog.DatabaseNotExistException e) { + throw new NoSuchSchemaException(e, NO_SUCH_SCHEMA_EXCEPTION, identifier); + } + LOG.info("Loaded Paimon schema (database) {}.", identifier); + return fromPaimonSchema(identifier.name(), properties); } /** @@ -98,7 +165,7 @@ public PaimonSchema loadSchema(NameIdentifier identifier) throws NoSuchSchemaExc @Override public PaimonSchema alterSchema(NameIdentifier identifier, SchemaChange... changes) throws NoSuchSchemaException { - throw new UnsupportedOperationException(); + throw new UnsupportedOperationException("Alter schema is not supported in Paimon Catalog."); } /** @@ -112,7 +179,18 @@ public PaimonSchema alterSchema(NameIdentifier identifier, SchemaChange... chang @Override public boolean dropSchema(NameIdentifier identifier, boolean cascade) throws NonEmptySchemaException { - throw new UnsupportedOperationException(); + try { + paimonTableOps.dropDatabase(identifier.name(), cascade); + } catch (Catalog.DatabaseNotExistException e) { + LOG.warn("Paimon schema (database) {} does not exist.", identifier); + return false; + } catch (Catalog.DatabaseNotEmptyException e) { + throw new NonEmptySchemaException(e, NON_EMPTY_SCHEMA_EXCEPTION, identifier); + } catch (Exception e) { + throw new RuntimeException(e); + } + LOG.info("Dropped Paimon schema (database) {}.", identifier); + return true; } /** @@ -206,5 +284,17 @@ public boolean purgeTable(NameIdentifier identifier) throws UnsupportedOperation } @Override - public void close() throws IOException {} + public void close() throws IOException { + if (paimonTableOps != null) { + try { + paimonTableOps.close(); + } catch (Exception e) { + throw new IOException(e.getMessage()); + } + } + } + + private static String currentUser() { + return System.getProperty("user.name"); + } } diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java new file mode 100644 index 0000000000..468590b5f8 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonCatalogPropertiesMetadata.java @@ -0,0 +1,68 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import static com.datastrato.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry; +import static com.datastrato.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry; + +import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata; +import com.datastrato.gravitino.connector.PropertiesMetadata; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link PropertiesMetadata} that represents Paimon catalog properties metadata. + */ +public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata { + + public static final String GRAVITINO_CATALOG_BACKEND = "catalog-backend"; + public static final String PAIMON_METASTORE = "metastore"; + public static final String WAREHOUSE = "warehouse"; + public static final String URI = "uri"; + + private static final Map> PROPERTIES_METADATA; + + public static final Map GRAVITINO_CONFIG_TO_PAIMON = + ImmutableMap.of(GRAVITINO_CATALOG_BACKEND, PAIMON_METASTORE, WAREHOUSE, WAREHOUSE, URI, URI); + + static { + List> propertyEntries = + ImmutableList.of( + enumImmutablePropertyEntry( + GRAVITINO_CATALOG_BACKEND, + "Paimon catalog backend type", + true, + PaimonCatalogBackend.class, + null, + false, + false), + stringRequiredPropertyEntry(WAREHOUSE, "Paimon catalog warehouse config", false, false), + stringRequiredPropertyEntry(URI, "Paimon catalog uri config", false, false)); + HashMap> result = Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES); + result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName)); + PROPERTIES_METADATA = ImmutableMap.copyOf(result); + } + + @Override + protected Map> specificPropertyEntries() { + return PROPERTIES_METADATA; + } + + public Map transformProperties(Map properties) { + Map gravitinoConfig = Maps.newHashMap(); + properties.forEach( + (key, value) -> { + if (GRAVITINO_CONFIG_TO_PAIMON.containsKey(key)) { + gravitinoConfig.put(GRAVITINO_CONFIG_TO_PAIMON.get(key), value); + } + }); + return gravitinoConfig; + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonConfig.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonConfig.java new file mode 100644 index 0000000000..10f6e288f9 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonConfig.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import com.datastrato.gravitino.Config; +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigConstants; +import com.datastrato.gravitino.config.ConfigEntry; +import java.util.Map; +import org.apache.commons.lang3.StringUtils; +import org.apache.paimon.options.CatalogOptions; + +public class PaimonConfig extends Config { + + public static final ConfigEntry CATALOG_BACKEND = + new ConfigBuilder(PaimonCatalogPropertiesMetadata.PAIMON_METASTORE) + .doc(CatalogOptions.METASTORE.description().toString()) + .version(ConfigConstants.VERSION_0_6_0) + .stringConf() + .createWithDefault(CatalogOptions.METASTORE.defaultValue()); + + public static final ConfigEntry CATALOG_WAREHOUSE = + new ConfigBuilder(PaimonCatalogPropertiesMetadata.WAREHOUSE) + .doc(CatalogOptions.WAREHOUSE.description().toString()) + .version(ConfigConstants.VERSION_0_6_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public static final ConfigEntry CATALOG_URI = + new ConfigBuilder(PaimonCatalogPropertiesMetadata.URI) + .doc(CatalogOptions.URI.description().toString()) + .version(ConfigConstants.VERSION_0_6_0) + .stringConf() + .checkValue(StringUtils::isNotBlank, ConfigConstants.NOT_BLANK_ERROR_MSG) + .create(); + + public PaimonConfig() { + super(false); + } + + public PaimonConfig(Map properties) { + super(false); + loadFromMap(properties, k -> true); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java index e73fe079c3..ca5b162383 100644 --- a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchema.java @@ -4,9 +4,14 @@ */ package com.datastrato.gravitino.catalog.lakehouse.paimon; +import static com.datastrato.gravitino.meta.AuditInfo.EMPTY; + import com.datastrato.gravitino.Schema; import com.datastrato.gravitino.connector.BaseSchema; +import java.util.Map; +import java.util.Optional; import lombok.ToString; +import org.apache.paimon.utils.Pair; /** * Implementation of {@link Schema} that represents a Paimon Schema (Database) entity in the Paimon @@ -16,4 +21,66 @@ public class PaimonSchema extends BaseSchema { private PaimonSchema() {} + + /** + * Converts {@link PaimonSchema} instance to inner schema. + * + * @return The converted inner schema. + */ + public Pair> toPaimonSchema() { + return Pair.of(name, properties); + } + + /** + * Creates a new {@link PaimonSchema} instance from inner schema. + * + * @param name The name of inner schema. + * @param properties The properties of inner schema. + * @return A new {@link PaimonSchema} instance. + */ + public static PaimonSchema fromPaimonSchema(String name, Map properties) { + return builder() + .withName(name) + .withComment( + Optional.of(properties) + .map(map -> map.get(PaimonSchemaPropertiesMetadata.COMMENT)) + .orElse(null)) + .withProperties(properties) + .withAuditInfo(EMPTY) + .build(); + } + + /** A builder class for constructing {@link PaimonSchema} instance. */ + public static class Builder extends BaseSchemaBuilder { + + /** Creates a new instance of {@link Builder}. */ + private Builder() {} + + /** + * Internal method to build a {@link PaimonSchema} instance using the provided values. + * + * @return A new {@link PaimonSchema} instance with the configured values. + */ + @Override + protected PaimonSchema internalBuild() { + PaimonSchema paimonSchema = new PaimonSchema(); + paimonSchema.name = name; + paimonSchema.comment = + comment == null + ? (properties == null ? null : properties.get(PaimonSchemaPropertiesMetadata.COMMENT)) + : comment; + paimonSchema.properties = properties; + paimonSchema.auditInfo = auditInfo; + return paimonSchema; + } + } + + /** + * Creates a new instance of {@link Builder}. + * + * @return The new instance. + */ + public static Builder builder() { + return new Builder(); + } } diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java new file mode 100644 index 0000000000..9a228ef6f0 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/PaimonSchemaPropertiesMetadata.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import static com.datastrato.gravitino.connector.PropertyEntry.stringReservedPropertyEntry; + +import com.datastrato.gravitino.connector.BasePropertiesMetadata; +import com.datastrato.gravitino.connector.PropertiesMetadata; +import com.datastrato.gravitino.connector.PropertyEntry; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Maps; +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link PropertiesMetadata} that represents Paimon schema properties metadata. + */ +public class PaimonSchemaPropertiesMetadata extends BasePropertiesMetadata { + + public static final String COMMENT = "comment"; + + private static final Map> PROPERTIES_METADATA; + + static { + List> propertyEntries = + ImmutableList.of(stringReservedPropertyEntry(COMMENT, "Schema comment", true)); + PROPERTIES_METADATA = Maps.uniqueIndex(propertyEntries, PropertyEntry::getName); + } + + @Override + protected Map> specificPropertyEntries() { + return PROPERTIES_METADATA; + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonTableOps.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonTableOps.java new file mode 100644 index 0000000000..da23708acc --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/ops/PaimonTableOps.java @@ -0,0 +1,51 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.ops; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils.loadCatalogBackend; + +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import java.util.List; +import java.util.Map; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Catalog.DatabaseAlreadyExistException; +import org.apache.paimon.catalog.Catalog.DatabaseNotEmptyException; +import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; +import org.apache.paimon.utils.Pair; + +/** Table operation proxy that handles table operations of an underlying Paimon catalog. */ +public class PaimonTableOps implements AutoCloseable { + + protected Catalog catalog; + + public PaimonTableOps(PaimonConfig paimonConfig) { + catalog = loadCatalogBackend(paimonConfig); + } + + @Override + public void close() throws Exception { + if (catalog != null) { + catalog.close(); + } + } + + public List listDatabases() { + return catalog.listDatabases(); + } + + public Map loadDatabase(String databaseName) throws DatabaseNotExistException { + return catalog.loadDatabaseProperties(databaseName); + } + + public void createDatabase(Pair> database) + throws DatabaseAlreadyExistException { + catalog.createDatabase(database.getKey(), false, database.getRight()); + } + + public void dropDatabase(String databaseName, boolean cascade) + throws DatabaseNotExistException, DatabaseNotEmptyException { + catalog.dropDatabase(databaseName, false, cascade); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/CatalogUtils.java b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/CatalogUtils.java new file mode 100644 index 0000000000..a683cc079c --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/main/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/CatalogUtils.java @@ -0,0 +1,48 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon.utils; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_BACKEND; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_URI; +import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig.CATALOG_WAREHOUSE; + +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import com.google.common.base.Preconditions; +import java.util.Locale; +import org.apache.commons.lang.StringUtils; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.CatalogContext; +import org.apache.paimon.catalog.CatalogFactory; +import org.apache.paimon.options.Options; + +/** Utilities of {@link Catalog} to support catalog management. */ +public class CatalogUtils { + + /** + * Loads {@link Catalog} instance with given {@link PaimonConfig}. + * + * @param paimonConfig The Paimon configuration. + * @return The {@link Catalog} instance of catalog backend. + */ + public static Catalog loadCatalogBackend(PaimonConfig paimonConfig) { + String metastore = paimonConfig.get(CATALOG_BACKEND).toLowerCase(Locale.ROOT); + Preconditions.checkArgument( + StringUtils.isNotBlank(metastore), "Paimon Catalog metastore can not be null or empty."); + String warehouse = paimonConfig.get(CATALOG_WAREHOUSE).toLowerCase(Locale.ROOT); + Preconditions.checkArgument( + StringUtils.isNotBlank(warehouse), "Paimon Catalog warehouse can not be null or empty."); + if (!PaimonCatalogBackend.FILESYSTEM + .name() + .equals(paimonConfig.get(CATALOG_BACKEND).toLowerCase(Locale.ROOT))) { + String uri = paimonConfig.get(CATALOG_URI).toLowerCase(Locale.ROOT); + Preconditions.checkArgument( + StringUtils.isNotBlank(uri), "Paimon Catalog uri can not be null or empty."); + } + CatalogContext catalogContext = + CatalogContext.create(Options.fromMap(paimonConfig.getAllConfig())); + return CatalogFactory.createCatalog(catalogContext); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonConfig.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonConfig.java new file mode 100644 index 0000000000..f86fc16ba0 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonConfig.java @@ -0,0 +1,30 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import com.datastrato.gravitino.config.ConfigBuilder; +import com.datastrato.gravitino.config.ConfigEntry; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Tests for {@link PaimonConfig}. */ +public class TestPaimonConfig { + + @Test + public void testLoadPaimonConfig() { + ConfigEntry testConf = new ConfigBuilder("k1").stringConf().create(); + Map properties = ImmutableMap.of(testConf.getKey(), "v1"); + + PaimonConfig paimonConfig = new PaimonConfig(); + paimonConfig.loadFromMap(properties, k -> k.startsWith("gravitino.")); + Assertions.assertNull(paimonConfig.get(testConf)); + + paimonConfig = new PaimonConfig(properties); + Assertions.assertEquals("v1", paimonConfig.get(testConf)); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java new file mode 100644 index 0000000000..2f0d83a125 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/TestPaimonSchema.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ +package com.datastrato.gravitino.catalog.lakehouse.paimon; + +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.Schema; +import com.datastrato.gravitino.SchemaChange; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.meta.AuditInfo; +import com.datastrato.gravitino.meta.CatalogEntity; +import com.google.common.collect.Maps; +import java.time.Instant; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class TestPaimonSchema { + + private static final String META_LAKE_NAME = "metalake"; + + private static final String COMMENT_VALUE = "comment"; + + private static AuditInfo AUDIT_INFO = + AuditInfo.builder().withCreator("testPaimonUser").withCreateTime(Instant.now()).build(); + + @Test + public void testCreateIcebergSchema() { + PaimonCatalog paimonCatalog = initPaimonCatalog("testCreatePaimonSchema"); + PaimonCatalogOperations paimonCatalogOperations = (PaimonCatalogOperations) paimonCatalog.ops(); + + NameIdentifier ident = + NameIdentifier.of("metalake", paimonCatalog.name(), "test_paimon_schema"); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + + Schema schema = paimonCatalogOperations.createSchema(ident, COMMENT_VALUE, properties); + Assertions.assertEquals(ident.name(), schema.name()); + Assertions.assertEquals(COMMENT_VALUE, schema.comment()); + Assertions.assertEquals(properties, schema.properties()); + + Assertions.assertTrue(paimonCatalogOperations.schemaExists(ident)); + + Set names = + Arrays.stream(paimonCatalogOperations.listSchemas(ident.namespace())) + .map(NameIdentifier::name) + .collect(Collectors.toSet()); + Assertions.assertTrue(names.contains(ident.name())); + + // Test schema already exists + Throwable exception = + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> { + paimonCatalogOperations.createSchema(ident, COMMENT_VALUE, properties); + }); + Assertions.assertTrue(exception.getMessage().contains("already exists")); + } + + @Test + public void testListSchema() { + PaimonCatalog paimonCatalog = initPaimonCatalog("testListPaimonSchema"); + PaimonCatalogOperations paimonCatalogOperations = (PaimonCatalogOperations) paimonCatalog.ops(); + NameIdentifier ident = + NameIdentifier.of("metalake", paimonCatalog.name(), "test_paimon_schema"); + paimonCatalogOperations.createSchema(ident, COMMENT_VALUE, Maps.newHashMap()); + + NameIdentifier[] schemas = paimonCatalogOperations.listSchemas(ident.namespace()); + Assertions.assertEquals(1, schemas.length); + Assertions.assertEquals(ident.name(), schemas[0].name()); + Assertions.assertEquals(ident.namespace(), schemas[0].namespace()); + } + + @Test + public void testAlterSchema() { + PaimonCatalog paimonCatalog = initPaimonCatalog("testListPaimonSchema"); + PaimonCatalogOperations paimonCatalogOperations = (PaimonCatalogOperations) paimonCatalog.ops(); + + NameIdentifier ident = + NameIdentifier.of("metalake", paimonCatalog.name(), "test_paimon_schema"); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + + paimonCatalogOperations.createSchema(ident, COMMENT_VALUE, properties); + Assertions.assertTrue(paimonCatalogOperations.schemaExists(ident)); + + Map properties1 = paimonCatalogOperations.loadSchema(ident).properties(); + Assertions.assertEquals("val1", properties1.get("key1")); + Assertions.assertEquals("val2", properties1.get("key2")); + + Assertions.assertThrowsExactly( + UnsupportedOperationException.class, + () -> paimonCatalogOperations.alterSchema(ident, SchemaChange.removeProperty("key1"))); + } + + @Test + public void testDropSchema() { + PaimonCatalog paimonCatalog = initPaimonCatalog("testListPaimonSchema"); + PaimonCatalogOperations paimonCatalogOperations = (PaimonCatalogOperations) paimonCatalog.ops(); + + NameIdentifier ident = + NameIdentifier.of("metalake", paimonCatalog.name(), "test_paimon_schema"); + Map properties = Maps.newHashMap(); + properties.put("key1", "val1"); + properties.put("key2", "val2"); + + paimonCatalogOperations.createSchema(ident, COMMENT_VALUE, properties); + Assertions.assertTrue(paimonCatalogOperations.schemaExists(ident)); + paimonCatalogOperations.dropSchema(ident, false); + Assertions.assertFalse(paimonCatalogOperations.schemaExists(ident)); + + Assertions.assertFalse(paimonCatalogOperations.dropSchema(ident, false)); + } + + private PaimonCatalog initPaimonCatalog(String name) { + CatalogEntity entity = + CatalogEntity.builder() + .withId(1L) + .withName(name) + .withNamespace(Namespace.of(META_LAKE_NAME)) + .withType(PaimonCatalog.Type.RELATIONAL) + .withProvider("lakehouse-paimon") + .withAuditInfo(AUDIT_INFO) + .build(); + + Map conf = Maps.newHashMap(); + return new PaimonCatalog().withCatalogConf(conf).withCatalogEntity(entity); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java new file mode 100644 index 0000000000..25e092ae9b --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonBaseIT.java @@ -0,0 +1,192 @@ +package com.datastrato.gravitino.catalog.lakehouse.paimon.integration.test; + +import com.datastrato.gravitino.Catalog; +import com.datastrato.gravitino.NameIdentifier; +import com.datastrato.gravitino.Namespace; +import com.datastrato.gravitino.Schema; +import com.datastrato.gravitino.SchemaChange; +import com.datastrato.gravitino.SupportsSchemas; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import com.datastrato.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils; +import com.datastrato.gravitino.client.GravitinoMetalake; +import com.datastrato.gravitino.exceptions.NoSuchSchemaException; +import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException; +import com.datastrato.gravitino.integration.test.container.ContainerSuite; +import com.datastrato.gravitino.integration.test.util.AbstractIT; +import com.datastrato.gravitino.integration.test.util.GravitinoITUtils; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Set; +import org.apache.paimon.catalog.Catalog.DatabaseNotExistException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.platform.commons.util.StringUtils; + +public abstract class CatalogPaimonBaseIT extends AbstractIT { + + protected static final ContainerSuite containerSuite = ContainerSuite.getInstance(); + + private static final String provider = "lakehouse-paimon"; + private static final String catalog_comment = "catalog_comment"; + private static final String schema_comment = "schema_comment"; + private String metalakeName = GravitinoITUtils.genRandomName("paimon_it_metalake"); + private String catalogName = GravitinoITUtils.genRandomName("paimon_it_catalog"); + private String schemaName = GravitinoITUtils.genRandomName("paimon_it_schema"); + private GravitinoMetalake metalake; + private Catalog catalog; + private org.apache.paimon.catalog.Catalog paimonCatalog; + + @BeforeAll + public void startup() { + containerSuite.startHiveContainer(); + initPaimonCatalogProperties(); + createMetalake(); + createCatalog(); + createSchema(); + } + + @AfterAll + public void stop() { + clearTableAndSchema(); + metalake.dropCatalog(catalogName); + client.dropMetalake(metalakeName); + } + + @AfterEach + private void resetSchema() { + clearTableAndSchema(); + createSchema(); + } + + protected abstract Map initPaimonCatalogProperties(); + + private void clearTableAndSchema() { + if (catalog.asSchemas().schemaExists(schemaName)) { + NameIdentifier[] nameIdentifiers = + catalog.asTableCatalog().listTables(Namespace.of(metalakeName, catalogName, schemaName)); + for (NameIdentifier nameIdentifier : nameIdentifiers) { + catalog.asTableCatalog().dropTable(nameIdentifier); + } + catalog.asSchemas().dropSchema(schemaName, false); + } + } + + private void createMetalake() { + GravitinoMetalake[] gravitinoMetalakes = client.listMetalakes(); + Assertions.assertEquals(0, gravitinoMetalakes.length); + + GravitinoMetalake createdMetalake = + client.createMetalake(metalakeName, "comment", Collections.emptyMap()); + GravitinoMetalake loadMetalake = client.loadMetalake(metalakeName); + Assertions.assertEquals(createdMetalake, loadMetalake); + + metalake = loadMetalake; + } + + private void createCatalog() { + Map catalogProperties = initPaimonCatalogProperties(); + Catalog createdCatalog = + metalake.createCatalog( + catalogName, Catalog.Type.RELATIONAL, provider, catalog_comment, catalogProperties); + Catalog loadCatalog = metalake.loadCatalog(catalogName); + Assertions.assertEquals(createdCatalog, loadCatalog); + catalog = loadCatalog; + + String type = + catalogProperties + .get(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND) + .toLowerCase(Locale.ROOT); + Preconditions.checkArgument( + StringUtils.isNotBlank(type), "Paimon Catalog backend type can not be null or empty."); + catalogProperties.put(PaimonCatalogPropertiesMetadata.PAIMON_METASTORE, type); + paimonCatalog = CatalogUtils.loadCatalogBackend(new PaimonConfig(catalogProperties)); + } + + private void createSchema() { + NameIdentifier ident = NameIdentifier.of(metalakeName, catalogName, schemaName); + Map prop = Maps.newHashMap(); + prop.put("key1", "val1"); + prop.put("key2", "val2"); + + Schema createdSchema = catalog.asSchemas().createSchema(ident.name(), schema_comment, prop); + Schema loadSchema = catalog.asSchemas().loadSchema(ident.name()); + Assertions.assertEquals(createdSchema.name(), loadSchema.name()); + prop.forEach((key, value) -> Assertions.assertEquals(loadSchema.properties().get(key), value)); + } + + @Test + void testPaimonSchemaOperations() throws DatabaseNotExistException { + SupportsSchemas schemas = catalog.asSchemas(); + + // list schema check. + Set schemaNames = new HashSet<>(Arrays.asList(schemas.listSchemas())); + Assertions.assertTrue(schemaNames.contains(schemaName)); + List paimonDatabaseNames = paimonCatalog.listDatabases(); + Assertions.assertTrue(paimonDatabaseNames.contains(schemaName)); + + // create schema check. + String testSchemaName = GravitinoITUtils.genRandomName("test_schema_1"); + NameIdentifier schemaIdent = NameIdentifier.of(metalakeName, catalogName, testSchemaName); + Map schemaProperties = Maps.newHashMap(); + schemaProperties.put("key1", "val1"); + schemaProperties.put("key2", "val2"); + schemas.createSchema(schemaIdent.name(), schema_comment, schemaProperties); + + schemaNames = new HashSet<>(Arrays.asList(schemas.listSchemas())); + Assertions.assertTrue(schemaNames.contains(testSchemaName)); + paimonDatabaseNames = paimonCatalog.listDatabases(); + Assertions.assertTrue(paimonDatabaseNames.contains(testSchemaName)); + + // load schema check. + Schema schema = schemas.loadSchema(schemaIdent.name()); + schema.properties().forEach((k, v) -> Assertions.assertEquals(schemaProperties.get(k), v)); + + paimonCatalog + .loadDatabaseProperties(schemaIdent.name()) + .forEach((k, v) -> Assertions.assertEquals(schemaProperties.get(k), v)); + + Map emptyMap = Collections.emptyMap(); + Assertions.assertThrows( + SchemaAlreadyExistsException.class, + () -> schemas.createSchema(schemaIdent.name(), schema_comment, emptyMap)); + + // alter schema check. + // unSupport alter schema operation. + Assertions.assertThrowsExactly( + UnsupportedOperationException.class, + () -> schemas.alterSchema(schemaIdent.name(), SchemaChange.setProperty("k1", "v1"))); + + // drop schema check. + schemas.dropSchema(schemaIdent.name(), false); + Assertions.assertThrows( + NoSuchSchemaException.class, () -> schemas.loadSchema(schemaIdent.name())); + Assertions.assertThrows( + DatabaseNotExistException.class, + () -> { + paimonCatalog.loadDatabaseProperties(schemaIdent.name()); + }); + + schemaNames = new HashSet<>(Arrays.asList(schemas.listSchemas())); + Assertions.assertFalse(schemaNames.contains(testSchemaName)); + Assertions.assertFalse(schemas.dropSchema(schemaIdent.name(), false)); + Assertions.assertFalse(schemas.dropSchema("no-exits", false)); + + // list schema check. + schemaNames = new HashSet<>(Arrays.asList(schemas.listSchemas())); + Assertions.assertTrue(schemaNames.contains(schemaName)); + Assertions.assertFalse(schemaNames.contains(testSchemaName)); + paimonDatabaseNames = paimonCatalog.listDatabases(); + Assertions.assertTrue(paimonDatabaseNames.contains(schemaName)); + Assertions.assertFalse(paimonDatabaseNames.contains(testSchemaName)); + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java new file mode 100644 index 0000000000..8d6dc2c840 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/integration/test/CatalogPaimonFileSystemIT.java @@ -0,0 +1,31 @@ +package com.datastrato.gravitino.catalog.lakehouse.paimon.integration.test; + +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalogPropertiesMetadata; +import com.datastrato.gravitino.integration.test.container.HiveContainer; +import com.google.common.collect.Maps; +import java.util.Map; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.TestInstance; + +@Tag("gravitino-docker-it") +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +public class CatalogPaimonFileSystemIT extends CatalogPaimonBaseIT { + + @Override + protected Map initPaimonCatalogProperties() { + + Map catalogProperties = Maps.newHashMap(); + catalogProperties.put("key1", "val1"); + catalogProperties.put("key2", "val2"); + + catalogProperties.put(PaimonCatalogPropertiesMetadata.GRAVITINO_CATALOG_BACKEND, "filesystem"); + catalogProperties.put( + PaimonCatalogPropertiesMetadata.WAREHOUSE, + String.format( + "hdfs://%s:%d/user/hive/warehouse-catalog-paimon/", + containerSuite.getHiveContainer().getContainerIpAddress(), + HiveContainer.HDFS_DEFAULTFS_PORT)); + + return catalogProperties; + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java new file mode 100644 index 0000000000..eef31b3c43 --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/java/com/datastrato/gravitino/catalog/lakehouse/paimon/utils/TestCatalogUtils.java @@ -0,0 +1,50 @@ +/* + * Copyright 2024 Datastrato Pvt Ltd. + * This software is licensed under the Apache License version 2. + */ + +package com.datastrato.gravitino.catalog.lakehouse.paimon.utils; + +import static com.datastrato.gravitino.catalog.lakehouse.paimon.utils.CatalogUtils.loadCatalogBackend; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrowsExactly; + +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonCatalogBackend; +import com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonConfig; +import com.google.common.collect.ImmutableMap; +import java.util.Locale; +import java.util.function.Consumer; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.FileSystemCatalog; +import org.apache.paimon.factories.FactoryException; +import org.junit.jupiter.api.Test; + +/** Tests for {@link CatalogUtils}. */ +public class TestCatalogUtils { + + @Test + void testLoadCatalogBackend() throws Exception { + // Test load FileSystemCatalog for filesystem metastore. + assertCatalog(PaimonCatalogBackend.FILESYSTEM.name(), FileSystemCatalog.class); + // Test load catalog exception for other metastore. + assertThrowsExactly(FactoryException.class, () -> assertCatalog("other", catalog -> {})); + } + + private void assertCatalog(String metastore, Class expected) throws Exception { + assertCatalog( + metastore.toLowerCase(Locale.ROOT), catalog -> assertEquals(expected, catalog.getClass())); + } + + private void assertCatalog(String metastore, Consumer consumer) throws Exception { + try (Catalog catalog = + loadCatalogBackend( + new PaimonConfig( + ImmutableMap.of( + PaimonConfig.CATALOG_BACKEND.getKey(), + metastore, + PaimonConfig.CATALOG_WAREHOUSE.getKey(), + "file:///tmp/paimon_catalog_warehouse")))) { + consumer.accept(catalog); + } + } +} diff --git a/catalogs/catalog-lakehouse-paimon/src/test/resources/log4j2.properties b/catalogs/catalog-lakehouse-paimon/src/test/resources/log4j2.properties new file mode 100644 index 0000000000..67cbe8f91d --- /dev/null +++ b/catalogs/catalog-lakehouse-paimon/src/test/resources/log4j2.properties @@ -0,0 +1,33 @@ +# +# Copyright 2024 Datastrato Pvt Ltd. +# This software is licensed under the Apache License version 2. +# + +# Set to debug or trace if log4j initialization is failing +status = info + +# Name of the configuration +name = ConsoleLogConfig + +# Console appender configuration +appender.console.type = Console +appender.console.name = consoleLogger +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Log files location +property.logPath = ${sys:gravitino.log.path:-catalog-lakehouse-paimon/build/paimon-integration-test.log} + +# File appender configuration +appender.file.type = File +appender.file.name = fileLogger +appender.file.fileName = ${logPath} +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n + +# Root logger level +rootLogger.level = info + +# Root logger referring to console and file appenders +rootLogger.appenderRef.stdout.ref = consoleLogger +rootLogger.appenderRef.file.ref = fileLogger