-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-3008] Fixing HoodieFileIndex partition column parsing for nested fields #4308
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,7 +18,6 @@ | |
package org.apache.hudi | ||
|
||
import org.apache.hadoop.fs.{FileStatus, Path} | ||
|
||
import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL} | ||
import org.apache.hudi.client.common.HoodieSparkEngineContext | ||
import org.apache.hudi.common.config.HoodieMetadataConfig | ||
|
@@ -27,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice | |
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ | ||
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView} | ||
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver} | ||
|
||
import org.apache.spark.api.java.JavaSparkContext | ||
import org.apache.spark.internal.Logging | ||
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate} | ||
|
@@ -37,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, N | |
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr | ||
import org.apache.spark.sql.hudi.HoodieSqlUtils | ||
import org.apache.spark.sql.internal.SQLConf | ||
import org.apache.spark.sql.types.StructType | ||
import org.apache.spark.sql.types.{StructField, StructType} | ||
import org.apache.spark.sql.{AnalysisException, Column, SparkSession} | ||
import org.apache.spark.unsafe.types.UTF8String | ||
|
||
|
@@ -108,7 +106,7 @@ case class HoodieFileIndex( | |
private lazy val _partitionSchemaFromProperties: StructType = { | ||
val tableConfig = metaClient.getTableConfig | ||
val partitionColumns = tableConfig.getPartitionFields | ||
val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap | ||
val nameFieldMap = generateNameFieldMap(Right(schema)) | ||
|
||
if (partitionColumns.isPresent) { | ||
val partitionFields = partitionColumns.get().map(column => | ||
|
@@ -123,6 +121,25 @@ case class HoodieFileIndex( | |
} | ||
} | ||
|
||
/** | ||
* This method traverses StructType recursively to build map of columnName -> StructField | ||
* Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding | ||
* only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"] | ||
* @param structField | ||
* @return map of ( columns names -> StructField ) | ||
*/ | ||
private def generateNameFieldMap(structField: Either[StructField, StructType]) : Map[String, StructField] = { | ||
structField match { | ||
case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap | ||
case Left(field) => field.dataType match { | ||
case struct: StructType => generateNameFieldMap(Right(struct)).map { | ||
case (key: String, sf: StructField) => (field.name + "." + key, sf) | ||
} | ||
case _ => Map(field.name -> field) | ||
} | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pls notice the code style of scala: |
||
private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext)) | ||
|
||
private lazy val configProperties = { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please delete the space before ‘:’
private def generateNameFieldMap(structField : Either[StructField, StructType]) : Map[String, StructField] =>
private def generateNameFieldMap(structField: Either[StructField, StructType]): Map[String, StructField]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done, thanks for pointing out code style of scala.