Skip to content

Commit

Permalink
[HUDI-3008] Fixing HoodieFileIndex partition column parsing for neste…
Browse files Browse the repository at this point in the history
…d fields
  • Loading branch information
harsh1231 committed Dec 20, 2021
1 parent 9eddfe4 commit e890932
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ case class HoodieFileIndex(
private lazy val _partitionSchemaFromProperties: StructType = {
val tableConfig = metaClient.getTableConfig
val partitionColumns = tableConfig.getPartitionFields
val nameFieldMap = convertStructToMap(Right(schema))
val nameFieldMap = generateNameFieldMap(Right(schema))

if (partitionColumns.isPresent) {
val partitionFields = partitionColumns.get().map(column =>
Expand All @@ -128,12 +128,12 @@ case class HoodieFileIndex(
* @param structField
* @return map of ( columns names -> StructField )
*/
private def convertStructToMap(structField : Either[StructField,StructType]) : Map[String,StructField] = {
private def generateNameFieldMap(structField : Either[StructField, StructType]) : Map[String, StructField] = {
structField match {
case Right(field) => field.fields.map(f => convertStructToMap(Left(f))).flatten.toMap
case Right(field) => field.fields.map(f => generateNameFieldMap(Left(f))).flatten.toMap
case Left(field) => field.dataType match {
case struct: StructType => convertStructToMap(Right(struct)).map {
case (key :String , sf:StructField) => (field.name + "." + key , sf )
case struct: StructType => generateNameFieldMap(Right(struct)).map {
case (key: String, sf: StructField) => (field.name + "." + key, sf)
}
case _ => Map(field.name -> field)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.hudi

import java.util.Properties

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.table.HoodieTableMetaClient
Expand All @@ -31,14 +30,15 @@ import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator.{Config, TimestampType}
import org.apache.hudi.testutils.HoodieClientTestBase
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.functions.{lit, struct}
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, GreaterThanOrEqual, LessThan, Literal}
import org.apache.spark.sql.execution.datasources.PartitionDirectory
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.{CsvSource, ValueSource}

import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -253,6 +253,29 @@ class TestHoodieFileIndex extends HoodieClientTestBase {
assertEquals(5, readDF2.filter("dt = '2021/03/01' and hh ='10'").count())
}

@ParameterizedTest
@CsvSource(Array("true,a.b.c","false,a.b.c","true,c","false,c"))
def testQueryPartitionPathsForNestedPartition(useMetaFileList:Boolean, partitionBy:String): Unit = {
val inputDF = spark.range(100)
.withColumn("c",lit("c"))
.withColumn("b",struct("c"))
.withColumn("a",struct("b"))
inputDF.write.format("hudi")
.options(commonOpts)
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(RECORDKEY_FIELD.key, "id")
.option(PRECOMBINE_FIELD.key, "id")
.option(PARTITIONPATH_FIELD.key, partitionBy)
.option(HoodieMetadataConfig.ENABLE.key(), useMetaFileList)
.mode(SaveMode.Overwrite)
.save(basePath)
metaClient = HoodieTableMetaClient.reload(metaClient)
val fileIndex = HoodieFileIndex(spark, metaClient, None,
queryOpts ++ Map(HoodieMetadataConfig.ENABLE.key -> useMetaFileList.toString))
// test if table is partitioned on nested columns, getAllQueryPartitionPaths does not break
assert(fileIndex.getAllQueryPartitionPaths.get(0).partitionPath.equals("c"))
}

private def attribute(partition: String): AttributeReference = {
AttributeReference(partition, StringType, true)()
}
Expand Down

0 comments on commit e890932

Please sign in to comment.