-
Notifications
You must be signed in to change notification settings - Fork 270
Initial integration for hudi tables within Polaris #1862
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Thanks for you contribution, @rahil-c ! Would you mind opening a discussion for this feature on dev@polaris.apache.org? |
37af09a
to
98908b3
Compare
Thanks @dimas-b will do so! Have raised a email on dev list here: https://lists.apache.org/thread/66d39oqkc412kk262gy80bm723r9xmpm |
d0011d5
to
5445c48
Compare
5b136d6
to
2bb83cd
Compare
2bb83cd
to
6185ea6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces initial support for Hudi tables in the Polaris Spark catalog, enabling Hudi create/load operations alongside existing formats.
- Extended parameterized tests to cover the new “hudi” format.
- Added
HudiHelper
andHudiCatalogUtils
for Hudi-specific catalog loading and namespace synchronization. - Updated
SparkCatalog
,PolarisCatalogUtils
, and build configurations to wire in Hudi dependencies and behavior.
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
plugins/spark/v3.5/spark/src/test/java/.../DeserializationTest.java | Updated parameterized tests to accept and assert on format |
plugins/spark/v3.5/spark/src/test/java/.../SparkCatalogTest.java | Added static mocks and new Hudi namespace/table tests |
plugins/spark/v3.5/spark/src/test/java/.../NoopHudiCatalog.java | Created a no-op Hudi catalog stub for tests |
plugins/spark/v3.5/spark/src/main/java/.../PolarisCatalogUtils.java | Introduced useHudi , isHudiExtensionEnabled , Hudi load support, SQL builders |
plugins/spark/v3.5/spark/src/main/java/.../HudiHelper.java | New helper for instantiating and delegating to Hudi Catalog |
plugins/spark/v3.5/spark/src/main/java/.../HudiCatalogUtils.java | New utility for syncing namespace operations via SQL |
plugins/spark/v3.5/spark/src/main/java/.../SparkCatalog.java | Routed create/alter/drop to Hudi catalog when appropriate |
plugins/spark/v3.5/spark/src/main/java/.../PolarisSparkCatalog.java | Adjusted calls to pass Identifier through Hudi load API |
plugins/spark/v3.5/spark/build.gradle.kts | Added Hudi dependencies and exclusions |
plugins/spark/v3.5/integration/.../logback.xml | Enabled Hudi loggers for integration tests |
plugins/spark/v3.5/integration/.../SparkHudiIT.java | New integration tests for basic and unsupported Hudi ops |
plugins/spark/v3.5/integration/build.gradle.kts | Added Hive and Hudi bundles to integration dependencies |
Comments suppressed due to low confidence (1)
Mockito.when(mockedConfig.get("spark.sql.extensions", null)) | ||
.thenReturn( | ||
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions," | ||
+ "io.delta.sql.DeltaSparkSessionExtension" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The returned extension string concatenates DeltaSparkSessionExtension
and HoodieSparkSessionExtension
without a comma, producing ...DeltaSparkSessionExtensionorg.apache.spark...
. This will break isHudiExtensionEnabled
detection. Add a separating comma and any necessary whitespace.
+ "io.delta.sql.DeltaSparkSessionExtension" | |
+ "io.delta.sql.DeltaSparkSessionExtension, " |
Copilot uses AI. Check for mistakes.
exclude("io.swagger", "*") | ||
exclude("org.apache.commons", "*") | ||
} | ||
implementation(project(":polaris-core")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The polaris-core
project is added twice as an implementation dependency. Consider removing the duplicate block to reduce confusion and avoid redundant classpath entries.
Copilot uses AI. Check for mistakes.
// Reserved properties (owner, location, comment) are automatically filtered out. | ||
try { | ||
SparkSession spark = SparkSession.active(); | ||
String ns = String.join(".", namespace); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The namespace identifier ns
is inserted directly into SQL without escaping, which could lead to SQL injection if namespaces contain unexpected characters. Consider validating or sanitizing the namespace string before embedding it in the SQL.
String ns = String.join(".", namespace); | |
String ns = validateAndSanitizeNamespace(namespace); |
Copilot uses AI. Check for mistakes.
|
||
// Add spark-hive for Hudi integration - provides HiveExternalCatalog that Hudi needs | ||
testImplementation("org.apache.spark:spark-hive_${scalaVersion}:${spark35Version}") { | ||
// exclude log4j dependencies to match spark-sql exclusions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we remove the spark_sql dependency above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will need this dependency as a test dependency, as when hudi insert is invoked it looks for this HiveExternalCatalog
, when removing this dependency as suspected this fails for class not found exception.
org/apache/spark/sql/hive/HiveExternalCatalog$
java.lang.NoClassDefFoundError: org/apache/spark/sql/hive/HiveExternalCatalog$
at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHiveSyncConfig(ProvidesHoodieConfig.scala:496)
at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHiveSyncConfig$(ProvidesHoodieConfig.scala:467)
at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.buildHiveSyncConfig(HoodieSpark35Analysis.scala:39)
at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHoodieConfig(ProvidesHoodieConfig.scala:63)
at org.apache.spark.sql.hudi.ProvidesHoodieConfig.buildHoodieConfig$(ProvidesHoodieConfig.scala:55)
at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.buildHoodieConfig(HoodieSpark35Analysis.scala:39)
at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.org$apache$spark$sql$hudi$analysis$HoodieSpark35DataSourceV2ToV1Fallback$$convertToV1(HoodieSpark35Analysis.scala:62)
at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.apply(HoodieSpark35Analysis.scala:50)
at org.apache.spark.sql.hudi.analysis.HoodieSpark35DataSourceV2ToV1Fallback.apply(HoodieSpark35Analysis.scala:39)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
Have also explained further here in related comment: #1862 (comment)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we only need it as a test-runtime?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer for the suggestion, I can also try setting this as test runtime
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry, what i mean is does spark-hive contains spark-sql? if yes, can we remove
testImplementation("org.apache.spark:spark-sql_${scalaVersion}:${spark35Version}") {
// exclude log4j dependencies. Explicit dependencies for the log4j libraries are
// enforced below to ensure the version compatibility
exclude("org.apache.logging.log4j", "log4j-slf4j2-impl")
exclude("org.apache.logging.log4j", "log4j-1.2-api")
exclude("org.apache.logging.log4j", "log4j-core")
exclude("org.slf4j", "jul-to-slf4j")
}
// enforce the usage of log4j 2.24.3. This is for the log4j-api compatibility | ||
// of spark-sql dependency | ||
testRuntimeOnly("org.apache.logging.log4j:log4j-core:2.24.3") | ||
testRuntimeOnly("org.apache.logging.log4j:log4j-slf4j2-impl:2.24.3") | ||
|
||
testImplementation("io.delta:delta-spark_${scalaVersion}:3.3.1") | ||
testImplementation("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0") { | ||
// exclude log4j dependencies to match spark-sql exclusions and prevent version conflicts |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the bundle already contain all spark dependency needed? if that is the case, we shouldn't need the spark_hive dependency anymore, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this and the above dependency can both be test runtime? Or are we using Delta and Hudi specific code in the test code itself?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar as comment above: #1862 (comment)
When running the following command
jar tf hudi-spark3.5-bundle_2.12-0.15.0.jar | grep -i "org/apache/spark/sql/hive"
I do not see the HiveExternalCatalog
provided by the hudi-spark-bundle
.
org/apache/spark/sql/hive/
org/apache/spark/sql/hive/HiveClientUtils.class
org/apache/spark/sql/hive/HiveClientUtils$.class
Based on my understanding of the hudi-spark-bundle
it aims to provide the core hudi dependencies needed for spark-hudi integration to work but expects certain spark dependencies to be on the engine class path.
For example when checking the oss spark engine jars
folder by running these commands, spark will provide these:
cd ~
wget https://archive.apache.org/dist/spark/spark-3.5.5/spark-3.5.5-bin-hadoop3.tgz
mkdir spark-3.5
tar xzvf spark-3.5.5-bin-hadoop3.tgz -C spark-3.5 --strip-components=1
cd spark-3.5
rahil@mac ~/spark-3.5/jars ls -l | grep hive
-rw-r--r--@ 1 rahil staff 183633 Feb 23 12:45 hive-beeline-2.3.9.jar
-rw-r--r--@ 1 rahil staff 44704 Feb 23 12:45 hive-cli-2.3.9.jar
-rw-r--r--@ 1 rahil staff 436169 Feb 23 12:45 hive-common-2.3.9.jar
-rw-r--r--@ 1 rahil staff 10840949 Feb 23 12:45 hive-exec-2.3.9-core.jar
-rw-r--r--@ 1 rahil staff 116364 Feb 23 12:45 hive-jdbc-2.3.9.jar
-rw-r--r--@ 1 rahil staff 326585 Feb 23 12:45 hive-llap-common-2.3.9.jar
-rw-r--r--@ 1 rahil staff 8195966 Feb 23 12:45 hive-metastore-2.3.9.jar
-rw-r--r--@ 1 rahil staff 916630 Feb 23 12:45 hive-serde-2.3.9.jar
-rw-r--r--@ 1 rahil staff 1679366 Feb 23 12:45 hive-service-rpc-3.1.3.jar
-rw-r--r--@ 1 rahil staff 53902 Feb 23 12:45 hive-shims-0.23-2.3.9.jar
-rw-r--r--@ 1 rahil staff 8786 Feb 23 12:45 hive-shims-2.3.9.jar
-rw-r--r--@ 1 rahil staff 120293 Feb 23 12:45 hive-shims-common-2.3.9.jar
-rw-r--r--@ 1 rahil staff 12923 Feb 23 12:45 hive-shims-scheduler-2.3.9.jar
-rw-r--r--@ 1 rahil staff 258346 Feb 23 12:45 hive-storage-api-2.8.1.jar
-rw-r--r--@ 1 rahil staff 572320 Feb 23 12:45 spark-hive-thriftserver_2.12-3.5.5.jar
-rw-r--r--@ 1 rahil staff 725252 Feb 23 12:45 spark-hive_2.12-3.5.5.jar
rahil@mac ~/spark-3.5/jars
Which allows the hudi to not hit the class not found exception(atleast when testing via my local spark). Therefore I believe we will need to explicitly provide this in test env in order for spark hudi integration test to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RussellSpitzer For delta it seems we have declared the test dep with testImplementation https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/integration/build.gradle.kts#L68
So i had done the same for hudi.
I am using one area in the SparkCatalogTest
using an assertion of a specific hudi class to see if the table returned was in fact a hudi table.
} else if (PolarisCatalogUtils.useHudi(format)) {
assertThat(loadedTable).isInstanceOf(HoodieInternalV2Table.class);
}
So I think for now had left this as testImplementation. If this is a concern though let me know and I can try to see an alternative here.
@@ -32,6 +32,9 @@ out the configuration if you would like ot see all spark debug log during the ru | |||
</encoder> | |||
</appender> | |||
|
|||
<!-- Hudi-specific loggers for test --> | |||
<logger name="org.apache.hudi" level="INFO"/> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does hudi output a log of logs? too much logging was causing problems to the test efficiency, so we only turned on the error log here. How long does the integration test take now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good point I can remove this INFO
then, in order to avoid the extra logging. I am curious though if we should keep the test root logger at WARN
instead of its current ERROR
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can if the WARN level log output doesn't significantly slow the test, but based on my previous testing with Spark, even with WARN, there seems quite a lot of Spark logs, that was part of the reason why we set the level to Error. If the test fails, people can always tune the log level to check more logs if needed, so i wouldn't worry too much about the restricted log level.
} | ||
|
||
implementation("org.apache.iceberg:iceberg-core:${icebergVersion}") | ||
compileOnly("org.apache.hudi:hudi-spark3.5-bundle_${scalaVersion}:0.15.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i don't think we need to depends on hudi bundle here, similar as how we handles delta
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently I am directly using this one hudi class
import org.apache.spark.sql.hudi.catalog.HoodieInternalV2Table;
within the PolarisCatalogUtils
https://github.com/apache/polaris/pull/1862/files#diff-f351cfc050ac63c907c7f35f8052eb88ee6241c39f95a8b81df3d895349bafa0R139
So we will need to keep a provided/compileOnly dependency, in order to ensure that compilation succeeds, as the user will anyway have to provide the hudi spark bundle directly like this https://hudi.apache.org/docs/0.15.0/quick-start-guide
If we are still concerned of having the bundle as provided/compileOnly dependency, I can try to narrow to just the exact hudi spark dep that contains this class which is hudi-spark3.2plus-common based on what I see in intellij and try declaring this instead.

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm in agreement here, I think we don't want to have any datasource specific code as a compile time dependency here (although runtime is fine).
My main reasons are
- Avoiding having a dependency that would be un-needed by users without hudi
- Licensing headaches
Could we check via a String and Reflection? I think "provided" is maybe an ok workaround but that makes this a but strongly tied in version to the Hudi client library? I was hoping we could keep the Catalog plugin basically version independent and relying pretty much only on Datasource Apis.
|
||
// Then for actually creating the hudi table, we load HoodieCatalog | ||
// to create the .hoodie folder in cloud storage | ||
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it might be better to first make sure the hudiCatalog.createTable can be done successfully first, and then create the catalog with the remote service
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks Yun for the suggestion,
My question is shouldn't the engine first consult the polaris catalog service to see if its even authorized to perform a createTable
? https://github.com/apache/polaris/blob/main/service/common/src/main/java/org/apache/polaris/service/catalog/generic/GenericTableCatalogHandler.java#L74
If this returns back to the client an exception, then I am not sure if it makes sense for client to perform logic of creating the table format metadata in cloud storage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rahil it make sense. Just for your info, for alter table command, create hoodie table command, it seems like it is always we do actual hudi storage layer change before make catalog change. I would suggest we keep the pattern consistent.
@@ -270,18 +286,24 @@ public Map<String, String> loadNamespaceMetadata(String[] namespace) | |||
public void createNamespace(String[] namespace, Map<String, String> metadata) | |||
throws NamespaceAlreadyExistsException { | |||
this.icebergsSparkCatalog.createNamespace(namespace, metadata); | |||
HudiCatalogUtils.createNamespace(namespace, metadata); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the difference between delta and hudi? I assume the hudi catalog will be used as spark session catalog and directly called for namespace operations
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we always calling this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only being called when the spark session has also added HoodieSparkSessionExtensions
https://github.com/apache/polaris/pull/1862/files#diff-b9c22ea4a50bbb1b3e3db1f420b3656557c4875019321c24be2f1ee718e67b25R54
https://github.com/apache/polaris/pull/1862/files
I can restucture the code to make this more clear.
private static final Logger LOG = LoggerFactory.getLogger(HudiCatalogUtils.class); | ||
|
||
/** | ||
* Synchronizes namespace creation to session catalog when Hudi extension is enabled. This ensures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahil-c this part of change is actually out of my expectation, I might need some more time to understand the motivation for this part. Can you update the readme for the project with instruction about how to use hudi? I want to checkout the code and try it out. Thanks!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes can provide this
} catch (NoSuchTableException e) { | ||
LOG.debug("No table currently exists, as an initial create table"); | ||
} | ||
return new HoodieInternalV2Table( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does hudi catalog provides any load table functionality, which also reads the hudi logs? if yes, we can load the hudi catalog here and call the functions, it would be better if we could avoid any table format specific dependency in the client
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PolarisCatalogUtils.loadSparkTable
seems to be to return something that is a child of Spark Table
, as it invoked by the PolarisSparkCatalog#loadTable
https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/PolarisSparkCatalog.java#L67
When checking the hudi code base the only class I see that extends this Table
is the HoodieInternalV2Table
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieInternalV2Table.scala#L44
And when checking the hudi code base i did not see any util class for providing the HoodieInternalV2Table
hence why I had to construct it in this manner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I take a look at how HoodieCatalog.scala works https://github.com/apache/hudi/blob/3369b09fd8fca1db1f8e655f79ecb7a97c57367b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala#L123, it actually works very similar as DeltaCatalog, https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L230. Before, the approach I have tried is to let PolarisSparkCatalog returns V1Table object, and in SparkCatalog we just call DeltaCatalog.loadTable, and it will return DeltaV2Table, in this way, we will not introduce any format specific dependency to the client side. I didn't end up doing that because Delta actually provided the DataSourceV2 support, and the loadTableUtils automatically loads the DeltaV2Table.
Since Hudi doesn't support DataSourceV2 yet, i think you can go with my original approach, let PolarisSparkCatalog returns V1Table on load, and let the HudiCatalog.loadTable takes care of the final returned table format
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog); | ||
return hudiCatalog.createTable(ident, schema, transforms, properties); | ||
} | ||
return this.polarisSparkCatalog.createTable(ident, schema, transforms, properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the if { } else if {} else { }
structure
"CREATE TABLE %s (id INT, name STRING) USING HUDI LOCATION '%s'", | ||
huditb1, getTableLocation(huditb1)); | ||
sql("INSERT INTO %s VALUES (1, 'anna'), (2, 'bob')", huditb1); | ||
List<Object[]> results = sql("SELECT id,name FROM %s WHERE id > 1 ORDER BY id DESC", huditb1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Some of these queries could be cleaned up
...spark/v3.5/integration/src/intTest/java/org/apache/polaris/spark/quarkus/it/SparkHudiIT.java
Show resolved
Hide resolved
// create hudi table with no location | ||
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this fail?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this should not fail as some default location will be used instead
exclude("io.swagger", "*") | ||
exclude("org.apache.commons", "*") | ||
} | ||
implementation(project(":polaris-api-catalog-service")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto, why is this new dep needed for the implementation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have removed these
plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/HudiCatalogUtils.java
Outdated
Show resolved
Hide resolved
String ns = String.join(".", namespace); | ||
|
||
// Build CREATE NAMESPACE SQL with metadata properties (filtered for reserved properties) | ||
String createSql = String.format("CREATE NAMESPACE IF NOT EXISTS spark_catalog.%s", ns); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't look correct; why do we need to keep the spark catalog in sync with the actual Polaris catalog?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was this comment resolved?
*/ | ||
private static void handleNamespaceSyncError( | ||
String[] namespace, String operation, Exception e, boolean throwOnError) { | ||
String errorMsg = e.getMessage(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Necessity aside, this error handling looks quite brittle. You should probably be traversing the cause chain and looking at exception types rather than just doing string-matching on the message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is fair point, will try to improve on this.
public static Table loadSparkTable(GenericTable genericTable) { | ||
public static Table loadSparkTable(GenericTable genericTable, Identifier identifier) { | ||
if (genericTable.getFormat().equalsIgnoreCase("hudi")) { | ||
// hudi does not implement table provider interface, so will need to catch it |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Catch it?
It's not clear to me why this is different from Delta?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently the PolarisCatalogUtils
.loadSparkTable
is using a DataSourceV2Utils
Util in order to load the table using sparks table provider as seen here.
In the case for Delta this will go thru Delta's datasource impl which is implementing this V2TableProviderInterface
, see DeltaDataSource https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaDataSource.scala#L57
The PolarisCatalogUtils
.loadSparkTable
currently assumes that other formats also implement this same TableProvider interface. However if they do not then this will fail with an exception.
Hudi in its spark integration does not implement that interface, see the entry point class for hudi datasource entry point DefaultSource
https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/DefaultSource.scala#L55, meaning we will have to provide another way to load the hudi table, which is why I have added this condition and method called loadHudiSparkTable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- But you're not catching anything here
- Sounds like a limitation on the Hudi side; maybe it makes sense for us to only support
V2TableProviderInterface
then?
@@ -68,7 +68,7 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException { | |||
try { | |||
GenericTable genericTable = | |||
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier)); | |||
return PolarisCatalogUtils.loadSparkTable(genericTable); | |||
return PolarisCatalogUtils.loadSparkTable(genericTable, identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not clear to me why the interface had to change, is this related to the namespace thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently PolarisCatalogUtils itself is not leveraging any interface, its just a util method https://github.com/apache/polaris/blob/main/plugins/spark/v3.5/spark/src/main/java/org/apache/polaris/spark/utils/PolarisCatalogUtils.java#L69 thats used to provide back an impl of Spark's Table
interface. So there are no interfaces changes just an additional param passed in this method.
I needed to do this for hudi as it will need a full identifier to the table in order to create construct a HoodieInternalV2Table
which extends Table
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are no interfaces changes just an additional param passed in this method.
In this case, the method itself is the functional interface being changed. Java doesn't support static methods in interfaces, so there's obviously not an actual java interface being changed.
It's still not clear to me why we can't load a table directly from object storage.
// create hudi table with no location | ||
assertThatThrownBy(() -> sql("CREATE TABLE %s (id INT, name STRING) USING HUDI", huditb)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, this should not fail as some default location will be used instead
assertThatThrownBy( | ||
() -> | ||
sql( | ||
"CREATE TABLE %s USING HUDI LOCATION '%s' AS SELECT 1 AS id", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CTAS as long as we write it properly, it should not fail.
@@ -46,6 +46,8 @@ dependencies { | |||
// TODO: extract a polaris-rest module as a thin layer for | |||
// client to depends on. | |||
implementation(project(":polaris-core")) { isTransitive = false } | |||
compileOnly("org.apache.hudi:hudi-spark3.2plus-common:0.15.0") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe Ethan know this part better, starting hudi 1.x we deprecate some old spark version support, not sure about the status of spark 3.2
|
||
// Then for actually creating the hudi table, we load HoodieCatalog | ||
// to create the .hoodie folder in cloud storage | ||
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rahil it make sense. Just for your info, for alter table command, create hoodie table command, it seems like it is always we do actual hudi storage layer change before make catalog change. I would suggest we keep the pattern consistent.
@@ -182,7 +194,11 @@ public Table alterTable(Identifier ident, TableChange... changes) throws NoSuchT | |||
// using ALTER TABLE ...SET LOCATION, and ALTER TABLE ... SET FILEFORMAT. | |||
TableCatalog deltaCatalog = deltaHelper.loadDeltaCatalog(this.polarisSparkCatalog); | |||
return deltaCatalog.alterTable(ident, changes); | |||
} else if (PolarisCatalogUtils.useHudi(provider)) { | |||
TableCatalog hudiCatalog = hudiHelper.loadHudiCatalog(this.polarisSparkCatalog); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
@@ -68,7 +68,7 @@ public Table loadTable(Identifier identifier) throws NoSuchTableException { | |||
try { | |||
GenericTable genericTable = | |||
this.polarisCatalog.loadGenericTable(Spark3Util.identifierToTableIdentifier(identifier)); | |||
return PolarisCatalogUtils.loadSparkTable(genericTable); | |||
return PolarisCatalogUtils.loadSparkTable(genericTable, identifier); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So there are no interfaces changes just an additional param passed in this method.
In this case, the method itself is the functional interface being changed. Java doesn't support static methods in interfaces, so there's obviously not an actual java interface being changed.
It's still not clear to me why we can't load a table directly from object storage.
} | ||
|
||
@Override | ||
public void alterNamespace(String[] namespace, NamespaceChange... changes) | ||
throws NoSuchNamespaceException { | ||
this.icebergsSparkCatalog.alterNamespace(namespace, changes); | ||
if (PolarisCatalogUtils.isHudiExtensionEnabled()) { | ||
HudiCatalogUtils.alterNamespace(namespace, changes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like a bunch of comments got cleared, but I still see all these namespace-related changes which should not be necessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rahil-c I tried out the current client without your change for hudi usage, although it fails on loadTable, it doesn't really complain about namespaces, so I strongly believe we don't need any namespace specific changes here. I will check out the current change to verify also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After checking out your code with testing, I believe the fact you need those namespace operations are because you are calling the following code when load hudi table
catalogTable = sparkSession.sessionState().catalog().getTableMetadata(tableIdentifier);
This triggers the spark internal session catalog, which actually checks whether db exists. However, i don't think we need to do that for loading hudi tables. I think we should just create a V1Table, and let the HudiCatalog take care of the final table format. To construct V1Table, you can either do that manually with filling all possible field, or another approach is do it in a similar way as Unity Catalog here https://github.com/unitycatalog/unitycatalog/blob/main/connectors/spark/src/main/scala/io/unitycatalog/spark/UCSingleCatalog.scala#L283
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the CatalogTableCreation, maybe you can follow up on how hudi create the catalog table here https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/catalog/HoodieCatalog.scala#L293.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As for createTable, i think the real problem comes from here https://github.com/apache/hudi/blob/master/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/CreateHoodieTableCommand.scala#L198
Similar as delta, when we are using rest catalog as delegation, i think we should call into the catalogPlugin for table creation, instead of spark session catalog. https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L218
Delta today triggers a special check for unity catalog here https://github.com/delta-io/delta/blob/2d89954008b6c53e49744f09435136c5c63b9f2c/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala#L77, One way I am thinking is that we can introduce a special flag for Polaris SparkCatalog to represent that a third party catalog plugin is used, and then do similar thing as DeltaCatalog
@rahil-c sorry, i made my comment yesterday, but forgot to push it. I did a push, and added some more comments, please let me know if you have more questions about this!
|
No description provided.