Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AMORO-2276]: UnifiiedCatalog for Spark Engine #2269

Merged
merged 36 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 34 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
9136dc7
Squash Merged from branch:external-unified-catalog
baiyangtx Nov 7, 2023
9669d64
Merge branch 'master' into spark-unified-catalog
baiyangtx Nov 9, 2023
138e907
spark catalog
baiyangtx Nov 9, 2023
59a6395
Merge branch 'master' into spark-unified-catalog
baiyangtx Nov 15, 2023
1565323
spark test base refactor to adapt UnifiedCatalog test and mixed catal…
baiyangtx Nov 15, 2023
65ed1d6
run tests/ some passed
baiyangtx Nov 15, 2023
acd8bea
fix unit tests in truncate sql.
baiyangtx Nov 20, 2023
7e174dc
unified catalog refactors
baiyangtx Nov 20, 2023
99fdddf
support mixed-iceberg
baiyangtx Nov 21, 2023
a90a403
paimon catalog loaded.
baiyangtx Nov 22, 2023
5984e0d
support unified catalog in spark
baiyangtx Nov 23, 2023
b49a85c
refactor mixed spark catalog. extract common base.
baiyangtx Nov 23, 2023
6f49f01
UnifiedCatalog test.
baiyangtx Nov 27, 2023
84f3c03
remove useless codes
baiyangtx Nov 27, 2023
36a5651
Merge branch 'master' into spark-unified-catalog
baiyangtx Nov 27, 2023
5114a3b
merge from master
baiyangtx Nov 27, 2023
7ae8088
fix unit test cases.
baiyangtx Nov 27, 2023
9d0974e
fix complie error
baiyangtx Nov 27, 2023
0b120db
fix unit tests
baiyangtx Nov 28, 2023
5af7f23
unified catalog and test from spark engine.
baiyangtx Nov 28, 2023
0002e38
Merge branch 'master' into spark-unified-catalog
baiyangtx Nov 28, 2023
190af26
unified session catalog and test case
baiyangtx Nov 28, 2023
80e103b
support sub table for unified session catalog.
baiyangtx Nov 28, 2023
1d0bfad
spotless:apply
baiyangtx Nov 30, 2023
17c731b
remove useless annotation
baiyangtx Nov 30, 2023
04fb603
support paimon in session catalog
baiyangtx Nov 30, 2023
bf40a59
Merge branch 'master' into spark-unified-catalog
baiyangtx Nov 30, 2023
c07bdef
merged from master && mixed module.
baiyangtx Dec 1, 2023
d83e778
Merge branch 'master' into spark-unified-catalog
baiyangtx Dec 1, 2023
919a7a1
fix code reviewer comments.
baiyangtx Dec 7, 2023
2fffedb
Merge branch 'master' into spark-unified-catalog
baiyangtx Dec 7, 2023
05f0465
add catalog() to TableIdWithFormat
baiyangtx Dec 7, 2023
4d24cad
Add docs to class
baiyangtx Dec 7, 2023
dd6fc3b
spotless
baiyangtx Dec 7, 2023
153cbeb
fix review comments
baiyangtx Dec 12, 2023
2f18c81
fix review comments
baiyangtx Dec 12, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 23 additions & 1 deletion core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,29 @@
private Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newHashMap();
private final Map<String, String> properties = Maps.newHashMap();

private TableMetaStore tableMetaStore;

public CommonUnifiedCatalog(
Supplier<CatalogMeta> catalogMetaSupplier, Map<String, String> properties) {
CatalogMeta catalogMeta = catalogMetaSupplier.get();
CatalogUtil.mergeCatalogProperties(catalogMeta, properties);
this.meta = catalogMeta;
this.tableMetaStore = CatalogUtil.buildMetaStore(catalogMeta);
this.properties.putAll(properties);
this.metaSupplier = catalogMetaSupplier;
initializeFormatCatalogs();
}

@Override
public String metastoreType() {
return meta.getCatalogType();

Check warning on line 59 in core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java#L59

Added line #L59 was not covered by tests
}

@Override
public TableMetaStore authenticationContext() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I try to complete this #2344 Flink Unified Catalog(Iceberg format table), I find out that this API is aslo needed.

return this.tableMetaStore;

Check warning on line 64 in core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java#L64

Added line #L64 was not covered by tests
}

@Override
public List<String> listDatabases() {
return findFirstFormatCatalog(TableFormat.values()).listDatabases();
Expand Down Expand Up @@ -163,19 +176,28 @@
if (newMeta.equals(this.meta)) {
return;
}
this.tableMetaStore = CatalogUtil.buildMetaStore(newMeta);

Check warning on line 179 in core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java#L179

Added line #L179 was not covered by tests
this.meta = newMeta;
this.initializeFormatCatalogs();
}

@Override
public Map<String, String> properties() {
return this.meta.getCatalogProperties();

Check warning on line 186 in core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/CommonUnifiedCatalog.java#L186

Added line #L186 was not covered by tests
}

protected void initializeFormatCatalogs() {
ServiceLoader<FormatCatalogFactory> loader = ServiceLoader.load(FormatCatalogFactory.class);
Set<TableFormat> formats = CatalogUtil.tableFormats(this.meta);
TableMetaStore store = CatalogUtil.buildMetaStore(this.meta);
Map<TableFormat, FormatCatalog> formatCatalogs = Maps.newConcurrentMap();
for (FormatCatalogFactory factory : loader) {
if (formats.contains(factory.format())) {
Map<String, String> catalogProperties =
factory.convertCatalogProperties(
name(), meta.getCatalogType(), meta.getCatalogProperties());
FormatCatalog catalog =
factory.create(name(), meta.getCatalogType(), meta.getCatalogProperties(), store);
factory.create(name(), meta.getCatalogType(), catalogProperties, store);
formatCatalogs.put(factory.format(), catalog);
}
}
Expand Down
12 changes: 12 additions & 0 deletions core/src/main/java/com/netease/arctic/FormatCatalogFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,16 @@ FormatCatalog create(

/** format of this catalog factory */
TableFormat format();

/**
* Convert UnifiedCatalog Properties to corresponding format Properties and use them to initialize
* the corresponding Catalog.
*
* @param catalogName register in AMS
* @param metastoreType metastore type
* @param unifiedCatalogProperties properties of unified catalog.
* @return properties of the target format.
*/
Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties);
}
12 changes: 12 additions & 0 deletions core/src/main/java/com/netease/arctic/TableIDWithFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,16 @@
public TableFormat getTableFormat() {
return tableFormat;
}

wangtaohz marked this conversation as resolved.
Show resolved Hide resolved
public String catalog() {
return this.identifier.getCatalog();

Check warning on line 47 in core/src/main/java/com/netease/arctic/TableIDWithFormat.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/TableIDWithFormat.java#L47

Added line #L47 was not covered by tests
}

public String database() {
return this.identifier.getDatabase();

Check warning on line 51 in core/src/main/java/com/netease/arctic/TableIDWithFormat.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/TableIDWithFormat.java#L51

Added line #L51 was not covered by tests
}

public String table() {
return this.identifier.getTableName();

Check warning on line 55 in core/src/main/java/com/netease/arctic/TableIDWithFormat.java

View check run for this annotation

Codecov / codecov/patch

core/src/main/java/com/netease/arctic/TableIDWithFormat.java#L55

Added line #L55 was not covered by tests
}
}
24 changes: 22 additions & 2 deletions core/src/main/java/com/netease/arctic/UnifiedCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,29 @@

package com.netease.arctic;

import com.netease.arctic.table.TableMetaStore;

import java.util.List;
import java.util.Map;

/** UnifiedCatalog is a catalog that can visit tables with all types of formats. */
public interface UnifiedCatalog extends AmoroCatalog {

/** name of this catalog */
/** Name of this catalog */
String name();

/** Metastore type */
String metastoreType();

/**
* Get authentication context of this catalog.
*
* @return table metastore.
*/
TableMetaStore authenticationContext();

/**
* list tables with format
* List tables with format
*
* @param database given database
* @return identifier and format list
Expand All @@ -36,4 +49,11 @@ public interface UnifiedCatalog extends AmoroCatalog {

/** Refresh catalog meta */
void refresh();

/**
* Get catalog properties
*
* @return catalog properties
*/
Map<String, String> properties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,11 @@ public FormatCatalog create(
public TableFormat format() {
return TableFormat.ICEBERG;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
return com.netease.arctic.utils.CatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, metastoreType, unifiedCatalogProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@
import com.netease.arctic.FormatCatalog;
import com.netease.arctic.FormatCatalogFactory;
import com.netease.arctic.ams.api.TableFormat;
import com.netease.arctic.ams.api.properties.CatalogMetaProperties;
import com.netease.arctic.catalog.ArcticCatalog;
import com.netease.arctic.catalog.CatalogLoader;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;

import java.util.Map;

Expand All @@ -43,4 +45,14 @@ public FormatCatalog create(
public TableFormat format() {
return TableFormat.MIXED_ICEBERG;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
Map<String, String> properties =
CatalogUtil.withIcebergCatalogInitializeProperties(
catalogName, metastoreType, unifiedCatalogProperties);
properties.put(CatalogMetaProperties.TABLE_FORMATS, format().name());
return properties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,32 @@ public PaimonCatalog create(
url ->
catalogProperties.put(
HiveCatalogOptions.HIVE_CONF_DIR.key(), new File(url.getPath()).getParent()));
return new PaimonCatalog(
paimonCatalog(metastoreType, catalogProperties, metaStore.getConfiguration()), name);
Catalog catalog = paimonCatalog(catalogProperties, metaStore.getConfiguration());
return new PaimonCatalog(catalog, name);
}

public static Catalog paimonCatalog(
String metastoreType, Map<String, String> properties, Configuration configuration) {
public static Catalog paimonCatalog(Map<String, String> properties, Configuration configuration) {
Options options = Options.fromMap(properties);
CatalogContext catalogContext = CatalogContext.create(options, configuration);
return CatalogFactory.createCatalog(catalogContext);
}

@Override
public TableFormat format() {
return TableFormat.PAIMON;
}

@Override
public Map<String, String> convertCatalogProperties(
String catalogName, String metastoreType, Map<String, String> unifiedCatalogProperties) {
Options options = Options.fromMap(unifiedCatalogProperties);
String type;
if (CatalogMetaProperties.CATALOG_TYPE_HADOOP.equalsIgnoreCase(metastoreType)) {
type = FileSystemCatalogFactory.IDENTIFIER;
} else {
type = metastoreType;
}
options.set(CatalogOptions.METASTORE, type);

CatalogContext catalogContext = CatalogContext.create(options, configuration);
return CatalogFactory.createCatalog(catalogContext);
}

@Override
public TableFormat format() {
return TableFormat.PAIMON;
return options.toMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ public void initHiveConf(Configuration hiveConf) {
public AmoroCatalog amoroCatalog() {
IcebergCatalogFactory icebergCatalogFactory = new IcebergCatalogFactory();
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
return icebergCatalogFactory.create(
catalogName, getMetastoreType(), catalogProperties, metaStore);
Map<String, String> properties =
icebergCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), catalogProperties);
return icebergCatalogFactory.create(catalogName, getMetastoreType(), properties, metaStore);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ protected TableFormat format() {

@Override
public AmoroCatalog amoroCatalog() {
MixedIcebergCatalogFactory icebergCatalogFactory = new MixedIcebergCatalogFactory();
MixedIcebergCatalogFactory mixedIcebergCatalogFactory = new MixedIcebergCatalogFactory();
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
return icebergCatalogFactory.create(
catalogName, getMetastoreType(), catalogProperties, metaStore);
Map<String, String> properties =
mixedIcebergCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), catalogProperties);
return mixedIcebergCatalogFactory.create(
catalogName, getMetastoreType(), properties, metaStore);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.netease.arctic.formats.paimon.PaimonCatalogFactory;
import com.netease.arctic.table.TableMetaStore;
import com.netease.arctic.utils.CatalogUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.CatalogOptions;
Expand Down Expand Up @@ -62,13 +63,19 @@ protected TableFormat format() {
public AmoroCatalog amoroCatalog() {
PaimonCatalogFactory paimonCatalogFactory = new PaimonCatalogFactory();
TableMetaStore metaStore = CatalogUtil.buildMetaStore(getCatalogMeta());
Map<String, String> paimonCatalogProperties =
paimonCatalogFactory.convertCatalogProperties(
catalogName, getMetastoreType(), getCatalogMeta().getCatalogProperties());
return paimonCatalogFactory.create(
catalogName, getMetastoreType(), catalogProperties, metaStore);
catalogName, getMetastoreType(), paimonCatalogProperties, metaStore);
}

@Override
public Catalog originalCatalog() {
return PaimonCatalogFactory.paimonCatalog(getMetastoreType(), catalogProperties, null);
PaimonCatalogFactory factory = new PaimonCatalogFactory();
Map<String, String> properties =
factory.convertCatalogProperties(catalogName, getMetastoreType(), catalogProperties);
return PaimonCatalogFactory.paimonCatalog(properties, new Configuration());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;

import java.util.Map;
Expand All @@ -45,16 +44,10 @@ public class HiveCatalogTestHelper implements CatalogTestHelper {
private final Configuration hiveConf;

public static CatalogTestHelper build(Configuration hiveConf, TableFormat... formats) {
Preconditions.checkArgument(formats.length == 1, "Cannot support multiple table formats");
return new HiveCatalogTestHelper(formats[0], hiveConf);
}

public HiveCatalogTestHelper(TableFormat tableFormat, Configuration hiveConf) {
Preconditions.checkArgument(
tableFormat.equals(TableFormat.ICEBERG)
|| tableFormat.equals(TableFormat.MIXED_HIVE)
|| tableFormat.equals(TableFormat.MIXED_ICEBERG),
"Cannot support table format:" + tableFormat);
this.tableFormat = tableFormat;
this.hiveConf = hiveConf;
}
Expand All @@ -72,9 +65,8 @@ public TableFormat tableFormat() {
@Override
public CatalogMeta buildCatalogMeta(String baseDir) {
Map<String, String> properties = Maps.newHashMap();
if (TableFormat.MIXED_ICEBERG == tableFormat) {
properties.put(CatalogProperties.URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
}
properties.put(CatalogProperties.URI, hiveConf.get(HiveConf.ConfVars.METASTOREURIS.varname));
properties.put(CatalogProperties.WAREHOUSE_LOCATION, baseDir);
return CatalogTestHelpers.buildHiveCatalogMeta(
TEST_CATALOG_NAME, properties, hiveConf, tableFormat);
}
Expand Down
49 changes: 11 additions & 38 deletions mixed/spark/common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -215,43 +215,16 @@

</dependencies>

<build>
zhoujinsong marked this conversation as resolved.
Show resolved Hide resolved
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</build>

<profiles>
<profile>
<id>deploy-maven-central</id>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin-version}</version>
<executions>
<execution>
<id>empty-javadoc-jar</id>
<goals>
<goal>jar</goal>
</goals>
<phase>package</phase>
<configuration>
<classifier>javadoc</classifier>
<classesDirectory>${basedir}/javadoc</classesDirectory>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/it/**.java</exclude>
</excludes>
<groups>unit</groups>
<excludedGroups>integration</excludedGroups>
</configuration>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
Loading