Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -571,52 +571,40 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
baseFileFormat = baseFileFormat
)

// Check whether fields required for merging were also requested to be fetched
// by the query:
// - In case they were, there's no optimization we could apply here (we will have
// to fetch such fields)
// - In case they were not, we will provide 2 separate file-readers
// a) One which would be applied to file-groups w/ delta-logs (merging)
// b) One which would be applied to file-groups w/ no delta-logs or
// in case query-mode is skipping merging
// For file groups without delta logs, we can make the following optimizations:
// a) If the requested columns are not included in mandatoryColumns, they can be removed from requiredDataSchema.
// b) Apply filters to reader for data skipping since no merging.
val mandatoryColumns = mandatoryFields.map(HoodieAvroUtils.getRootLevelFieldName)
if (mandatoryColumns.forall(requestedColumns.contains)) {
HoodieMergeOnReadBaseFileReaders(
fullSchemaReader = fullSchemaReader,
requiredSchemaReader = requiredSchemaReader,
requiredSchemaReaderSkipMerging = requiredSchemaReader
)
val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains)
val prunedRequiredSchema = if (unusedMandatoryColumnNames.isEmpty) {
requiredDataSchema
} else {
val prunedRequiredSchema = {
val unusedMandatoryColumnNames = mandatoryColumns.filterNot(requestedColumns.contains)
val prunedStructSchema =
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => unusedMandatoryColumnNames.contains(f.name)))
val prunedStructSchema =
StructType(requiredDataSchema.structTypeSchema.fields
.filterNot(f => unusedMandatoryColumnNames.contains(f.name)))

HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString)
}

val requiredSchemaReaderSkipMerging = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = prunedRequiredSchema,
// This file-reader is only used in cases when no merging is performed, therefore it's safe to push
// down these filters to the base file readers
filters = requiredFilters ++ optionalFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema),
baseFileFormat = baseFileFormat
)

HoodieMergeOnReadBaseFileReaders(
fullSchemaReader = fullSchemaReader,
requiredSchemaReader = requiredSchemaReader,
requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging
)
HoodieTableSchema(prunedStructSchema, convertToAvroSchema(prunedStructSchema, tableName).toString)
}

val requiredSchemaReaderSkipMerging = createBaseFileReader(
spark = sqlContext.sparkSession,
partitionSchema = partitionSchema,
dataSchema = dataSchema,
requiredDataSchema = prunedRequiredSchema,
// This file-reader is only used in cases when no merging is performed, therefore it's safe to push
// down these filters to the base file readers
filters = requiredFilters ++ optionalFilters,
options = optParams,
// NOTE: We have to fork the Hadoop Config here as Spark will be modifying it
// to configure Parquet reader appropriately
hadoopConf = embedInternalSchema(new Configuration(conf), requiredDataSchema.internalSchema),
baseFileFormat = baseFileFormat
)

HoodieMergeOnReadBaseFileReaders(
fullSchemaReader = fullSchemaReader,
requiredSchemaReader = requiredSchemaReader,
requiredSchemaReaderSkipMerging = requiredSchemaReaderSkipMerging)
}

/**
Expand Down