Skip to content

Commit

Permalink
[SPARK-32030][SQL] Support unlimited MATCHED and NOT MATCHED clauses …
Browse files Browse the repository at this point in the history
…in MERGE INTO

### What changes were proposed in this pull request?
This PR add unlimited MATCHED and NOT MATCHED clauses in MERGE INTO statement.

### Why are the changes needed?
Now the MERGE INTO syntax is,
```
MERGE INTO [db_name.]target_table [AS target_alias]
 USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
 ON <merge_condition>
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
 [ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
```
It would be nice if we support unlimited MATCHED and NOT MATCHED clauses in MERGE INTO statement, because users may want to deal with different "AND <condition>"s, the result of which just like a series of "CASE WHEN"s. The expected syntax looks like
```
MERGE INTO [db_name.]target_table [AS target_alias]
 USING [db_name.]source_table [<time_travel_version>] [AS source_alias]
 ON <merge_condition>
 [when_matched_clause [, ...]]
 [when_not_matched_clause [, ...]]
```
where when_matched_clause is
```
WHEN MATCHED [ AND <condition> ] THEN <matched_action>
```
and when_not_matched_clause is
```
WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action>
 ```
matched_action can be one of
```
DELETE
UPDATE SET * or
UPDATE SET col1 = value1 [, col2 = value2, ...]
```
and not_matched_action can be one of
```
INSERT *
INSERT (col1 [, col2, ...]) VALUES (value1 [, value2, ...])
```
### Does this PR introduce _any_ user-facing change?
Yes. The SQL command changes, but it is backward compatible.

### How was this patch tested?
New tests added.

Closes #28875 from xianyinxin/SPARK-32030.

Authored-by: xy_xin <xianyin.xxy@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
xy_xin authored and cloud-fan committed Jun 29, 2020
1 parent a6b6a1f commit 20cd47e
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 71 deletions.
Expand Up @@ -411,12 +411,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging

val mergeCondition = expression(ctx.mergeCondition)

val matchedClauses = ctx.matchedClause()
if (matchedClauses.size() > 2) {
throw new ParseException("There should be at most 2 'WHEN MATCHED' clauses.",
matchedClauses.get(2))
}
val matchedActions = matchedClauses.asScala.map {
val matchedActions = ctx.matchedClause().asScala.map {
clause => {
if (clause.matchedAction().DELETE() != null) {
DeleteAction(Option(clause.matchedCond).map(expression))
Expand All @@ -435,12 +430,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}
}
val notMatchedClauses = ctx.notMatchedClause()
if (notMatchedClauses.size() > 1) {
throw new ParseException("There should be at most 1 'WHEN NOT MATCHED' clause.",
notMatchedClauses.get(1))
}
val notMatchedActions = notMatchedClauses.asScala.map {
val notMatchedActions = ctx.notMatchedClause().asScala.map {
clause => {
if (clause.notMatchedAction().INSERT() != null) {
val condition = Option(clause.notMatchedCond).map(expression)
Expand Down Expand Up @@ -468,13 +458,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
throw new ParseException("There must be at least one WHEN clause in a MERGE statement", ctx)
}
// children being empty means that the condition is not set
if (matchedActions.length == 2 && matchedActions.head.children.isEmpty) {
throw new ParseException("When there are 2 MATCHED clauses in a MERGE statement, " +
"the first MATCHED clause must have a condition", ctx)
}
if (matchedActions.groupBy(_.getClass).mapValues(_.size).exists(_._2 > 1)) {
throw new ParseException(
"UPDATE and DELETE can appear at most once in MATCHED clauses in a MERGE statement", ctx)
val matchedActionSize = matchedActions.length
if (matchedActionSize >= 2 && !matchedActions.init.forall(_.condition.nonEmpty)) {
throw new ParseException("When there are more than one MATCHED clauses in a MERGE " +
"statement, only the last MATCHED clause can omit the condition.", ctx)
}
val notMatchedActionSize = notMatchedActions.length
if (notMatchedActionSize >= 2 && !notMatchedActions.init.forall(_.condition.nonEmpty)) {
throw new ParseException("When there are more than one NOT MATCHED clauses in a MERGE " +
"statement, only the last NOT MATCHED clause can omit the condition.", ctx)
}

MergeIntoTable(
Expand Down
Expand Up @@ -346,25 +346,25 @@ case class MergeIntoTable(
override def children: Seq[LogicalPlan] = Seq(targetTable, sourceTable)
}

sealed abstract class MergeAction(
condition: Option[Expression]) extends Expression with Unevaluable {
sealed abstract class MergeAction extends Expression with Unevaluable {
def condition: Option[Expression]
override def foldable: Boolean = false
override def nullable: Boolean = false
override def dataType: DataType = throw new UnresolvedException(this, "nullable")
override def children: Seq[Expression] = condition.toSeq
}

case class DeleteAction(condition: Option[Expression]) extends MergeAction(condition)
case class DeleteAction(condition: Option[Expression]) extends MergeAction

case class UpdateAction(
condition: Option[Expression],
assignments: Seq[Assignment]) extends MergeAction(condition) {
assignments: Seq[Assignment]) extends MergeAction {
override def children: Seq[Expression] = condition.toSeq ++ assignments
}

case class InsertAction(
condition: Option[Expression],
assignments: Seq[Assignment]) extends MergeAction(condition) {
assignments: Seq[Assignment]) extends MergeAction {
override def children: Seq[Expression] = condition.toSeq ++ assignments
}

Expand Down
Expand Up @@ -1134,58 +1134,74 @@ class DDLParserSuite extends AnalysisTest {
}
}

test("merge into table: at most two matched clauses") {
val exc = intercept[ParseException] {
parsePlan(
"""
|MERGE INTO testcat1.ns1.ns2.tbl AS target
|USING testcat2.ns1.ns2.tbl AS source
|ON target.col1 = source.col1
|WHEN MATCHED AND (target.col2='delete') THEN DELETE
|WHEN MATCHED AND (target.col2='update1') THEN UPDATE SET target.col2 = source.col2
|WHEN MATCHED AND (target.col2='update2') THEN UPDATE SET target.col2 = source.col2
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
""".stripMargin)
}

assert(exc.getMessage.contains("There should be at most 2 'WHEN MATCHED' clauses."))
test("merge into table: multi matched and not matched clauses") {
parseCompare(
"""
|MERGE INTO testcat1.ns1.ns2.tbl AS target
|USING testcat2.ns1.ns2.tbl AS source
|ON target.col1 = source.col1
|WHEN MATCHED AND (target.col2='delete') THEN DELETE
|WHEN MATCHED AND (target.col2='update1') THEN UPDATE SET target.col2 = 1
|WHEN MATCHED AND (target.col2='update2') THEN UPDATE SET target.col2 = 2
|WHEN NOT MATCHED AND (target.col2='insert1')
|THEN INSERT (target.col1, target.col2) values (source.col1, 1)
|WHEN NOT MATCHED AND (target.col2='insert2')
|THEN INSERT (target.col1, target.col2) values (source.col1, 2)
""".stripMargin,
MergeIntoTable(
SubqueryAlias("target", UnresolvedRelation(Seq("testcat1", "ns1", "ns2", "tbl"))),
SubqueryAlias("source", UnresolvedRelation(Seq("testcat2", "ns1", "ns2", "tbl"))),
EqualTo(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
Seq(DeleteAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("delete")))),
UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update1"))),
Seq(Assignment(UnresolvedAttribute("target.col2"), Literal(1)))),
UpdateAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("update2"))),
Seq(Assignment(UnresolvedAttribute("target.col2"), Literal(2))))),
Seq(InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert1"))),
Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
Assignment(UnresolvedAttribute("target.col2"), Literal(1)))),
InsertAction(Some(EqualTo(UnresolvedAttribute("target.col2"), Literal("insert2"))),
Seq(Assignment(UnresolvedAttribute("target.col1"), UnresolvedAttribute("source.col1")),
Assignment(UnresolvedAttribute("target.col2"), Literal(2)))))))
}

test("merge into table: at most one not matched clause") {
test("merge into table: only the last matched clause can omit the condition") {
val exc = intercept[ParseException] {
parsePlan(
"""
|MERGE INTO testcat1.ns1.ns2.tbl AS target
|USING testcat2.ns1.ns2.tbl AS source
|ON target.col1 = source.col1
|WHEN MATCHED AND (target.col2='delete') THEN DELETE
|WHEN MATCHED AND (target.col2='update1') THEN UPDATE SET target.col2 = source.col2
|WHEN NOT MATCHED AND (target.col2='insert1')
|THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
|WHEN NOT MATCHED AND (target.col2='insert2')
|WHEN MATCHED AND (target.col2 == 'update1') THEN UPDATE SET target.col2 = 1
|WHEN MATCHED THEN UPDATE SET target.col2 = 2
|WHEN MATCHED THEN DELETE
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
""".stripMargin)
}

assert(exc.getMessage.contains("There should be at most 1 'WHEN NOT MATCHED' clause."))
assert(exc.getMessage.contains("only the last MATCHED clause can omit the condition"))
}

test("merge into table: the first matched clause must have a condition if there's a second") {
test("merge into table: only the last not matched clause can omit the condition") {
val exc = intercept[ParseException] {
parsePlan(
"""
|MERGE INTO testcat1.ns1.ns2.tbl AS target
|USING testcat2.ns1.ns2.tbl AS source
|ON target.col1 = source.col1
|WHEN MATCHED AND (target.col2 == 'update') THEN UPDATE SET target.col2 = source.col2
|WHEN MATCHED THEN DELETE
|WHEN MATCHED THEN UPDATE SET target.col2 = source.col2
|WHEN NOT MATCHED AND (target.col2='insert')
|WHEN NOT MATCHED AND (target.col2='insert1')
|THEN INSERT (target.col1, target.col2) values (source.col1, 1)
|WHEN NOT MATCHED
|THEN INSERT (target.col1, target.col2) values (source.col1, 2)
|WHEN NOT MATCHED
|THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
""".stripMargin)
}

assert(exc.getMessage.contains("the first MATCHED clause must have a condition"))
assert(exc.getMessage.contains("only the last NOT MATCHED clause can omit the condition"))
}

test("merge into table: there must be a when (not) matched condition") {
Expand All @@ -1201,26 +1217,6 @@ class DDLParserSuite extends AnalysisTest {
assert(exc.getMessage.contains("There must be at least one WHEN clause in a MERGE statement"))
}

test("merge into table: there can be only a single use DELETE or UPDATE") {
Seq("UPDATE SET *", "DELETE").foreach { op =>
val exc = intercept[ParseException] {
parsePlan(
s"""
|MERGE INTO testcat1.ns1.ns2.tbl AS target
|USING testcat2.ns1.ns2.tbl AS source
|ON target.col1 = source.col1
|WHEN MATCHED AND (target.col2='delete') THEN $op
|WHEN MATCHED THEN $op
|WHEN NOT MATCHED AND (target.col2='insert')
|THEN INSERT (target.col1, target.col2) values (source.col1, source.col2)
""".stripMargin)
}

assert(exc.getMessage.contains(
"UPDATE and DELETE can appear at most once in MATCHED clauses"))
}
}

test("show tables") {
comparePlans(
parsePlan("SHOW TABLES"),
Expand Down

0 comments on commit 20cd47e

Please sign in to comment.