Skip to content

Commit

Permalink
[SPARK-14691][SQL] Simplify and Unify Error Generation for Unsupporte…
Browse files Browse the repository at this point in the history
…d Alter Table DDL

#### What changes were proposed in this pull request?
So far, we are capturing each unsupported Alter Table in separate visit functions. They should be unified and issue the same ParseException instead.

This PR is to refactor the existing implementation and make error message consistent for Alter Table DDL.

#### How was this patch tested?
Updated the existing test cases and also added new test cases to ensure all the unsupported statements are covered.

Author: gatorsmile <gatorsmile@gmail.com>
Author: xiaoli <lixiao1983@gmail.com>
Author: Xiao Li <xiaoli@Xiaos-MacBook-Pro.local>

Closes #12459 from gatorsmile/cleanAlterTable.
  • Loading branch information
gatorsmile authored and hvanhovell committed Apr 24, 2016
1 parent 8df8a81 commit 337289d
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 138 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,19 @@ statement
SET SERDE STRING (WITH SERDEPROPERTIES tablePropertyList)? #setTableSerDe
| ALTER TABLE tableIdentifier (partitionSpec)?
SET SERDEPROPERTIES tablePropertyList #setTableSerDe
| ALTER TABLE tableIdentifier bucketSpec #bucketTable
| ALTER TABLE tableIdentifier NOT CLUSTERED #unclusterTable
| ALTER TABLE tableIdentifier NOT SORTED #unsortTable
| ALTER TABLE tableIdentifier skewSpec #skewTable
| ALTER TABLE tableIdentifier NOT SKEWED #unskewTable
| ALTER TABLE tableIdentifier NOT STORED AS DIRECTORIES #unstoreTable
| ALTER TABLE tableIdentifier
SET SKEWED LOCATION skewedLocationList #setTableSkewLocations
| ALTER TABLE tableIdentifier ADD (IF NOT EXISTS)?
partitionSpecLocation+ #addTablePartition
| ALTER VIEW tableIdentifier ADD (IF NOT EXISTS)?
partitionSpec+ #addTablePartition
| ALTER TABLE tableIdentifier
from=partitionSpec RENAME TO to=partitionSpec #renameTablePartition
| ALTER TABLE from=tableIdentifier
EXCHANGE partitionSpec WITH TABLE to=tableIdentifier #exchangeTablePartition
| ALTER TABLE tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* PURGE? #dropTablePartitions
| ALTER VIEW tableIdentifier
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier ARCHIVE partitionSpec #archiveTablePartition
| ALTER TABLE tableIdentifier UNARCHIVE partitionSpec #unarchiveTablePartition
| ALTER TABLE tableIdentifier partitionSpec?
SET FILEFORMAT fileFormat #setTableFileFormat
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier TOUCH partitionSpec? #touchTable
| ALTER TABLE tableIdentifier partitionSpec? COMPACT STRING #compactTable
| ALTER TABLE tableIdentifier partitionSpec? CONCATENATE #concatenateTable
| ALTER TABLE tableIdentifier partitionSpec?
CHANGE COLUMN? oldName=identifier colType
(FIRST | AFTER after=identifier)? (CASCADE | RESTRICT)? #changeColumn
Expand Down Expand Up @@ -179,6 +164,19 @@ unsupportedHiveNativeCommands
| kw1=CREATE kw2=TEMPORARY kw3=MACRO
| kw1=DROP kw2=TEMPORARY kw3=MACRO
| kw1=MSCK kw2=REPAIR kw3=TABLE
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=CLUSTERED
| kw1=ALTER kw2=TABLE tableIdentifier kw3=CLUSTERED kw4=BY
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SORTED
| kw1=ALTER kw2=TABLE tableIdentifier kw3=SKEWED kw4=BY
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=SKEWED
| kw1=ALTER kw2=TABLE tableIdentifier kw3=NOT kw4=STORED kw5=AS kw6=DIRECTORIES
| kw1=ALTER kw2=TABLE tableIdentifier kw3=SET kw4=SKEWED kw5=LOCATION
| kw1=ALTER kw2=TABLE tableIdentifier kw3=EXCHANGE kw4=PARTITION
| kw1=ALTER kw2=TABLE tableIdentifier kw3=ARCHIVE kw4=PARTITION
| kw1=ALTER kw2=TABLE tableIdentifier kw3=UNARCHIVE kw4=PARTITION
| kw1=ALTER kw2=TABLE tableIdentifier kw3=TOUCH
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=COMPACT
| kw1=ALTER kw2=TABLE tableIdentifier partitionSpec? kw3=CONCATENATE
;

createTableHeader
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try

import org.antlr.v4.runtime.{ParserRuleContext, Token}

import org.apache.spark.sql.{AnalysisException, SaveMode}
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.parser._
Expand Down Expand Up @@ -511,40 +511,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec))
}

// TODO: don't even bother parsing alter table commands related to bucketing and skewing

override def visitBucketTable(ctx: BucketTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... CLUSTERED BY ... INTO N BUCKETS")
}

override def visitUnclusterTable(ctx: UnclusterTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT CLUSTERED")
}

override def visitUnsortTable(ctx: UnsortTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SORTED")
}

override def visitSkewTable(ctx: SkewTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... SKEWED BY ...")
}

override def visitUnskewTable(ctx: UnskewTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... NOT SKEWED")
}

override def visitUnstoreTable(ctx: UnstoreTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... NOT STORED AS DIRECTORIES")
}

override def visitSetTableSkewLocations(
ctx: SetTableSkewLocationsContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... SET SKEWED LOCATION ...")
}

/**
* Create an [[AlterTableAddPartition]] command.
*
Expand All @@ -560,7 +526,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitAddTablePartition(
ctx: AddTablePartitionContext): LogicalPlan = withOrigin(ctx) {
if (ctx.VIEW != null) {
throw new AnalysisException(s"Operation not allowed: partitioned views")
throw new ParseException(s"Operation not allowed: partitioned views", ctx)
}
// Create partition spec to location mapping.
val specsAndLocs = if (ctx.partitionSpec.isEmpty) {
Expand All @@ -580,20 +546,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx.EXISTS != null)
}

/**
* Create an (Hive's) AlterTableExchangePartition command.
*
* For example:
* {{{
* ALTER TABLE table1 EXCHANGE PARTITION spec WITH TABLE table2;
* }}}
*/
override def visitExchangeTablePartition(
ctx: ExchangeTablePartitionContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... EXCHANGE PARTITION ...")
}

/**
* Create an [[AlterTableRenamePartition]] command
*
Expand Down Expand Up @@ -625,45 +577,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitDropTablePartitions(
ctx: DropTablePartitionsContext): LogicalPlan = withOrigin(ctx) {
if (ctx.VIEW != null) {
throw new AnalysisException(s"Operation not allowed: partitioned views")
throw new ParseException(s"Operation not allowed: partitioned views", ctx)
}
if (ctx.PURGE != null) {
throw new AnalysisException(s"Operation not allowed: PURGE")
throw new ParseException(s"Operation not allowed: PURGE", ctx)
}
AlterTableDropPartition(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null)
}

/**
* Create an (Hive's) AlterTableArchivePartition command
*
* For example:
* {{{
* ALTER TABLE table ARCHIVE PARTITION spec;
* }}}
*/
override def visitArchiveTablePartition(
ctx: ArchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... ARCHIVE PARTITION ...")
}

/**
* Create an (Hive's) AlterTableUnarchivePartition command
*
* For example:
* {{{
* ALTER TABLE table UNARCHIVE PARTITION spec;
* }}}
*/
override def visitUnarchiveTablePartition(
ctx: UnarchiveTablePartitionContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException(
"Operation not allowed: ALTER TABLE ... UNARCHIVE PARTITION ...")
}

/**
* Create an [[AlterTableSetFileFormat]] command
*
Expand Down Expand Up @@ -708,42 +632,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
visitLocationSpec(ctx.locationSpec))
}

/**
* Create an (Hive's) AlterTableTouch command
*
* For example:
* {{{
* ALTER TABLE table TOUCH [PARTITION spec];
* }}}
*/
override def visitTouchTable(ctx: TouchTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... TOUCH ...")
}

/**
* Create an (Hive's) AlterTableCompact command
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] COMPACT 'compaction_type';
* }}}
*/
override def visitCompactTable(ctx: CompactTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... COMPACT ...")
}

/**
* Create an (Hive's) AlterTableMerge command
*
* For example:
* {{{
* ALTER TABLE table [PARTITION spec] CONCATENATE;
* }}}
*/
override def visitConcatenateTable(ctx: ConcatenateTableContext): LogicalPlan = withOrigin(ctx) {
throw new AnalysisException("Operation not allowed: ALTER TABLE ... CONCATENATE")
}

/**
* Create an [[AlterTableChangeCol]] command
*
Expand Down Expand Up @@ -857,12 +745,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitFailNativeCommand(
ctx: FailNativeCommandContext): LogicalPlan = withOrigin(ctx) {
val keywords = if (ctx.kws != null) {
Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3).filter(_ != null).map(_.getText).mkString(" ")
Seq(ctx.kws.kw1, ctx.kws.kw2, ctx.kws.kw3, ctx.kws.kw4, ctx.kws.kw5, ctx.kws.kw6)
.filter(_ != null).map(_.getText).mkString(" ")
} else {
// SET ROLE is the exception to the rule, because we handle this before other SET commands.
"SET ROLE"
}
throw new ParseException(s"Unsupported operation: $keywords", ctx)
throw new ParseException(s"Operation not allowed: $keywords", ctx)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.execution.command

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
Expand All @@ -31,7 +30,7 @@ class DDLCommandSuite extends PlanTest {
private val parser = new SparkSqlParser(new SQLConf)

private def assertUnsupported(sql: String): Unit = {
val e = intercept[AnalysisException] {
val e = intercept[ParseException] {
parser.parsePlan(sql)
}
assert(e.getMessage.toLowerCase.contains("operation not allowed"))
Expand Down Expand Up @@ -503,6 +502,21 @@ class DDLCommandSuite extends PlanTest {
"ALTER TABLE table_name PARTITION (dt='2008-08-08', country='us') CONCATENATE")
}

test("alter table: cluster by (not supported)") {
assertUnsupported(
"ALTER TABLE table_name CLUSTERED BY (col_name) SORTED BY (col2_name) INTO 3 BUCKETS")
assertUnsupported("ALTER TABLE table_name CLUSTERED BY (col_name) INTO 3 BUCKETS")
assertUnsupported("ALTER TABLE table_name NOT CLUSTERED")
assertUnsupported("ALTER TABLE table_name NOT SORTED")
}

test("alter table: skewed by (not supported)") {
assertUnsupported("ALTER TABLE table_name NOT SKEWED")
assertUnsupported("ALTER TABLE table_name NOT STORED AS DIRECTORIES")
assertUnsupported("ALTER TABLE table_name SET SKEWED LOCATION (col_name1=\"location1\"")
assertUnsupported("ALTER TABLE table_name SKEWED BY (key) ON (1,5,6) STORED AS DIRECTORIES")
}

test("alter table: change column name/type/position/comment") {
val sql1 = "ALTER TABLE table_name CHANGE col_old_name col_new_name INT"
val sql2 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class HiveDDLCommandSuite extends PlanTest {
val e = intercept[ParseException] {
parser.parsePlan(sql)
}
assert(e.getMessage.toLowerCase.contains("unsupported"))
assert(e.getMessage.toLowerCase.contains("operation not allowed"))
}

test("Test CTAS #1") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{SparkException, SparkFiles}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
Expand Down Expand Up @@ -68,8 +69,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}

private def assertUnsupportedFeature(body: => Unit): Unit = {
val e = intercept[AnalysisException] { body }
assert(e.getMessage.toLowerCase.contains("unsupported operation"))
val e = intercept[ParseException] { body }
assert(e.getMessage.toLowerCase.contains("operation not allowed"))
}

test("SPARK-4908: concurrent hive native commands") {
Expand Down

0 comments on commit 337289d

Please sign in to comment.