Skip to content

Commit

Permalink
[HUDI-3008] Fixes bug in hoodieDeltaStreamer for nested partition lookup
Browse files Browse the repository at this point in the history
  • Loading branch information
harsh1231 committed Dec 14, 2021
1 parent c8d6bd8 commit f0763e1
Showing 1 changed file with 21 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hudi

import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.hudi.DataSourceReadOptions.{QUERY_TYPE, QUERY_TYPE_SNAPSHOT_OPT_VAL}
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.config.HoodieMetadataConfig
Expand All @@ -27,7 +26,6 @@ import org.apache.hudi.common.model.FileSlice
import org.apache.hudi.common.model.HoodieTableType.MERGE_ON_READ
import org.apache.hudi.common.table.view.{FileSystemViewStorageConfig, HoodieTableFileSystemView}
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.avro.SchemaConverters
Expand All @@ -39,7 +37,7 @@ import org.apache.spark.sql.execution.datasources.{FileIndex, FileStatusCache, N
import org.apache.spark.sql.hudi.DataSkippingUtils.createColumnStatsIndexFilterExpr
import org.apache.spark.sql.hudi.HoodieSqlUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.{AnalysisException, Column, SparkSession}
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -110,7 +108,7 @@ case class HoodieFileIndex(
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = schema.fields.map(filed => filed.name -> filed).toMap
val nameFieldMap = convertStructToMap(Right(schema))

if (partitionColumns.isPresent) {
val partitionFields = partitionColumns.get().map(column =>
Expand All @@ -125,6 +123,25 @@ case class HoodieFileIndex(
}
}

/**
* This method traverses StructType recursively to build map of columnName -> StructField
* Note : If there is nesting of columns like ["a.b.c.d", "a.b.c.e"] -> final map will have keys corresponding
* only to ["a.b.c.d", "a.b.c.e"] and not for subsets like ["a.b.c", "a.b"]
* @param structField
* @return map of ( columns names -> StructField )
*/
private def convertStructToMap(structField : Either[StructField,StructType]) : Map[String,StructField] = {
structField match {
case Right(field) => field.fields.map(f => convertStructToMap(Left(f))).flatten.toMap
case Left(field) => field.dataType match {
case struct: StructType => convertStructToMap(Right(struct)).map {
case (key :String , sf:StructField) => (field.name + "." + key , sf )
}
case _ => Map(field.name -> field)
}
}
}

private lazy val engineContext = new HoodieSparkEngineContext(new JavaSparkContext(spark.sparkContext))

private lazy val configProperties = {
Expand Down

0 comments on commit f0763e1

Please sign in to comment.