Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-3936] Fix projection for a nested field as pre-combined key #5379

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 22 additions & 10 deletions hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@

package org.apache.hudi.avro;

import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Conversions;
import org.apache.avro.Conversions.DecimalConversion;
Expand All @@ -42,16 +53,6 @@
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hudi.common.config.SerializableSchema;
import org.apache.hudi.common.model.HoodieOperation;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.SchemaCompatibilityException;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -470,6 +471,17 @@ public static Schema generateProjectionSchema(Schema originalSchema, List<String
return projectedSchema;
}

/**
* Obtain the root-level field name of a full field name, possibly a nested field.
* For example, given "a.b.c", the output is "a"; given "a", the output is "a".
*
* @param fieldName The field name.
* @return Root-level field name
*/
public static String getRootLevelFieldName(String fieldName) {
return fieldName.split("\\.")[0];
}

/**
* Obtain value of the provided field as string, denoted by dot notation. e.g: a.b.c
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,13 @@ public void testRemoveFields() {
assertEquals(expectedSchema, rec1.getSchema());
}

@Test
public void testGetRootLevelFieldName() {
assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a.b.c"));
assertEquals("a", HoodieAvroUtils.getRootLevelFieldName("a"));
assertEquals("", HoodieAvroUtils.getRootLevelFieldName(""));
}

@Test
public void testGetNestedFieldVal() {
GenericRecord rec = new GenericData.Record(new Schema.Parser().parse(EXAMPLE_SCHEMA));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hudi.hadoop.HoodieROTablePathFilter
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.execution.datasources.parquet.{HoodieParquetFileFormat, ParquetFileFormat}
import org.apache.spark.sql.execution.datasources.parquet.HoodieParquetFileFormat
import org.apache.spark.sql.hive.orc.OrcFileFormat
import org.apache.spark.sql.sources.{BaseRelation, Filter}
import org.apache.spark.sql.types.StructType
Expand Down Expand Up @@ -54,8 +54,8 @@ class BaseFileOnlyRelation(sqlContext: SQLContext,

override type FileSplit = HoodieBaseFileSplit

override lazy val mandatoryColumns: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
override lazy val mandatoryFields: Seq[String] =
// TODO reconcile, record's key shouldn't be mandatory for base-file only relation
Seq(recordKeyField)

override def imbueConfigs(sqlContext: SQLContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig
import org.apache.hadoop.mapred.JobConf
import org.apache.hudi.HoodieBaseRelation.{convertToAvroSchema, createHFileReader, generateUnsafeProjection, getPartitionPath}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.config.{HoodieMetadataConfig, SerializableConfiguration}
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.model.{HoodieFileFormat, HoodieRecord}
Expand All @@ -39,10 +40,8 @@ import org.apache.hudi.io.storage.HoodieHFileReader
import org.apache.spark.execution.datasources.HoodieInMemoryFileIndex
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.avro.HoodieAvroSchemaConverters
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.{Expression, SubqueryExpression}
import org.apache.spark.sql.execution.FileRelation
import org.apache.spark.sql.execution.datasources.{FileStatusCache, PartitionedFile, PartitioningUtils}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
Expand Down Expand Up @@ -199,7 +198,10 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
*
* @VisibleInTests
*/
val mandatoryColumns: Seq[String]
val mandatoryFields: Seq[String]

protected def mandatoryRootFields: Seq[String] =
mandatoryFields.map(col => HoodieAvroUtils.getRootLevelFieldName(col))

protected def timeline: HoodieTimeline =
// NOTE: We're including compaction here since it's not considering a "commit" operation
Expand Down Expand Up @@ -246,7 +248,7 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
//
// (!!!) IT'S CRITICAL TO AVOID REORDERING OF THE REQUESTED COLUMNS AS THIS WILL BREAK THE UPSTREAM
// PROJECTION
val fetchedColumns: Array[String] = appendMandatoryColumns(requiredColumns)
val fetchedColumns: Array[String] = appendMandatoryRootFields(requiredColumns)

val (requiredAvroSchema, requiredStructSchema, requiredInternalSchema) =
HoodieSparkUtils.getRequiredSchema(tableAvroSchema, fetchedColumns, internalSchema)
Expand Down Expand Up @@ -362,8 +364,11 @@ abstract class HoodieBaseRelation(val sqlContext: SQLContext,
!SubqueryExpression.hasSubquery(condition)
}

protected final def appendMandatoryColumns(requestedColumns: Array[String]): Array[String] = {
val missing = mandatoryColumns.filter(col => !requestedColumns.contains(col))
protected final def appendMandatoryRootFields(requestedColumns: Array[String]): Array[String] = {
// For a nested field in mandatory columns, we should first get the root-level field, and then
// check for any missing column, as the requestedColumns should only contain root-level fields
// We should only append root-level field as well
val missing = mandatoryRootFields.filter(rootField => !requestedColumns.contains(rootField))
requestedColumns ++ missing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ trait HoodieIncrementalRelationTrait extends HoodieBaseRelation {
Seq(isNotNullFilter, largerThanFilter, lessThanFilter)
}

override lazy val mandatoryColumns: Seq[String] = {
override lazy val mandatoryFields: Seq[String] = {
// NOTE: This columns are required for Incremental flow to be able to handle the rows properly, even in
// cases when no columns are requested to be fetched (for ex, when using {@code count()} API)
Seq(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.COMMIT_TIME_METADATA_FIELD) ++
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class MergeOnReadSnapshotRelation(sqlContext: SQLContext,

override type FileSplit = HoodieMergeOnReadFileSplit

override lazy val mandatoryColumns: Seq[String] =
override lazy val mandatoryFields: Seq[String] =
Seq(recordKeyField) ++ preCombineFieldOpt.map(Seq(_)).getOrElse(Seq())

protected val mergeType: String = optParams.getOrElse(DataSourceReadOptions.REALTIME_MERGE.key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.testutils.HoodieTestDataGenerator
import org.apache.hudi.common.testutils.RawTripTestPayload.recordsToStrings
import org.apache.hudi.common.util.StringUtils
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions, HoodieDataSourceHelpers}
Expand All @@ -32,7 +33,7 @@ import org.apache.spark.sql.functions.{col, lit}
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Tag
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import org.junit.jupiter.params.provider.CsvSource

import scala.collection.JavaConversions._

Expand All @@ -57,19 +58,28 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val updatedVerificationVal: String = "driver_update"

@ParameterizedTest
@ValueSource(booleans = Array(true, false))
def testMergeOnReadStorage(isMetadataEnabled: Boolean) {
val dataGen = new HoodieTestDataGenerator()
@CsvSource(Array(
"true,",
"true,fare.currency",
"false,",
"false,fare.currency"
))
def testMergeOnReadStorage(isMetadataEnabled: Boolean, preComineField: String) {
var options: Map[String, String] = commonOpts +
(HoodieMetadataConfig.ENABLE.key -> String.valueOf(isMetadataEnabled))
if (!StringUtils.isNullOrEmpty(preComineField)) {
options += (DataSourceWriteOptions.PRECOMBINE_FIELD.key() -> preComineField)
}
val dataGen = new HoodieTestDataGenerator(0xDEEF)
val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
// Bulk Insert Operation
val records1 = recordsToStrings(dataGen.generateInserts("001", 100)).toList
val inputDF1: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records1, 2))
inputDF1.write.format("org.apache.hudi")
.options(commonOpts)
.options(options)
.option("hoodie.compact.inline", "false") // else fails due to compaction & deltacommit instant times being same
.option(DataSourceWriteOptions.OPERATION.key, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.TABLE_TYPE.key, DataSourceWriteOptions.MOR_TABLE_TYPE_OPT_VAL)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.mode(SaveMode.Overwrite)
.save(basePath)

Expand All @@ -90,8 +100,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val records2 = recordsToStrings(dataGen.generateUniqueUpdates("002", 100)).toList
val inputDF2: Dataset[Row] = spark.read.json(spark.sparkContext.parallelize(records2, 2))
inputDF2.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(options)
.mode(SaveMode.Append)
.save(basePath)

Expand All @@ -110,8 +119,7 @@ class TestMORDataSourceStorage extends SparkClientFunctionalTestHarness {
val inputDF3 = hudiSnapshotDF2.filter(col("_row_key") === verificationRowKey).withColumn(verificationCol, lit(updatedVerificationVal))

inputDF3.write.format("org.apache.hudi")
.options(commonOpts)
.option(HoodieMetadataConfig.ENABLE.key, isMetadataEnabled)
.options(options)
.mode(SaveMode.Append)
.save(basePath)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.hudi.functional

import org.apache.avro.Schema
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.model.{HoodieRecord, OverwriteNonDefaultsWithLatestAvroPayload}
import org.apache.hudi.common.table.HoodieTableConfig
import org.apache.hudi.common.testutils.{HadoopMapRedUtils, HoodieTestDataGenerator}
import org.apache.hudi.config.{HoodieStorageConfig, HoodieWriteConfig}
Expand Down Expand Up @@ -332,7 +332,7 @@ class TestParquetColumnProjection extends SparkClientFunctionalTestHarness with
logWarning(s"Not matching bytes read ($bytesRead)")
}

val readColumns = targetColumns ++ relation.mandatoryColumns
val readColumns = targetColumns ++ relation.mandatoryFields
val (_, projectedStructType, _) = HoodieSparkUtils.getRequiredSchema(tableState.schema, readColumns)

val row: InternalRow = rows.take(1).head
Expand Down