From 3d55f220c10a032314cdd2204332f9e004da62c2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 13:24:51 -0700 Subject: [PATCH 01/26] Tidying up --- .../InsertIntoHoodieTableCommand.scala | 62 +++++++++++-------- 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index be1ad8e9b8a5..8105aaddd428 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter import org.apache.spark.internal.Logging +import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal} import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -28,16 +29,27 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode, SparkSession} /** - * Command for insert into hoodie table. + * Command for insert into Hudi table. + * + * This is correspondent to Spark's native [[InsertIntoStatement]] + * + * @param logicalRelation the [[LogicalRelation]] representing the table to be writing into. + * @param query the logical plan representing data to be written + * @param partitionSpec a map from the partition key to the partition value (optional). + * If the value is missing, dynamic partition insert will be performed. + * As an example, `INSERT INTO tbl PARTITION (a=1, b=2) AS` would have + * Map('a' -> Some('1'), 'b' -> Some('2')), + * and `INSERT INTO tbl PARTITION (a=1, b) AS ...` + * would have Map('a' -> Some('1'), 'b' -> None). + * @param overwrite overwrite existing table or partitions. */ -case class InsertIntoHoodieTableCommand( - logicalRelation: LogicalRelation, - query: LogicalPlan, - partition: Map[String, Option[String]], - overwrite: Boolean) +case class InsertIntoHoodieTableCommand(logicalRelation: LogicalRelation, + query: LogicalPlan, + partitionSpec: Map[String, Option[String]], + overwrite: Boolean) extends HoodieLeafRunnableCommand { override def innerChildren: Seq[QueryPlan[_]] = Seq(query) @@ -45,7 +57,7 @@ case class InsertIntoHoodieTableCommand( assert(logicalRelation.catalogTable.isDefined, "Missing catalog table") val table = logicalRelation.catalogTable.get - InsertIntoHoodieTableCommand.run(sparkSession, table, query, partition, overwrite) + InsertIntoHoodieTableCommand.run(sparkSession, table, query, partitionSpec, overwrite) Seq.empty[Row] } } @@ -56,7 +68,7 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { * @param sparkSession The spark session. * @param table The insert table. * @param query The insert query. - * @param insertPartitions The specified insert partition map. + * @param partitionSpec The specified insert partition map. * e.g. "insert into h(dt = '2021') select id, name from src" * "dt" is the key in the map and "2021" is the partition value. If the * partition value has not specified(in the case of dynamic partition) @@ -66,15 +78,15 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { * @param extraOptions Extra options for insert. */ def run(sparkSession: SparkSession, - table: CatalogTable, - query: LogicalPlan, - insertPartitions: Map[String, Option[String]], - overwrite: Boolean, - refreshTable: Boolean = true, - extraOptions: Map[String, String] = Map.empty): Boolean = { + table: CatalogTable, + query: LogicalPlan, + partitionSpec: Map[String, Option[String]], + overwrite: Boolean, + refreshTable: Boolean = true, + extraOptions: Map[String, String] = Map.empty): Boolean = { val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table) - val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, insertPartitions, extraOptions) + val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, partitionSpec, extraOptions) val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) { // insert overwrite non-partition table @@ -107,23 +119,21 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { * Aligned the type and name of query's output fields with the result table's fields. * @param query The insert query which to aligned. * @param hoodieCatalogTable The result hoodie catalog table. - * @param insertPartitions The insert partition map. + * @param partitionsSpec The insert partition map. * @param conf The SQLConf. * @return */ - private def alignOutputFields( - query: LogicalPlan, - hoodieCatalogTable: HoodieCatalogTable, - insertPartitions: Map[String, Option[String]], - conf: SQLConf): LogicalPlan = { + private def alignOutputFields(query: LogicalPlan, + hoodieCatalogTable: HoodieCatalogTable, + partitionsSpec: Map[String, Option[String]], + conf: SQLConf): LogicalPlan = { val targetPartitionSchema = hoodieCatalogTable.partitionSchema - val staticPartitionValues = insertPartitions.filter(p => p._2.isDefined).mapValues(_.get) + val staticPartitionValues = partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) assert(staticPartitionValues.isEmpty || - insertPartitions.size == targetPartitionSchema.size, - s"Required partition columns is: ${targetPartitionSchema.json}, Current input partitions " + - s"is: ${staticPartitionValues.mkString("," + "")}") + partitionsSpec.size == targetPartitionSchema.size, + s"Required partition schema is: ${targetPartitionSchema.json}, partition spec is: ${staticPartitionValues.mkString(",")}") val queryOutputWithoutMetaFields = removeMetaFields(query.output) assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size From 47e86f37c913dd32d826d829a579b7782fcd4f5a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 13:25:24 -0700 Subject: [PATCH 02/26] Avoid unnecessary RDD dereferencing of the `Dataset` --- .../hudi/command/InsertIntoHoodieTableCommand.scala | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 8105aaddd428..a7452df51a33 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -96,15 +96,12 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { SaveMode.Append } val conf = sparkSession.sessionState.conf - val alignedQuery = alignOutputFields(query, hoodieCatalogTable, insertPartitions, conf) - // If we create dataframe using the Dataset.ofRows(sparkSession, alignedQuery), - // The nullable attribute of fields will lost. - // In order to pass the nullable attribute to the inputDF, we specify the schema - // of the rdd. - val inputDF = sparkSession.createDataFrame( - Dataset.ofRows(sparkSession, alignedQuery).rdd, alignedQuery.schema) + val alignedQuery = alignOutputFields(query, hoodieCatalogTable, partitionSpec, conf) + val alignedDF = Dataset.ofRows(sparkSession, alignedQuery) + val success = - HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, inputDF)._1 + HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, alignedDF)._1 + if (success) { if (refreshTable) { sparkSession.catalog.refreshTable(table.identifier.unquotedString) From ed0b9c9b7056cb9d21897c0a28651606aa1b26c8 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 13:31:15 -0700 Subject: [PATCH 03/26] Tidying up --- .../apache/hudi/HoodieSparkSqlWriter.scala | 3 +-- .../InsertIntoHoodieTableCommand.scala | 23 ++++++++----------- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 7324a5ca5be7..9b4323e6b04b 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -72,8 +72,7 @@ object HoodieSparkSqlWriter { hoodieTableConfigOpt: Option[HoodieTableConfig] = Option.empty, hoodieWriteClient: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]] = Option.empty, asyncCompactionTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty, - asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty - ) + asyncClusteringTriggerFn: Option[SparkRDDWriteClient[HoodieRecordPayload[Nothing]] => Unit] = Option.empty) : (Boolean, common.util.Option[String], common.util.Option[String], common.util.Option[String], SparkRDDWriteClient[HoodieRecordPayload[Nothing]], HoodieTableConfig) = { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index a7452df51a33..a788c6b777bb 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -84,32 +84,27 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { overwrite: Boolean, refreshTable: Boolean = true, extraOptions: Map[String, String] = Map.empty): Boolean = { - val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table) val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, partitionSpec, extraOptions) + // NOTE: In case of partitioned table we override specified "overwrite" parameter + // to instead append to the dataset val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) { - // insert overwrite non-partition table SaveMode.Overwrite } else { - // for insert into or insert overwrite partition we use append mode. SaveMode.Append } - val conf = sparkSession.sessionState.conf - val alignedQuery = alignOutputFields(query, hoodieCatalogTable, partitionSpec, conf) + + val alignedQuery = alignOutputFields(query, hoodieCatalogTable, partitionSpec, sparkSession.sessionState.conf) val alignedDF = Dataset.ofRows(sparkSession, alignedQuery) - val success = - HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, alignedDF)._1 + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, alignedDF) - if (success) { - if (refreshTable) { - sparkSession.catalog.refreshTable(table.identifier.unquotedString) - } - true - } else { - false + if (success && refreshTable) { + sparkSession.catalog.refreshTable(table.identifier.unquotedString) } + + success } /** From c5829c9ddb6d97db46ac1fe2979c7be5d534451e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 14:25:03 -0700 Subject: [PATCH 04/26] Cleaned up query ouput alignment seq to do proper validations and avoid unnecessary conversions; Tidying up --- .../spark/sql/hudi/HoodieSqlCommonUtils.scala | 4 +- .../InsertIntoHoodieTableCommand.scala | 139 +++++++++++------- 2 files changed, 86 insertions(+), 57 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala index 8328882239ec..b02881bc3dab 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala @@ -317,8 +317,8 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport { def castIfNeeded(child: Expression, dataType: DataType, conf: SQLConf): Expression = { child match { case Literal(nul, NullType) => Literal(nul, dataType) - case _ => if (child.dataType != dataType) - Cast(child, dataType, Option(conf.sessionLocalTimeZone)) else child + case expr if child.dataType != dataType => Cast(expr, dataType, Option(conf.sessionLocalTimeZone)) + case _ => child } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index a788c6b777bb..08454cdfdcfc 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode, SparkSession} +import scala.Predef.assert + /** * Command for insert into Hudi table. * @@ -84,21 +86,20 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { overwrite: Boolean, refreshTable: Boolean = true, extraOptions: Map[String, String] = Map.empty): Boolean = { - val hoodieCatalogTable = new HoodieCatalogTable(sparkSession, table) - val config = buildHoodieInsertConfig(hoodieCatalogTable, sparkSession, overwrite, partitionSpec, extraOptions) + val catalogTable = new HoodieCatalogTable(sparkSession, table) + val config = buildHoodieInsertConfig(catalogTable, sparkSession, overwrite, partitionSpec, extraOptions) // NOTE: In case of partitioned table we override specified "overwrite" parameter // to instead append to the dataset - val mode = if (overwrite && hoodieCatalogTable.partitionFields.isEmpty) { + val mode = if (overwrite && catalogTable.partitionFields.isEmpty) { SaveMode.Overwrite } else { SaveMode.Append } - val alignedQuery = alignOutputFields(query, hoodieCatalogTable, partitionSpec, sparkSession.sessionState.conf) - val alignedDF = Dataset.ofRows(sparkSession, alignedQuery) + val alignedQuery = alignQueryOutput(query, catalogTable, partitionSpec, sparkSession.sessionState.conf) - val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, alignedDF) + val (success, _, _, _, _, _) = HoodieSparkSqlWriter.write(sparkSession.sqlContext, mode, config, Dataset.ofRows(sparkSession, alignedQuery)) if (success && refreshTable) { sparkSession.catalog.refreshTable(table.identifier.unquotedString) @@ -108,63 +109,91 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { } /** - * Aligned the type and name of query's output fields with the result table's fields. - * @param query The insert query which to aligned. - * @param hoodieCatalogTable The result hoodie catalog table. - * @param partitionsSpec The insert partition map. - * @param conf The SQLConf. - * @return + * Align provided [[query]]'s output with the expected [[catalogTable]] schema by + * + *
    + *
  • Performing type coercion (casting corresponding outputs, where needed)
  • + *
  • Adding aliases (matching column names) to corresponding outputs
  • + *
+ * + * @param query target query whose output is to be inserted + * @param catalogTable catalog table + * @param partitionsSpec partition spec specifying static/dynamic partition values + * @param conf Spark's [[SQLConf]] */ - private def alignOutputFields(query: LogicalPlan, - hoodieCatalogTable: HoodieCatalogTable, - partitionsSpec: Map[String, Option[String]], - conf: SQLConf): LogicalPlan = { - - val targetPartitionSchema = hoodieCatalogTable.partitionSchema + private def alignQueryOutput(query: LogicalPlan, + catalogTable: HoodieCatalogTable, + partitionsSpec: Map[String, Option[String]], + conf: SQLConf): LogicalPlan = { + val targetPartitionSchema = catalogTable.partitionSchema val staticPartitionValues = partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) - assert(staticPartitionValues.isEmpty || - partitionsSpec.size == targetPartitionSchema.size, - s"Required partition schema is: ${targetPartitionSchema.json}, partition spec is: ${staticPartitionValues.mkString(",")}") val queryOutputWithoutMetaFields = removeMetaFields(query.output) - assert(staticPartitionValues.size + queryOutputWithoutMetaFields.size - == hoodieCatalogTable.tableSchemaWithoutMetaFields.size, - s"Required select columns count: ${hoodieCatalogTable.tableSchemaWithoutMetaFields.size}, " + - s"Current select columns(including static partition column) count: " + - s"${staticPartitionValues.size + queryOutputWithoutMetaFields.size},columns: " + - s"(${(queryOutputWithoutMetaFields.map(_.name) ++ staticPartitionValues.keys).mkString(",")})") - - val dataAndDynamicPartitionSchemaWithoutMetaFields = StructType( - hoodieCatalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name))) - val dataProjectsWithoutMetaFields = getTableFieldsAlias(queryOutputWithoutMetaFields, - dataAndDynamicPartitionSchemaWithoutMetaFields.fields, conf) - - val partitionProjects = targetPartitionSchema.fields.filter(f => staticPartitionValues.contains(f.name)) - .map(f => { - val staticPartitionValue = staticPartitionValues.getOrElse(f.name, - s"Missing static partition value for: ${f.name}") - val castAttr = castIfNeeded(Literal.create(staticPartitionValue), f.dataType, conf) - Alias(castAttr, f.name)() - }) - Project(dataProjectsWithoutMetaFields ++ partitionProjects, query) + validate(queryOutputWithoutMetaFields, staticPartitionValues, catalogTable) + + // To validate and align properly output of the query, we simply filter out partition columns with already + // provided static values from the table's schema + val expectedColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) + + val transformedQueryOutput = transformQueryOutput(queryOutputWithoutMetaFields, expectedColumns, conf) + val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) + + Project(transformedQueryOutput ++ staticPartitionValuesExprs, query) + } + + private def validate(queryOutput: Seq[Attribute], staticPartitionValues: Map[String, String], table: HoodieCatalogTable): Unit = { + // Validate that either + // - There's no static-partition values + // - All of the partition columns are provided w/ static values + // + // NOTE: Dynamic partition-value are not currently supported + assert(staticPartitionValues.isEmpty || + staticPartitionValues.size == table.partitionSchema.size, + s"Required partition schema is: ${table.partitionSchema.json}, partition spec is: ${staticPartitionValues.mkString(",")}") + + val queryOutputColumnNames = queryOutput.map(_.name) + val expectedColumnNames = table.tableSchemaWithoutMetaFields.filterNot(sf => staticPartitionValues.contains(sf.name)) + + // Asert that query's output is appropriately ordered + assert(queryOutputColumnNames == expectedColumnNames, + s"Expected table's columns in the following ordering: $expectedColumnNames, received: $queryOutputColumnNames") + + val fullQueryOutputColumnNames = queryOutputColumnNames ++ staticPartitionValues.keys + + // Assert that query provides all the required columns + assert(fullQueryOutputColumnNames.toSet == table.tableSchemaWithoutMetaFields.fieldNames.toSet, + s"Expected table's schema: ${table.tableSchemaWithoutMetaFields.json}, query's output (including static partition values): $fullQueryOutputColumnNames" + } + + private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String], + partitionSchema: StructType, + conf: SQLConf) = { + partitionSchema.fields + .filter(pf => staticPartitionValues.contains(pf.name)) + .map(pf => { + val staticPartitionValue = staticPartitionValues(pf.name) + val castExpr = castIfNeeded(Literal.create(staticPartitionValue), pf.dataType, conf) + + Alias(castExpr, pf.name)() + }) } - private def getTableFieldsAlias( - queryOutputWithoutMetaFields: Seq[Attribute], - schemaWithoutMetaFields: Seq[StructField], - conf: SQLConf): Seq[Alias] = { - queryOutputWithoutMetaFields.zip(schemaWithoutMetaFields).map { case (dataAttr, dataField) => - val targetAttrOption = if (dataAttr.name.startsWith("col")) { - None - } else { - queryOutputWithoutMetaFields.find(_.name.equals(dataField.name)) - } - val targetAttr = targetAttrOption.getOrElse(dataAttr) - val castAttr = castIfNeeded(targetAttr.withNullability(dataField.nullable), - dataField.dataType, conf) - Alias(castAttr, dataField.name)() + private def transformQueryOutput(queryOutput: Seq[Attribute], + expectedColumns: Seq[StructField], + conf: SQLConf): Seq[Alias] = { + // NOTE: This code assumes that query's output and corresponding [[StructField]]s + // are properly ordered (which is asserted in the validation step) + queryOutput.zip(expectedColumns).map { case (attr, field) => + // Lookup (by name) corresponding column from the table definition. In case there's + // no match, assume the column by the relative ordering + val targetColumn = expectedColumns.find(_.name.equals(attr.name)).getOrElse(field) + // Since query output might be providing data in a different format we might need to wrap + // output reference into the cast expression + val castExpr = castIfNeeded(attr.withNullability(targetColumn.nullable), targetColumn.dataType, conf) + + Alias(castExpr, targetColumn.name)() } } } From d6c34b0d08659c1ceabf3bfdcd6f1926e1774fe3 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 14:26:29 -0700 Subject: [PATCH 05/26] Relaxed ordering requirement --- .../InsertIntoHoodieTableCommand.scala | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 08454cdfdcfc..510759d1aea8 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -137,7 +137,7 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { // provided static values from the table's schema val expectedColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) - val transformedQueryOutput = transformQueryOutput(queryOutputWithoutMetaFields, expectedColumns, conf) + val transformedQueryOutput = coerceQueryOutput(queryOutputWithoutMetaFields, expectedColumns, conf) val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) Project(transformedQueryOutput ++ staticPartitionValuesExprs, query) @@ -153,18 +153,11 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { staticPartitionValues.size == table.partitionSchema.size, s"Required partition schema is: ${table.partitionSchema.json}, partition spec is: ${staticPartitionValues.mkString(",")}") - val queryOutputColumnNames = queryOutput.map(_.name) - val expectedColumnNames = table.tableSchemaWithoutMetaFields.filterNot(sf => staticPartitionValues.contains(sf.name)) - - // Asert that query's output is appropriately ordered - assert(queryOutputColumnNames == expectedColumnNames, - s"Expected table's columns in the following ordering: $expectedColumnNames, received: $queryOutputColumnNames") - - val fullQueryOutputColumnNames = queryOutputColumnNames ++ staticPartitionValues.keys + val fullQueryOutputColumnNames = queryOutput.map(_.name) ++ staticPartitionValues.keys // Assert that query provides all the required columns assert(fullQueryOutputColumnNames.toSet == table.tableSchemaWithoutMetaFields.fieldNames.toSet, - s"Expected table's schema: ${table.tableSchemaWithoutMetaFields.json}, query's output (including static partition values): $fullQueryOutputColumnNames" + s"Expected table's schema: ${table.tableSchemaWithoutMetaFields.json}, query's output (including static partition values): $fullQueryOutputColumnNames") } private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String], @@ -180,11 +173,9 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { }) } - private def transformQueryOutput(queryOutput: Seq[Attribute], - expectedColumns: Seq[StructField], - conf: SQLConf): Seq[Alias] = { - // NOTE: This code assumes that query's output and corresponding [[StructField]]s - // are properly ordered (which is asserted in the validation step) + private def coerceQueryOutput(queryOutput: Seq[Attribute], + expectedColumns: Seq[StructField], + conf: SQLConf): Seq[Alias] = { queryOutput.zip(expectedColumns).map { case (attr, field) => // Lookup (by name) corresponding column from the table definition. In case there's // no match, assume the column by the relative ordering From 4be35f1d3ba935c790f913a0a4de385a7190fd63 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 14:51:26 -0700 Subject: [PATCH 06/26] Fixing compilation --- .../sql/hudi/command/InsertIntoHoodieTableCommand.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 510759d1aea8..92130d6613cd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -19,9 +19,8 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter import org.apache.spark.internal.Logging -import org.apache.spark.sql.HoodieUnsafeRDDUtils.createDataFrame import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -29,9 +28,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StructField, StructType} -import org.apache.spark.sql.{Dataset, HoodieUnsafeRDDUtils, Row, SaveMode, SparkSession} - -import scala.Predef.assert +import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} /** * Command for insert into Hudi table. @@ -162,7 +159,7 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String], partitionSchema: StructType, - conf: SQLConf) = { + conf: SQLConf): Seq[NamedExpression] = { partitionSchema.fields .filter(pf => staticPartitionValues.contains(pf.name)) .map(pf => { From 0188b774fe086723002148fbb4e4d0f0d78489c2 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 15:18:27 -0700 Subject: [PATCH 07/26] Fixed validating sequence to properly assert whether query output conforms to the expected table's schema --- .../InsertIntoHoodieTableCommand.scala | 41 ++++++++++++++----- 1 file changed, 30 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 92130d6613cd..68ba4704af53 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -18,7 +18,9 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter +import org.apache.hudi.exception.HoodieException import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.analysis.TypeCoercion.canCast import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -27,7 +29,7 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{StructField, StructType} +import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} /** @@ -126,35 +128,38 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { val targetPartitionSchema = catalogTable.partitionSchema val staticPartitionValues = partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) - val queryOutputWithoutMetaFields = removeMetaFields(query.output) - - validate(queryOutputWithoutMetaFields, staticPartitionValues, catalogTable) + validate(removeMetaFields(query.schema), staticPartitionValues, catalogTable) // To validate and align properly output of the query, we simply filter out partition columns with already // provided static values from the table's schema val expectedColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) + val queryOutputWithoutMetaFields = removeMetaFields(query.output) + val transformedQueryOutput = coerceQueryOutput(queryOutputWithoutMetaFields, expectedColumns, conf) val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) Project(transformedQueryOutput ++ staticPartitionValuesExprs, query) } - private def validate(queryOutput: Seq[Attribute], staticPartitionValues: Map[String, String], table: HoodieCatalogTable): Unit = { + private def validate(queryOutputSchema: StructType, staticPartitionValues: Map[String, String], table: HoodieCatalogTable): Unit = { // Validate that either // - There's no static-partition values // - All of the partition columns are provided w/ static values // // NOTE: Dynamic partition-value are not currently supported - assert(staticPartitionValues.isEmpty || - staticPartitionValues.size == table.partitionSchema.size, - s"Required partition schema is: ${table.partitionSchema.json}, partition spec is: ${staticPartitionValues.mkString(",")}") + if (staticPartitionValues.nonEmpty && staticPartitionValues.size != table.partitionSchema.size) { + throw new HoodieException(s"Required partition schema is: ${table.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " + + s"partition spec is: ${staticPartitionValues.mkString(",")}") + } - val fullQueryOutputColumnNames = queryOutput.map(_.name) ++ staticPartitionValues.keys + val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ staticPartitionValues.keys.map(StructField(_, StringType, nullable = true))) // Assert that query provides all the required columns - assert(fullQueryOutputColumnNames.toSet == table.tableSchemaWithoutMetaFields.fieldNames.toSet, - s"Expected table's schema: ${table.tableSchemaWithoutMetaFields.json}, query's output (including static partition values): $fullQueryOutputColumnNames") + if (!conforms(fullQueryOutputSchema, table.tableSchemaWithoutMetaFields)) { + throw new HoodieException(s"Expected table's schema: ${table.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " + + s"query's output (including static partition values): ${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}") + } } private def createStaticPartitionValuesExpressions(staticPartitionValues: Map[String, String], @@ -184,4 +189,18 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { Alias(castExpr, targetColumn.name)() } } + + private def conforms(sourceSchema: StructType, targetSchema: StructType): Boolean = { + if (sourceSchema.fields.length != targetSchema.fields.length) { + false + } else { + targetSchema.fields.zip(sourceSchema).forall { + case (targetColumn, correspondingColumn) => + // Determine matching source column by either a name or corresponding ordering + val matchingSourceColumn = sourceSchema.find(_.name == targetColumn.name).getOrElse(correspondingColumn) + // Make sure we can cast source column to the target column type + canCast(matchingSourceColumn.dataType, targetColumn.dataType) + } + } + } } From 023be639923aeae82038dc7da63eeca3c4311677 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 15:44:12 -0700 Subject: [PATCH 08/26] Fixed partition-spec assertion; Tidying up --- .../InsertIntoHoodieTableCommand.scala | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 68ba4704af53..99418cee6581 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -126,9 +126,9 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { conf: SQLConf): LogicalPlan = { val targetPartitionSchema = catalogTable.partitionSchema - val staticPartitionValues = partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) + val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) - validate(removeMetaFields(query.schema), staticPartitionValues, catalogTable) + validate(removeMetaFields(query.schema), partitionsSpec, catalogTable) // To validate and align properly output of the query, we simply filter out partition columns with already // provided static values from the table's schema @@ -142,22 +142,20 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { Project(transformedQueryOutput ++ staticPartitionValuesExprs, query) } - private def validate(queryOutputSchema: StructType, staticPartitionValues: Map[String, String], table: HoodieCatalogTable): Unit = { - // Validate that either - // - There's no static-partition values - // - All of the partition columns are provided w/ static values - // - // NOTE: Dynamic partition-value are not currently supported - if (staticPartitionValues.nonEmpty && staticPartitionValues.size != table.partitionSchema.size) { - throw new HoodieException(s"Required partition schema is: ${table.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " + - s"partition spec is: ${staticPartitionValues.mkString(",")}") + private def validate(queryOutputSchema: StructType, partitionsSpec: Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = { + // Validate that partition-spec has proper format (it could be empty if all of the partition values are dynamic, + // ie there are no static partition-values specified) + if (partitionsSpec.nonEmpty && partitionsSpec.size != catalogTable.partitionSchema.size) { + throw new HoodieException(s"Required partition schema is: ${catalogTable.partitionSchema.fieldNames.mkString("[", ", ", "]")}, " + + s"partition spec is: ${partitionsSpec.mkString("[", ", ", "]")}") } - val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ staticPartitionValues.keys.map(StructField(_, StringType, nullable = true))) + val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) + val fullQueryOutputSchema = StructType(queryOutputSchema.fields ++ staticPartitionValues.keys.map(StructField(_, StringType))) // Assert that query provides all the required columns - if (!conforms(fullQueryOutputSchema, table.tableSchemaWithoutMetaFields)) { - throw new HoodieException(s"Expected table's schema: ${table.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " + + if (!conforms(fullQueryOutputSchema, catalogTable.tableSchemaWithoutMetaFields)) { + throw new HoodieException(s"Expected table's schema: ${catalogTable.tableSchemaWithoutMetaFields.fields.mkString("[", ", ", "]")}, " + s"query's output (including static partition values): ${fullQueryOutputSchema.fields.mkString("[", ", ", "]")}") } } @@ -203,4 +201,7 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { } } } + + private def filterStaticPartitionValues(partitionsSpec: Map[String, Option[String]]): Map[String, String] = + partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) } From a8b0a25c196c58a48df52e85c8622ebe91465298 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 15:58:00 -0700 Subject: [PATCH 09/26] Fixing tests --- .../apache/spark/sql/hudi/TestInsertTable.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 8d21fe32eadb..210b3a443a13 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -481,14 +481,17 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | tblproperties (primaryKey = 'id') | partitioned by (dt) """.stripMargin) - checkException(s"insert into $tableName partition(dt = '2021-06-20')" + - s" select 1, 'a1', 10, '2021-06-20'") ( - "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + - " count: 5,columns: (1,a1,10,2021-06-20,dt)" + checkException(s"insert into $tableName partition(dt = '2021-06-20') select 1, 'a1', 10, '2021-06-20'") ( + "Expected table's schema: " + + "[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " + + "query's output (including static partition values): " + + "[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false), StructField(2021-06-20,StringType,false), StructField(dt,StringType,true)]" ) checkException(s"insert into $tableName select 1, 'a1', 10")( - "assertion failed: Required select columns count: 4, Current select columns(including static partition column)" + - " count: 3,columns: (1,a1,10)" + "Expected table's schema: " + + "[StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(price,DoubleType,true), StructField(dt,StringType,true)], " + + "query's output (including static partition values): " + + "[StructField(1,IntegerType,false), StructField(a1,StringType,false), StructField(10,IntegerType,false)]" ) spark.sql("set hoodie.sql.bulk.insert.enable = true") spark.sql("set hoodie.sql.insert.mode = strict") From 27bb52fc88ee9dfb6bea5590e3042ef46dc9dbc7 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 16:21:31 -0700 Subject: [PATCH 10/26] Fixed invalid ref --- .../sql/hudi/command/InsertIntoHoodieTableCommand.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 99418cee6581..b19573404a2b 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -20,9 +20,8 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.exception.HoodieException import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.analysis.TypeCoercion.canCast import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -197,7 +196,7 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { // Determine matching source column by either a name or corresponding ordering val matchingSourceColumn = sourceSchema.find(_.name == targetColumn.name).getOrElse(correspondingColumn) // Make sure we can cast source column to the target column type - canCast(matchingSourceColumn.dataType, targetColumn.dataType) + Cast.canCast(matchingSourceColumn.dataType, targetColumn.dataType) } } } From 225d037bf5ba36273871d0fa58c3ef976dc51e89 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 19:24:06 -0700 Subject: [PATCH 11/26] Duct-tape the issue of incorrect schema handling in `HoodieSparkSqlWriter` --- .../common/table/TableSchemaResolver.java | 3 +++ .../apache/hudi/HoodieSparkSqlWriter.scala | 24 +++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 5fc989e2e518..8f4c362ffbfa 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -372,7 +372,10 @@ public static boolean isSchemaCompatible(String oldSchema, String newSchema) { * @param convertTableSchemaToAddNamespace {@code true} if table schema needs to be converted. {@code false} otherwise. * @param converterFn converter function to be called over table schema (to add namespace may be). Each caller can decide if any conversion is required. * @return the latest schema. + * + * @deprecated will be removed (HUDI-4472) */ + @Deprecated public Schema getLatestSchema(Schema writeSchema, boolean convertTableSchemaToAddNamespace, Function1 converterFn) { Schema latestSchema = writeSchema; diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9b4323e6b04b..d41073ab80f9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -31,7 +31,7 @@ import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model._ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, TableSchemaResolver} -import org.apache.hudi.common.util.{CommitUtils, StringUtils} +import org.apache.hudi.common.util.{CommitUtils, Functions, StringUtils} import org.apache.hudi.config.HoodieBootstrapConfig.{BASE_PATH, INDEX_CLASS_NAME} import org.apache.hudi.config.{HoodieInternalConfig, HoodieWriteConfig} import org.apache.hudi.exception.HoodieException @@ -240,8 +240,9 @@ object HoodieSparkSqlWriter { sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) + // TODO(HUDI-4472) revisit and simplify schema handling var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - val lastestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema) + val latestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema) var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean && internalSchemaOpt.isEmpty) { @@ -249,22 +250,22 @@ object HoodieSparkSqlWriter { internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) } if (reconcileSchema) { - schema = lastestSchema + schema = latestSchema } if (internalSchemaOpt.isDefined) { // Apply schema evolution. val mergedSparkSchema = if (!reconcileSchema) { - AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, lastestSchema)) + AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, latestSchema)) } else { // Auto merge write schema and read schema. val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get) - AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, lastestSchema.getName)) + AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, latestSchema.getName)) } schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace) } if (reconcileSchema && internalSchemaOpt.isEmpty) { - schema = lastestSchema + schema = latestSchema } validateSchemaForHoodieIsDeleted(schema) sparkContext.getConf.registerAvroSchemas(schema) @@ -392,7 +393,16 @@ object HoodieSparkSqlWriter { if (FSUtils.isTableExists(basePath.toString, fs)) { val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - latestSchema = tableSchemaResolver.getLatestSchema(schema, false, null) + // TODO(HUDI-4472): clean up + // NOTE: Repackaging schema to override name/namespace of the table's schema is required to workaround + // the issue of this method improperly comparing table schema w/ writer's schema resulting in improper + // discarding of the table's schema (instead proceeding with writer's schema) + latestSchema = tableSchemaResolver.getLatestSchema(schema, true, new Functions.Function1[Schema, Schema] { + override def apply(tableSchema: Schema): Schema = { + val repackagedFields = tableSchema.getFields.map { f => new Schema.Field(f.name, f.schema(), f.doc, f.defaultVal(), f.order()) } + Schema.createRecord(schema.getName, tableSchema.getDoc, schema.getNamespace, false, repackagedFields) + } + }) } latestSchema } From c7f268830c25aac287515eb4f6da64cb5591cccf Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Mon, 25 Jul 2022 20:44:15 -0700 Subject: [PATCH 12/26] Revert back to relative-order based mathing (no name lookup) --- .../command/InsertIntoHoodieTableCommand.scala | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index b19573404a2b..fbdb06936cea 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -175,15 +175,12 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { private def coerceQueryOutput(queryOutput: Seq[Attribute], expectedColumns: Seq[StructField], conf: SQLConf): Seq[Alias] = { - queryOutput.zip(expectedColumns).map { case (attr, field) => - // Lookup (by name) corresponding column from the table definition. In case there's - // no match, assume the column by the relative ordering - val targetColumn = expectedColumns.find(_.name.equals(attr.name)).getOrElse(field) + queryOutput.zip(expectedColumns).map { case (attr, targetField) => // Since query output might be providing data in a different format we might need to wrap // output reference into the cast expression - val castExpr = castIfNeeded(attr.withNullability(targetColumn.nullable), targetColumn.dataType, conf) + val castExpr = castIfNeeded(attr.withNullability(targetField.nullable), targetField.dataType, conf) - Alias(castExpr, targetColumn.name)() + Alias(castExpr, targetField.name)() } } @@ -192,11 +189,9 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { false } else { targetSchema.fields.zip(sourceSchema).forall { - case (targetColumn, correspondingColumn) => - // Determine matching source column by either a name or corresponding ordering - val matchingSourceColumn = sourceSchema.find(_.name == targetColumn.name).getOrElse(correspondingColumn) + case (targetColumn, sourceColumn) => // Make sure we can cast source column to the target column type - Cast.canCast(matchingSourceColumn.dataType, targetColumn.dataType) + Cast.canCast(sourceColumn.dataType, targetColumn.dataType) } } } From ec3cbfbb73d920566f1c4486bebd032b4e131048 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 12:51:37 -0700 Subject: [PATCH 13/26] Simplify scehma reconciliation, schema evolution handling --- .../apache/hudi/HoodieSparkSqlWriter.scala | 31 ++++++++++--------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d41073ab80f9..9f11c7c387c3 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -240,33 +240,34 @@ object HoodieSparkSqlWriter { sparkContext.getConf.registerKryoClasses( Array(classOf[org.apache.avro.generic.GenericData], classOf[org.apache.avro.Schema])) + // TODO(HUDI-4472) revisit and simplify schema handling var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) val latestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema) + + val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) - if (reconcileSchema && parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean - && internalSchemaOpt.isEmpty) { - // force apply full schema evolution. - internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) - } + if (reconcileSchema) { - schema = latestSchema + if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) { + // force apply full schema evolution. + internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) + } else { + schema = latestSchema + } } + if (internalSchemaOpt.isDefined) { - // Apply schema evolution. - val mergedSparkSchema = if (!reconcileSchema) { - AvroConversionUtils.convertAvroSchemaToStructType(AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, latestSchema)) - } else { + // Apply schema evolution + schema = if (reconcileSchema) { // Auto merge write schema and read schema. val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get) - AvroConversionUtils.convertAvroSchemaToStructType(AvroInternalSchemaConverter.convert(mergedInternalSchema, latestSchema.getName)) + AvroInternalSchemaConverter.convert(mergedInternalSchema, latestSchema.getName) + } else { + AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, latestSchema) } - schema = AvroConversionUtils.convertStructTypeToAvroSchema(mergedSparkSchema, structName, nameSpace) } - if (reconcileSchema && internalSchemaOpt.isEmpty) { - schema = latestSchema - } validateSchemaForHoodieIsDeleted(schema) sparkContext.getConf.registerAvroSchemas(schema) log.info(s"Registered avro schema : ${schema.toString(true)}") From b2f5007c237748d05242b7862611c2d0fa23d008 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 13:04:49 -0700 Subject: [PATCH 14/26] Properly reconcile nullability attributes --- .../scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9f11c7c387c3..b78b043d5460 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -249,12 +249,19 @@ object HoodieSparkSqlWriter { var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) if (reconcileSchema) { + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema. + // Otherwise we simply fallback to the latest schema committed if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) { - // force apply full schema evolution. internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) } else { schema = latestSchema } + } else { + // In case reconciliation is disabled, we still have to do nullability attributes + // (minor) reconciliation, making sure schema of the incoming batch is in-line with + // the data already committed in the table + schema = AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, latestSchema) } if (internalSchemaOpt.isDefined) { From 193fa73d80f0b44fe505df6e63085b1497ff0ade Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 15:42:14 -0700 Subject: [PATCH 15/26] Rebased `InsertIntoHoodieTableCommand` to rely on Spark's `TableSchemaResolver` instead of bespoke implementation --- .../InsertIntoHoodieTableCommand.scala | 47 ++++++++++++------- 1 file changed, 29 insertions(+), 18 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index fbdb06936cea..99b90bf263d6 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hudi.command import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.exception.HoodieException import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.analysis.TableOutputResolver import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan @@ -29,7 +30,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} /** * Command for insert into Hudi table. @@ -129,16 +130,29 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { validate(removeMetaFields(query.schema), partitionsSpec, catalogTable) + // Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway) + val cleanedQuery = stripMetaFields(query) // To validate and align properly output of the query, we simply filter out partition columns with already // provided static values from the table's schema - val expectedColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) + val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) - val queryOutputWithoutMetaFields = removeMetaFields(query.output) - - val transformedQueryOutput = coerceQueryOutput(queryOutputWithoutMetaFields, expectedColumns, conf) + val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf) val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) - Project(transformedQueryOutput ++ staticPartitionValuesExprs, query) + Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, coercedQueryOutput) + } + + private def coerceQueryOutputColumns(expectedSchema: StructType, + query: LogicalPlan, + catalogTable: HoodieCatalogTable, + conf: SQLConf): LogicalPlan = { + try { + TableOutputResolver.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf) + } catch { + // NOTE: In case matching by name didn't match the query output, we will attempt positional matching + case ae: AnalysisException if ae.getMessage().startsWith("Cannot write incompatible data to table") => + TableOutputResolver.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf) + } } private def validate(queryOutputSchema: StructType, partitionsSpec: Map[String, Option[String]], catalogTable: HoodieCatalogTable): Unit = { @@ -172,18 +186,6 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { }) } - private def coerceQueryOutput(queryOutput: Seq[Attribute], - expectedColumns: Seq[StructField], - conf: SQLConf): Seq[Alias] = { - queryOutput.zip(expectedColumns).map { case (attr, targetField) => - // Since query output might be providing data in a different format we might need to wrap - // output reference into the cast expression - val castExpr = castIfNeeded(attr.withNullability(targetField.nullable), targetField.dataType, conf) - - Alias(castExpr, targetField.name)() - } - } - private def conforms(sourceSchema: StructType, targetSchema: StructType): Boolean = { if (sourceSchema.fields.length != targetSchema.fields.length) { false @@ -196,6 +198,15 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { } } + def stripMetaFields(query: LogicalPlan): LogicalPlan = { + val filteredOutput = query.output.filterNot(attr => isMetaField(attr.name)) + if (filteredOutput == query.output) { + query + } else { + Project(filteredOutput, query) + } + } + private def filterStaticPartitionValues(partitionsSpec: Map[String, Option[String]]): Map[String, String] = partitionsSpec.filter(p => p._2.isDefined).mapValues(_.get) } From 813ea6fc362370d4bd8b16f46c7dc6a4e47fed5e Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 16:11:38 -0700 Subject: [PATCH 16/26] Fixed tests --- .../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 9 +++++---- .../org/apache/spark/sql/hudi/TestInsertTable.scala | 12 ++++++------ .../apache/spark/sql/hudi/TestMergeIntoTable.scala | 2 +- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index 5e2afd749066..2325acc41883 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -145,11 +145,12 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } - - protected def removeQuotes(value: Any): Any = { + protected def extractRawValue(value: Any): Any = { value match { - case s: String => s.stripPrefix("'").stripSuffix("'") - case _=> value + case s: String => + // We need to strip out data-type prefixes like "DATE", "TIMESTAMP" + s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("'").stripSuffix("'") + case _ => value } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index 210b3a443a13..b2c9d313b4ca 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -396,8 +396,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { ("string", "'1000'"), ("int", 1000), ("bigint", 10000), - ("timestamp", "'2021-05-20 00:00:00'"), - ("date", "'2021-05-20'") + ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"), + ("date", "DATE'2021-05-20'") ) typeAndValue.foreach { case (partitionType, partitionValue) => val tableName = generateTableName @@ -409,8 +409,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { test("Test TimestampType Partition Column With Consistent Logical Timestamp Enabled") { withTempDir { tmp => val typeAndValue = Seq( - ("timestamp", "'2021-05-20 00:00:00'"), - ("date", "'2021-05-20'") + ("timestamp", "TIMESTAMP'2021-05-20 00:00:00'"), + ("date", "DATE'2021-05-20'") ) typeAndValue.foreach { case (partitionType, partitionValue) => val tableName = generateTableName @@ -436,8 +436,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( - Seq(1, "a1", 10, removeQuotes(partitionValue).toString), - Seq(2, "a2", 10, removeQuotes(partitionValue).toString) + Seq(1, "a1", 10, extractRawValue(partitionValue).toString), + Seq(2, "a2", 10, extractRawValue(partitionValue).toString) ) } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala index ac11f83d5311..58c808d28a70 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestMergeIntoTable.scala @@ -908,7 +908,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase { | when not matched then insert * |""".stripMargin) checkAnswer(s"select id, name, cast(value as string), ts from $tableName")( - Seq(1, "a1", removeQuotes(dataValue), 1000) + Seq(1, "a1", extractRawValue(dataValue), 1000) ) } } From f7dae901df0730e26f131f758dfd151a1ee2847a Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 16:36:23 -0700 Subject: [PATCH 17/26] Extracted query output resolving/reshaping into `HoodieCatalystPlanUtils` --- .../spark/sql/HoodieCatalystPlansUtils.scala | 23 ++++++++++++++++++- .../InsertIntoHoodieTableCommand.scala | 15 ++++++------ .../sql/HoodieSpark2CatalystPlanUtils.scala | 12 ++++++++-- .../sql/HoodieSpark3CatalystPlanUtils.scala | 12 ++++++++-- 4 files changed, 50 insertions(+), 12 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala index c277dcb3e670..7566458b1b9e 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala @@ -19,12 +19,33 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan} +import org.apache.spark.sql.internal.SQLConf trait HoodieCatalystPlansUtils { + /** + * Resolves output of the provided [[query]] against the [[expected]] list of [[Attribute]], + * and returns new (reshaped) instance of the [[LogicalPlan]] + * + * @param tableName used purely for more human-readable error output (if any) + * @param expected list of attributes output of the query has to adhere to + * @param query query whose output has to be reshaped + * @param byName whether the matching should occur by-name or positionally + * @param conf instance of [[SQLConf]] + * @return [[LogicalPlan]] which output is aligned to match to that of [[expected]] + */ + def resolveOutputColumns(tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean, + conf: SQLConf): LogicalPlan + + /** + * Instantiates an [[Explain]] command + */ def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan /** diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 99b90bf263d6..4129586ec843 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -17,12 +17,11 @@ package org.apache.spark.sql.hudi.command -import org.apache.hudi.HoodieSparkSqlWriter import org.apache.hudi.exception.HoodieException +import org.apache.hudi.{HoodieSparkSqlWriter, SparkAdapterSupport} import org.apache.spark.internal.Logging -import org.apache.spark.sql.catalyst.analysis.TableOutputResolver import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HoodieCatalogTable} -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, Literal, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Cast, Literal, NamedExpression} import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} import org.apache.spark.sql.execution.datasources.LogicalRelation @@ -30,7 +29,7 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils._ import org.apache.spark.sql.hudi.ProvidesHoodieConfig import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} -import org.apache.spark.sql.{AnalysisException, Dataset, Row, SaveMode, SparkSession} +import org.apache.spark.sql._ /** * Command for insert into Hudi table. @@ -63,7 +62,8 @@ case class InsertIntoHoodieTableCommand(logicalRelation: LogicalRelation, } } -object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { +object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig with SparkAdapterSupport { + /** * Run the insert query. We support both dynamic partition insert and static partition insert. * @param sparkSession The spark session. @@ -146,12 +146,13 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig { query: LogicalPlan, catalogTable: HoodieCatalogTable, conf: SQLConf): LogicalPlan = { + val planUtils = sparkAdapter.getCatalystPlanUtils try { - TableOutputResolver.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf) + planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = true, conf) } catch { // NOTE: In case matching by name didn't match the query output, we will attempt positional matching case ae: AnalysisException if ae.getMessage().startsWith("Cannot write incompatible data to table") => - TableOutputResolver.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf) + planUtils.resolveOutputColumns(catalogTable.catalogTableName, expectedSchema.toAttributes, query, byName = false, conf) } } diff --git a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala index 2797b8caa1da..cf54504d0d54 100644 --- a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala @@ -18,14 +18,22 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelation} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, LogicalPlan} import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.internal.SQLConf object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils { + def resolveOutputColumns(tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean, + conf: SQLConf): LogicalPlan = + SimpleAnalyzer.ResolveOutputRelation.resolveOutputColumns(tableName, expected, query, byName) + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = ExplainCommand(plan, extended = extended) diff --git a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala index 0cdf5782c0a4..abece34dea86 100644 --- a/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala +++ b/hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/HoodieSpark3CatalystPlanUtils.scala @@ -18,17 +18,25 @@ package org.apache.spark.sql import org.apache.hudi.spark3.internal.ReflectUtil -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.{Expression, Like} +import org.apache.spark.sql.catalyst.analysis.{TableOutputResolver, UnresolvedRelation} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like} import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoStatement, Join, JoinHint, LogicalPlan} import org.apache.spark.sql.catalyst.{AliasIdentifier, TableIdentifier} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.execution.command.ExplainCommand import org.apache.spark.sql.execution.{ExtendedMode, SimpleMode} +import org.apache.spark.sql.internal.SQLConf abstract class HoodieSpark3CatalystPlanUtils extends HoodieCatalystPlansUtils { + def resolveOutputColumns(tableName: String, + expected: Seq[Attribute], + query: LogicalPlan, + byName: Boolean, + conf: SQLConf): LogicalPlan = + TableOutputResolver.resolveOutputColumns(tableName, expected, query, byName, conf) + def createExplainCommand(plan: LogicalPlan, extended: Boolean): LogicalPlan = ExplainCommand(plan, mode = if (extended) ExtendedMode else SimpleMode) From 83bec46f2014ef33534a34b8df751ccc5a9f9b40 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 18:57:57 -0700 Subject: [PATCH 18/26] Fixed tests --- .../apache/spark/sql/hudi/TestSpark3DDL.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index b64d386f1fb4..e4af982e3131 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -55,11 +55,11 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql( s""" | insert into $tableName values - | (1,1,11,100001,101.01,1001.0001,100001.0001,'a000001','2021-12-25','2021-12-25 12:01:01',true,'a01','2021-12-25'), - | (2,2,12,100002,102.02,1002.0002,100002.0002,'a000002','2021-12-25','2021-12-25 12:02:02',true,'a02','2021-12-25'), - | (3,3,13,100003,103.03,1003.0003,100003.0003,'a000003','2021-12-25','2021-12-25 12:03:03',false,'a03','2021-12-25'), - | (4,4,14,100004,104.04,1004.0004,100004.0004,'a000004','2021-12-26','2021-12-26 12:04:04',true,'a04','2021-12-26'), - | (5,5,15,100005,105.05,1005.0005,100005.0005,'a000005','2021-12-26','2021-12-26 12:05:05',false,'a05','2021-12-26') + | (1,1,11,100001,101.01,1001.0001,100001.0001,'a000001',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:01:01',true,X'a01',TIMESTAMP'2021-12-25'), + | (2,2,12,100002,102.02,1002.0002,100002.0002,'a000002',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:02:02',true,X'a02',TIMESTAMP'2021-12-25'), + | (3,3,13,100003,103.03,1003.0003,100003.0003,'a000003',DATE'2021-12-25',TIMESTAMP'2021-12-25 12:03:03',false,X'a03',TIMESTAMP'2021-12-25'), + | (4,4,14,100004,104.04,1004.0004,100004.0004,'a000004',DATE'2021-12-26',TIMESTAMP'2021-12-26 12:04:04',true,X'a04',TIMESTAMP'2021-12-26'), + | (5,5,15,100005,105.05,1005.0005,100005.0005,'a000005',DATE'2021-12-26',TIMESTAMP'2021-12-26 12:05:05',false,X'a05',TIMESTAMP'2021-12-26') |""".stripMargin) } @@ -70,6 +70,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") + // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x + // and are disallowed now by default in Spark 3.x + spark.sql("set spark.sql.storeAssignmentPolicy=legacy") createAndPreparePartitionTable(spark, tableName, tablePath, tableType) // date -> string -> date spark.sql(s"alter table $tableName alter column col6 type String") @@ -138,6 +141,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") + // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x + // and are disallowed now by default in Spark 3.x + spark.sql("set spark.sql.storeAssignmentPolicy=legacy") createAndPreparePartitionTable(spark, tableName, tablePath, tableType) // float -> double -> decimal -> String spark.sql(s"alter table $tableName alter column col2 type double") @@ -172,6 +178,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { val tablePath = s"${new Path(tmp.getCanonicalPath, tableName).toUri.toString}" if (HoodieSparkUtils.gteqSpark3_1) { spark.sql("set hoodie.schema.on.read.enable=true") + // NOTE: This is required since as this tests use type coercions which were only permitted in Spark 2.x + // and are disallowed now by default in Spark 3.x + spark.sql("set spark.sql.storeAssignmentPolicy=legacy") createAndPreparePartitionTable(spark, tableName, tablePath, tableType) // test set properties From f37753ae4820d4d87ead70c6327d34e5cc1029bf Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 19:34:57 -0700 Subject: [PATCH 19/26] Added new method `TableSchemaResolver#getTableLatestAvroSchema` to return most recent table's schema; Rebased `HoodieSparkSqlWriter` onto `TableSchemaResolver#getTableLatestAvroSchema` --- .../common/table/TableSchemaResolver.java | 60 ++++++++++++------- .../apache/hudi/HoodieSparkSqlWriter.scala | 27 ++++----- 2 files changed, 49 insertions(+), 38 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index 8f4c362ffbfa..f80eb4978f74 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -135,7 +135,7 @@ public Schema getTableAvroSchema() throws Exception { * @throws Exception */ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).get(); } /** @@ -144,7 +144,7 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception * @param instant as of which table's schema will be fetched */ public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)); + return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).get(); } /** @@ -169,33 +169,36 @@ public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { return getTableAvroSchema(false); } - private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { - Schema schema = - (instantOpt.isPresent() - ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) - : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) - .or(() -> - metaClient.getTableConfig().getTableCreateSchema() - .map(tableSchema -> - includeMetadataFields - ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) - : tableSchema) - ) - .orElseGet(() -> { - Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); - return includeMetadataFields - ? schemaFromDataFile - : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); - }); + private Option getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { + if (!metaClient.isTimelineNonEmpty()) { + return Option.empty(); + } + + Option schemaFromCommitMetadata = instantOpt.isPresent() + ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) + : getTableSchemaFromLatestCommitMetadata(includeMetadataFields); + + Schema schema = schemaFromCommitMetadata.or(() -> + metaClient.getTableConfig().getTableCreateSchema() + .map(tableSchema -> + includeMetadataFields + ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) + : tableSchema)) + .orElseGet(() -> { + Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); + return includeMetadataFields + ? schemaFromDataFile + : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); + }); // TODO partition columns have to be appended in all read-paths if (metaClient.getTableConfig().shouldDropPartitionColumns()) { return metaClient.getTableConfig().getPartitionFields() .map(partitionFields -> appendPartitionColumns(schema, partitionFields)) - .orElse(schema); + .or(() -> Option.of(schema)); } - return schema; + return Option.of(schema); } private Option getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { @@ -316,6 +319,9 @@ private MessageType convertAvroSchemaToParquet(Schema schema) { * @param oldSchema Older schema to check. * @param newSchema Newer schema to check. * @return True if the schema validation is successful + * + * TODO revisit this method: it's implemented incorrectly as it might be applying different criteria + * to top-level record and nested record (for ex, if that nested record is contained w/in an array) */ public static boolean isSchemaCompatible(Schema oldSchema, Schema newSchema) { if (oldSchema.getType() == newSchema.getType() && newSchema.getType() == Schema.Type.RECORD) { @@ -366,6 +372,16 @@ public static boolean isSchemaCompatible(String oldSchema, String newSchema) { return isSchemaCompatible(new Schema.Parser().parse(oldSchema), new Schema.Parser().parse(newSchema)); } + /** + * Returns table's latest Avro {@link Schema} + * + * This method is {@link #getTableAvroSchema(boolean)} counterpart that is returning an + * {@link Option} instead of failing in case table is empty + */ + public Option getTableLatestAvroSchema(boolean includeMetadataFields) throws Exception { + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); + } + /** * Get latest schema either from incoming schema or table schema. * @param writeSchema incoming batch's write schema. diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index b78b043d5460..d3f68f89d494 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -22,7 +22,7 @@ import org.apache.avro.generic.GenericRecord import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hudi.DataSourceWriteOptions._ -import org.apache.hudi.HoodieConversionUtils.toProperties +import org.apache.hudi.HoodieConversionUtils.{toProperties, toScalaOption} import org.apache.hudi.HoodieWriterUtils._ import org.apache.hudi.avro.HoodieAvroUtils import org.apache.hudi.client.{HoodieWriteResult, SparkRDDWriteClient} @@ -243,7 +243,7 @@ object HoodieSparkSqlWriter { // TODO(HUDI-4472) revisit and simplify schema handling var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - val latestSchema = getLatestTableSchema(fs, basePath, sparkContext, schema) + val latestSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(schema) val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) @@ -396,23 +396,18 @@ object HoodieSparkSqlWriter { * @param schema incoming record's schema. * @return Pair of(boolean, table schema), where first entry will be true only if schema conversion is required. */ - def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext, schema: Schema): Schema = { - var latestSchema: Schema = schema + def getLatestTableSchema(fs: FileSystem, basePath: Path, sparkContext: SparkContext): Option[Schema] = { if (FSUtils.isTableExists(basePath.toString, fs)) { - val tableMetaClient = HoodieTableMetaClient.builder.setConf(sparkContext.hadoopConfiguration).setBasePath(basePath.toString).build() + val tableMetaClient = HoodieTableMetaClient.builder + .setConf(sparkContext.hadoopConfiguration) + .setBasePath(basePath.toString) + .build() val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - // TODO(HUDI-4472): clean up - // NOTE: Repackaging schema to override name/namespace of the table's schema is required to workaround - // the issue of this method improperly comparing table schema w/ writer's schema resulting in improper - // discarding of the table's schema (instead proceeding with writer's schema) - latestSchema = tableSchemaResolver.getLatestSchema(schema, true, new Functions.Function1[Schema, Schema] { - override def apply(tableSchema: Schema): Schema = { - val repackagedFields = tableSchema.getFields.map { f => new Schema.Field(f.name, f.schema(), f.doc, f.defaultVal(), f.order()) } - Schema.createRecord(schema.getName, tableSchema.getDoc, schema.getNamespace, false, repackagedFields) - } - }) + + toScalaOption(tableSchemaResolver.getTableLatestAvroSchema(false)) + } else { + None } - latestSchema } def registerKryoClassesAndGetGenericRecords(tblName: String, sparkContext: SparkContext, df: Dataset[Row], From 52a46e8dc7cb1f070681912888037720ba457249 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 21:53:44 -0700 Subject: [PATCH 20/26] Fixing tests --- .../spark/sql/hudi/HoodieSparkSqlTestBase.scala | 13 ++++++++++++- .../org/apache/spark/sql/hudi/TestInsertTable.scala | 4 +++- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala index 2325acc41883..e7848320ff35 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/HoodieSparkSqlTestBase.scala @@ -145,11 +145,22 @@ class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll { assertResult(true)(hasException) } + def dropTypeLiteralPrefix(value: Any): Any = { + value match { + case s: String => + s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("X") + case _ => value + } + } + protected def extractRawValue(value: Any): Any = { value match { case s: String => // We need to strip out data-type prefixes like "DATE", "TIMESTAMP" - s.stripPrefix("DATE").stripPrefix("TIMESTAMP").stripPrefix("'").stripSuffix("'") + dropTypeLiteralPrefix(s) + .asInstanceOf[String] + .stripPrefix("'") + .stripSuffix("'") case _ => value } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala index b2c9d313b4ca..ced6fef72d45 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestInsertTable.scala @@ -24,6 +24,7 @@ import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.exception.HoodieDuplicateKeyException import org.apache.hudi.keygen.ComplexKeyGenerator import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.internal.SQLConf import java.io.File @@ -433,7 +434,8 @@ class TestInsertTable extends HoodieSparkSqlTestBase { | partitioned by (dt) | location '${tmp.getCanonicalPath}/$tableName' """.stripMargin) - spark.sql(s"insert into $tableName partition(dt = $partitionValue) select 1, 'a1', 10") + // NOTE: We have to drop type-literal prefix since Spark doesn't parse type literals appropriately + spark.sql(s"insert into $tableName partition(dt = ${dropTypeLiteralPrefix(partitionValue)}) select 1, 'a1', 10") spark.sql(s"insert into $tableName select 2, 'a2', 10, $partitionValue") checkAnswer(s"select id, name, price, cast(dt as string) from $tableName order by id")( Seq(1, "a1", 10, extractRawValue(partitionValue).toString), From 3fa6184937f4c84484195747afbac83403dd423d Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 21:54:38 -0700 Subject: [PATCH 21/26] Tidying up --- .../sql/hudi/command/InsertIntoHoodieTableCommand.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala index 4129586ec843..8bd81df3d271 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/InsertIntoHoodieTableCommand.scala @@ -129,14 +129,18 @@ object InsertIntoHoodieTableCommand extends Logging with ProvidesHoodieConfig wi val staticPartitionValues = filterStaticPartitionValues(partitionsSpec) validate(removeMetaFields(query.schema), partitionsSpec, catalogTable) - // Make sure we strip out meta-fields from the incoming dataset (these will have to be discarded anyway) val cleanedQuery = stripMetaFields(query) // To validate and align properly output of the query, we simply filter out partition columns with already // provided static values from the table's schema + // + // NOTE: This is a crucial step: since coercion might rely on either of a) name-based or b) positional-based + // matching it's important to strip out partition columns, having static values provided in the partition spec, + // since such columns wouldn't be otherwise specified w/in the query itself and therefore couldn't be matched + // positionally for example val expectedQueryColumns = catalogTable.tableSchemaWithoutMetaFields.filterNot(f => staticPartitionValues.contains(f.name)) - val coercedQueryOutput = coerceQueryOutputColumns(StructType(expectedQueryColumns), cleanedQuery, catalogTable, conf) + val staticPartitionValuesExprs = createStaticPartitionValuesExpressions(staticPartitionValues, targetPartitionSchema, conf) Project(coercedQueryOutput.output ++ staticPartitionValuesExprs, coercedQueryOutput) From e089df9b5b53374e47b23fb6e0494ef88f3bea59 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 22:28:36 -0700 Subject: [PATCH 22/26] Refactored schema handling in `HoodieSparkSqlWriter` to make sure all cases are handled correctly: - Full Schema Evolution - Schema reconciliation (w/o FSE) - No schema reconciliation at all --- .../apache/hudi/HoodieSparkSqlWriter.scala | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index d3f68f89d494..0e55a7aa0d8e 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -242,46 +242,47 @@ object HoodieSparkSqlWriter { classOf[org.apache.avro.Schema])) // TODO(HUDI-4472) revisit and simplify schema handling - var schema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) - val latestSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(schema) + val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) + val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema) val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) - if (reconcileSchema) { - // In case we need to reconcile the schema and schema evolution is enabled, - // we will force-apply schema evolution to the writer's schema. - // Otherwise we simply fallback to the latest schema committed - if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) { - internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(schema)) + val writerSchema: Schema = + if (reconcileSchema) { + // In case we need to reconcile the schema and schema evolution is enabled, + // we will force-apply schema evolution to the writer's schema + if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) { + internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema)) + } + + if (internalSchemaOpt.isDefined) { + // Apply schema evolution, by auto-merging write schema and read schema + val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(sourceSchema, internalSchemaOpt.get) + AvroInternalSchemaConverter.convert(mergedInternalSchema, latestTableSchema.getName) + } else if (TableSchemaResolver.isSchemaCompatible(sourceSchema, latestTableSchema)) { + // In case schema reconciliation is enabled and source and latest table schemas + // are compatible (as defined by [[TableSchemaResolver#isSchemaCompatible]], then we will + // pick latest table's schema as the writer's schema + latestTableSchema + } else { + // Otherwise fallback to original source's schema + sourceSchema + } } else { - schema = latestSchema + // In case reconciliation is disabled, we still have to do nullability attributes + // (minor) reconciliation, making sure schema of the incoming batch is in-line with + // the data already committed in the table + AvroSchemaEvolutionUtils.canonicalizeColumnNullability(sourceSchema, latestTableSchema) } - } else { - // In case reconciliation is disabled, we still have to do nullability attributes - // (minor) reconciliation, making sure schema of the incoming batch is in-line with - // the data already committed in the table - schema = AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, latestSchema) - } - - if (internalSchemaOpt.isDefined) { - // Apply schema evolution - schema = if (reconcileSchema) { - // Auto merge write schema and read schema. - val mergedInternalSchema = AvroSchemaEvolutionUtils.reconcileSchema(schema, internalSchemaOpt.get) - AvroInternalSchemaConverter.convert(mergedInternalSchema, latestSchema.getName) - } else { - AvroSchemaEvolutionUtils.canonicalizeColumnNullability(schema, latestSchema) - } - } - validateSchemaForHoodieIsDeleted(schema) - sparkContext.getConf.registerAvroSchemas(schema) - log.info(s"Registered avro schema : ${schema.toString(true)}") + validateSchemaForHoodieIsDeleted(writerSchema) + sparkContext.getConf.registerAvroSchemas(writerSchema) + log.info(s"Registered avro schema : ${writerSchema.toString(true)}") // Convert to RDD[HoodieRecord] val genericRecords: RDD[GenericRecord] = HoodieSparkUtils.createRdd(df, structName, nameSpace, reconcileSchema, - org.apache.hudi.common.util.Option.of(schema)) + org.apache.hudi.common.util.Option.of(writerSchema)) val shouldCombine = parameters(INSERT_DROP_DUPS.key()).toBoolean || operation.equals(WriteOperationType.UPSERT) || parameters.getOrElse(HoodieWriteConfig.COMBINE_BEFORE_INSERT.key(), @@ -303,7 +304,7 @@ object HoodieSparkSqlWriter { hoodieRecord }).toJavaRDD() - val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, schema) else schema + val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema // Create a HoodieWriteClient & issue the write. val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path, From 0088598307dd596f4534575e8fe765d4cf2f2e17 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 23:04:28 -0700 Subject: [PATCH 23/26] Reverting changes in `TableSchemaResolver` --- .../common/table/TableSchemaResolver.java | 62 ++++++++++--------- .../apache/hudi/HoodieSparkSqlWriter.scala | 2 +- 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index f80eb4978f74..4ada97e35ce6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -135,7 +135,7 @@ public Schema getTableAvroSchema() throws Exception { * @throws Exception */ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()).get(); + return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); } /** @@ -144,7 +144,7 @@ public Schema getTableAvroSchema(boolean includeMetadataFields) throws Exception * @param instant as of which table's schema will be fetched */ public Schema getTableAvroSchema(HoodieInstant instant, boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).get(); + return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)); } /** @@ -169,36 +169,33 @@ public Schema getTableAvroSchemaWithoutMetadataFields() throws Exception { return getTableAvroSchema(false); } - private Option getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { - if (!metaClient.isTimelineNonEmpty()) { - return Option.empty(); - } - - Option schemaFromCommitMetadata = instantOpt.isPresent() - ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) - : getTableSchemaFromLatestCommitMetadata(includeMetadataFields); - - Schema schema = schemaFromCommitMetadata.or(() -> - metaClient.getTableConfig().getTableCreateSchema() - .map(tableSchema -> - includeMetadataFields - ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) - : tableSchema)) - .orElseGet(() -> { - Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); - return includeMetadataFields - ? schemaFromDataFile - : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); - }); + private Schema getTableAvroSchemaInternal(boolean includeMetadataFields, Option instantOpt) { + Schema schema = + (instantOpt.isPresent() + ? getTableSchemaFromCommitMetadata(instantOpt.get(), includeMetadataFields) + : getTableSchemaFromLatestCommitMetadata(includeMetadataFields)) + .or(() -> + metaClient.getTableConfig().getTableCreateSchema() + .map(tableSchema -> + includeMetadataFields + ? HoodieAvroUtils.addMetadataFields(tableSchema, hasOperationField.get()) + : tableSchema) + ) + .orElseGet(() -> { + Schema schemaFromDataFile = getTableAvroSchemaFromDataFile(); + return includeMetadataFields + ? schemaFromDataFile + : HoodieAvroUtils.removeMetadataFields(schemaFromDataFile); + }); // TODO partition columns have to be appended in all read-paths if (metaClient.getTableConfig().shouldDropPartitionColumns()) { return metaClient.getTableConfig().getPartitionFields() .map(partitionFields -> appendPartitionColumns(schema, partitionFields)) - .or(() -> Option.of(schema)); + .orElse(schema); } - return Option.of(schema); + return schema; } private Option getTableSchemaFromLatestCommitMetadata(boolean includeMetadataFields) { @@ -373,13 +370,18 @@ public static boolean isSchemaCompatible(String oldSchema, String newSchema) { } /** - * Returns table's latest Avro {@link Schema} + * Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least + * a single commit) * - * This method is {@link #getTableAvroSchema(boolean)} counterpart that is returning an - * {@link Option} instead of failing in case table is empty + * This method differs from {@link #getTableAvroSchema(boolean)} in that it won't fallback + * to use table's schema used at creation */ - public Option getTableLatestAvroSchema(boolean includeMetadataFields) throws Exception { - return getTableAvroSchemaInternal(includeMetadataFields, Option.empty()); + public Option getTableAvroSchemaFromLatestCommit(boolean includeMetadataFields) throws Exception { + if (metaClient.isTimelineNonEmpty()) { + return Option.of(getTableAvroSchemaInternal(includeMetadataFields, Option.empty())); + } + + return Option.empty(); } /** diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 0e55a7aa0d8e..9cbfa69a6ab9 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -405,7 +405,7 @@ object HoodieSparkSqlWriter { .build() val tableSchemaResolver = new TableSchemaResolver(tableMetaClient) - toScalaOption(tableSchemaResolver.getTableLatestAvroSchema(false)) + toScalaOption(tableSchemaResolver.getTableAvroSchemaFromLatestCommit(false)) } else { None } From 40b432ea72f13976b94f81791ebfe7c12f52bc03 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Tue, 26 Jul 2022 23:39:19 -0700 Subject: [PATCH 24/26] Fixed tests --- .../org/apache/spark/sql/hudi/TestSpark3DDL.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala index e4af982e3131..65357b903b5b 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestSpark3DDL.scala @@ -411,7 +411,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"alter table $tableName alter column members.value.a first") - spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStruct', 29, 100), 1000)") + spark.sql(s"insert into ${tableName} values(1, 'jack', map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStruct', 29, 100), 1000)") // rename column spark.sql(s"alter table ${tableName} rename column user to userx") @@ -433,7 +433,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { checkAnswer(spark.sql(s"select name, userx.name, userx.score from ${tableName}").collect())(Seq(null, null, null)) // insert again - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000)") + spark.sql(s"insert into ${tableName} values(2 , map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000)") // check again checkAnswer(spark.sql(s"select name, userx.name as uxname, userx.score as uxs from ${tableName} order by id").collect())( @@ -449,9 +449,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { Seq(291, 2, "jacknew")) // test map value type change spark.sql(s"alter table ${tableName} add columns(mxp map)") - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 9))") + spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 9))") spark.sql(s"alter table ${tableName} alter column mxp.value type double") - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") + spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") spark.sql(s"select * from $tableName").show(false) checkAnswer(spark.sql(s"select mxp from ${tableName} order by id").collect())( Seq(null), @@ -462,7 +462,7 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase { spark.sql(s"alter table ${tableName} rename column userx to us") spark.sql(s"alter table ${tableName} rename column us.age to age1") - spark.sql(s"insert into ${tableName} values(2 , map('k1', struct('v1', 100), 'k2', struct('v2', 200)), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") + spark.sql(s"insert into ${tableName} values(2, map('k1', struct(100, 'v1'), 'k2', struct(200, 'v2')), struct('jackStructNew', 291 , 101), 'jacknew', 1000, map('t1', 10))") spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").show() checkAnswer(spark.sql(s"select mem.value.nn, us.age1 from $tableName order by id").collect())( Seq(null, 29), From d0983425ffdc6140a8fe523832816bc781c69c34 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Wed, 27 Jul 2022 10:06:56 -0700 Subject: [PATCH 25/26] Fixed more tests (for Spark 2) --- .../spark/sql/hudi/TestDeleteTable.scala | 39 ++++++++++++++----- .../spark/sql/hudi/TestShowPartitions.scala | 20 ++++++++-- .../spark/sql/hudi/TestUpdateTable.scala | 28 ++++++++++--- 3 files changed, 67 insertions(+), 20 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala index 4c7c6269667a..3ab52a0bac7f 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestDeleteTable.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.hudi import org.apache.hudi.DataSourceWriteOptions._ +import org.apache.hudi.HoodieSparkUtils.isSpark2 import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.keygen.SimpleKeyGenerator import org.apache.spark.sql.SaveMode @@ -93,11 +94,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql( - s""" - |insert into $tableName - |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000) - """.stripMargin) + if (isSpark2) { + spark.sql( + s""" + |insert into $tableName + |values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', cast(20.0 as double), 1000), (3, 'a2', cast(30.0 as double), 1000) + |""".stripMargin) + } else { + spark.sql( + s""" + |insert into $tableName + |values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000), (3, 'a2', 30.0, 1000) + |""".stripMargin) + } + checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 20.0, 1000), @@ -132,11 +142,20 @@ class TestDeleteTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql( - s""" - |insert into $ptTableName - |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") - """.stripMargin) + if (isSpark2) { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 1000, "2022") + |""".stripMargin) + } else { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") + |""".stripMargin) + } + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( Seq(1, "a1", 10.0, 1000, "2021"), Seq(2, "a2", 20.0, 1000, "2021"), diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala index 005d5fed710e..59ee64286107 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestShowPartitions.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.HoodieSparkUtils.isSpark2 import org.apache.hudi.common.util.PartitionPathEncodeUtils.DEFAULT_PARTITION_PATH class TestShowPartitions extends HoodieSparkSqlTestBase { @@ -84,11 +85,22 @@ class TestShowPartitions extends HoodieSparkSqlTestBase { checkAnswer(s"show partitions $tableName partition(dt='2021-01-02')")(Seq("dt=2021-01-02")) // Insert into null partition - spark.sql( - s""" - | insert into $tableName - | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt + if (isSpark2) { + // Spark 2 isn't able to convert NullType to any other type w/ appropriate nullability, so + // explicit cast is required + spark.sql( + s""" + | insert into $tableName + | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, cast(null as string) as dt """.stripMargin) + } else { + spark.sql( + s""" + | insert into $tableName + | select 3 as id, 'a3' as name, 10 as price, 1000 as ts, null as dt + """.stripMargin) + } + checkAnswer(s"show partitions $tableName")( Seq("dt=2021-01-01"), Seq("dt=2021-01-02"), Seq("dt=%s".format(DEFAULT_PARTITION_PATH)) ) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala index 8c709ab37a6e..2d8d6ceca714 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestUpdateTable.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hudi +import org.apache.hudi.HoodieSparkUtils.isSpark2 + class TestUpdateTable extends HoodieSparkSqlTestBase { test("Test Update Table") { @@ -84,7 +86,12 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)") + if (isSpark2) { + spark.sql(s"insert into $tableName values (1, 'a1', cast(10.0 as double), 1000), (2, 'a2', cast(20.0 as double), 1000)") + } else { + spark.sql(s"insert into $tableName values (1, 'a1', 10.0, 1000), (2, 'a2', 20.0, 1000)") + } + checkAnswer(s"select id, name, price, ts from $tableName")( Seq(1, "a1", 10.0, 1000), Seq(2, "a2", 20.0, 1000) @@ -119,11 +126,20 @@ class TestUpdateTable extends HoodieSparkSqlTestBase { """.stripMargin) // insert data to table - spark.sql( - s""" - |insert into $ptTableName - |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") - """.stripMargin) + if (isSpark2) { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', cast(10.0 as double), 1000, "2021"), (2, 'a2', cast(20.0 as double), 1000, "2021"), (3, 'a2', cast(30.0 as double), 1000, "2022") + |""".stripMargin) + } else { + spark.sql( + s""" + |insert into $ptTableName + |values (1, 'a1', 10.0, 1000, "2021"), (2, 'a2', 20.0, 1000, "2021"), (3, 'a2', 30.0, 1000, "2022") + |""".stripMargin) + } + checkAnswer(s"select id, name, price, ts, pt from $ptTableName")( Seq(1, "a1", 10.0, 1000, "2021"), Seq(2, "a2", 20.0, 1000, "2021"), From 4a639a47b823e71425b6450883dc1427ec4c8898 Mon Sep 17 00:00:00 2001 From: Alexey Kudinkin Date: Thu, 28 Jul 2022 08:52:47 -0700 Subject: [PATCH 26/26] Tidying up --- .../main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala index 9cbfa69a6ab9..167001863da4 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala @@ -245,14 +245,14 @@ object HoodieSparkSqlWriter { val sourceSchema = AvroConversionUtils.convertStructTypeToAvroSchema(df.schema, structName, nameSpace) val latestTableSchema = getLatestTableSchema(fs, basePath, sparkContext).getOrElse(sourceSchema) - val enabledSchemaEvolution = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean + val schemaEvolutionEnabled = parameters.getOrDefault(DataSourceReadOptions.SCHEMA_EVOLUTION_ENABLED.key(), "false").toBoolean var internalSchemaOpt = getLatestTableInternalSchema(fs, basePath, sparkContext) val writerSchema: Schema = if (reconcileSchema) { // In case we need to reconcile the schema and schema evolution is enabled, // we will force-apply schema evolution to the writer's schema - if (enabledSchemaEvolution && internalSchemaOpt.isEmpty) { + if (schemaEvolutionEnabled && internalSchemaOpt.isEmpty) { internalSchemaOpt = Some(AvroInternalSchemaConverter.convert(sourceSchema)) } @@ -304,10 +304,10 @@ object HoodieSparkSqlWriter { hoodieRecord }).toJavaRDD() - val writeSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema + val writerDataSchema = if (dropPartitionColumns) generateSchemaWithoutPartitionColumns(partitionColumns, writerSchema) else writerSchema // Create a HoodieWriteClient & issue the write. - val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writeSchema.toString, path, + val client = hoodieWriteClient.getOrElse(DataSourceUtils.createHoodieClient(jsc, writerDataSchema.toString, path, tblName, mapAsJavaMap(addSchemaEvolutionParameters(parameters, internalSchemaOpt) - HoodieWriteConfig.AUTO_COMMIT_ENABLE.key) )).asInstanceOf[SparkRDDWriteClient[HoodieRecordPayload[Nothing]]]