Skip to content

Initial integration for hudi tables within Polaris #1862

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

rahil-c
Copy link

@rahil-c rahil-c commented Jun 11, 2025

No description provided.

@dimas-b
Copy link
Contributor

dimas-b commented Jun 11, 2025

Thanks for you contribution, @rahil-c ! Would you mind opening a discussion for this feature on dev@polaris.apache.org?

@rahil-c rahil-c force-pushed the rahil-c/polaris-hudi branch from 37af09a to 98908b3 Compare June 13, 2025 15:27
@rahil-c
Copy link
Author

rahil-c commented Jun 13, 2025

Thanks @dimas-b will do so! Have raised a email on dev list here: https://lists.apache.org/thread/66d39oqkc412kk262gy80bm723r9xmpm

@rahil-c rahil-c force-pushed the rahil-c/polaris-hudi branch from d0011d5 to 5445c48 Compare June 16, 2025 00:21
@rahil-c rahil-c force-pushed the rahil-c/polaris-hudi branch from 5b136d6 to 2bb83cd Compare July 1, 2025 07:20
@rahil-c rahil-c changed the title [DRAFT] Initial integration for hudi tables within Polaris Initial integration for hudi tables within Polaris Jul 1, 2025
@rahil-c rahil-c marked this pull request as ready for review July 1, 2025 07:20
@rahil-c rahil-c force-pushed the rahil-c/polaris-hudi branch from 2bb83cd to 6185ea6 Compare July 1, 2025 07:25
@rahil-c
Copy link
Author

rahil-c commented Jul 1, 2025

cc @flyrain @gh-yzou @singhpk234

@flyrain flyrain requested a review from Copilot July 1, 2025 15:54
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces initial support for Hudi tables in the Polaris Spark catalog, enabling Hudi create/load operations alongside existing formats.

  • Extended parameterized tests to cover the new “hudi” format.
  • Added HudiHelper and HudiCatalogUtils for Hudi-specific catalog loading and namespace synchronization.
  • Updated SparkCatalog, PolarisCatalogUtils, and build configurations to wire in Hudi dependencies and behavior.

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
plugins/spark/v3.5/spark/src/test/java/.../DeserializationTest.java Updated parameterized tests to accept and assert on format
plugins/spark/v3.5/spark/src/test/java/.../SparkCatalogTest.java Added static mocks and new Hudi namespace/table tests
plugins/spark/v3.5/spark/src/test/java/.../NoopHudiCatalog.java Created a no-op Hudi catalog stub for tests
plugins/spark/v3.5/spark/src/main/java/.../PolarisCatalogUtils.java Introduced useHudi, isHudiExtensionEnabled, Hudi load support, SQL builders
plugins/spark/v3.5/spark/src/main/java/.../HudiHelper.java New helper for instantiating and delegating to Hudi Catalog
plugins/spark/v3.5/spark/src/main/java/.../HudiCatalogUtils.java New utility for syncing namespace operations via SQL
plugins/spark/v3.5/spark/src/main/java/.../SparkCatalog.java Routed create/alter/drop to Hudi catalog when appropriate
plugins/spark/v3.5/spark/src/main/java/.../PolarisSparkCatalog.java Adjusted calls to pass Identifier through Hudi load API
plugins/spark/v3.5/spark/build.gradle.kts Added Hudi dependencies and exclusions
plugins/spark/v3.5/integration/.../logback.xml Enabled Hudi loggers for integration tests
plugins/spark/v3.5/integration/.../SparkHudiIT.java New integration tests for basic and unsupported Hudi ops
plugins/spark/v3.5/integration/build.gradle.kts Added Hive and Hudi bundles to integration dependencies
Comments suppressed due to low confidence (1)

Mockito.when(mockedConfig.get("spark.sql.extensions", null))
.thenReturn(
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,"
+ "io.delta.sql.DeltaSparkSessionExtension"
Copy link
Preview

Copilot AI Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned extension string concatenates DeltaSparkSessionExtension and HoodieSparkSessionExtension without a comma, producing ...DeltaSparkSessionExtensionorg.apache.spark.... This will break isHudiExtensionEnabled detection. Add a separating comma and any necessary whitespace.

Suggested change
+ "io.delta.sql.DeltaSparkSessionExtension"
+ "io.delta.sql.DeltaSparkSessionExtension, "

Copilot uses AI. Check for mistakes.

exclude("io.swagger", "*")
exclude("org.apache.commons", "*")
}
implementation(project(":polaris-core")) {
Copy link
Preview

Copilot AI Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The polaris-core project is added twice as an implementation dependency. Consider removing the duplicate block to reduce confusion and avoid redundant classpath entries.

Copilot uses AI. Check for mistakes.

// Reserved properties (owner, location, comment) are automatically filtered out.
try {
SparkSession spark = SparkSession.active();
String ns = String.join(".", namespace);
Copy link
Preview

Copilot AI Jul 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The namespace identifier ns is inserted directly into SQL without escaping, which could lead to SQL injection if namespaces contain unexpected characters. Consider validating or sanitizing the namespace string before embedding it in the SQL.

Suggested change
String ns = String.join(".", namespace);
String ns = validateAndSanitizeNamespace(namespace);

Copilot uses AI. Check for mistakes.

@flyrain flyrain requested a review from gh-yzou July 1, 2025 15:57

// Add spark-hive for Hudi integration - provides HiveExternalCatalog that Hudi needs
testImplementation("org.apache.spark:spark-hive_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies to match spark-sql exclusions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove the spark_sql dependency above?

Copy link
Author

@rahil-c rahil-c Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need this dependency as a test dependency, as when hudi insert is invoked it looks for this HiveExternalCatalog, when removing this dependency as suspected this fails for class not found exception.

org/apache/spark/sql/hive/HiveExternalCatalog$
java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveExternalCatalog$
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHiveSyncConfig(ProvidesHoodieConfig.scala:496)
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHiveSyncConfig$(ProvidesHoodieConfig.scala:467)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.buildHiveSyncConfig(HoodieSpark35Analysis.scala:39)
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHoodieConfig(ProvidesHoodieConfig.scala:63)
	at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHoodieConfig$(ProvidesHoodieConfig.scala:55)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.buildHoodieConfig(HoodieSpark35Analysis.scala:39)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.org$apache$spark$sql$hudi$analysis$HoodieSpark35DataSourceV2ToV1Fallback$$convertToV1(HoodieSpark35Analysis.scala:62)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.apply(HoodieSpark35Analysis.scala:50)
	at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.apply(HoodieSpark35Analysis.scala:39)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)

Have also explained further here in related comment: #1862 (comment)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we only need it as a test-runtime?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer for the suggestion, I can also try setting this as test runtime

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, what i mean is does spark-hive contains spark-sql? if yes, can we remove

testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
    // exclude log4j dependencies. Explicit dependencies for the log4j libraries are
    // enforced below to ensure the version compatibility
    exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
    exclude("org.apache.logging.log4j", "log4j-1.2-api")
    exclude("org.apache.logging.log4j", "log4j-core")
    exclude("org.slf4j", "jul-to-slf4j")
  }

// enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility
// of spark-sql dependency
testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3")
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3")

testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1")
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0") {
// exclude log4j dependencies to match spark-sql exclusions and prevent version conflicts
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the bundle already contain all spark dependency needed? if that is the case, we shouldn't need the spark_hive dependency anymore, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this and the above dependency can both be test runtime? Or are we using Delta and Hudi specific code in the test code itself?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as comment above: #1862 (comment)

When running the following command
jar tf hudi-spark3.5-bundle_2.12-0.15.0.jar | grep -i "org/apache/spark/sql/hive"

I do not see the HiveExternalCatalog provided by the hudi-spark-bundle.

org/apache/spark/sql/hive/
org/apache/spark/sql/hive/HiveClientUtils.class
org/apache/spark/sql/hive/HiveClientUtils$.class

Based on my understanding of the hudi-spark-bundle it aims to provide the core hudi dependencies needed for spark-hudi integration to work but expects certain spark dependencies to be on the engine class path.
For example when checking the oss spark engine jars folder by running these commands, spark will provide these:

cd ~
wget https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
mkdir spark-3.5
tar xzvf spark-3.5.5-bin-hadoop3.tgz  -C spark-3.5 --strip-components=1
cd spark-3.5
 rahil@mac  ~/spark-3.5/jars  ls -l | grep hive
-rw-r--r--@ 1 rahil  staff    183633 Feb 23 12:45 hive-beeline-2.3.9.jar
-rw-r--r--@ 1 rahil  staff     44704 Feb 23 12:45 hive-cli-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    436169 Feb 23 12:45 hive-common-2.3.9.jar
-rw-r--r--@ 1 rahil  staff  10840949 Feb 23 12:45 hive-exec-2.3.9-core.jar
-rw-r--r--@ 1 rahil  staff    116364 Feb 23 12:45 hive-jdbc-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    326585 Feb 23 12:45 hive-llap-common-2.3.9.jar
-rw-r--r--@ 1 rahil  staff   8195966 Feb 23 12:45 hive-metastore-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    916630 Feb 23 12:45 hive-serde-2.3.9.jar
-rw-r--r--@ 1 rahil  staff   1679366 Feb 23 12:45 hive-service-rpc-3.1.3.jar
-rw-r--r--@ 1 rahil  staff     53902 Feb 23 12:45 hive-shims-0.23-2.3.9.jar
-rw-r--r--@ 1 rahil  staff      8786 Feb 23 12:45 hive-shims-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    120293 Feb 23 12:45 hive-shims-common-2.3.9.jar
-rw-r--r--@ 1 rahil  staff     12923 Feb 23 12:45 hive-shims-scheduler-2.3.9.jar
-rw-r--r--@ 1 rahil  staff    258346 Feb 23 12:45 hive-storage-api-2.8.1.jar
-rw-r--r--@ 1 rahil  staff    572320 Feb 23 12:45 spark-hive-thriftserver_2.12-3.5.5.jar
-rw-r--r--@ 1 rahil  staff    725252 Feb 23 12:45 spark-hive_2.12-3.5.5.jar
 rahil@mac  ~/spark-3.5/jars 

Which allows the hudi to not hit the class not found exception(atleast when testing via my local spark). Therefore I believe we will need to explicitly provide this in test env in order for spark hudi integration test to work.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RussellSpitzer For delta it seems we have declared the test dep with testImplementation https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/integration/build.gradle.kts#L68
So i had done the same for hudi.

I am using one area in the SparkCatalogTest using an assertion of a specific hudi class to see if the table returned was in fact a hudi table.

} else if (PolarisCatalogUtils.useHudi(format)) {
        assertThat(loadedTable).isInstanceOf(HoodieInternalV2Table.class);
      }

So I think for now had left this as testImplementation. If this is a concern though let me know and I can try to see an alternative here.

@@ -32,6 +32,9 @@ out the configuration if you would like ot see all spark debug log during the ru
</encoder>
</appender>

<!-- Hudi-specific loggers for test -->
<logger name="org.apache.hudi" level="INFO"/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hudi output a log of logs? too much logging was causing problems to the test efficiency, so we only turned on the error log here. How long does the integration test take now?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point I can remove this INFO then, in order to avoid the extra logging. I am curious though if we should keep the test root logger at WARN instead of its current ERROR?

Copy link
Contributor

@gh-yzou gh-yzou Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can if the WARN level log output doesn't significantly slow the test, but based on my previous testing with Spark, even with WARN, there seems quite a lot of Spark logs, that was part of the reason why we set the level to Error. If the test fails, people can always tune the log level to check more logs if needed, so i wouldn't worry too much about the restricted log level.

}

implementation("org.apache.iceberg:iceberg-core:${icebergVersion}")
compileOnly("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i don't think we need to depends on hudi bundle here, similar as how we handles delta

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I am directly using this one hudi class
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table; within the PolarisCatalogUtils
https://github.com/apache/polaris/pull/1862/files#diff-f351cfc050ac63c907c7f35f8052eb88ee6241c39f95a8b81df3d895349bafa0R139

So we will need to keep a provided/compileOnly dependency, in order to ensure that compilation succeeds, as the user will anyway have to provide the hudi spark bundle directly like this https://hudi.apache.org/docs/0.15.0/quick-start-guide

If we are still concerned of having the bundle as provided/compileOnly dependency, I can try to narrow to just the exact hudi spark dep that contains this class which is hudi-spark3.2plus-common based on what I see in intellij and try declaring this instead.

Screenshot 2025-07-02 at 10 06 46 AM

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm in agreement here, I think we don't want to have any datasource specific code as a compile time dependency here (although runtime is fine).

My main reasons are

  1. Avoiding having a dependency that would be un-needed by users without hudi
  2. Licensing headaches

Could we check via a String and Reflection? I think "provided" is maybe an ok workaround but that makes this a but strongly tied in version to the Hudi client library? I was hoping we could keep the Catalog plugin basically version independent and relying pretty much only on Datasource Apis.


// Then for actually creating the hudi table, we load HoodieCatalog
// to create the .hoodie folder in cloud storage
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it might be better to first make sure the hudiCatalog.createTable can be done successfully first, and then create the catalog with the remote service

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Yun for the suggestion,

My question is shouldn't the engine first consult the polaris catalog service to see if its even authorized to perform a createTable? https://github.com/apache/polaris/blob/main/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java#L74

If this returns back to the client an exception, then I am not sure if it makes sense for client to perform logic of creating the table format metadata in cloud storage.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rahil it make sense. Just for your info, for alter table command, create hoodie table command, it seems like it is always we do actual hudi storage layer change before make catalog change. I would suggest we keep the pattern consistent.

@@ -270,18 +286,24 @@ public Map<String, String> loadNamespaceMetadata(String[] namespace)
public void createNamespace(String[] namespace, Map<String, String> metadata)
throws NamespaceAlreadyExistsException {
this.icebergsSparkCatalog.createNamespace(namespace, metadata);
HudiCatalogUtils.createNamespace(namespace, metadata);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between delta and hudi? I assume the hudi catalog will be used as spark session catalog and directly called for namespace operations

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we always calling this?

Copy link
Author

@rahil-c rahil-c Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only being called when the spark session has also added HoodieSparkSessionExtensions https://github.com/apache/polaris/pull/1862/files#diff-b9c22ea4a50bbb1b3e3db1f420b3656557c4875019321c24be2f1ee718e67b25R54

https://github.com/apache/polaris/pull/1862/files
I can restucture the code to make this more clear.

private static final Logger LOG = LoggerFactory.getLogger(HudiCatalogUtils.class);

/**
* Synchronizes namespace creation to session catalog when Hudi extension is enabled. This ensures
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c this part of change is actually out of my expectation, I might need some more time to understand the motivation for this part. Can you update the readme for the project with instruction about how to use hudi? I want to checkout the code and try it out. Thanks!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes can provide this

} catch (NoSuchTableException e) {
LOG.debug("No table currently exists, as an initial create table");
}
return new HoodieInternalV2Table(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does hudi catalog provides any load table functionality, which also reads the hudi logs? if yes, we can load the hudi catalog here and call the functions, it would be better if we could avoid any table format specific dependency in the client

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PolarisCatalogUtils.loadSparkTable seems to be to return something that is a child of Spark Table, as it invoked by the PolarisSparkCatalog#loadTable
https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java#L67

When checking the hudi code base the only class I see that extends this Table is the HoodieInternalV2Table https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala#L44

And when checking the hudi code base i did not see any util class for providing the HoodieInternalV2Table hence why I had to construct it in this manner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take a look at how HoodieCatalog.scala works https://github.com/apache/hudi/blob/3369b09fd8fca1db1f8e655f79ecb7a97c57367b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala#L123, it actually works very similar as DeltaCatalog, https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L230. Before, the approach I have tried is to let PolarisSparkCatalog returns V1Table object, and in SparkCatalog we just call DeltaCatalog.loadTable, and it will return DeltaV2Table, in this way, we will not introduce any format specific dependency to the client side. I didn't end up doing that because Delta actually provided the DataSourceV2 support, and the loadTableUtils automatically loads the DeltaV2Table.

Since Hudi doesn't support DataSourceV2 yet, i think you can go with my original approach, let PolarisSparkCatalog returns V1Table on load, and let the HudiCatalog.loadTable takes care of the final returned table format

TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);
return hudiCatalog.createTable(ident, schema, transforms, properties);
}
return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the if { } else if {} else { } structure

"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'",
huditb1, getTableLocation(huditb1));
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1);
List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY id DESC", huditb1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Some of these queries could be cleaned up

Comment on lines +162 to +163
// create hudi table with no location
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this fail?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this should not fail as some default location will be used instead

exclude("io.swagger", "*")
exclude("org.apache.commons", "*")
}
implementation(project(":polaris-api-catalog-service")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto, why is this new dep needed for the implementation?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have removed these

String ns = String.join(".", namespace);

// Build CREATE NAMESPACE SQL with metadata properties (filtered for reserved properties)
String createSql = String.format("CREATE NAMESPACE IF NOT EXISTS spark_catalog.%s", ns);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't look correct; why do we need to keep the spark catalog in sync with the actual Polaris catalog?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was this comment resolved?

*/
private static void handleNamespaceSyncError(
String[] namespace, String operation, Exception e, boolean throwOnError) {
String errorMsg = e.getMessage();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Necessity aside, this error handling looks quite brittle. You should probably be traversing the cause chain and looking at exception types rather than just doing string-matching on the message.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fair point, will try to improve on this.

public static Table loadSparkTable(GenericTable genericTable) {
public static Table loadSparkTable(GenericTable genericTable, Identifier identifier) {
if (genericTable.getFormat().equalsIgnoreCase("hudi")) {
// hudi does not implement table provider interface, so will need to catch it
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch it?

It's not clear to me why this is different from Delta?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently the PolarisCatalogUtils.loadSparkTable is using a DataSourceV2Utils Util in order to load the table using sparks table provider as seen here.

In the case for Delta this will go thru Delta's datasource impl which is implementing this V2TableProviderInterface, see DeltaDataSource https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L57

The PolarisCatalogUtils.loadSparkTable currently assumes that other formats also implement this same TableProvider interface. However if they do not then this will fail with an exception.

Hudi in its spark integration does not implement that interface, see the entry point class for hudi datasource entry point DefaultSource https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala#L55, meaning we will have to provide another way to load the hudi table, which is why I have added this condition and method called loadHudiSparkTable

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. But you're not catching anything here
  2. Sounds like a limitation on the Hudi side; maybe it makes sense for us to only support V2TableProviderInterface then?

@@ -68,7 +68,7 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
try {
GenericTable genericTable =
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
return PolarisCatalogUtils.loadSparkTable(genericTable);
return PolarisCatalogUtils.loadSparkTable(genericTable, identifier);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear to me why the interface had to change, is this related to the namespace thing?

Copy link
Author

@rahil-c rahil-c Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently PolarisCatalogUtils itself is not leveraging any interface, its just a util method https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java#L69 thats used to provide back an impl of Spark's Table interface. So there are no interfaces changes just an additional param passed in this method.

I needed to do this for hudi as it will need a full identifier to the table in order to create construct a HoodieInternalV2Table which extends Table

Copy link
Contributor

@eric-maynard eric-maynard Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are no interfaces changes just an additional param passed in this method.

In this case, the method itself is the functional interface being changed. Java doesn't support static methods in interfaces, so there's obviously not an actual java interface being changed.

It's still not clear to me why we can't load a table directly from object storage.

Comment on lines +162 to +163
// create hudi table with no location
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this should not fail as some default location will be used instead

assertThatThrownBy(
() ->
sql(
"CREATE TABLE %s USING HUDI LOCATION '%s' AS SELECT 1 AS id",

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CTAS as long as we write it properly, it should not fail.

@@ -46,6 +46,8 @@ dependencies {
// TODO: extract a polaris-rest module as a thin layer for
// client to depends on.
implementation(project(":polaris-core")) { isTransitive = false }
compileOnly("org.apache.hudi:hudi-spark3.2plus-common:0.15.0")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe Ethan know this part better, starting hudi 1.x we deprecate some old spark version support, not sure about the status of spark 3.2


// Then for actually creating the hudi table, we load HoodieCatalog
// to create the .hoodie folder in cloud storage
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rahil it make sense. Just for your info, for alter table command, create hoodie table command, it seems like it is always we do actual hudi storage layer change before make catalog change. I would suggest we keep the pattern consistent.

@@ -182,7 +194,11 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT.
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog);
return deltaCatalog.alterTable(ident, changes);
} else if (PolarisCatalogUtils.useHudi(provider)) {
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@@ -68,7 +68,7 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException {
try {
GenericTable genericTable =
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier));
return PolarisCatalogUtils.loadSparkTable(genericTable);
return PolarisCatalogUtils.loadSparkTable(genericTable, identifier);
Copy link
Contributor

@eric-maynard eric-maynard Jul 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So there are no interfaces changes just an additional param passed in this method.

In this case, the method itself is the functional interface being changed. Java doesn't support static methods in interfaces, so there's obviously not an actual java interface being changed.

It's still not clear to me why we can't load a table directly from object storage.

}

@Override
public void alterNamespace(String[] namespace, NamespaceChange... changes)
throws NoSuchNamespaceException {
this.icebergsSparkCatalog.alterNamespace(namespace, changes);
if (PolarisCatalogUtils.isHudiExtensionEnabled()) {
HudiCatalogUtils.alterNamespace(namespace, changes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a bunch of comments got cleared, but I still see all these namespace-related changes which should not be necessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rahil-c I tried out the current client without your change for hudi usage, although it fails on loadTable, it doesn't really complain about namespaces, so I strongly believe we don't need any namespace specific changes here. I will check out the current change to verify also.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After checking out your code with testing, I believe the fact you need those namespace operations are because you are calling the following code when load hudi table

catalogTable = sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);

This triggers the spark internal session catalog, which actually checks whether db exists. However, i don't think we need to do that for loading hudi tables. I think we should just create a V1Table, and let the HudiCatalog take care of the final table format. To construct V1Table, you can either do that manually with filling all possible field, or another approach is do it in a similar way as Unity Catalog here https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/io/unitycatalog/spark/UCSingleCatalog.scala#L283

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As for createTable, i think the real problem comes from here https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala#L198
Similar as delta, when we are using rest catalog as delegation, i think we should call into the catalogPlugin for table creation, instead of spark session catalog. https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L218

Delta today triggers a special check for unity catalog here https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L77, One way I am thinking is that we can introduce a special flag for Polaris SparkCatalog to represent that a third party catalog plugin is used, and then do similar thing as DeltaCatalog

@gh-yzou
Copy link
Contributor

gh-yzou commented Jul 8, 2025

@rahil-c sorry, i made my comment yesterday, but forgot to push it. I did a push, and added some more comments, please let me know if you have more questions about this!
As we have discussed, there are two main concerns for this PR:

  1. the hudi dependency introduced for spark client, which is caused by the usage of HoodieInternalV2Table. This can be resolved by loading V1Table, and then let HudiCatalog loadTable to handle the final table result https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala#L123
  2. the extra namespace creation for HudiCatalog. Polaris Spark Client reuses the whole Iceberg namespace, ideally we do not want to maintain extra namespace creation just for specific table format. The needs of extra namespace creation is because HudiCatalog only works with SparkSession Catalog and HiveCatalog today https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala#L198, however, since Polaris is rest catalog, this will not work anymore. We want to see if we can push forward on hudi community to improve the catalog implementation regarding to the third party catalog plugin. Similar as Delta did a special case for unity catalog here https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L218

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants