Skip to content

Commit

Permalink
Improve readability
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored and nsivabalan committed Apr 21, 2022
1 parent 9acfdea commit 5aae2f9
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,

override type FileSplit = HoodieBaseFileSplit

override lazy val mandatoryColumns: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
override lazy val mandatoryFields: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
Seq(recordKeyField)

override def imbueConfigs(sqlContext: SQLContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*
* @VisibleInTests
*/
val mandatoryColumns: Seq[String]
val mandatoryFields: Seq[String]

protected def mandatoryRootFields: Seq[String] =
mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))

protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
Expand Down Expand Up @@ -245,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
// PROJECTION
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)

val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
Expand Down Expand Up @@ -361,12 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
!SubqueryExpression.hasSubquery(condition)
}

protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
// For a nested field in mandatory columns, we should first get the root-level field, and then
// check for any missing column, as the requestedColumns should only contain root-level fields
// We should only append root-level field as well
val missing = mandatoryColumns.map(col => HoodieAvroUtils.getRootLevelFieldName(col))
.filter(rootField => !requestedColumns.contains(rootField))
val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}

override lazy val mandatoryColumns: Seq[String] = {
override lazy val mandatoryFields: Seq[String] = {
// NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
// cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,

override type FileSplit = HoodieMergeOnReadFileSplit

override lazy val mandatoryColumns: Seq[String] =
override lazy val mandatoryFields: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())

protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi.functional

import org.apache.avro.Schema
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
Expand Down Expand Up @@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
logWarning(s"Not matching bytes read ($bytesRead)")
}

val readColumns = targetColumns ++ relation.mandatoryColumns
val readColumns = targetColumns ++ relation.mandatoryFields
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)

val row: InternalRow = rows.take(1).head
Expand Down

0 comments on commit 5aae2f9

Please sign in to comment.