Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-6198] Support Hudi on Spark 3.4.0 #8885

Merged
merged 42 commits into from
Jun 10, 2023
Merged

Conversation

yihua
Copy link
Contributor

@yihua yihua commented Jun 5, 2023

Change Logs

This PR adds the support of Apache Hudi on Spark 3.4.0.

There are a few changes in Spark 3.4.0 that the Hudi Spark integration needs to adjust to, particularly:

  • SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED (spark.sql.parquet.inferTimestampNTZ.enabled) is required for parquet reading
  • case class changes of MergeIntoTable (five to six arguments)
Before Spark 3.4.0 (five arguments):
 case class MergeIntoTable(
    targetTable: LogicalPlan,
    sourceTable: LogicalPlan,
    mergeCondition: Expression,
    matchedActions: Seq[MergeAction],
    notMatchedActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery
   
Since Spark 3.4.0 (six arguments):
 case class MergeIntoTable(
    targetTable: LogicalPlan,
    sourceTable: LogicalPlan,
    mergeCondition: Expression,
    matchedActions: Seq[MergeAction],
    notMatchedActions: Seq[MergeAction],
    notMatchedBySourceActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery
  • API change of SchemaUtils.checkColumnNameDuplication
  • PartitionedFile change
Before Spark 3.4.0,
case class PartitionedFile(
  partitionValues: InternalRow,
  filePath: String,
  start: Long,
  @transient locations: Array[String] = Array.empty)

Since Spark 3.4.0, the filePath is switch to [[SparkPath]] for type safety:
case class PartitionedFile(
  partitionValues: InternalRow,
  filePath: SparkPath,
  start: Long,
  length: Long,
  @transient locations: Array[String] = Array.empty,
  modificationTime: Long = 0L,
  fileSize: Long = 0L)

To support Apache Hudi on Spark 3.4.0, here the changes made:

  • Adds adapter support for Spark 3.4 in SparkAdapterSupport and HoodieSparkUtils.
  • Adds adapter framework changes and Hudi-related logic adjustment:
    • To account for the config SQLConf.PARQUET_INFER_TIMESTAMP_NTZ_ENABLED (spark.sql.parquet.inferTimestampNTZ.enabled) required for parquet reading in Spark 3.4.0, the config is set now in HoodieSparkFileReaderFactory.
    • To account for case class changes of MergeIntoTable (five to six arguments), a new function def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)] in HoodieCatalystPlansUtils` and corresponding implementation in each Spark version are added in the Spark adapter framework.
    • To account for the API change of SchemaUtils.checkColumnNameDuplication, HoodieSchemaUtils and checkColumnNameDuplication function and corresponding implementation in each Spark version are added to the Spark adapter framework.
    • To account for PartitionedFile change where the filePath is switch to [[SparkPath]] for type safety, the HoodieSparkPartitionedFileUtils and corresponding implementation in each Spark version are added to the Spark adapter framework.
    • Creates BASE_PATH_PARAM = "basePath" as Spark 3.4.0 uses FileIndexOptions.BASE_PATH_PARAM instead PartitioningAwareFileIndex.BASE_PATH_PARAM
    • Uses the above adapter functions to adjust logic in BaseFileOnlyRelation, HoodieBaseRelation, HoodieBootstrapRelation, HoodieDataSourceHelper, Iterators, MergeOnReadSnapshotRelation, HoodieCDCRDD, AlterHoodieTableAddColumnsCommand, and HoodieAnalysis.
  • Spark 3.4 now supports vectorized reader on nested fields. However, Hudi does not support this yet due to custom schema evolution logic. So we add logic to override supportBatch in HoodieParquetFileFormat for Spark 3.4.
  • Uses ColumnVector instead of WritableColumnVector to get general in Spark32PlusHoodieVectorizedParquetRecordReader
  • Adds hudi-spark3.4.x module by mainly copying the code from hudi-spark3.3.x module and making following adjustments (more details below on the code comparison):
    • Spark32PlusHoodieParquetFileFormat is removed. Each spark version module has its own file format class. Spark34HoodieParquetFileFormat adds custom logic on when to enable vectorized reader based on various conditions, compared to Spark33HoodieParquetFileFormat.
    • HoodieSpark34PartitionedFileUtils is different from HoodieSpark33PartitionedFileUtils because of PartitionedFile changes
    • Spark34ResolveHudiAlterTableCommand adjusts logic based on AlterColumn class changes.
    • HoodieSpark3_4ExtendedSqlAstBuilder adjusts logic based on Spark 3.4.
    • HoodieSpark34CatalystExpressionUtils adjusts logic based on ParseToTimestamp change and removal of AnsiCast compared to HoodieSpark33CatalystExpressionUtils.
    • HoodieSpark34CatalystPlanUtils adjusts logic based on MergeIntoTable change.
    • HoodieSpark34SchemaUtils is different from HoodieSpark33SchemaUtils because of API change of SchemaUtils.checkColumnNameDuplication
  • Adds Github Actions to run Spark tests and bundle validation on Spark 3.4.0.
  • Adjusts tests due to changes in Spark 3.4.0.

This PR address all compatibility issues in Hudi Spark integration across Spark versions on top of the #8826, which adds support of Hudi for Spark 3.4.0 on Hudi 0.13.1 release only. @mansipp, @rahil-c, @CTTY, and @umehrot2 have significantly contributed to #8826 to make Hudi 0.13.1 release work on Spark 3.4.0 for EMR release. This PR makes sure all recent changes in Hudi Spark integration since Hudi 0.13.1 are considered and taken care of, including:

More details

Major changes from hudi-spark3.3.x to hudi-spark3.4.x module:
Spark33HoodieParquetFileFormat vs Spark34HoodieParquetFileFormat:

Screenshot 2023-06-08 at 09 14 58 Screenshot 2023-06-08 at 09 15 11

HoodieSpark33PartitionedFileUtils vs HoodieSpark34PartitionedFileUtils:

Screenshot 2023-06-08 at 09 15 59

Spark33ResolveHudiAlterTableCommand vs Spark34ResolveHudiAlterTableCommand:

Screenshot 2023-06-08 at 09 17 06

HoodieSpark3_3ExtendedSqlAstBuilder vs HoodieSpark3_4ExtendedSqlAstBuilder:

Screenshot 2023-06-08 at 09 17 53 Screenshot 2023-06-08 at 09 18 15 Screenshot 2023-06-08 at 09 18 29 Screenshot 2023-06-08 at 09 18 52 Screenshot 2023-06-08 at 09 19 16

HoodieSpark33CatalystExpressionUtils vs HoodieSpark34CatalystExpressionUtils

Screenshot 2023-06-08 at 09 20 56

HoodieSpark33CatalystPlanUtils vs HoodieSpark34CatalystPlanUtils

Screenshot 2023-06-08 at 09 21 40

HoodieSpark33SchemaUtils vs HoodieSpark34SchemaUtils

Screenshot 2023-06-08 at 09 22 07

Impact

This enables Spark 3.4.0 to run Hudi workload.

Risk level

medium

The PR is thoroughly tested by the Java CI, Azure CI, and integration tests on EMR and S3.

Documentation Update

We'll update Hudi website on the Spark 3.4.0 support: HUDI-6341.

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@melin
Copy link

melin commented Jun 6, 2023

spark 2.4 and spark 3.0 are not supported by iceberg recently. Does hudi consider reducing the maintenance of the version

@yihua
Copy link
Contributor Author

yihua commented Jun 6, 2023

spark 2.4 and spark 3.0 are not supported by iceberg recently. Does hudi consider reducing the maintenance of the version

Maintaining compatibility across multiple Spark versions is indeed challenging and time-taking. However, there are still users using Hudi on Spark 2.4 and 3.0, so we cannot drop the support on Spark 2.4 and 3.0 at least for the next two releases.

pom.xml Outdated Show resolved Hide resolved
hudi-common/pom.xml Outdated Show resolved Hide resolved
@yihua yihua added dependencies Pull requests that update a dependency file priority:blocker spark Issues related to spark release-0.14.0 labels Jun 7, 2023
@apache apache deleted a comment from hudi-bot Jun 7, 2023
@yihua yihua changed the title [DNM][HUDI-6198] Support Hudi on Spark 3.4.0 [HUDI-6198] Support Hudi on Spark 3.4.0 Jun 8, 2023
@apache apache deleted a comment from hudi-bot Jun 8, 2023
@@ -34,6 +34,15 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport

override def toString: String = "Hoodie-Parquet"

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FR: Spark 3.4 now supports vectorized reader on nested fields. However, Hudi does not support this yet due to custom schema evolution logic. So we add logic to override supportBatch in HoodieParquetFileFormat for Spark 3.4.

Copy link
Contributor

@rahil-c rahil-c Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add the JIRA for this as a comment here? https://issues.apache.org/jira/browse/HUDI-6262 for why we have this override, and that we should remove this once we support complex types in hudi?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JIRA to track: HUDI-6347

@@ -56,30 +58,37 @@ class TestIndexSyntax extends HoodieSparkSqlTestBase {

var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName")
var resolvedLogicalPlan = analyzer.execute(logicalPlan)
assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].table.identifier.quotedString)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FR: table.identifier.quotedString now also has catalog name as the prefix.

@@ -733,8 +734,8 @@ object HoodieBaseRelation extends SparkAdapterSupport {

partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.get()
val reader = new HoodieAvroHFileReader(hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf))
val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(partitionedFile)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Reviewer (FR): all the changes in the common module of introducing new adapter support are because of Spark 3.4 class and API changes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg

@yihua
Copy link
Contributor Author

yihua commented Jun 9, 2023

Hi @zhangyue19921010 @xiarixiaoyao @nsivabalan @xushiyan @danny0405, could you also review this PR?

@yihua
Copy link
Contributor Author

yihua commented Jun 9, 2023

@CTTY Thanks for the review. I addressed all your comments. @rahil-c @mansipp let me know if you have more comments.

// dialect: JdbcDialect,
// alwaysNullable: Boolean = false,
// isTimestampNTZ: Boolean = false): StructType
JdbcUtils.getSchema(resultSet, dialect, alwaysNullable)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, where does this get used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is used by UtilHelpers.getJDBCSchema for JDBC-based schema provider.

* notMatchedActions: Seq[MergeAction],
* notMatchedBySourceActions: Seq[MergeAction]) extends BinaryCommand with SupportsSubquery
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, where does this get used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is ultimately used in HoodieAnalysis to match the plan.

override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
this.copy(prunedDataSchema = Some(prunedSchema))

override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
// TODO Issue with setting this to true in spark 332
if (!HoodieSparkUtils.gteqSpark3_3_2) {
if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {
Copy link
Contributor

@rahil-c rahil-c Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yihua Now when I think about it, I think the changes we made in 3.4(Specifically around supportBatch and supportsColumnar in ParquetFileFormat) should be brought to our 3.3.2 impl. This would then allow us to remove this if check all together, and allow vectorizedReader to be enabled regardless of Spark version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say let's keep the logic for other spark versions as is and not balloon the scope of this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if we sync I can try to help you get past this error for the other spark version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JIRA to track: HUDI-6347

@@ -34,6 +34,15 @@ class HoodieParquetFileFormat extends ParquetFileFormat with SparkAdapterSupport

override def toString: String = "Hoodie-Parquet"

override def supportBatch(sparkSession: SparkSession, schema: StructType): Boolean = {
if (HoodieSparkUtils.gteqSpark3_4) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Im wondering if this should be for greater than equal to spark 332, so that we can have this fix there as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's take this separately since this PR is not related to Spark 3.3.2. The code here is to make sure all other versions maintain the same logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests fail for other spark versions if I don't add this check.

Merge Hudi to Hudi *** FAILED ***
2023-06-06T23:38:24.7660935Z   org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3194.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3194.0 (TID 3768) (fv-az1128-658 executor driver): java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatchRow cannot be cast to org.apache.spark.sql.vectorized.ColumnarBatch
2023-06-06T23:38:24.7662056Z 	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:560)
2023-06-06T23:38:24.7662628Z 	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.next(DataSourceScanExec.scala:549)
2023-06-06T23:38:24.7663391Z 	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)

* <li>Schema on-read</li>
* </ol>
*/
class Spark33HoodieParquetFileFormat(private val shouldAppendPartitionValues: Boolean) extends ParquetFileFormat {
Copy link
Contributor

@rahil-c rahil-c Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, for this class we migrated logic as what spark32plus originally had? Are there any other changes we have added in this, and is this what spark 332 will use?

Im thinking if we should make a ParquertFileFormat for Spark 332 and add a similar supportBatch and supportColumnar so we can avoid cast issues instead of disabling vectorized reader for spark 332

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is copied from Spark32PlusHoodieParquetFileFormat. Let's take the Spark 3.3.2 improvement in a separate PR and not put everything in the same.

@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Copy link
Contributor

@nsivabalan nsivabalan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. few minor comments. Good job folks on the 3.4.0 support 👏

* @param partitionedFile Spark [[PartitionedFile]] instance.
* @return Hadoop [[Path]] instance.
*/
def getPathFromPartitionedFile(partitionedFile: PartitionedFile): Path
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may I know why do we need two methods, one to fetch string version and another to fetch Path. we can return string and the caller can construct the Path if need be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SparkPath has toPath function to efficiently return the Path instance in Spark 3.4, so I opt to have two separate APIs here.

override def updatePrunedDataSchema(prunedSchema: StructType): Relation =
this.copy(prunedDataSchema = Some(prunedSchema))

override def imbueConfigs(sqlContext: SQLContext): Unit = {
super.imbueConfigs(sqlContext)
// TODO Issue with setting this to true in spark 332
if (!HoodieSparkUtils.gteqSpark3_3_2) {
if (HoodieSparkUtils.gteqSpark3_4 || !HoodieSparkUtils.gteqSpark3_3_2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we introduce "HoodieSparkUtils.ltSpark3_3_2" instead of "!HoodieSparkUtils.gteqSpark3_3_2"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. I keep the existing code the same in this PR.

@@ -733,8 +734,8 @@ object HoodieBaseRelation extends SparkAdapterSupport {

partitionedFile => {
val hadoopConf = hadoopConfBroadcast.value.get()
val reader = new HoodieAvroHFileReader(hadoopConf, new Path(partitionedFile.filePath),
new CacheConfig(hadoopConf))
val filePath = sparkAdapter.getSparkPartitionedFileUtils.getPathFromPartitionedFile(partitionedFile)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg

@yihua
Copy link
Contributor Author

yihua commented Jun 10, 2023

Thank you folks for the review. I'll merge this PR now. Feel free to add more comments if there is more, given we're having ongoing discussion on the Hudi Spark integration.

@yihua yihua merged commit 96ca7aa into apache:master Jun 10, 2023
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dependencies Pull requests that update a dependency file priority:blocker release-0.14.0 spark Issues related to spark
Projects
Archived in project
Development

Successfully merging this pull request may close these issues.

None yet

7 participants