diff --git a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala index e99850bef06b8..70bc39151462c 100644 --- a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala +++ b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/Spark31HoodieParquetFileFormat.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.FileSplit import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl import org.apache.hadoop.mapreduce.{JobID, TaskAttemptID, TaskID, TaskType} import org.apache.hudi.HoodieSparkUtils @@ -135,14 +136,11 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo assert(!shouldAppendPartitionValues || file.partitionValues.numFields == partitionSchema.size) val filePath = new Path(new URI(file.filePath)) - val split = - new org.apache.parquet.hadoop.ParquetInputSplit( - filePath, - file.start, - file.start + file.length, - file.length, - Array.empty, - null) + /** + * from https://github.com/apache/spark/pull/29542 + * must use org.apache.hadoop.mapred.FileSplit + */ + val split = new FileSplit(filePath, file.start, file.length, Array.empty[String]) val sharedConf = broadcastedHadoopConf.value.value @@ -170,7 +168,11 @@ class Spark31HoodieParquetFileFormat(private val shouldAppendPartitionValues: Bo // Try to push down filters when filter push-down is enabled. val pushed = if (enableParquetFilterPushDown) { val parquetSchema = footerFileMetaData.getSchema - val parquetFilters = if (HoodieSparkUtils.gteqSpark3_1_3) { + /** + * hard code for adaption, because tspark port 3.3 api to 3.1 + */ + val ctor = classOf[ParquetFilters].getConstructors.head + val parquetFilters = if (8.equals(ctor.getParameterCount) || HoodieSparkUtils.gteqSpark3_1_3) { createParquetFilters( parquetSchema, pushDownDate,