Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,17 @@ import org.apache.hudi.common.config.HoodieMetadataConfig.{DEFAULT_METADATA_ENAB
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver}
import org.apache.hudi.common.table.timeline.HoodieTimeline
import org.apache.hudi.common.util.StringUtils.isNullOrEmpty
import org.apache.hudi.common.util.{ConfigUtils, StringUtils}
import org.apache.hudi.common.util.ValidationUtils.checkState
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter
import org.apache.hudi.keygen.{CustomAvroKeyGenerator, CustomKeyGenerator, TimestampBasedAvroKeyGenerator, TimestampBasedKeyGenerator}
import org.apache.hudi.metadata.HoodieTableMetadataUtil
import org.apache.hudi.storage.StoragePath
import org.apache.spark.sql.{SparkSession, SQLContext}

import org.apache.spark.sql.{SQLContext, SparkSession}
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.execution.datasources.{FileFormat, FileIndex, FileStatusCache, HadoopFsRelation, HoodieMultipleBaseFileFormat}
Expand All @@ -60,7 +63,8 @@ trait HoodieHadoopFsRelationFactory {
abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
val metaClient: HoodieTableMetaClient,
val options: Map[String, String],
val schemaSpec: Option[StructType]
val schemaSpec: Option[StructType],
val isBootstrap: Boolean
) extends SparkAdapterSupport with HoodieHadoopFsRelationFactory {
protected lazy val sparkSession: SparkSession = sqlContext.sparkSession
protected lazy val optParams: Map[String, String] = options
Expand All @@ -75,6 +79,19 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
protected lazy val basePath: StoragePath = metaClient.getBasePath
protected lazy val partitionColumns: Array[String] = tableConfig.getPartitionFields.orElse(Array.empty)

private lazy val keygenTypeHasVariablePartitionCols = !isNullOrEmpty(tableConfig.getKeyGeneratorClassName) &&
(tableConfig.getKeyGeneratorClassName.equals(classOf[TimestampBasedKeyGenerator].getName) ||
tableConfig.getKeyGeneratorClassName.equals(classOf[TimestampBasedAvroKeyGenerator].getName) ||
tableConfig.getKeyGeneratorClassName.equals(classOf[CustomKeyGenerator].getName) ||
tableConfig.getKeyGeneratorClassName.equals(classOf[CustomAvroKeyGenerator].getName))

protected lazy val partitionColumnsToRead: Seq[String] = if (shouldExtractPartitionValuesFromPartitionPath || !keygenTypeHasVariablePartitionCols) {
Seq.empty
} else {
//TODO: [HUDI-8098] filter for timestamp keygen columns when using custom keygen
tableConfig.getPartitionFields.orElse(Array.empty).toSeq
}

protected lazy val (tableAvroSchema: Schema, internalSchemaOpt: Option[InternalSchema]) = {
val schemaResolver = new TableSchemaResolver(metaClient)
val internalSchemaOpt = if (!isSchemaEvolutionEnabledOnRead(optParams, sparkSession)) {
Expand Down Expand Up @@ -165,12 +182,9 @@ abstract class HoodieBaseHadoopFsRelationFactory(val sqlContext: SQLContext,
val shouldExtractPartitionValueFromPath =
optParams.getOrElse(DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.key,
DataSourceReadOptions.EXTRACT_PARTITION_VALUES_FROM_PARTITION_PATH.defaultValue.toString).toBoolean
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath
shouldOmitPartitionColumns || shouldExtractPartitionValueFromPath || isBootstrap
}

protected lazy val mandatoryFieldsForMerging: Seq[String] =
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all this is computed in the fg reader so we don't need this

Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())

protected lazy val shouldUseRecordPosition: Boolean = checkIfAConfigurationEnabled(HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS)

protected def queryTimestamp: Option[String] =
Expand Down Expand Up @@ -200,7 +214,7 @@ class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
override val options: Map[String, String],
override val schemaSpec: Option[StructType],
isBootstrap: Boolean)
extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec) {
extends HoodieBaseHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) {

val fileIndex: HoodieFileIndex = new HoodieFileIndex(
sparkSession,
Expand Down Expand Up @@ -229,7 +243,7 @@ class HoodieMergeOnReadSnapshotHadoopFsRelationFactory(override val sqlContext:
recordMergerImpls = recordMergerImpls,
recordMergerStrategy = recordMergerStrategy
)
val mandatoryFields: Seq[String] = mandatoryFieldsForMerging
val mandatoryFields: Seq[String] = partitionColumnsToRead

override def buildFileIndex(): FileIndex = fileIndex

Expand Down Expand Up @@ -271,7 +285,7 @@ class HoodieMergeOnReadIncrementalHadoopFsRelationFactory(override val sqlContex
isBootstrap: Boolean)
extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) {

override val mandatoryFields: Seq[String] = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ mandatoryFieldsForMerging
override val mandatoryFields: Seq[String] = Seq(HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++ partitionColumnsToRead

override val fileIndex = new HoodieIncrementalFileIndex(
sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), true, true)
Expand Down Expand Up @@ -299,7 +313,7 @@ class HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(override val sqlContext:
isBootstrap: Boolean)
extends HoodieMergeOnReadSnapshotHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) {

override val mandatoryFields: Seq[String] = Seq.empty
override val mandatoryFields: Seq[String] = partitionColumnsToRead

override val fileIndex: HoodieFileIndex = HoodieFileIndex(
sparkSession,
Expand Down Expand Up @@ -332,7 +346,7 @@ class HoodieCopyOnWriteIncrementalHadoopFsRelationFactory(override val sqlContex
extends HoodieCopyOnWriteSnapshotHadoopFsRelationFactory(sqlContext, metaClient, options, schemaSpec, isBootstrap) {

override val mandatoryFields: Seq[String] = Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())
preCombineFieldOpt.map(Seq(_)).getOrElse(Seq()) ++ partitionColumnsToRead

override val fileIndex = new HoodieIncrementalFileIndex(
sparkSession, metaClient, schemaSpec, options, FileStatusCache.getOrCreate(sparkSession), false, isBootstrap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,18 @@ import org.apache.hudi.common.table.read.HoodieFileGroupReader
import org.apache.hudi.internal.schema.InternalSchema
import org.apache.hudi.storage.StorageConfiguration
import org.apache.hudi.storage.hadoop.{HadoopStorageConfiguration, HoodieHadoopStorage}

import org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.JoinedRow
import org.apache.spark.sql.execution.datasources.PartitionedFile
import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, OnHeapColumnVector}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PARQUET_VECTORIZED_READER_ENABLED
import org.apache.spark.sql.sources.Filter
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.{ColumnarBatch, ColumnarBatchUtils}
import org.apache.spark.util.SerializableConfiguration

import java.io.Closeable
Expand Down Expand Up @@ -87,6 +91,32 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
supportBatchResult
}

//for partition columns that we read from the file, we don't want them to be constant column vectors so we
//modify the vector types in this scenario
override def vectorTypes(requiredSchema: StructType,
partitionSchema: StructType,
sqlConf: SQLConf): Option[Seq[String]] = {
val originalVectorTypes = super.vectorTypes(requiredSchema, partitionSchema, sqlConf)
if (mandatoryFields.isEmpty) {
originalVectorTypes
} else {
val regularVectorType = if (!sqlConf.offHeapColumnVectorEnabled) {
classOf[OnHeapColumnVector].getName
} else {
classOf[OffHeapColumnVector].getName
}
originalVectorTypes.map {
o: Seq[String] => o.zipWithIndex.map(a => {
if (a._2 >= requiredSchema.length && mandatoryFields.contains(partitionSchema.fields(a._2 - requiredSchema.length).name)) {
regularVectorType
} else {
a._1
}
})
}
}
}

private lazy val internalSchemaOpt: org.apache.hudi.common.util.Option[InternalSchema] = if (tableSchema.internalSchema.isEmpty) {
org.apache.hudi.common.util.Option.empty()
} else {
Expand All @@ -104,16 +134,21 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
//dataSchema is not always right due to spark bugs
val partitionColumns = partitionSchema.fieldNames
val preCombineField = options.getOrElse(HoodieTableConfig.PRECOMBINE_FIELD.key, "")
val dataSchema = StructType(tableSchema.structTypeSchema.fields.filter(f => !partitionColumns.contains(f.name)
|| preCombineField.equals(f.name)))
val outputSchema = StructType(requiredSchema.fields ++ partitionSchema.fields)
val isCount = requiredSchema.isEmpty && !isMOR && !isIncremental
val augmentedStorageConf = new HadoopStorageConfiguration(hadoopConf).getInline
setSchemaEvolutionConfigs(augmentedStorageConf)
val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requiredSchema, sanitizedTableName)
val (remainingPartitionSchemaArr, fixedPartitionIndexesArr) = partitionSchema.fields.toSeq.zipWithIndex.filter(p => !mandatoryFields.contains(p._1.name)).unzip

// The schema of the partition cols we want to append the value instead of reading from the file
val remainingPartitionSchema = StructType(remainingPartitionSchemaArr)

// index positions of the remainingPartitionSchema fields in partitionSchema
val fixedPartitionIndexes = fixedPartitionIndexesArr.toSet

// schema that we want fg reader to output to us
val requestedSchema = StructType(requiredSchema.fields ++ partitionSchema.fields.filter(f => mandatoryFields.contains(f.name)))
val requestedAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(requestedSchema, sanitizedTableName)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

filter out partition columns that we will read from the parquet. Also get the indexes in the partition schema so that we can prune the partition column values to only the remaining partition schema columns

val dataAvroSchema = AvroConversionUtils.convertStructTypeToAvroSchema(dataSchema, sanitizedTableName)
val parquetFileReader = spark.sparkContext.broadcast(sparkAdapter.createParquetFileReader(supportBatchResult,
spark.sessionState.conf, options, augmentedStorageConf.unwrap()))
Expand Down Expand Up @@ -154,18 +189,23 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
// Append partition values to rows and project to output schema
appendPartitionAndProject(
reader.getClosableIterator,
requiredSchema,
partitionSchema,
requestedSchema,
remainingPartitionSchema,
outputSchema,
fileSliceMapping.getPartitionValues)
fileSliceMapping.getPartitionValues,
fixedPartitionIndexes)

case _ => parquetFileReader.value.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
case _ =>
readBaseFile(file, parquetFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes,
requiredSchema, partitionSchema, outputSchema, filters, storageConf)
}
// CDC queries.
case hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping =>
buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping, parquetFileReader.value, storageConf, fileIndexProps, requiredSchema)

case _ => parquetFileReader.value.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
case _ =>
readBaseFile(file, parquetFileReader.value, requestedSchema, remainingPartitionSchema, fixedPartitionIndexes,
requiredSchema, partitionSchema, outputSchema, filters, storageConf)
}
}
}
Expand All @@ -177,11 +217,11 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
}
}

protected def buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping,
parquetFileReader: SparkParquetReader,
storageConf: StorageConfiguration[Configuration],
props: TypedProperties,
requiredSchema: StructType): Iterator[InternalRow] = {
private def buildCDCRecordIterator(hoodiePartitionCDCFileGroupSliceMapping: HoodiePartitionCDCFileGroupMapping,
parquetFileReader: SparkParquetReader,
storageConf: StorageConfiguration[Configuration],
props: TypedProperties,
requiredSchema: StructType): Iterator[InternalRow] = {
val fileSplits = hoodiePartitionCDCFileGroupSliceMapping.getFileSplits().toArray
val cdcFileGroupSplit: HoodieCDCFileGroupSplit = HoodieCDCFileGroupSplit(fileSplits)
props.setProperty(HoodieTableConfig.HOODIE_TABLE_NAME_KEY, tableName)
Expand All @@ -204,13 +244,22 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
inputSchema: StructType,
partitionSchema: StructType,
to: StructType,
partitionValues: InternalRow): Iterator[InternalRow] = {
partitionValues: InternalRow,
fixedPartitionIndexes: Set[Int]): Iterator[InternalRow] = {
if (partitionSchema.isEmpty) {
//'inputSchema' and 'to' should be the same so the projection will just be an identity func
projectSchema(iter, inputSchema, to)
} else {
val fixedPartitionValues = if (partitionSchema.length == partitionValues.numFields) {
//need to append all of the partition fields
partitionValues
} else {
//some partition fields read from file, some were not
getFixedPartitionValues(partitionValues, partitionSchema, fixedPartitionIndexes)
}
val unsafeProjection = generateUnsafeProjection(StructType(inputSchema.fields ++ partitionSchema.fields), to)
val joinedRow = new JoinedRow()
makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(joinedRow(d, partitionValues)))
makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(joinedRow(d, fixedPartitionValues)))
}
}

Expand All @@ -221,8 +270,8 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
makeCloseableFileGroupMappingRecordIterator(iter, d => unsafeProjection(d))
}

def makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
mappingFunction: Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
private def makeCloseableFileGroupMappingRecordIterator(closeableFileGroupRecordIterator: HoodieFileGroupReader.HoodieFileGroupReaderIterator[InternalRow],
mappingFunction: Function[InternalRow, InternalRow]): Iterator[InternalRow] = {
new Iterator[InternalRow] with Closeable {
override def hasNext: Boolean = closeableFileGroupRecordIterator.hasNext

Expand All @@ -231,4 +280,43 @@ class HoodieFileGroupReaderBasedParquetFileFormat(tableState: HoodieTableState,
override def close(): Unit = closeableFileGroupRecordIterator.close()
}
}

private def readBaseFile(file: PartitionedFile, parquetFileReader: SparkParquetReader, requestedSchema: StructType,
remainingPartitionSchema: StructType, fixedPartitionIndexes: Set[Int], requiredSchema: StructType,
partitionSchema: StructType, outputSchema: StructType, filters: Seq[Filter],
storageConf: StorageConfiguration[Configuration]): Iterator[InternalRow] = {
if (remainingPartitionSchema.fields.length == partitionSchema.fields.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are all of these different branches covered by the tests?

Copy link
Contributor Author

@jonvex jonvex Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tested with TestSparkSqlWithCustomKeyGenerator by changing partitionColumnsToRead in HoodieHadoopFsRelationFactory by doing:

//TODO: [HUDI-8098] filter for timestamp keygen columns when using custom keygen
    tableConfig.getPartitionFields.orElse(Array.empty).filter(p => p == "ts").toSeq

to fake what [HUDI-8036] + [HUDI-8098] will do. This exposed a case that I didn't test. For MOR with log files where we read some, but not all of the partition columns, I was not doing appending correctly. I have updated HoodieFileGroupReaderBasedParquetFileFormat.appendPartitionAndProject to do this correctly now.

I feel like appendPartitionAndProject and readBaseFile have overlapping logic, but can't think of a better way to do this for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a follow-up ticket to write proper tests covering the scenarios, after HUDI-8036 and HUDI-8098 are landed.

//none of partition fields are read from the file, so the reader will do the appending for us
parquetFileReader.read(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, storageConf)
} else if (remainingPartitionSchema.fields.length == 0) {
//we read all of the partition fields from the file
val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
//we need to modify the partitioned file so that the partition values are empty
val modifiedFile = pfileUtils.createPartitionedFile(InternalRow.empty, pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
//and we pass an empty schema for the partition schema
parquetFileReader.read(modifiedFile, outputSchema, new StructType(), internalSchemaOpt, filters, storageConf)
} else {
//need to do an additional projection here. The case in mind is that partition schema is "a,b,c" mandatoryFields is "a,c",
//then we will read (dataSchema + a + c) and append b. So the final schema will be (data schema + a + c +b)
//but expected output is (data schema + a + b + c)
val pfileUtils = sparkAdapter.getSparkPartitionedFileUtils
val partitionValues = getFixedPartitionValues(file.partitionValues, partitionSchema, fixedPartitionIndexes)
val modifiedFile = pfileUtils.createPartitionedFile(partitionValues, pfileUtils.getPathFromPartitionedFile(file), file.start, file.length)
val iter = parquetFileReader.read(modifiedFile, requestedSchema, remainingPartitionSchema, internalSchemaOpt, filters, storageConf)
projectIter(iter, StructType(requestedSchema.fields ++ remainingPartitionSchema.fields), outputSchema)
}
}

private def projectIter(iter: Iterator[Any], from: StructType, to: StructType): Iterator[InternalRow] = {
val unsafeProjection = generateUnsafeProjection(from, to)
val batchProjection = ColumnarBatchUtils.generateProjection(from, to)
iter.map {
case ir: InternalRow => unsafeProjection(ir)
case cb: ColumnarBatch => batchProjection(cb)
}.asInstanceOf[Iterator[InternalRow]]
}

private def getFixedPartitionValues(allPartitionValues: InternalRow, partitionSchema: StructType, fixedPartitionIndexes: Set[Int]): InternalRow = {
InternalRow.fromSeq(allPartitionValues.toSeq(partitionSchema).zipWithIndex.filter(p => fixedPartitionIndexes.contains(p._2)).map(p => p._1))
}
}
Loading