Skip to content

Commit

Permalink
[AMORO-2276]: UnifiiedCatalog for Spark Engine (apache#2269)
Browse files Browse the repository at this point in the history
* Add UnifiedSparkCatalog under spark common module
* Extract MixedSparkCatalogBase and MixedSparkSessionCatalogBase to spark common module
* Refactor spark unit test framework to adapt unifed catalog tests and mixed format tests.
  • Loading branch information
baiyangtx authored and ShawHee committed Dec 29, 2023
1 parent 0c6284a commit 5ffac89
Show file tree
Hide file tree
Showing 138 changed files with 2,742 additions and 4,382 deletions.
24 changes: 23 additions & 1 deletion core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,29 @@ public class CommonUnifiedCatalog implements UnifiedCatalog {
private Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newHashMap();
private final Map<String, String> properties = Maps.newHashMap();

private TableMetaStore tableMetaStore;

public CommonUnifiedCatalog(
Supplier<CatalogMeta> catalogMetaSupplier, Map<String, String> properties) {
CatalogMeta catalogMeta = catalogMetaSupplier.get();
CatalogUtil.mergeCatalogProperties(catalogMeta, properties);
this.meta = catalogMeta;
this.tableMetaStore = CatalogUtil.buildMetaStore(catalogMeta);
this.properties.putAll(properties);
this.metaSupplier = catalogMetaSupplier;
initializeFormatCatalogs();
}

@Override
public String metastoreType() {
return meta.getCatalogType();
}

@Override
public TableMetaStore authenticationContext() {
return this.tableMetaStore;
}

@Override
public List<String> listDatabases() {
return findFirstFormatCatalog(TableFormat.values()).listDatabases();
Expand Down Expand Up @@ -163,19 +176,28 @@ public synchronized void refresh() {
if (newMeta.equals(this.meta)) {
return;
}
this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta);
this.meta = newMeta;
this.initializeFormatCatalogs();
}

@Override
public Map<String, String> properties() {
return this.meta.getCatalogProperties();
}

protected void initializeFormatCatalogs() {
ServiceLoader<FormatCatalogFactory> loader = ServiceLoader.load(FormatCatalogFactory.class);
Set<TableFormat> formats = CatalogUtil.tableFormats(this.meta);
TableMetaStore store = CatalogUtil.buildMetaStore(this.meta);
Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newConcurrentMap();
for (FormatCatalogFactory factory : loader) {
if (formats.contains(factory.format())) {
Map<String, String> catalogProperties =
factory.convertCatalogProperties(
name(), meta.getCatalogType(), meta.getCatalogProperties());
FormatCatalog catalog =
factory.create(name(), meta.getCatalogType(), meta.getCatalogProperties(), store);
factory.create(name(), meta.getCatalogType(), catalogProperties, store);
formatCatalogs.put(factory.format(), catalog);
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/com/netease/arctic/FormatCatalogFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,16 @@ FormatCatalog create(

/** format of this catalog factory */
TableFormat format();

/**
* Convert UnifiedCatalog Properties to corresponding format Properties and use them to initialize
* the corresponding Catalog.
*
* @param catalogName register in AMS
* @param metastoreType metastore type
* @param unifiedCatalogProperties properties of unified catalog.
* @return properties of the target format.
*/
Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties);
}
12 changes: 12 additions & 0 deletions core/src/main/java/com/netease/arctic/TableIDWithFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,16 @@ public TableIdentifier getIdentifier() {
public TableFormat getTableFormat() {
return tableFormat;
}

public String catalog() {
return this.identifier.getCatalog();
}

public String database() {
return this.identifier.getDatabase();
}

public String table() {
return this.identifier.getTableName();
}
}
24 changes: 22 additions & 2 deletions core/src/main/java/com/netease/arctic/UnifiedCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,29 @@

package com.netease.arctic;

import com.netease.arctic.table.TableMetaStore;

import java.util.List;
import java.util.Map;

/** UnifiedCatalog is a catalog that can visit tables with all types of formats. */
public interface UnifiedCatalog extends AmoroCatalog {

/** name of this catalog */
/** Name of this catalog */
String name();

/** Metastore type */
String metastoreType();

/**
* Get authentication context of this catalog.
*
* @return table metastore.
*/
TableMetaStore authenticationContext();

/**
* list tables with format
* List tables with format
*
* @param database given database
* @return identifier and format list
Expand All @@ -36,4 +49,11 @@ public interface UnifiedCatalog extends AmoroCatalog {

/** Refresh catalog meta */
void refresh();

/**
* Get catalog properties
*
* @return catalog properties
*/
Map<String, String> properties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public FormatCatalog create(
public TableFormat format() {
return TableFormat.ICEBERG;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
return com.netease.arctic.utils.CatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, metastoreType, unifiedCatalogProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.netease.arctic.FormatCatalog;
import com.netease.arctic.FormatCatalogFactory;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.ams.api.properties.CatalogMetaProperties;
import com.netease.arctic.catalog.ArcticCatalog;
import com.netease.arctic.catalog.CatalogLoader;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;

import java.util.Map;

Expand All @@ -43,4 +45,14 @@ public FormatCatalog create(
public TableFormat format() {
return TableFormat.MIXED_ICEBERG;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
Map<String, String> properties =
CatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, metastoreType, unifiedCatalogProperties);
properties.put(CatalogMetaProperties.TABLE_FORMATS, format().name());
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,32 @@ public PaimonCatalog create(
url ->
catalogProperties.put(
HiveCatalogOptions.HIVE_CONF_DIR.key(), new File(url.getPath()).getParent()));
return new PaimonCatalog(
paimonCatalog(metastoreType, catalogProperties, metaStore.getConfiguration()), name);
Catalog catalog = paimonCatalog(catalogProperties, metaStore.getConfiguration());
return new PaimonCatalog(catalog, name);
}

public static Catalog paimonCatalog(
String metastoreType, Map<String, String> properties, Configuration configuration) {
public static Catalog paimonCatalog(Map<String, String> properties, Configuration configuration) {
Options options = Options.fromMap(properties);
CatalogContext catalogContext = CatalogContext.create(options, configuration);
return CatalogFactory.createCatalog(catalogContext);
}

@Override
public TableFormat format() {
return TableFormat.PAIMON;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
Options options = Options.fromMap(unifiedCatalogProperties);
String type;
if (CatalogMetaProperties.CATALOG_TYPE_HADOOP.equalsIgnoreCase(metastoreType)) {
type = FileSystemCatalogFactory.IDENTIFIER;
} else {
type = metastoreType;
}
options.set(CatalogOptions.METASTORE, type);

CatalogContext catalogContext = CatalogContext.create(options, configuration);
return CatalogFactory.createCatalog(catalogContext);
}

@Override
public TableFormat format() {
return TableFormat.PAIMON;
return options.toMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public void initHiveConf(Configuration hiveConf) {
public AmoroCatalog amoroCatalog() {
IcebergCatalogFactory icebergCatalogFactory = new IcebergCatalogFactory();
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
return icebergCatalogFactory.create(
catalogName, getMetastoreType(), catalogProperties, metaStore);
Map<String, String> properties =
icebergCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), catalogProperties);
return icebergCatalogFactory.create(catalogName, getMetastoreType(), properties, metaStore);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ protected TableFormat format() {

@Override
public AmoroCatalog amoroCatalog() {
MixedIcebergCatalogFactory icebergCatalogFactory = new MixedIcebergCatalogFactory();
MixedIcebergCatalogFactory mixedIcebergCatalogFactory = new MixedIcebergCatalogFactory();
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
return icebergCatalogFactory.create(
catalogName, getMetastoreType(), catalogProperties, metaStore);
Map<String, String> properties =
mixedIcebergCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), catalogProperties);
return mixedIcebergCatalogFactory.create(
catalogName, getMetastoreType(), properties, metaStore);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.formats.paimon.PaimonCatalogFactory;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
Expand Down Expand Up @@ -62,13 +63,19 @@ protected TableFormat format() {
public AmoroCatalog amoroCatalog() {
PaimonCatalogFactory paimonCatalogFactory = new PaimonCatalogFactory();
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
Map<String, String> paimonCatalogProperties =
paimonCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), getCatalogMeta().getCatalogProperties());
return paimonCatalogFactory.create(
catalogName, getMetastoreType(), catalogProperties, metaStore);
catalogName, getMetastoreType(), paimonCatalogProperties, metaStore);
}

@Override
public Catalog originalCatalog() {
return PaimonCatalogFactory.paimonCatalog(getMetastoreType(), catalogProperties, null);
PaimonCatalogFactory factory = new PaimonCatalogFactory();
Map<String, String> properties =
factory.convertCatalogProperties(catalogName, getMetastoreType(), catalogProperties);
return PaimonCatalogFactory.paimonCatalog(properties, new Configuration());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

import java.util.Map;
Expand All @@ -45,16 +44,10 @@ public class HiveCatalogTestHelper implements CatalogTestHelper {
private final Configuration hiveConf;

public static CatalogTestHelper build(Configuration hiveConf, TableFormat... formats) {
Preconditions.checkArgument(formats.length == 1, "Cannot support multiple table formats");
return new HiveCatalogTestHelper(formats[0], hiveConf);
}

public HiveCatalogTestHelper(TableFormat tableFormat, Configuration hiveConf) {
Preconditions.checkArgument(
tableFormat.equals(TableFormat.ICEBERG)
|| tableFormat.equals(TableFormat.MIXED_HIVE)
|| tableFormat.equals(TableFormat.MIXED_ICEBERG),
"Cannot support table format:" + tableFormat);
this.tableFormat = tableFormat;
this.hiveConf = hiveConf;
}
Expand All @@ -72,9 +65,8 @@ public TableFormat tableFormat() {
@Override
public CatalogMeta buildCatalogMeta(String baseDir) {
Map<String, String> properties = Maps.newHashMap();
if (TableFormat.MIXED_ICEBERG == tableFormat) {
properties.put(CatalogProperties.URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
}
properties.put(CatalogProperties.URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
properties.put(CatalogProperties.WAREHOUSE_LOCATION, baseDir);
return CatalogTestHelpers.buildHiveCatalogMeta(
TEST_CATALOG_NAME, properties, hiveConf, tableFormat);
}
Expand Down
39 changes: 0 additions & 39 deletions mixed/spark/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,43 +215,4 @@

</dependencies>


<profiles>
<profile>
<id>deploy-maven-central</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin-version}</version>
<executions>
<execution>
<id>empty-javadoc-jar</id>
<goals>
<goal>jar</goal>
</goals>
<phase>package</phase>
<configuration>
<classifier>javadoc</classifier>
<classesDirectory>${basedir}/javadoc</classesDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/it/**.java</exclude>
</excludes>
<groups>unit</groups>
<excludedGroups>integration</excludedGroups>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Loading

0 comments on commit 5ffac89

Please sign in to comment.