Skip to content

Initial integration for hudi tables within Polaris #1862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions plugins/spark/v3.5/integration/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,51 @@ dependencies {
exclude("org.apache.logging.log4j", "log4j-core")
exclude("org.slf4j", "jul-to-slf4j")
}

// Add spark-hive for Hudi integration - provides HiveExternalCatalog that Hudi needs
testRuntimeOnly("org.apache.spark:spark-hive_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies to match spark-sql exclusions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the spark_sql dependency above?

Copy link
Author

@rahil-c rahil-c Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need this dependency as a test dependency, as when hudi insert is invoked it looks for this HiveExternalCatalog, when removing this dependency as suspected this fails for class not found exception.

org/apache/spark/sql/hive/HiveExternalCatalog$
java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveExternalCatalog$
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHiveSyncConfig(ProvidesHoodieConfig.scala:496)
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHiveSyncConfig$(ProvidesHoodieConfig.scala:467)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.buildHiveSyncConfig(HoodieSpark35Analysis.scala:39)
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHoodieConfig(ProvidesHoodieConfig.scala:63)
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHoodieConfig$(ProvidesHoodieConfig.scala:55)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.buildHoodieConfig(HoodieSpark35Analysis.scala:39)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.org$apache$spark$sql$hudi$analysis$HoodieSpark35DataSourceV2ToV1Fallback$$convertToV1(HoodieSpark35Analysis.scala:62)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.apply(HoodieSpark35Analysis.scala:50)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.apply(HoodieSpark35Analysis.scala:39)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)

Have also explained further here in related comment: #1862 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we only need it as a test-runtime?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer for the suggestion, I can also try setting this as test runtime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, what i mean is does spark-hive contains spark-sql? if yes, can we remove

testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
    // exclude log4j dependencies. Explicit dependencies for the log4j libraries are
    // enforced below to ensure the version compatibility
    exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
    exclude("org.apache.logging.log4j", "log4j-1.2-api")
    exclude("org.apache.logging.log4j", "log4j-core")
    exclude("org.slf4j", "jul-to-slf4j")
  }

exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
exclude("org.apache.logging.log4j", "log4j-1.2-api")
exclude("org.apache.logging.log4j", "log4j-core")
exclude("org.slf4j", "jul-to-slf4j")
// exclude old slf4j 1.x to log4j 2.x bridge that conflicts with slf4j 2.x bridge
exclude("org.apache.logging.log4j", "log4j-slf4j-impl")
}
// enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility
// of spark-sql dependency
testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3")

testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0") {
// exclude log4j dependencies to match spark-sql exclusions and prevent version conflicts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the bundle already contain all spark dependency needed? if that is the case, we shouldn't need the spark_hive dependency anymore, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this and the above dependency can both be test runtime? Or are we using Delta and Hudi specific code in the test code itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as comment above: #1862 (comment)

When running the following command
jar tf hudi-spark3.5-bundle_2.12-0.15.0.jar | grep -i "org/apache/spark/sql/hive"

I do not see the HiveExternalCatalog provided by the hudi-spark-bundle.

org/apache/spark/sql/hive/
org/apache/spark/sql/hive/HiveClientUtils.class
org/apache/spark/sql/hive/HiveClientUtils$.class

Based on my understanding of the hudi-spark-bundle it aims to provide the core hudi dependencies needed for spark-hudi integration to work but expects certain spark dependencies to be on the engine class path.
For example when checking the oss spark engine jars folder by running these commands, spark will provide these:

cd ~
wget https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
mkdir spark-3.5
tar xzvf spark-3.5.5-bin-hadoop3.tgz  -C spark-3.5 --strip-components=1
cd spark-3.5
 rahil@mac  ~/spark-3.5/jars  ls -l | grep hive
-rw-r--r--@ 1 rahil  staff    183633 Feb 23 12:45 hive-beeline-2.3.9.jar
-rw-r--r--@ 1 rahil  staff     44704 Feb 23 12:45 hive-cli-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    436169 Feb 23 12:45 hive-common-2.3.9.jar
-rw-r--r--@ 1 rahil  staff  10840949 Feb 23 12:45 hive-exec-2.3.9-core.jar
-rw-r--r--@ 1 rahil  staff    116364 Feb 23 12:45 hive-jdbc-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    326585 Feb 23 12:45 hive-llap-common-2.3.9.jar
-rw-r--r--@ 1 rahil  staff   8195966 Feb 23 12:45 hive-metastore-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    916630 Feb 23 12:45 hive-serde-2.3.9.jar
-rw-r--r--@ 1 rahil  staff   1679366 Feb 23 12:45 hive-service-rpc-3.1.3.jar
-rw-r--r--@ 1 rahil  staff     53902 Feb 23 12:45 hive-shims-0.23-2.3.9.jar
-rw-r--r--@ 1 rahil  staff      8786 Feb 23 12:45 hive-shims-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    120293 Feb 23 12:45 hive-shims-common-2.3.9.jar
-rw-r--r--@ 1 rahil  staff     12923 Feb 23 12:45 hive-shims-scheduler-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    258346 Feb 23 12:45 hive-storage-api-2.8.1.jar
-rw-r--r--@ 1 rahil  staff    572320 Feb 23 12:45 spark-hive-thriftserver_2.12-3.5.5.jar
-rw-r--r--@ 1 rahil  staff    725252 Feb 23 12:45 spark-hive_2.12-3.5.5.jar
 rahil@mac  ~/spark-3.5/jars 

Which allows the hudi to not hit the class not found exception(atleast when testing via my local spark). Therefore I believe we will need to explicitly provide this in test env in order for spark hudi integration test to work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer For delta it seems we have declared the test dep with testImplementation https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/integration/build.gradle.kts#L68
So i had done the same for hudi.

I am using one area in the SparkCatalogTest using an assertion of a specific hudi class to see if the table returned was in fact a hudi table.

} else if (PolarisCatalogUtils.useHudi(format)) {
        assertThat(loadedTable).isInstanceOf(HoodieInternalV2Table.class);
      }

So I think for now had left this as testImplementation. If this is a concern though let me know and I can try to see an alternative here.

exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
exclude("org.apache.logging.log4j", "log4j-1.2-api")
exclude("org.apache.logging.log4j", "log4j-core")
exclude("org.slf4j", "jul-to-slf4j")
exclude("org.slf4j", "slf4j-log4j12")
exclude("org.slf4j", "slf4j-reload4j")
exclude("ch.qos.reload4j", "reload4j")
exclude("log4j", "log4j")
// exclude old slf4j 1.x to log4j 2.x bridge that conflicts with slf4j 2.x bridge
exclude("org.apache.logging.log4j", "log4j-slf4j-impl")
}

// The hudi-spark-bundle includes most Hive libraries but excludes hive-exec to keep size
// manageable
// This matches what Spark 3.5 distribution provides (hive-exec-2.3.9-core.jar)
testImplementation("org.apache.hive:hive-exec:2.3.9:core") {
// Exclude conflicting dependencies to use Spark's versions
exclude("org.apache.hadoop", "*")
exclude("org.apache.commons", "*")
exclude("org.slf4j", "*")
exclude("log4j", "*")
exclude("org.apache.logging.log4j", "*")
exclude("org.pentaho", "*")
exclude("org.apache.calcite", "*")
exclude("org.apache.tez", "*")
}

testImplementation(platform(libs.jackson.bom))
testImplementation("com.fasterxml.jackson.jakarta.rs:jackson-jakarta-rs-json-provider")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.polaris.spark.quarkus.it;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import io.quarkus.test.junit.QuarkusIntegrationTest;
import java.io.File;
import java.nio.file.Path;
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.polaris.service.it.env.IntegrationTestsHelper;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

@QuarkusIntegrationTest
public class SparkHudiIT extends SparkIntegrationBase {

@Override
protected SparkSession.Builder withCatalog(SparkSession.Builder builder, String catalogName) {
return builder
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config(
"spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config(
String.format("spark.sql.catalog.%s", catalogName),
"org.apache.polaris.spark.SparkCatalog")
.config("spark.sql.warehouse.dir", warehouseDir.toString())
.config(String.format("spark.sql.catalog.%s.type", catalogName), "rest")
.config(
String.format("spark.sql.catalog.%s.uri", catalogName),
endpoints.catalogApiEndpoint().toString())
.config(String.format("spark.sql.catalog.%s.warehouse", catalogName), catalogName)
.config(String.format("spark.sql.catalog.%s.scope", catalogName), "PRINCIPAL_ROLE:ALL")
.config(
String.format("spark.sql.catalog.%s.header.realm", catalogName), endpoints.realmId())
.config(String.format("spark.sql.catalog.%s.token", catalogName), sparkToken)
.config(String.format("spark.sql.catalog.%s.s3.access-key-id", catalogName), "fakekey")
.config(
String.format("spark.sql.catalog.%s.s3.secret-access-key", catalogName), "fakesecret")
.config(String.format("spark.sql.catalog.%s.s3.region", catalogName), "us-west-2")
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
// for intial integration test have disabled for now, to revisit enabling in future
.config("hoodie.metadata.enable", "false");
}

private String defaultNs;
private String tableRootDir;

private String getTableLocation(String tableName) {
return String.format("%s/%s", tableRootDir, tableName);
}

private String getTableNameWithRandomSuffix() {
return generateName("huditb");
}

@BeforeEach
public void createDefaultResources(@TempDir Path tempDir) {
spark.sparkContext().setLogLevel("INFO");
defaultNs = generateName("hudi");
// create a default namespace
sql("CREATE NAMESPACE %s", defaultNs);
sql("USE NAMESPACE %s", defaultNs);
tableRootDir =
IntegrationTestsHelper.getTemporaryDirectory(tempDir).resolve(defaultNs).getPath();
}

@AfterEach
public void cleanupHudiData() {
// clean up hudi data
File dirToDelete = new File(tableRootDir);
FileUtils.deleteQuietly(dirToDelete);
sql("DROP NAMESPACE %s", defaultNs);
}

@Test
public void testBasicTableOperations() {
// create a regular hudi table
String huditb1 = "huditb1";
sql(
"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
huditb1, getTableLocation(huditb1));
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY id DESC", huditb1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Some of these queries could be cleaned up

assertThat(results.size()).isEqualTo(1);
assertThat(results.get(0)).isEqualTo(new Object[] {2, "bob"});

// create a hudi table with partition
String huditb2 = "huditb2";
sql(
"CREATE TABLE %s (name String, age INT, country STRING) USING HUDI PARTITIONED BY (country) LOCATION '%s'",
huditb2, getTableLocation(huditb2));
sql(
"INSERT INTO %s VALUES ('anna', 10, 'US'), ('james', 32, 'US'), ('yan', 16, 'CHINA')",
huditb2);
results = sql("SELECT name, country FROM %s ORDER BY age", huditb2);
assertThat(results.size()).isEqualTo(3);
assertThat(results.get(0)).isEqualTo(new Object[] {"anna", "US"});
assertThat(results.get(1)).isEqualTo(new Object[] {"yan", "CHINA"});
assertThat(results.get(2)).isEqualTo(new Object[] {"james", "US"});

// verify the partition dir is created
List<String> subDirs = listDirs(getTableLocation(huditb2));
assertThat(subDirs).contains(".hoodie", "country=CHINA", "country=US");

// test listTables
List<Object[]> tables = sql("SHOW TABLES");
assertThat(tables.size()).isEqualTo(2);
assertThat(tables)
.contains(
new Object[] {defaultNs, huditb1, false}, new Object[] {defaultNs, huditb2, false});

sql("DROP TABLE %s", huditb1);
sql("DROP TABLE %s", huditb2);
tables = sql("SHOW TABLES");
assertThat(tables.size()).isEqualTo(0);
}

@Test
public void testUnsupportedAlterTableOperations() {
String huditb = getTableNameWithRandomSuffix();
sql(
"CREATE TABLE %s (name String, age INT, country STRING) USING HUDI PARTITIONED BY (country) LOCATION '%s'",
huditb, getTableLocation(huditb));

// ALTER TABLE ... RENAME TO ... fails
assertThatThrownBy(() -> sql("ALTER TABLE %s RENAME TO new_hudi", huditb))
.isInstanceOf(UnsupportedOperationException.class);

// ALTER TABLE ... SET LOCATION ... fails
assertThatThrownBy(() -> sql("ALTER TABLE %s SET LOCATION '/tmp/new/path'", huditb))
.isInstanceOf(UnsupportedOperationException.class);

sql("DROP TABLE %s", huditb);
}

@Test
public void testUnsupportedTableCreateOperations() {
String huditb = getTableNameWithRandomSuffix();
// create hudi table with no location
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb))
Comment on lines +162 to +163
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this fail?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this should not fail as some default location will be used instead

.isInstanceOf(UnsupportedOperationException.class);

// CTAS fails
assertThatThrownBy(
() ->
sql(
"CREATE TABLE %s USING HUDI LOCATION '%s' AS SELECT 1 AS id",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CTAS as long as we write it properly, it should not fail.

huditb, getTableLocation(huditb)))
.isInstanceOf(IllegalArgumentException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ out the configuration if you would like ot see all spark debug log during the ru
</encoder>
</appender>

<!-- Hudi-specific loggers for test -->
<logger name="org.apache.hudi" level="INFO"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hudi output a log of logs? too much logging was causing problems to the test efficiency, so we only turned on the error log here. How long does the integration test take now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point I can remove this INFO then, in order to avoid the extra logging. I am curious though if we should keep the test root logger at WARN instead of its current ERROR?

Copy link
Contributor

@gh-yzou gh-yzou Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can if the WARN level log output doesn't significantly slow the test, but based on my previous testing with Spark, even with WARN, there seems quite a lot of Spark logs, that was part of the reason why we set the level to Error. If the test fails, people can always tune the log level to check more logs if needed, so i wouldn't worry too much about the restricted log level.


<root level="ERROR">
<appender-ref ref="CONSOLE"/>
</root>
Expand Down
2 changes: 2 additions & 0 deletions plugins/spark/v3.5/spark/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ dependencies {
// TODO: extract a polaris-rest module as a thin layer for
// client to depends on.
implementation(project(":polaris-core")) { isTransitive = false }
compileOnly("org.apache.hudi:hudi-spark3.2plus-common:0.15.0")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe Ethan know this part better, starting hudi 1.x we deprecate some old spark version support, not sure about the status of spark 3.2

testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0")

implementation(
"org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersion}:${icebergVersion}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
try {
GenericTable genericTable =
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
return PolarisCatalogUtils.loadSparkTable(genericTable);
return PolarisCatalogUtils.loadSparkTable(genericTable, identifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why the interface had to change, is this related to the namespace thing?

Copy link
Author

@rahil-c rahil-c Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently PolarisCatalogUtils itself is not leveraging any interface, its just a util method https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java#L69 thats used to provide back an impl of Spark's Table interface. So there are no interfaces changes just an additional param passed in this method.

I needed to do this for hudi as it will need a full identifier to the table in order to create construct a HoodieInternalV2Table which extends Table

Copy link
Contributor

@eric-maynard eric-maynard Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are no interfaces changes just an additional param passed in this method.

In this case, the method itself is the functional interface being changed. Java doesn't support static methods in interfaces, so there's obviously not an actual java interface being changed.

It's still not clear to me why we can't load a table directly from object storage.

} catch (org.apache.iceberg.exceptions.NoSuchTableException e) {
throw new NoSuchTableException(identifier);
}
Expand All @@ -86,7 +86,7 @@ public Table createTable(
GenericTable genericTable =
this.polarisCatalog.createGenericTable(
Spark3Util.identifierToTableIdentifier(identifier), format, null, properties);
return PolarisCatalogUtils.loadSparkTable(genericTable);
return PolarisCatalogUtils.loadSparkTable(genericTable, identifier);
} catch (AlreadyExistsException e) {
throw new TableAlreadyExistsException(identifier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.iceberg.spark.SupportsReplaceView;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.polaris.spark.utils.DeltaHelper;
import org.apache.polaris.spark.utils.HudiCatalogUtils;
import org.apache.polaris.spark.utils.HudiHelper;
import org.apache.polaris.spark.utils.PolarisCatalogUtils;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
import org.apache.spark.sql.catalyst.analysis.NoSuchNamespaceException;
Expand Down Expand Up @@ -72,6 +74,7 @@ public class SparkCatalog
@VisibleForTesting protected org.apache.iceberg.spark.SparkCatalog icebergsSparkCatalog = null;
@VisibleForTesting protected PolarisSparkCatalog polarisSparkCatalog = null;
@VisibleForTesting protected DeltaHelper deltaHelper = null;
@VisibleForTesting protected HudiHelper hudiHelper = null;

@Override
public String name() {
Expand Down Expand Up @@ -133,6 +136,7 @@ public void initialize(String name, CaseInsensitiveStringMap options) {
this.catalogName = name;
initRESTCatalog(name, options);
this.deltaHelper = new DeltaHelper(options);
this.hudiHelper = new HudiHelper(options);
}

@Override
Expand All @@ -156,15 +160,23 @@ public Table createTable(
throw new UnsupportedOperationException(
"Create table without location key is not supported by Polaris. Please provide location or path on table creation.");
}

if (PolarisCatalogUtils.useDelta(provider)) {
// For delta table, we load the delta catalog to help dealing with the
// delta log creation.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.createTable(ident, schema, transforms, properties);
} else {
return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties);
}
if (PolarisCatalogUtils.useHudi(provider)) {
// First make a call via polaris's spark catalog
// to ensure an entity is created within the catalog and is authorized
polarisSparkCatalog.createTable(ident, schema, transforms, properties);

// Then for actually creating the hudi table, we load HoodieCatalog
// to create the .hoodie folder in cloud storage
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to first make sure the hudiCatalog.createTable can be done successfully first, and then create the catalog with the remote service

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yun for the suggestion,

My question is shouldn't the engine first consult the polaris catalog service to see if its even authorized to perform a createTable? https://github.com/apache/polaris/blob/main/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java#L74

If this returns back to the client an exception, then I am not sure if it makes sense for client to perform logic of creating the table format metadata in cloud storage.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rahil it make sense. Just for your info, for alter table command, create hoodie table command, it seems like it is always we do actual hudi storage layer change before make catalog change. I would suggest we keep the pattern consistent.

return hudiCatalog.createTable(ident, schema, transforms, properties);
}
return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the if { } else if {} else { } structure

}
}

Expand All @@ -182,7 +194,11 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.alterTable(ident, changes);
} else if (PolarisCatalogUtils.useHudi(provider)) {
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same comment above applies here, right? Let's restructure to make that more clear.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return hudiCatalog.alterTable(ident, changes);
}

return this.polarisSparkCatalog.alterTable(ident);
}
}
Expand Down Expand Up @@ -263,25 +279,39 @@ public String[][] listNamespaces(String[] namespace) throws NoSuchNamespaceExcep
@Override
public Map<String, String> loadNamespaceMetadata(String[] namespace)
throws NoSuchNamespaceException {
return this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
Map<String, String> metadata = this.icebergsSparkCatalog.loadNamespaceMetadata(namespace);
if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
HudiCatalogUtils.loadNamespaceMetadata(namespace, metadata);
}
return metadata;
}

@Override
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
HudiCatalogUtils.createNamespace(namespace, metadata);
}
}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
HudiCatalogUtils.alterNamespace(namespace, changes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a bunch of comments got cleared, but I still see all these namespace-related changes which should not be necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c I tried out the current client without your change for hudi usage, although it fails on loadTable, it doesn't really complain about namespaces, so I strongly believe we don't need any namespace specific changes here. I will check out the current change to verify also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking out your code with testing, I believe the fact you need those namespace operations are because you are calling the following code when load hudi table

catalogTable = sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);

This triggers the spark internal session catalog, which actually checks whether db exists. However, i don't think we need to do that for loading hudi tables. I think we should just create a V1Table, and let the HudiCatalog take care of the final table format. To construct V1Table, you can either do that manually with filling all possible field, or another approach is do it in a similar way as Unity Catalog here https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/io/unitycatalog/spark/UCSingleCatalog.scala#L283

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for createTable, i think the real problem comes from here https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala#L198
Similar as delta, when we are using rest catalog as delegation, i think we should call into the catalogPlugin for table creation, instead of spark session catalog. https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L218

Delta today triggers a special check for unity catalog here https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L77, One way I am thinking is that we can introduce a special flag for Polaris SparkCatalog to represent that a third party catalog plugin is used, and then do similar thing as DeltaCatalog

}
}

@Override
public boolean dropNamespace(String[] namespace, boolean cascade)
throws NoSuchNamespaceException {
return this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
boolean result = this.icebergsSparkCatalog.dropNamespace(namespace, cascade);
if (result && PolarisCatalogUtils.isHudiExtensionEnabled()) {
HudiCatalogUtils.dropNamespace(namespace, cascade);
}
return result;
}

@Override
Expand Down
Loading
Loading