diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala index 6cbc17c67381..185dc5ec54f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/AssignmentUtils.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.getDefaultValueExprOrNullLit import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ import org.apache.spark.sql.errors.QueryCompilationErrors -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{DataType, StructType} import org.apache.spark.util.ArrayImplicits._ @@ -183,7 +182,7 @@ object AssignmentUtils extends SQLConfHelper with CastSupport { } else if (exactAssignments.isEmpty && fieldAssignments.isEmpty) { TableOutputResolver.checkNullability(colExpr, col, conf, colPath) } else if (exactAssignments.nonEmpty) { - if (SQLConf.get.mergeUpdateStructsByField && updateStar) { + if (updateStar) { val value = exactAssignments.head.value col.dataType match { case structType: StructType => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala index 93ef98e3183a..d1b8eab13191 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveRowLevelCommandAssignments.scala @@ -53,7 +53,7 @@ object ResolveRowLevelCommandAssignments extends Rule[LogicalPlan] { case m: MergeIntoTable if !m.skipSchemaResolution && m.resolved && m.rewritable && !m.aligned && !m.needSchemaEvolution => validateStoreAssignmentPolicy() - val coerceNestedTypes = SQLConf.get.mergeCoerceNestedTypes + val coerceNestedTypes = SQLConf.get.coerceMergeNestedTypes m.copy( targetTable = cleanAttrMetadata(m.targetTable), matchedActions = alignActions(m.targetTable.output, m.matchedActions, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 6ad2031bbd8b..8b50abbe4052 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -6704,18 +6704,6 @@ object SQLConf { .booleanConf .createWithDefault(true) - val MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD = - buildConf("spark.sql.merge.nested.type.assign.by.field") - .internal() - .doc("If enabled and spark.sql.merge.source.nested.type.coercion.enabled is true," + - "allow MERGE INTO with UPDATE SET * action to set nested structs field by field. " + - "In updated rows, target structs will preserve the original value for fields missing " + - "in the the source struct. If disabled, the entire target struct will be replaced, " + - "and fields missing in the source struct will be null.") - .version("4.1.0") - .booleanConf - .createWithDefault(true) - /** * Holds information about keys that have been deprecated. * @@ -7915,12 +7903,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf { def legacyXMLParserEnabled: Boolean = getConf(SQLConf.LEGACY_XML_PARSER_ENABLED) - def mergeCoerceNestedTypes: Boolean = + def coerceMergeNestedTypes: Boolean = getConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED) - def mergeUpdateStructsByField: Boolean = - getConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD) - /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala index 7051a0b455e3..b7a8ff374b84 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MergeIntoTableSuiteBase.scala @@ -3231,180 +3231,161 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase test("merge into schema evolution replace column with nested struct and set all columns") { Seq(true, false).foreach { withSchemaEvolution => - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { - withTempView("source") { - // Create table using Spark SQL - sql( - s"""CREATE TABLE $tableNameAsString ( - |pk INT NOT NULL, - |s STRUCT, m: MAP>>, - |dep STRING) - |PARTITIONED BY (dep) - |""".stripMargin) - - // Insert data using DataFrame API with objects - val tableSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - StructField("a", ArrayType(IntegerType)), - StructField("m", MapType(StringType, StringType)) - ))) - ))), - StructField("dep", StringType) - )) - val targetData = Seq( - Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr") - ) - spark.createDataFrame(spark.sparkContext.parallelize(targetData), tableSchema) - .coalesce(1).writeTo(tableNameAsString).append() + withTempView("source") { + // Create table using Spark SQL + sql( + s"""CREATE TABLE $tableNameAsString ( + |pk INT NOT NULL, + |s STRUCT, m: MAP>>, + |dep STRING) + |PARTITIONED BY (dep) + |""".stripMargin) + // Insert data using DataFrame API with objects + val tableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", ArrayType(IntegerType)), + StructField("m", MapType(StringType, StringType)) + ))) + ))), + StructField("dep", StringType) + )) + val targetData = Seq( + Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr") + ) + spark.createDataFrame(spark.sparkContext.parallelize(targetData), tableSchema) + .coalesce(1).writeTo(tableNameAsString).append() - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - // missing column 'a' - StructField("m", MapType(StringType, StringType)), - StructField("c3", BooleanType) // new column - ))) - ))), - StructField("dep", StringType) - )) - val sourceData = Seq( - Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"), - Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering") - ) - spark.createDataFrame(spark.sparkContext.parallelize(sourceData), sourceTableSchema) - .createOrReplaceTempView("source") + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + // missing column 'a' + StructField("m", MapType(StringType, StringType)), + StructField("c3", BooleanType) // new column + ))) + ))), + StructField("dep", StringType) + )) + val sourceData = Seq( + Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(sourceData), sourceTableSchema) + .createOrReplaceTempView("source") - val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" - val mergeStmt = - s"""MERGE $schemaEvolutionClause - |INTO $tableNameAsString t - |USING source src - |ON t.pk = src.pk - |WHEN MATCHED THEN - | UPDATE SET * - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin - if (withSchemaEvolution) { - sql(mergeStmt) - if (updateByFields) { - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), "sales"), - Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) - } else { - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "sales"), - Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) - } - } else { - val exception = intercept[org.apache.spark.sql.AnalysisException] { - sql(mergeStmt) - } - assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") - assert(exception.getMessage.contains( - "Cannot write extra fields `c3` to the struct `s`.`c2`")) - } + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, Row(10, Row(Seq(1, 2), Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") + assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + assert(exception.getMessage.contains( + "Cannot write extra fields `c3` to the struct `s`.`c2`")) } } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } } test("merge into schema evolution replace column with nested struct and update " + "top level struct") { Seq(true, false).foreach { withSchemaEvolution => - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { - withTempView("source") { - // Create table using Spark SQL - sql( - s"""CREATE TABLE $tableNameAsString ( - |pk INT NOT NULL, - |s STRUCT, m: MAP>>, - |dep STRING) - |PARTITIONED BY (dep) - |""".stripMargin) + withTempView("source") { + // Create table using Spark SQL + sql( + s"""CREATE TABLE $tableNameAsString ( + |pk INT NOT NULL, + |s STRUCT, m: MAP>>, + |dep STRING) + |PARTITIONED BY (dep) + |""".stripMargin) - // Insert data using DataFrame API with objects - val tableSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - StructField("a", ArrayType(IntegerType)), - StructField("m", MapType(StringType, StringType)) - ))) - ))), - StructField("dep", StringType) - )) - val targetData = Seq( - Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr") - ) - spark.createDataFrame(spark.sparkContext.parallelize(targetData), tableSchema) - .coalesce(1).writeTo(tableNameAsString).append() + // Insert data using DataFrame API with objects + val tableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", ArrayType(IntegerType)), + StructField("m", MapType(StringType, StringType)) + ))) + ))), + StructField("dep", StringType) + )) + val targetData = Seq( + Row(1, Row(2, Row(Array(1, 2), Map("a" -> "b"))), "hr") + ) + spark.createDataFrame(spark.sparkContext.parallelize(targetData), tableSchema) + .coalesce(1).writeTo(tableNameAsString).append() - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - // missing column 'a' - StructField("m", MapType(StringType, StringType)), - StructField("c3", BooleanType) // new column - ))) - ))), - StructField("dep", StringType) - )) - val sourceData = Seq( - Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"), - Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering") - ) - spark.createDataFrame(spark.sparkContext.parallelize(sourceData), sourceTableSchema) - .createOrReplaceTempView("source") + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + // missing column 'a' + StructField("m", MapType(StringType, StringType)), + StructField("c3", BooleanType) // new column + ))) + ))), + StructField("dep", StringType) + )) + val sourceData = Seq( + Row(1, Row(10, Row(Map("c" -> "d"), false)), "sales"), + Row(2, Row(20, Row(Map("e" -> "f"), true)), "engineering") + ) + spark.createDataFrame(spark.sparkContext.parallelize(sourceData), sourceTableSchema) + .createOrReplaceTempView("source") - val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" - val mergeStmt = - s"""MERGE $schemaEvolutionClause - |INTO $tableNameAsString t - |USING source src - |ON t.pk = src.pk - |WHEN MATCHED THEN - | UPDATE SET s = src.s - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin - if (withSchemaEvolution) { - sql(mergeStmt) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"), - Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) - } else { - val exception = intercept[org.apache.spark.sql.AnalysisException] { - sql(mergeStmt) - } - assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") - assert(exception.getMessage.contains( - "Cannot write extra fields `c3` to the struct `s`.`c2`")) - } + val schemaEvolutionClause = if (withSchemaEvolution) "WITH SCHEMA EVOLUTION" else "" + val mergeStmt = + s"""MERGE $schemaEvolutionClause + |INTO $tableNameAsString t + |USING source src + |ON t.pk = src.pk + |WHEN MATCHED THEN + | UPDATE SET s = src.s + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin + if (withSchemaEvolution) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, Row(10, Row(null, Map("c" -> "d"), false)), "hr"), + Row(2, Row(20, Row(null, Map("e" -> "f"), true)), "engineering"))) + } else { + val exception = intercept[org.apache.spark.sql.AnalysisException] { + sql(mergeStmt) } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") + assert(exception.errorClass.get == "INCOMPATIBLE_DATA_FOR_TABLE.EXTRA_STRUCT_FIELDS") + assert(exception.getMessage.contains( + "Cannot write extra fields `c3` to the struct `s`.`c2`")) } } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } } @@ -4661,124 +4642,163 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("merge with null struct") { - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { - withTempView("source") { - createAndInitTable( - s"""pk INT NOT NULL, - |s STRUCT, - |dep STRING""".stripMargin, - """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } - |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" - .stripMargin) + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" + .stripMargin) - // Source table matches target table schema - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StringType) - ))), - StructField("dep", StringType) - )) + // Source table matches target table schema + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + ))), + StructField("dep", StringType) + )) - val data = Seq( - Row(1, null, "engineering"), - Row(2, null, "finance") - ) - spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) - .createOrReplaceTempView("source") + val data = Seq( + Row(1, null, "engineering"), + Row(2, null, "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") - sql( - s"""MERGE INTO $tableNameAsString t USING source - |ON t.pk = source.pk - |WHEN MATCHED THEN - | UPDATE SET * - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(0, Row(1, "a"), "sales"), - Row(1, null, "engineering"), - Row(2, null, "finance"))) - } - } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a"), "sales"), + Row(1, null, "engineering"), + Row(2, null, "finance"))) } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } - test("merge with null struct - update field") { - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { - withTempView("source") { - createAndInitTable( - s"""pk INT NOT NULL, - |s STRUCT, - |dep STRING""".stripMargin, - """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } - |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" - .stripMargin) - // Source table matches target table schema - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StringType) - ))), - StructField("dep", StringType) - )) + test("merge with null struct - update field") { + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" + .stripMargin) - val data = Seq( - Row(1, null, "engineering"), - Row(2, null, "finance") - ) - spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) - .createOrReplaceTempView("source") + // Source table matches target table schema + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + ))), + StructField("dep", StringType) + )) - sql( - s"""MERGE INTO $tableNameAsString t USING source - |ON t.pk = source.pk - |WHEN MATCHED THEN - | UPDATE SET s = source.s - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(0, Row(1, "a"), "sales"), - Row(1, null, "hr"), - Row(2, null, "finance"))) - } - } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") + val data = Seq( + Row(1, null, "engineering"), + Row(2, null, "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET s = source.s + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, "a"), "sales"), + Row(1, null, "hr"), + Row(2, null, "finance"))) } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } test("merge with null struct into non-nullable struct column") { - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { + withTempView("source") { + createAndInitTable( + s"""pk INT NOT NULL, + |s STRUCT NOT NULL, + |dep STRING""".stripMargin, + """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" + .stripMargin) + + // Source table has null for the struct column + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StringType) + ))), + StructField("dep", StringType) + )) + + val data = Seq( + Row(1, null, "engineering"), + Row(2, null, "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + // Should throw an exception when trying to insert/update null into NOT NULL column + val exception = intercept[Exception] { + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + } + assert(exception.getMessage.contains( + "NULL value appeared in non-nullable field")) + } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") + } + + test("merge with with null struct with missing nested field") { + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { withTempView("source") { + // Target table has nested struct with fields c1 and c2 createAndInitTable( s"""pk INT NOT NULL, - |s STRUCT NOT NULL, + |s STRUCT>, |dep STRING""".stripMargin, - """{ "pk": 0, "s": { "c1": 1, "c2": "a" }, "dep": "sales" } - |{ "pk": 1, "s": { "c1": 2, "c2": "b" }, "dep": "hr" }""" + """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, "dep": "sales" } + |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, "dep": "hr" }""" .stripMargin) - // Source table has null for the struct column + // Source table has null for the nested struct val sourceTableSchema = StructType(Seq( StructField("pk", IntegerType), StructField("s", StructType(Seq( StructField("c1", IntegerType), - StructField("c2", StringType) + StructField("c2", StructType(Seq( + StructField("a", IntegerType) + // missing field 'b' + ))) ))), StructField("dep", StringType) )) @@ -4790,107 +4810,45 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) .createOrReplaceTempView("source") - // Should throw an exception when trying to insert/update null into NOT NULL column - val exception = intercept[Exception] { - sql( - s"""MERGE INTO $tableNameAsString t USING source - |ON t.pk = source.pk - |WHEN MATCHED THEN - | UPDATE SET * - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin) - } - assert(exception.getMessage.contains( - "NULL value appeared in non-nullable field")) - } - } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") - } - } - - test("merge with with null struct with missing nested field") { - Seq(true, false).foreach { updateByFields => - Seq(true, false).foreach { coerceNestedTypes => - withSQLConf( - SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString, - SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> - coerceNestedTypes.toString) { - withTempView("source") { - // Target table has nested struct with fields c1 and c2 - createAndInitTable( - s"""pk INT NOT NULL, - |s STRUCT>, - |dep STRING""".stripMargin, - """{ "pk": 0, "s": { "c1": 1, "c2": { "a": 10, "b": "x" } }, "dep": "sales" } - |{ "pk": 1, "s": { "c1": 2, "c2": { "a": 20, "b": "y" } }, "dep": "hr" }""" - .stripMargin) - - // Source table has null for the nested struct - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - StructField("a", IntegerType) - // missing field 'b' - ))) - ))), - StructField("dep", StringType) - )) - - val data = Seq( - Row(1, null, "engineering"), - Row(2, null, "finance") - ) - spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) - .createOrReplaceTempView("source") - - val mergeStmt = - s"""MERGE INTO $tableNameAsString t USING source - |ON t.pk = source.pk - |WHEN MATCHED THEN - | UPDATE SET * - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin + val mergeStmt = + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin - if (coerceNestedTypes) { + if (coerceNestedTypes) { + sql(mergeStmt) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Row(10, "x")), "sales"), + Row(1, null, "engineering"), + Row(2, null, "finance"))) + } else { + // Without coercion, the merge should fail due to missing field + val exception = intercept[org.apache.spark.sql.AnalysisException] { sql(mergeStmt) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, null, "engineering"), - Row(2, null, "finance"))) - } else { - // Without coercion, the merge should fail due to missing field - val exception = intercept[org.apache.spark.sql.AnalysisException] { - sql(mergeStmt) - } - assert(exception.errorClass.get == - "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") - assert(exception.getMessage.contains( - "Cannot write incompatible data for the table ``: " + - "Cannot find data for the output column `s`.`c2`.`b`.")) } + assert(exception.errorClass.get == + "INCOMPATIBLE_DATA_FOR_TABLE.CANNOT_FIND_DATA") + assert(exception.getMessage.contains( + "Cannot write incompatible data for the table ``: " + + "Cannot find data for the output column `s`.`c2`.`b`.")) } } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } } test("merge null struct with schema evolution - source with missing and extra nested fields") { - Seq(true, false).foreach { updateByFields => - Seq(true, false).foreach { withSchemaEvolution => - Seq(true, false).foreach { coerceNestedTypes => - withSQLConf( - SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString, - SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> - coerceNestedTypes.toString) { + Seq(true, false).foreach { withSchemaEvolution => + Seq(true, false).foreach { coerceNestedTypes => + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> + coerceNestedTypes.toString) { withTempView("source") { // Target table has nested struct with fields c1 and c2 createAndInitTable( @@ -4967,17 +4925,14 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } } } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") - } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } } } test("merge null struct with non-nullable nested field - source with missing " + "and extra nested fields") { - withSQLConf( - SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> "true", SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> "true") { withTempView("source") { // Target table has nested struct with NON-NULLABLE field b @@ -5033,153 +4988,142 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase } test("merge with null struct using default value") { - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { - withTempView("source") { - // Target table has nested struct with a default value - sql( - s"""CREATE TABLE $tableNameAsString ( - | pk INT NOT NULL, - | s STRUCT> DEFAULT - | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 'default')), - | dep STRING) - |PARTITIONED BY (dep) - |""".stripMargin) + withTempView("source") { + // Target table has nested struct with a default value + sql( + s"""CREATE TABLE $tableNameAsString ( + | pk INT NOT NULL, + | s STRUCT> DEFAULT + | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 'default')), + | dep STRING) + |PARTITIONED BY (dep) + |""".stripMargin) - // Insert initial data using DataFrame API - val initialSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - StructField("a", IntegerType), - StructField("b", StringType) - ))) - ))), - StructField("dep", StringType) - )) - val initialData = Seq( - Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, Row(2, Row(20, "y")), "hr") - ) - spark.createDataFrame(spark.sparkContext.parallelize(initialData), initialSchema) - .writeTo(tableNameAsString).append() + // Insert initial data using DataFrame API + val initialSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", StringType) + ))) + ))), + StructField("dep", StringType) + )) + val initialData = Seq( + Row(0, Row(1, Row(10, "x")), "sales"), + Row(1, Row(2, Row(20, "y")), "hr") + ) + spark.createDataFrame(spark.sparkContext.parallelize(initialData), initialSchema) + .writeTo(tableNameAsString).append() - // Source table has null for the nested struct - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - StructField("a", IntegerType) - // missing field 'b' - ))) - ))), - StructField("dep", StringType) - )) + // Source table has null for the nested struct + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType) + // missing field 'b' + ))) + ))), + StructField("dep", StringType) + )) - val data = Seq( - Row(1, null, "engineering"), - Row(2, null, "finance") - ) - spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) - .createOrReplaceTempView("source") + val data = Seq( + Row(1, null, "engineering"), + Row(2, null, "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") - sql( - s"""MERGE INTO $tableNameAsString t USING source - |ON t.pk = source.pk - |WHEN MATCHED THEN - | UPDATE SET * - |WHEN NOT MATCHED THEN - | INSERT * - |""".stripMargin) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, null, "engineering"), - Row(2, null, "finance"))) - } - } - sql(s"DROP TABLE IF EXISTS $tableNameAsString") + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET * + |WHEN NOT MATCHED THEN + | INSERT * + |""".stripMargin) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Row(10, "x")), "sales"), + Row(1, null, "engineering"), + Row(2, null, "finance"))) } + sql(s"DROP TABLE IF EXISTS $tableNameAsString") } + test("merge with source missing struct column with default value") { - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key -> - updateByFields.toString) { - withTempView("source") { - // Target table has nested struct with a default value - sql( - s"""CREATE TABLE $tableNameAsString ( - | pk INT NOT NULL, - | s STRUCT> DEFAULT - | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 'default')), - | dep STRING) - |PARTITIONED BY (dep) - |""".stripMargin) + withTempView("source") { + // Target table has nested struct with a default value + sql( + s"""CREATE TABLE $tableNameAsString ( + | pk INT NOT NULL, + | s STRUCT> DEFAULT + | named_struct('c1', 999, 'c2', named_struct('a', 999, 'b', 'default')), + | dep STRING) + |PARTITIONED BY (dep) + |""".stripMargin) - // Insert initial data using DataFrame API - val initialSchema = StructType(Seq( - StructField("pk", IntegerType, nullable = false), - StructField("s", StructType(Seq( - StructField("c1", IntegerType), - StructField("c2", StructType(Seq( - StructField("a", IntegerType), - StructField("b", StringType) - ))) - ))), - StructField("dep", StringType) - )) - val initialData = Seq( - Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, Row(2, Row(20, "y")), "hr") - ) - spark.createDataFrame(spark.sparkContext.parallelize(initialData), initialSchema) - .writeTo(tableNameAsString).append() + // Insert initial data using DataFrame API + val initialSchema = StructType(Seq( + StructField("pk", IntegerType, nullable = false), + StructField("s", StructType(Seq( + StructField("c1", IntegerType), + StructField("c2", StructType(Seq( + StructField("a", IntegerType), + StructField("b", StringType) + ))) + ))), + StructField("dep", StringType) + )) + val initialData = Seq( + Row(0, Row(1, Row(10, "x")), "sales"), + Row(1, Row(2, Row(20, "y")), "hr") + ) + spark.createDataFrame(spark.sparkContext.parallelize(initialData), initialSchema) + .writeTo(tableNameAsString).append() - // Source table is completely missing the struct column 's' - val sourceTableSchema = StructType(Seq( - StructField("pk", IntegerType), - StructField("dep", StringType) - )) + // Source table is completely missing the struct column 's' + val sourceTableSchema = StructType(Seq( + StructField("pk", IntegerType), + StructField("dep", StringType) + )) - val data = Seq( - Row(1, "engineering"), - Row(2, "finance") - ) - spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) - .createOrReplaceTempView("source") + val data = Seq( + Row(1, "engineering"), + Row(2, "finance") + ) + spark.createDataFrame(spark.sparkContext.parallelize(data), sourceTableSchema) + .createOrReplaceTempView("source") + + // When inserting without specifying the struct column, default should be used + sql( + s"""MERGE INTO $tableNameAsString t USING source + |ON t.pk = source.pk + |WHEN MATCHED THEN + | UPDATE SET dep = source.dep + |WHEN NOT MATCHED THEN + | INSERT (pk, dep) VALUES (source.pk, source.dep) + |""".stripMargin) + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(0, Row(1, Row(10, "x")), "sales"), + Row(1, Row(2, Row(20, "y")), "engineering"), + Row(2, Row(999, Row(999, "default")), "finance"))) - // When inserting without specifying the struct column, default should be used - sql( - s"""MERGE INTO $tableNameAsString t USING source - |ON t.pk = source.pk - |WHEN MATCHED THEN - | UPDATE SET dep = source.dep - |WHEN NOT MATCHED THEN - | INSERT (pk, dep) VALUES (source.pk, source.dep) - |""".stripMargin) - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(0, Row(1, Row(10, "x")), "sales"), - Row(1, Row(2, Row(20, "y")), "engineering"), - Row(2, Row(999, Row(999, "default")), "finance"))) - } - } sql(s"DROP TABLE IF EXISTS $tableNameAsString") } } test("merge into with source missing fields in nested struct") { Seq(true, false).foreach { nestedTypeCoercion => - Seq(true, false).foreach { updateByFields => - withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_UPDATE_BY_FIELD.key - -> updateByFields.toString, - SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key + withSQLConf(SQLConf.MERGE_INTO_NESTED_TYPE_COERCION_ENABLED.key -> nestedTypeCoercion.toString) { withTempView("source") { // Target table has nested struct: s.c1, s.c2.a, s.c2.b @@ -5221,21 +5165,11 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase if (nestedTypeCoercion) { sql(mergeStmt) - if (updateByFields) { - // When updating by fields, only non-null fields are updated - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, Row(10, Row(20, true)), "sales"), - Row(2, Row(20, Row(30, false)), "engineering"))) - } else { - // When updating by top level column, the missing field is set to NULL - checkAnswer( - sql(s"SELECT * FROM $tableNameAsString"), - Seq( - Row(1, Row(10, Row(20, null)), "sales"), - Row(2, Row(20, Row(30, null)), "engineering"))) - } + checkAnswer( + sql(s"SELECT * FROM $tableNameAsString"), + Seq( + Row(1, Row(10, Row(20, true)), "sales"), + Row(2, Row(20, Row(30, false)), "engineering"))) } else { val exception = intercept[Exception] { sql(mergeStmt) @@ -5247,7 +5181,6 @@ abstract class MergeIntoTableSuiteBase extends RowLevelOperationSuiteBase sql(s"DROP TABLE IF EXISTS $tableNameAsString") } } - } } test("merge with named_struct missing non-nullable field") {