Skip to content

Commit

Permalink
[SPARK-28892][SQL] support UPDATE in the parser and add the correspon…
Browse files Browse the repository at this point in the history
…ding logical plan

### What changes were proposed in this pull request?

This PR supports UPDATE in the parser and add the corresponding logical plan. The SQL syntax is a standard UPDATE statement:
```
UPDATE tableName tableAlias SET colName=value [, colName=value]+ WHERE predicate?
```

### Why are the changes needed?

With this change, we can start to implement UPDATE in builtin sources and think about how to design the update API in DS v2.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

New test cases added.

Closes #25626 from xianyinxin/SPARK-28892.

Authored-by: xy_xin <xianyin.xxy@alibaba-inc.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
xy_xin authored and cloud-fan committed Sep 23, 2019
1 parent f725d47 commit 655356e
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 12 deletions.
1 change: 1 addition & 0 deletions docs/sql-keywords.md
Expand Up @@ -280,6 +280,7 @@ Below is a list of all the keywords in Spark SQL.
<tr><td>UNKNOWN</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>UNLOCK</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>UNSET</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>UPDATE</td><td>non-reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>USE</td><td>non-reserved</td><td>non-reserved</td><td>non-reserved</td></tr>
<tr><td>USER</td><td>reserved</td><td>non-reserved</td><td>reserved</td></tr>
<tr><td>USING</td><td>reserved</td><td>strict-non-reserved</td><td>reserved</td></tr>
Expand Down
Expand Up @@ -217,6 +217,7 @@ statement
| SET .*? #setConfiguration
| RESET #resetConfiguration
| DELETE FROM multipartIdentifier tableAlias whereClause? #deleteFromTable
| UPDATE multipartIdentifier tableAlias setClause whereClause? #updateTable
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

Expand Down Expand Up @@ -476,6 +477,14 @@ selectClause
: SELECT (hints+=hint)* setQuantifier? namedExpressionSeq
;

setClause
: SET assign (',' assign)*
;

assign
: key=multipartIdentifier EQ value=expression
;

whereClause
: WHERE booleanExpression
;
Expand Down Expand Up @@ -1085,6 +1094,7 @@ ansiNonReserved
| UNCACHE
| UNLOCK
| UNSET
| UPDATE
| USE
| VALUES
| VIEW
Expand Down Expand Up @@ -1355,6 +1365,7 @@ nonReserved
| UNKNOWN
| UNLOCK
| UNSET
| UPDATE
| USE
| USER
| VALUES
Expand Down Expand Up @@ -1622,6 +1633,7 @@ UNIQUE: 'UNIQUE';
UNKNOWN: 'UNKNOWN';
UNLOCK: 'UNLOCK';
UNSET: 'UNSET';
UPDATE: 'UPDATE';
USE: 'USE';
USER: 'USER';
USING: 'USING';
Expand Down
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.{First, Last}
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{getZoneId, stringToDate, stringToTimestamp}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, Expression => V2Expression, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -361,6 +361,36 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
DeleteFromStatement(tableId, tableAlias, predicate)
}

override def visitUpdateTable(ctx: UpdateTableContext): LogicalPlan = withOrigin(ctx) {
val tableId = visitMultipartIdentifier(ctx.multipartIdentifier)
val tableAlias = if (ctx.tableAlias() != null) {
val ident = ctx.tableAlias().strictIdentifier()
// We do not allow columns aliases after table alias.
if (ctx.tableAlias().identifierList() != null) {
throw new ParseException("Columns aliases is not allowed in UPDATE.",
ctx.tableAlias().identifierList())
}
if (ident != null) Some(ident.getText) else None
} else {
None
}
val (attrs, values) = ctx.setClause().assign().asScala.map {
kv => visitMultipartIdentifier(kv.key) -> expression(kv.value)
}.unzip
val predicate = if (ctx.whereClause() != null) {
Some(expression(ctx.whereClause().booleanExpression()))
} else {
None
}

UpdateTableStatement(
tableId,
tableAlias,
attrs,
values,
predicate)
}

/**
* Create a partition specification map.
*/
Expand Down
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.expressions.Expression

case class UpdateTableStatement(
tableName: Seq[String],
tableAlias: Option[String],
attrs: Seq[Seq[String]],
values: Seq[Expression],
condition: Option[Expression]) extends ParsedStatement
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, UnresolvedAttribute
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableAlterColumnStatement, AlterTableDropColumnsStatement, AlterTableRenameColumnStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, InsertIntoStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
import org.apache.spark.sql.connector.expressions.{ApplyTransform, BucketTransform, DaysTransform, FieldReference, HoursTransform, IdentityTransform, LiteralValue, MonthsTransform, Transform, YearsTransform}
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType, TimestampType}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -789,6 +789,48 @@ class DDLParserSuite extends AnalysisTest {
assert(exc.getMessage.contains("Columns aliases is not allowed in DELETE."))
}

test("update table: basic") {
parseCompare(
"""
|UPDATE testcat.ns1.ns2.tbl
|SET t.a='Robert', t.b=32
""".stripMargin,
UpdateTableStatement(
Seq("testcat", "ns1", "ns2", "tbl"),
None,
Seq(Seq("t", "a"), Seq("t", "b")),
Seq(Literal("Robert"), Literal(32)),
None))
}

test("update table: with alias and where clause") {
parseCompare(
"""
|UPDATE testcat.ns1.ns2.tbl AS t
|SET t.a='Robert', t.b=32
|WHERE t.c=2
""".stripMargin,
UpdateTableStatement(
Seq("testcat", "ns1", "ns2", "tbl"),
Some("t"),
Seq(Seq("t", "a"), Seq("t", "b")),
Seq(Literal("Robert"), Literal(32)),
Some(EqualTo(UnresolvedAttribute("t.c"), Literal(2)))))
}

test("update table: columns aliases is not allowed") {
val exc = intercept[ParseException] {
parsePlan(
"""
|UPDATE testcat.ns1.ns2.tbl AS t(a,b,c,d)
|SET b='Robert', c=32
|WHERE d=2
""".stripMargin)
}

assert(exc.getMessage.contains("Columns aliases is not allowed in UPDATE."))
}

test("show tables") {
comparePlans(
parsePlan("SHOW TABLES"),
Expand Down
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{CastSupport, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, CatalogUtils}
import org.apache.spark.sql.catalyst.plans.logical.{CreateTableAsSelect, CreateV2Table, DeleteFromTable, DropTable, LogicalPlan, ReplaceTable, ReplaceTableAsSelect, ShowNamespaces, ShowTables, SubqueryAlias}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement}
import org.apache.spark.sql.catalyst.plans.logical.sql.{AlterTableAddColumnsStatement, AlterTableSetLocationStatement, AlterTableSetPropertiesStatement, AlterTableUnsetPropertiesStatement, AlterViewSetPropertiesStatement, AlterViewUnsetPropertiesStatement, CreateTableAsSelectStatement, CreateTableStatement, DeleteFromStatement, DescribeColumnStatement, DescribeTableStatement, DropTableStatement, DropViewStatement, QualifiedColType, ReplaceTableAsSelectStatement, ReplaceTableStatement, ShowNamespacesStatement, ShowTablesStatement, UpdateTableStatement}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog, TableCatalog}
import org.apache.spark.sql.connector.expressions.Transform
Expand Down Expand Up @@ -187,6 +187,9 @@ case class DataSourceResolution(
s"No v2 catalog is available for ${namespace.quoted}")
}

case update: UpdateTableStatement =>
throw new AnalysisException(s"Update table is not supported temporarily.")

case ShowTablesStatement(None, pattern) =>
defaultCatalog match {
case Some(catalog) =>
Expand Down

0 comments on commit 655356e

Please sign in to comment.