Skip to content

Commit

Permalink
[SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support c…
Browse files Browse the repository at this point in the history
…omparators

## What changes were proposed in this pull request?

#15704 will fail if we use int literal in `DROP PARTITION`, and we have reverted it in branch-2.1.

This PR reverts it in master branch, and add a regression test for it, to make sure the master branch is healthy.

## How was this patch tested?

new regression test

Author: Wenchen Fan <wenchen@databricks.com>

Closes #16036 from cloud-fan/revert.
  • Loading branch information
cloud-fan authored and hvanhovell committed Nov 28, 2016
1 parent 38e2982 commit d31ff9b
Show file tree
Hide file tree
Showing 8 changed files with 34 additions and 189 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ partitionSpecLocation
;

partitionSpec
: PARTITION '(' expression (',' expression)* ')'
: PARTITION '(' partitionVal (',' partitionVal)* ')'
;

partitionVal
: identifier (EQ constant)?
;

describeFuncName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitPartitionSpec(
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case UnresolvedAttribute(name :: Nil) =>
name -> None
case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
name -> Option(constant.toString)
case _ =>
throw new ParseException("Invalid partition filter specification", ctx)
}
val parts = ctx.partitionVal.asScala.map { pVal =>
val name = pVal.identifier.getText
val value = Option(pVal.constant).map(visitStringConstant)
name -> value
}
// Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
// in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
Expand All @@ -211,23 +206,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
parts.toMap
}

/**
* Create a partition filter specification.
*/
def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case EqualNullSafe(_, _) =>
throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
case _ =>
throw new ParseException("Invalid partition filter specification", ctx)
}
}
parts.reduceLeft(And)
}

/**
* Create a partition specification map without optional values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -420,55 +419,27 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[Expression],
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean)
extends RunnableCommand with PredicateHelper {

private def isRangeComparison(expr: Expression): Boolean = {
expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
}
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

specs.foreach { expr =>
expr.references.foreach { attr =>
if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
throw new AnalysisException(s"${attr.name} is not a valid partition column " +
s"in table ${table.identifier.quotedString}.")
}
}
val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
}

if (specs.exists(isRangeComparison)) {
val partitionSet = specs.flatMap { spec =>
val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
if (partitions.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${spec.sql}")
}
partitions
}.distinct
catalog.dropPartitions(
table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
} else {
val normalizedSpecs = specs.map { expr =>
val spec = splitConjunctivePredicates(expr).map {
case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
}.toMap
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
resolver)
}
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
}
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (overwrite.enabled) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
import org.apache.spark.sql.catalyst.expressions._
val expressions = deletedPartitions.map { specs =>
specs.map { case (key, value) =>
EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
}.reduceLeft(And)
}.toSeq
AlterTableDropPartitionCommand(
l.catalogTable.get.identifier, expressions,
l.catalogTable.get.identifier, deletedPartitions.toSeq,
ifExists = true, purge = true).run(t.sparkSession)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
Expand Down Expand Up @@ -613,12 +612,8 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
And(
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
And(
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,26 +1281,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val part2 = Map("a" -> "2", "b" -> "6")
val part3 = Map("a" -> "3", "b" -> "7")
val part4 = Map("a" -> "4", "b" -> "8")
val part5 = Map("a" -> "9", "b" -> "9")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
createTablePartition(catalog, part2, tableIdent)
createTablePartition(catalog, part3, tableIdent)
createTablePartition(catalog, part4, tableIdent)
createTablePartition(catalog, part5, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2, part3, part4))
Set(part1, part2, part3, part4, part5))
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}

// basic drop partition
sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part5))

// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))

// table to alter does not exist
intercept[AnalysisException] {
Expand All @@ -1314,10 +1316,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

// partition to drop does not exist when using IF EXISTS
sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))

// partition spec in DROP PARTITION should be case insensitive by default
sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part5))

// use int literal as partition value for int type partition column
sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)")
assert(catalog.listPartitions(tableIdent).isEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -226,108 +225,6 @@ class HiveDDLSuite
}
}

test("SPARK-17732: Drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

for (country <- Seq("US", "CA", "KR")) {
for (quarter <- 1 to 4) {
sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
}
}

sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)

// According to the declarative partition spec definitions, this drops the union of target
// partitions without exceptions. Hive raises exceptions because it handles them sequentially.
sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
}
}

test("SPARK-17732: Error handling for drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
}.getMessage
assert(m.contains("unknown is not a valid partition column in table"))

val m2 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
}.getMessage
assert(m2.contains("unknown is not a valid partition column in table"))

val m3 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
}.getMessage
assert(m3.contains("'<=>' operator is not allowed in partition specification"))

val m4 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
}.getMessage
assert(m4.contains("'<=>' operator is not allowed in partition specification"))

val m5 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
}.getMessage
assert(m5.contains("Invalid partition filter specification"))

sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
val m6 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
}.getMessage
// The query is not executed because `PARTITION (quarter <= '2')` is invalid.
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)
assert(m6.contains("There is no partition for (`quarter` <= '2')"))
}
}

test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[ParseException] {
sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
}.getMessage()
assert(m.contains("Invalid partition filter specification"))
}
}

test("drop views") {
withTable("tab1") {
val tabName = "tab1"
Expand Down

0 comments on commit d31ff9b

Please sign in to comment.