Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-17732][SQL] Revert ALTER TABLE DROP PARTITION should support comparators #16036

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,11 @@ partitionSpecLocation
;

partitionSpec
: PARTITION '(' expression (',' expression)* ')'
: PARTITION '(' partitionVal (',' partitionVal)* ')'
;

partitionVal
: identifier (EQ constant)?
;

describeFuncName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,15 +194,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
*/
override def visitPartitionSpec(
ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) {
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case UnresolvedAttribute(name :: Nil) =>
name -> None
case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) =>
name -> Option(constant.toString)
case _ =>
throw new ParseException("Invalid partition filter specification", ctx)
}
val parts = ctx.partitionVal.asScala.map { pVal =>
val name = pVal.identifier.getText
val value = Option(pVal.constant).map(visitStringConstant)
name -> value
}
// Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values
// in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for
Expand All @@ -211,23 +206,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
parts.toMap
}

/**
* Create a partition filter specification.
*/
def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) {
val parts = ctx.expression.asScala.map { pVal =>
expression(pVal) match {
case EqualNullSafe(_, _) =>
throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx)
case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) =>
cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant))
case _ =>
throw new ParseException("Invalid partition filter specification", ctx)
}
}
parts.reduceLeft(And)
}

/**
* Create a partition specification map without optional values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -832,7 +832,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitPartitionFilterSpec),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec),
ctx.EXISTS != null,
ctx.PURGE != null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison}
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.execution.datasources.PartitioningUtils
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration
Expand Down Expand Up @@ -420,55 +419,27 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[Expression],
specs: Seq[TablePartitionSpec],
ifExists: Boolean,
purge: Boolean)
extends RunnableCommand with PredicateHelper {

private def isRangeComparison(expr: Expression): Boolean = {
expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined
}
extends RunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
val resolver = sparkSession.sessionState.conf.resolver
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

specs.foreach { expr =>
expr.references.foreach { attr =>
if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
throw new AnalysisException(s"${attr.name} is not a valid partition column " +
s"in table ${table.identifier.quotedString}.")
}
}
val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
}

if (specs.exists(isRangeComparison)) {
val partitionSet = specs.flatMap { spec =>
val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec)
if (partitions.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${spec.sql}")
}
partitions
}.distinct
catalog.dropPartitions(
table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge)
} else {
val normalizedSpecs = specs.map { expr =>
val spec = splitConjunctivePredicates(expr).map {
case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString
}.toMap
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
resolver)
}
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
}
catalog.dropPartitions(
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge)
Seq.empty[Row]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,14 +215,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
if (overwrite.enabled) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
import org.apache.spark.sql.catalyst.expressions._
val expressions = deletedPartitions.map { specs =>
specs.map { case (key, value) =>
EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType))
}.reduceLeft(And)
}.toSeq
AlterTableDropPartitionCommand(
l.catalogTable.get.identifier, expressions,
l.catalogTable.get.identifier, deletedPartitions.toSeq,
ifExists = true, purge = true).run(t.sparkSession)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.reflect.{classTag, ClassTag}

import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.Project
Expand Down Expand Up @@ -613,12 +612,8 @@ class DDLCommandSuite extends PlanTest {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
And(
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)),
EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))),
And(
EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)),
EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))),
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
ifExists = true,
purge = false)
val expected2_table = expected1_table.copy(ifExists = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1281,26 +1281,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val part2 = Map("a" -> "2", "b" -> "6")
val part3 = Map("a" -> "3", "b" -> "7")
val part4 = Map("a" -> "4", "b" -> "8")
val part5 = Map("a" -> "9", "b" -> "9")
createDatabase(catalog, "dbx")
createTable(catalog, tableIdent)
createTablePartition(catalog, part1, tableIdent)
createTablePartition(catalog, part2, tableIdent)
createTablePartition(catalog, part3, tableIdent)
createTablePartition(catalog, part4, tableIdent)
createTablePartition(catalog, part5, tableIdent)
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet ==
Set(part1, part2, part3, part4))
Set(part1, part2, part3, part4, part5))
if (isDatasourceTable) {
convertToDatasourceTable(catalog, tableIdent)
}

// basic drop partition
sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2))
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part5))

// drop partitions without explicitly specifying database
catalog.setCurrentDatabase("dbx")
sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))

// table to alter does not exist
intercept[AnalysisException] {
Expand All @@ -1314,10 +1316,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {

// partition to drop does not exist when using IF EXISTS
sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1))
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5))

// partition spec in DROP PARTITION should be case insensitive by default
sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')")
assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part5))

// use int literal as partition value for int type partition column
sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)")
assert(catalog.listPartitions(tableIdent).isEmpty)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.test.TestHiveSingleton
Expand Down Expand Up @@ -226,108 +225,6 @@ class HiveDDLSuite
}
}

test("SPARK-17732: Drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

for (country <- Seq("US", "CA", "KR")) {
for (quarter <- 1 to 4) {
sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
}
}

sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')")
sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)

// According to the declarative partition spec definitions, this drops the union of target
// partitions without exceptions. Hive raises exceptions because it handles them sequentially.
sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
}
}

test("SPARK-17732: Error handling for drop partitions by filter") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')")
}.getMessage
assert(m.contains("unknown is not a valid partition column in table"))

val m2 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')")
}.getMessage
assert(m2.contains("unknown is not a valid partition column in table"))

val m3 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')")
}.getMessage
assert(m3.contains("'<=>' operator is not allowed in partition specification"))

val m4 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))")
}.getMessage
assert(m4.contains("'<=>' operator is not allowed in partition specification"))

val m5 = intercept[ParseException] {
sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)")
}.getMessage
assert(m5.contains("Invalid partition filter specification"))

sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')")
val m6 = intercept[AnalysisException] {
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')")
}.getMessage
// The query is not executed because `PARTITION (quarter <= '2')` is invalid.
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)
assert(m6.contains("There is no partition for (`quarter` <= '2')"))
}
}

test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") {
withTable("sales") {
sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)")

val m = intercept[ParseException] {
sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')")
}.getMessage()
assert(m.contains("Invalid partition filter specification"))
}
}

test("drop views") {
withTable("tab1") {
val tabName = "tab1"
Expand Down