From d1b8bd7d1195b4d2288565aefbdefb2731f7ae15 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 13 May 2021 12:58:24 +0000 Subject: [PATCH] [SPARK-34720][SQL] MERGE ... UPDATE/INSERT * should do by-name resolution ### What changes were proposed in this pull request? In Spark, we have an extension in the MERGE syntax: INSERT/UPDATE *. This is not from ANSI standard or any other mainstream databases, so we need to define the behaviors by our own. The behavior today is very weird: assume the source table has `n1` columns, target table has `n2` columns. We generate the assignments by taking the first `min(n1, n2)` columns from source & target tables and pairing them by ordinal. This PR proposes a more reasonable behavior: take all the columns from target table as keys, and find the corresponding columns from source table by name as values. ### Why are the changes needed? Fix the MEREG INSERT/UPDATE * to be more user-friendly and easy to do schema evolution. ### Does this PR introduce _any_ user-facing change? Yes, but MERGE is only supported by very few data sources. ### How was this patch tested? new tests Closes #32192 from cloud-fan/merge. Authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 68 +++++++++++-------- .../command/PlanResolutionSuite.scala | 54 ++++++++++++--- 2 files changed, 86 insertions(+), 36 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 18caa05bd556b..f1aac4e2c0bf5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1469,14 +1469,18 @@ class Analyzer(override val catalogManager: CatalogManager) case UpdateAction(updateCondition, assignments) => val resolvedUpdateCondition = updateCondition.map( resolveExpressionByPlanChildren(_, m)) - // The update value can access columns from both target and source tables. UpdateAction( resolvedUpdateCondition, - resolveAssignments(Some(assignments), m, resolveValuesWithSourceOnly = false)) + // The update value can access columns from both target and source tables. + resolveAssignments(assignments, m, resolveValuesWithSourceOnly = false)) case UpdateStarAction(updateCondition) => + val assignments = targetTable.output.map { attr => + Assignment(attr, UnresolvedAttribute(Seq(attr.name))) + } UpdateAction( updateCondition.map(resolveExpressionByPlanChildren(_, m)), - resolveAssignments(assignments = None, m, resolveValuesWithSourceOnly = false)) + // For UPDATE *, the value must from source table. + resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true)) case o => o } val newNotMatchedActions = m.notMatchedActions.map { @@ -1487,15 +1491,18 @@ class Analyzer(override val catalogManager: CatalogManager) resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable))) InsertAction( resolvedInsertCondition, - resolveAssignments(Some(assignments), m, resolveValuesWithSourceOnly = true)) + resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true)) case InsertStarAction(insertCondition) => // The insert action is used when not matched, so its condition and value can only // access columns from the source table. val resolvedInsertCondition = insertCondition.map( resolveExpressionByPlanChildren(_, Project(Nil, m.sourceTable))) + val assignments = targetTable.output.map { attr => + Assignment(attr, UnresolvedAttribute(Seq(attr.name))) + } InsertAction( resolvedInsertCondition, - resolveAssignments(assignments = None, m, resolveValuesWithSourceOnly = true)) + resolveAssignments(assignments, m, resolveValuesWithSourceOnly = true)) case o => o } val resolvedMergeCondition = resolveExpressionByPlanChildren(m.mergeCondition, m) @@ -1513,33 +1520,38 @@ class Analyzer(override val catalogManager: CatalogManager) } def resolveAssignments( - assignments: Option[Seq[Assignment]], + assignments: Seq[Assignment], mergeInto: MergeIntoTable, resolveValuesWithSourceOnly: Boolean): Seq[Assignment] = { - if (assignments.isEmpty) { - val expandedColumns = mergeInto.targetTable.output - val expandedValues = mergeInto.sourceTable.output - expandedColumns.zip(expandedValues).map(kv => Assignment(kv._1, kv._2)) - } else { - assignments.get.map { assign => - val resolvedKey = assign.key match { - case c if !c.resolved => - resolveExpressionByPlanChildren(c, Project(Nil, mergeInto.targetTable)) - case o => o - } - val resolvedValue = assign.value match { - // The update values may contain target and/or source references. - case c if !c.resolved => - if (resolveValuesWithSourceOnly) { - resolveExpressionByPlanChildren(c, Project(Nil, mergeInto.sourceTable)) - } else { - resolveExpressionByPlanChildren(c, mergeInto) - } - case o => o - } - Assignment(resolvedKey, resolvedValue) + assignments.map { assign => + val resolvedKey = assign.key match { + case c if !c.resolved => + resolveMergeExprOrFail(c, Project(Nil, mergeInto.targetTable)) + case o => o } + val resolvedValue = assign.value match { + // The update values may contain target and/or source references. + case c if !c.resolved => + if (resolveValuesWithSourceOnly) { + resolveMergeExprOrFail(c, Project(Nil, mergeInto.sourceTable)) + } else { + resolveMergeExprOrFail(c, mergeInto) + } + case o => o + } + Assignment(resolvedKey, resolvedValue) + } + } + + private def resolveMergeExprOrFail(e: Expression, p: LogicalPlan): Expression = { + val resolved = resolveExpressionByPlanChildren(e, p) + resolved.references.filter(!_.resolved).foreach { a => + // Note: This will throw error only on unresolved attribute issues, + // not other resolution errors like mismatched data types. + val cols = p.inputSet.toSeq.map(_.sql).mkString(", ") + a.failAnalysis(s"cannot resolve ${a.sql} in MERGE command given columns [$cols]") } + resolved } // This method is used to trim groupByExpressions/selectedGroupByExpressions's top-level diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index a87f977eb1602..d48264c6d1c43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -55,6 +55,20 @@ class PlanResolutionSuite extends AnalysisTest { t } + private val table1: Table = { + val t = mock(classOf[Table]) + when(t.schema()).thenReturn(new StructType().add("s", "string").add("i", "int")) + when(t.partitioning()).thenReturn(Array.empty[Transform]) + t + } + + private val table2: Table = { + val t = mock(classOf[Table]) + when(t.schema()).thenReturn(new StructType().add("i", "int").add("x", "string")) + when(t.partitioning()).thenReturn(Array.empty[Transform]) + t + } + private val tableWithAcceptAnySchemaCapability: Table = { val t = mock(classOf[Table]) when(t.schema()).thenReturn(new StructType().add("i", "int")) @@ -91,7 +105,8 @@ class PlanResolutionSuite extends AnalysisTest { when(newCatalog.loadTable(any())).thenAnswer((invocation: InvocationOnMock) => { invocation.getArgument[Identifier](0).name match { case "tab" => table - case "tab1" => table + case "tab1" => table1 + case "tab2" => table2 case name => throw new NoSuchTableException(name) } }) @@ -107,7 +122,7 @@ class PlanResolutionSuite extends AnalysisTest { case "v1Table1" => v1Table case "v1HiveTable" => v1HiveTable case "v2Table" => table - case "v2Table1" => table + case "v2Table1" => table1 case "v2TableWithAcceptAnySchemaCapability" => tableWithAcceptAnySchemaCapability case "view" => view case name => throw new NoSuchTableException(name) @@ -1385,7 +1400,7 @@ class PlanResolutionSuite extends AnalysisTest { // cte val sql5 = s""" - |WITH source(i, s) AS + |WITH source(s, i) AS | (SELECT * FROM $source) |MERGE INTO $target AS target |USING source @@ -1405,7 +1420,7 @@ class PlanResolutionSuite extends AnalysisTest { updateAssigns)), Seq(InsertAction(Some(EqualTo(il: AttributeReference, StringLiteral("insert"))), insertAssigns))) => - assert(source.output.map(_.name) == Seq("i", "s")) + assert(source.output.map(_.name) == Seq("s", "i")) checkResolution(target, source, mergeCondition, Some(dl), Some(ul), Some(il), updateAssigns, insertAssigns) @@ -1414,8 +1429,7 @@ class PlanResolutionSuite extends AnalysisTest { } // no aliases - Seq(("v2Table", "v2Table1"), - ("testcat.tab", "testcat.tab1")).foreach { pair => + Seq(("v2Table", "v2Table1"), ("testcat.tab", "testcat.tab1")).foreach { pair => val target = pair._1 val source = pair._2 @@ -1507,7 +1521,7 @@ class PlanResolutionSuite extends AnalysisTest { assert(e5.message.contains("Reference 's' is ambiguous")) } - val sql6 = + val sql1 = s""" |MERGE INTO non_exist_target |USING non_exist_source @@ -1516,13 +1530,37 @@ class PlanResolutionSuite extends AnalysisTest { |WHEN MATCHED THEN UPDATE SET * |WHEN NOT MATCHED THEN INSERT * """.stripMargin - val parsed = parseAndResolve(sql6) + val parsed = parseAndResolve(sql1) parsed match { case u: MergeIntoTable => assert(u.targetTable.isInstanceOf[UnresolvedRelation]) assert(u.sourceTable.isInstanceOf[UnresolvedRelation]) case _ => fail("Expect MergeIntoTable, but got:\n" + parsed.treeString) } + + // UPDATE * with incompatible schema between source and target tables. + val sql2 = + """ + |MERGE INTO testcat.tab + |USING testcat.tab2 + |ON 1 = 1 + |WHEN MATCHED THEN UPDATE SET * + |""".stripMargin + val e2 = intercept[AnalysisException](parseAndResolve(sql2)) + assert(e2.message.contains( + "cannot resolve s in MERGE command given columns [testcat.tab2.i, testcat.tab2.x]")) + + // INSERT * with incompatible schema between source and target tables. + val sql3 = + """ + |MERGE INTO testcat.tab + |USING testcat.tab2 + |ON 1 = 1 + |WHEN NOT MATCHED THEN INSERT * + |""".stripMargin + val e3 = intercept[AnalysisException](parseAndResolve(sql3)) + assert(e3.message.contains( + "cannot resolve s in MERGE command given columns [testcat.tab2.i, testcat.tab2.x]")) } test("MERGE INTO TABLE - skip resolution on v2 tables that accept any schema") {