Skip to content

Commit

Permalink
Change the logic of appendMandatoryColumns instead
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Apr 21, 2022
1 parent 85985e3 commit dd8db55
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ package org.apache.hudi
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.avro.HoodieAvroUtils.rewriteRecord
import org.apache.hudi.client.utils.SparkRowSerDe
import org.apache.hudi.common.config.TypedProperties
Expand Down Expand Up @@ -323,9 +322,7 @@ object HoodieSparkUtils extends SparkAdapterSupport {
val name2Fields = tableAvroSchema.getFields.asScala.map(f => f.name() -> f).toMap
// Here have to create a new Schema.Field object
// to prevent throwing exceptions like "org.apache.avro.AvroRuntimeException: Field already used".
// For a nested field, we include the root-level field
val requiredFields = requiredColumns.map(c => HoodieAvroUtils.getRootLevelFieldName(c))
.distinct.map(c => name2Fields(c))
val requiredFields = requiredColumns.map(c => name2Fields(c))
.map(f => new Schema.Field(f.name(), f.schema(), f.doc(), f.defaultVal(), f.order())).toList
val requiredAvroSchema = Schema.createRecord(tableAvroSchema.getName, tableAvroSchema.getDoc,
tableAvroSchema.getNamespace, tableAvroSchema.isError, requiredFields.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
Expand All @@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
Expand Down Expand Up @@ -336,7 +335,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
}

protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
// For a nested field in mandatory columns, we should first get the root-level field, and then
// check for any missing column, as the requestedColumns should only contain root-level fields
// We should only append root-level field as well
val missing = mandatoryColumns.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
.filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
}

Expand Down

0 comments on commit dd8db55

Please sign in to comment.