Skip to content

Commit

Permalink
[datastrato#3891] feat(catalog-lakehouse-paimon): Support schema oper…
Browse files Browse the repository at this point in the history
…ations for Paimon Catalog
  • Loading branch information
caican committed Jun 19, 2024
1 parent 282fc0d commit 9769db0
Show file tree
Hide file tree
Showing 16 changed files with 940 additions and 53 deletions.
77 changes: 35 additions & 42 deletions catalogs/catalog-lakehouse-paimon/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,21 @@ dependencies {
exclude("com.github.spotbugs")
}
implementation(libs.hadoop2.hdfs)
implementation(libs.hive2.exec) {
artifact {
classifier = "core"
}
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.protobuf")
exclude("org.apache.avro")
exclude("org.apache.calcite")
exclude("org.apache.calcite.avatica")
exclude("org.apache.curator")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.logging.log4j")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.openjdk.jol")
exclude("org.pentaho")
exclude("org.slf4j")
}
implementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.spotbugs")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.tdunning", "json")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro", "avro")
exclude("org.apache.hbase")
exclude("org.apache.hadoop", "hadoop-yarn-api")
exclude("org.apache.hadoop", "hadoop-yarn-server-applicationhistoryservice")
exclude("org.apache.hadoop", "hadoop-yarn-server-common")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.hadoop", "hadoop-yarn-server-web-proxy")
exclude("org.apache.logging.log4j")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.pentaho") // missing dependency
exclude("org.slf4j", "slf4j-log4j12")
exclude("com.zaxxer", "HikariCP")
exclude("com.sun.jersey", "jersey-server")
}
implementation(libs.hadoop2.mapreduce.client.core)

annotationProcessor(libs.lombok)
compileOnly(libs.lombok)

testImplementation(project(":clients:client-java"))
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)

testImplementation(libs.testcontainers)

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks {
Expand Down Expand Up @@ -105,6 +75,29 @@ tasks {
}
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val skipITs = project.hasProperty("skipITs")
if (skipITs) {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

doFirst {
environment("GRAVITINO_CI_HIVE_DOCKER_IMAGE", "datastrato/gravitino-ci-hive:0.1.12")
}

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}

tasks.getByName("generateMetadataFileForMavenJavaPublication") {
dependsOn("runtimeJars")
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
/** Implementation of {@link Catalog} that represents a Paimon catalog in Gravitino. */
public class PaimonCatalog extends BaseCatalog<PaimonCatalog> {

static final PaimonCatalogPropertiesMetadata CATALOG_PROPERTIES_META =
new PaimonCatalogPropertiesMetadata();

static final PaimonSchemaPropertiesMetadata SCHEMA_PROPERTIES_META =
new PaimonSchemaPropertiesMetadata();

/** @return The short name of the catalog. */
@Override
public String shortName() {
Expand Down Expand Up @@ -44,13 +50,11 @@ public PropertiesMetadata tablePropertiesMetadata() throws UnsupportedOperationE

@Override
public PropertiesMetadata catalogPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"The catalog does not support catalog properties metadata");
return CATALOG_PROPERTIES_META;
}

@Override
public PropertiesMetadata schemaPropertiesMetadata() throws UnsupportedOperationException {
throw new UnsupportedOperationException(
"The catalog does not support schema properties metadata");
return SCHEMA_PROPERTIES_META;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

/** The type of Paimon catalog backend. */
public enum PaimonCatalogBackend {
FILESYSTEM
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import static com.datastrato.gravitino.catalog.lakehouse.paimon.PaimonSchema.fromPaimonSchema;
import static com.datastrato.gravitino.connector.BaseCatalog.CATALOG_BYPASS_PREFIX;

import com.datastrato.gravitino.NameIdentifier;
import com.datastrato.gravitino.Namespace;
import com.datastrato.gravitino.SchemaChange;
import com.datastrato.gravitino.catalog.lakehouse.paimon.ops.PaimonTableOps;
import com.datastrato.gravitino.connector.CatalogInfo;
import com.datastrato.gravitino.connector.CatalogOperations;
import com.datastrato.gravitino.connector.HasPropertyMetadata;
Expand All @@ -17,22 +21,41 @@
import com.datastrato.gravitino.exceptions.NonEmptySchemaException;
import com.datastrato.gravitino.exceptions.SchemaAlreadyExistsException;
import com.datastrato.gravitino.exceptions.TableAlreadyExistsException;
import com.datastrato.gravitino.meta.AuditInfo;
import com.datastrato.gravitino.rel.Column;
import com.datastrato.gravitino.rel.TableCatalog;
import com.datastrato.gravitino.rel.TableChange;
import com.datastrato.gravitino.rel.expressions.distributions.Distribution;
import com.datastrato.gravitino.rel.expressions.sorts.SortOrder;
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.utils.MapUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import org.apache.paimon.catalog.Catalog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implementation of {@link CatalogOperations} that represents operations for interacting with the
* Paimon catalog in Gravitino.
*/
public class PaimonCatalogOperations implements CatalogOperations, SupportsSchemas, TableCatalog {

public static final Logger LOG = LoggerFactory.getLogger(PaimonCatalogOperations.class);

@VisibleForTesting public PaimonTableOps paimonTableOps;

private static final String NO_SUCH_SCHEMA_EXCEPTION =
"Paimon schema (database) %s does not exist.";
private static final String NON_EMPTY_SCHEMA_EXCEPTION =
"Paimon schema (database) %s is not empty. One or more tables exist.";
private static final String SCHEMA_ALREADY_EXISTS_EXCEPTION =
"Paimon schema (database) %s already exists.";

/**
* Initializes the Paimon catalog operations with the provided configuration.
*
Expand All @@ -43,7 +66,20 @@ public class PaimonCatalogOperations implements CatalogOperations, SupportsSchem
@Override
public void initialize(
Map<String, String> conf, CatalogInfo info, HasPropertyMetadata propertiesMetadata)
throws RuntimeException {}
throws RuntimeException {
// Key format like gravitino.bypass.a.b
Map<String, String> prefixMap = MapUtils.getPrefixMap(conf, CATALOG_BYPASS_PREFIX);

// Hold keys that lie in GRAVITINO_CONFIG_TO_PAIMON
Map<String, String> gravitinoConfig =
((PaimonCatalogPropertiesMetadata) propertiesMetadata.catalogPropertiesMetadata())
.transformProperties(conf);

Map<String, String> resultConf = Maps.newHashMap(prefixMap);
resultConf.putAll(gravitinoConfig);

this.paimonTableOps = new PaimonTableOps(new PaimonConfig(resultConf));
}

/**
* Lists the schemas under the specified namespace.
Expand All @@ -54,7 +90,9 @@ public void initialize(
*/
@Override
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
throw new UnsupportedOperationException();
return paimonTableOps.listDatabases().stream()
.map(NameIdentifier::of)
.toArray(NameIdentifier[]::new);
}

/**
Expand All @@ -71,7 +109,29 @@ public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogExc
public PaimonSchema createSchema(
NameIdentifier identifier, String comment, Map<String, String> properties)
throws NoSuchCatalogException, SchemaAlreadyExistsException {
throw new UnsupportedOperationException();
String currentUser = currentUser();
PaimonSchema createdSchema =
PaimonSchema.builder()
.withName(identifier.name())
.withComment(comment)
.withProperties(properties)
.withAuditInfo(
AuditInfo.builder().withCreator(currentUser).withCreateTime(Instant.now()).build())
.build();
try {
paimonTableOps.createDatabase(createdSchema.toPaimonSchema());
} catch (Catalog.DatabaseAlreadyExistException e) {
throw new SchemaAlreadyExistsException(e, SCHEMA_ALREADY_EXISTS_EXCEPTION, identifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info(
"Created Paimon schema (database): {}.\nCurrent user: {} \nComment: {}.\nMetadata: {}.",
identifier,
currentUser,
comment,
properties);
return createdSchema;
}

/**
Expand All @@ -83,7 +143,14 @@ public PaimonSchema createSchema(
*/
@Override
public PaimonSchema loadSchema(NameIdentifier identifier) throws NoSuchSchemaException {
throw new UnsupportedOperationException();
Map<String, String> properties;
try {
properties = paimonTableOps.loadDatabase(identifier.name());
} catch (Catalog.DatabaseNotExistException e) {
throw new NoSuchSchemaException(e, NO_SUCH_SCHEMA_EXCEPTION, identifier);
}
LOG.info("Loaded Paimon schema (database) {}.", identifier);
return fromPaimonSchema(identifier.name(), properties);
}

/**
Expand All @@ -98,7 +165,7 @@ public PaimonSchema loadSchema(NameIdentifier identifier) throws NoSuchSchemaExc
@Override
public PaimonSchema alterSchema(NameIdentifier identifier, SchemaChange... changes)
throws NoSuchSchemaException {
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("Alter schema is not supported in Paimon Catalog.");
}

/**
Expand All @@ -112,7 +179,18 @@ public PaimonSchema alterSchema(NameIdentifier identifier, SchemaChange... chang
@Override
public boolean dropSchema(NameIdentifier identifier, boolean cascade)
throws NonEmptySchemaException {
throw new UnsupportedOperationException();
try {
paimonTableOps.dropDatabase(identifier.name(), cascade);
} catch (Catalog.DatabaseNotExistException e) {
LOG.warn("Paimon schema (database) {} does not exist.", identifier);
return false;
} catch (Catalog.DatabaseNotEmptyException e) {
throw new NonEmptySchemaException(e, NON_EMPTY_SCHEMA_EXCEPTION, identifier);
} catch (Exception e) {
throw new RuntimeException(e);
}
LOG.info("Dropped Paimon schema (database) {}.", identifier);
return true;
}

/**
Expand Down Expand Up @@ -206,5 +284,17 @@ public boolean purgeTable(NameIdentifier identifier) throws UnsupportedOperation
}

@Override
public void close() throws IOException {}
public void close() throws IOException {
if (paimonTableOps != null) {
try {
paimonTableOps.close();
} catch (Exception e) {
throw new IOException(e.getMessage());
}
}
}

private static String currentUser() {
return System.getProperty("user.name");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.lakehouse.paimon;

import static com.datastrato.gravitino.connector.PropertyEntry.enumImmutablePropertyEntry;
import static com.datastrato.gravitino.connector.PropertyEntry.stringRequiredPropertyEntry;

import com.datastrato.gravitino.connector.BaseCatalogPropertiesMetadata;
import com.datastrato.gravitino.connector.PropertiesMetadata;
import com.datastrato.gravitino.connector.PropertyEntry;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Implementation of {@link PropertiesMetadata} that represents Paimon catalog properties metadata.
*/
public class PaimonCatalogPropertiesMetadata extends BaseCatalogPropertiesMetadata {

public static final String GRAVITINO_CATALOG_BACKEND = "catalog-backend";
public static final String PAIMON_METASTORE = "metastore";
public static final String WAREHOUSE = "warehouse";
public static final String URI = "uri";

private static final Map<String, PropertyEntry<?>> PROPERTIES_METADATA;

public static final Map<String, String> GRAVITINO_CONFIG_TO_PAIMON =
ImmutableMap.of(GRAVITINO_CATALOG_BACKEND, PAIMON_METASTORE, WAREHOUSE, WAREHOUSE, URI, URI);

static {
List<PropertyEntry<?>> propertyEntries =
ImmutableList.of(
enumImmutablePropertyEntry(
GRAVITINO_CATALOG_BACKEND,
"Paimon catalog backend type",
true,
PaimonCatalogBackend.class,
null,
false,
false),
stringRequiredPropertyEntry(WAREHOUSE, "Paimon catalog warehouse config", false, false),
stringRequiredPropertyEntry(URI, "Paimon catalog uri config", false, false));
HashMap<String, PropertyEntry<?>> result = Maps.newHashMap(BASIC_CATALOG_PROPERTY_ENTRIES);
result.putAll(Maps.uniqueIndex(propertyEntries, PropertyEntry::getName));
PROPERTIES_METADATA = ImmutableMap.copyOf(result);
}

@Override
protected Map<String, PropertyEntry<?>> specificPropertyEntries() {
return PROPERTIES_METADATA;
}

public Map<String, String> transformProperties(Map<String, String> properties) {
Map<String, String> gravitinoConfig = Maps.newHashMap();
properties.forEach(
(key, value) -> {
if (GRAVITINO_CONFIG_TO_PAIMON.containsKey(key)) {
gravitinoConfig.put(GRAVITINO_CONFIG_TO_PAIMON.get(key), value);
}
});
return gravitinoConfig;
}
}
Loading

0 comments on commit 9769db0

Please sign in to comment.