diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index 4531fe4a0ebaf..df85c70c6cdea 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -235,7 +235,11 @@ partitionSpecLocation ; partitionSpec - : PARTITION '(' expression (',' expression)* ')' + : PARTITION '(' partitionVal (',' partitionVal)* ')' + ; + +partitionVal + : identifier (EQ constant)? ; describeFuncName diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 8e8d37407c02a..3fa7bf1cdbf16 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -194,15 +194,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitPartitionSpec( ctx: PartitionSpecContext): Map[String, Option[String]] = withOrigin(ctx) { - val parts = ctx.expression.asScala.map { pVal => - expression(pVal) match { - case UnresolvedAttribute(name :: Nil) => - name -> None - case cmp @ EqualTo(UnresolvedAttribute(name :: Nil), constant: Literal) => - name -> Option(constant.toString) - case _ => - throw new ParseException("Invalid partition filter specification", ctx) - } + val parts = ctx.partitionVal.asScala.map { pVal => + val name = pVal.identifier.getText + val value = Option(pVal.constant).map(visitStringConstant) + name -> value } // Before calling `toMap`, we check duplicated keys to avoid silently ignore partition values // in partition spec like PARTITION(a='1', b='2', a='3'). The real semantical check for @@ -211,23 +206,6 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { parts.toMap } - /** - * Create a partition filter specification. - */ - def visitPartitionFilterSpec(ctx: PartitionSpecContext): Expression = withOrigin(ctx) { - val parts = ctx.expression.asScala.map { pVal => - expression(pVal) match { - case EqualNullSafe(_, _) => - throw new ParseException("'<=>' operator is not allowed in partition specification.", ctx) - case cmp @ BinaryComparison(UnresolvedAttribute(name :: Nil), constant: Literal) => - cmp.withNewChildren(Seq(AttributeReference(name, StringType)(), constant)) - case _ => - throw new ParseException("Invalid partition filter specification", ctx) - } - } - parts.reduceLeft(And) - } - /** * Create a partition specification map without optional values. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 0300bfe1ece39..5f89a229d6242 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -832,7 +832,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitPartitionFilterSpec), + ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), ctx.EXISTS != null, ctx.PURGE != null) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index d80b000bcc598..0f126d0200eff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -31,8 +31,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison} -import org.apache.spark.sql.catalyst.expressions.{EqualTo, Expression, PredicateHelper} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} import org.apache.spark.sql.execution.datasources.PartitioningUtils import org.apache.spark.sql.types._ import org.apache.spark.util.SerializableConfiguration @@ -420,55 +419,27 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[Expression], + specs: Seq[TablePartitionSpec], ifExists: Boolean, purge: Boolean) - extends RunnableCommand with PredicateHelper { - - private def isRangeComparison(expr: Expression): Boolean = { - expr.find(e => e.isInstanceOf[BinaryComparison] && !e.isInstanceOf[EqualTo]).isDefined - } + extends RunnableCommand { override def run(sparkSession: SparkSession): Seq[Row] = { val catalog = sparkSession.sessionState.catalog val table = catalog.getTableMetadata(tableName) - val resolver = sparkSession.sessionState.conf.resolver DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - specs.foreach { expr => - expr.references.foreach { attr => - if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { - throw new AnalysisException(s"${attr.name} is not a valid partition column " + - s"in table ${table.identifier.quotedString}.") - } - } + val normalizedSpecs = specs.map { spec => + PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) } - if (specs.exists(isRangeComparison)) { - val partitionSet = specs.flatMap { spec => - val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(spec)).map(_.spec) - if (partitions.isEmpty && !ifExists) { - throw new AnalysisException(s"There is no partition for ${spec.sql}") - } - partitions - }.distinct - catalog.dropPartitions( - table.identifier, partitionSet, ignoreIfNotExists = ifExists, purge = purge) - } else { - val normalizedSpecs = specs.map { expr => - val spec = splitConjunctivePredicates(expr).map { - case BinaryComparison(AttributeReference(name, _, _, _), right) => name -> right.toString - }.toMap - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - resolver) - } - catalog.dropPartitions( - table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge) - } + catalog.dropPartitions( + table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge) Seq.empty[Row] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index e81512d1abf84..4f19a2d00b0e4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -215,14 +215,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] { if (overwrite.enabled) { val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { - import org.apache.spark.sql.catalyst.expressions._ - val expressions = deletedPartitions.map { specs => - specs.map { case (key, value) => - EqualTo(AttributeReference(key, StringType)(), Literal.create(value, StringType)) - }.reduceLeft(And) - }.toSeq AlterTableDropPartitionCommand( - l.catalogTable.get.identifier, expressions, + l.catalogTable.get.identifier, deletedPartitions.toSeq, ifExists = true, purge = true).run(t.sparkSession) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala index 057528bef5084..d31e7aeb3a78a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala @@ -21,7 +21,6 @@ import scala.reflect.{classTag, ClassTag} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ -import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, EqualTo, Literal} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical.Project @@ -613,12 +612,8 @@ class DDLCommandSuite extends PlanTest { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - And( - EqualTo(AttributeReference("dt", StringType)(), Literal.create("2008-08-08", StringType)), - EqualTo(AttributeReference("country", StringType)(), Literal.create("us", StringType))), - And( - EqualTo(AttributeReference("dt", StringType)(), Literal.create("2009-09-09", StringType)), - EqualTo(AttributeReference("country", StringType)(), Literal.create("uk", StringType)))), + Map("dt" -> "2008-08-08", "country" -> "us"), + Map("dt" -> "2009-09-09", "country" -> "uk")), ifExists = true, purge = false) val expected2_table = expected1_table.copy(ifExists = false) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 10843e9ba5753..a602d750d73d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1281,26 +1281,28 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val part2 = Map("a" -> "2", "b" -> "6") val part3 = Map("a" -> "3", "b" -> "7") val part4 = Map("a" -> "4", "b" -> "8") + val part5 = Map("a" -> "9", "b" -> "9") createDatabase(catalog, "dbx") createTable(catalog, tableIdent) createTablePartition(catalog, part1, tableIdent) createTablePartition(catalog, part2, tableIdent) createTablePartition(catalog, part3, tableIdent) createTablePartition(catalog, part4, tableIdent) + createTablePartition(catalog, part5, tableIdent) assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == - Set(part1, part2, part3, part4)) + Set(part1, part2, part3, part4, part5)) if (isDatasourceTable) { convertToDatasourceTable(catalog, tableIdent) } // basic drop partition sql("ALTER TABLE dbx.tab1 DROP IF EXISTS PARTITION (a='4', b='8'), PARTITION (a='3', b='7')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2)) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part2, part5)) // drop partitions without explicitly specifying database catalog.setCurrentDatabase("dbx") sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='2', b ='6')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5)) // table to alter does not exist intercept[AnalysisException] { @@ -1314,10 +1316,14 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { // partition to drop does not exist when using IF EXISTS sql("ALTER TABLE tab1 DROP IF EXISTS PARTITION (a='300')") - assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1)) + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part1, part5)) // partition spec in DROP PARTITION should be case insensitive by default sql("ALTER TABLE tab1 DROP PARTITION (A='1', B='5')") + assert(catalog.listPartitions(tableIdent).map(_.spec).toSet == Set(part5)) + + // use int literal as partition value for int type partition column + sql("ALTER TABLE tab1 DROP PARTITION (a=9, b=9)") assert(catalog.listPartitions(tableIdent).isEmpty) } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 15e3927b755af..951e0704148b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.test.TestHiveSingleton @@ -226,108 +225,6 @@ class HiveDDLSuite } } - test("SPARK-17732: Drop partitions by filter") { - withTable("sales") { - sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") - - for (country <- Seq("US", "CA", "KR")) { - for (quarter <- 1 to 4) { - sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") - } - } - - sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") - checkAnswer(sql("SHOW PARTITIONS sales"), - Row("country=CA/quarter=1") :: - Row("country=CA/quarter=2") :: - Row("country=KR/quarter=1") :: - Row("country=KR/quarter=2") :: - Row("country=KR/quarter=3") :: - Row("country=KR/quarter=4") :: - Row("country=US/quarter=1") :: - Row("country=US/quarter=2") :: - Row("country=US/quarter=3") :: - Row("country=US/quarter=4") :: Nil) - - sql("ALTER TABLE sales DROP PARTITION (country < 'KR'), PARTITION (quarter <= '1')") - checkAnswer(sql("SHOW PARTITIONS sales"), - Row("country=KR/quarter=2") :: - Row("country=KR/quarter=3") :: - Row("country=KR/quarter=4") :: - Row("country=US/quarter=2") :: - Row("country=US/quarter=3") :: - Row("country=US/quarter=4") :: Nil) - - sql("ALTER TABLE sales DROP PARTITION (country='KR', quarter='4')") - sql("ALTER TABLE sales DROP PARTITION (country='US', quarter='3')") - checkAnswer(sql("SHOW PARTITIONS sales"), - Row("country=KR/quarter=2") :: - Row("country=KR/quarter=3") :: - Row("country=US/quarter=2") :: - Row("country=US/quarter=4") :: Nil) - - sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')") - checkAnswer(sql("SHOW PARTITIONS sales"), - Row("country=KR/quarter=3") :: Nil) - - // According to the declarative partition spec definitions, this drops the union of target - // partitions without exceptions. Hive raises exceptions because it handles them sequentially. - sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')") - checkAnswer(sql("SHOW PARTITIONS sales"), Nil) - } - } - - test("SPARK-17732: Error handling for drop partitions by filter") { - withTable("sales") { - sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") - - val m = intercept[AnalysisException] { - sql("ALTER TABLE sales DROP PARTITION (unknown = 'KR')") - }.getMessage - assert(m.contains("unknown is not a valid partition column in table")) - - val m2 = intercept[AnalysisException] { - sql("ALTER TABLE sales DROP PARTITION (unknown < 'KR')") - }.getMessage - assert(m2.contains("unknown is not a valid partition column in table")) - - val m3 = intercept[AnalysisException] { - sql("ALTER TABLE sales DROP PARTITION (unknown <=> 'KR')") - }.getMessage - assert(m3.contains("'<=>' operator is not allowed in partition specification")) - - val m4 = intercept[ParseException] { - sql("ALTER TABLE sales DROP PARTITION (unknown <=> upper('KR'))") - }.getMessage - assert(m4.contains("'<=>' operator is not allowed in partition specification")) - - val m5 = intercept[ParseException] { - sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter)") - }.getMessage - assert(m5.contains("Invalid partition filter specification")) - - sql(s"ALTER TABLE sales ADD PARTITION (country = 'KR', quarter = '3')") - val m6 = intercept[AnalysisException] { - sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '2')") - }.getMessage - // The query is not executed because `PARTITION (quarter <= '2')` is invalid. - checkAnswer(sql("SHOW PARTITIONS sales"), - Row("country=KR/quarter=3") :: Nil) - assert(m6.contains("There is no partition for (`quarter` <= '2')")) - } - } - - test("SPARK-17732: Partition filter is not allowed in ADD PARTITION") { - withTable("sales") { - sql("CREATE TABLE sales(id INT) PARTITIONED BY (country STRING, quarter STRING)") - - val m = intercept[ParseException] { - sql("ALTER TABLE sales ADD PARTITION (country = 'US', quarter < '1')") - }.getMessage() - assert(m.contains("Invalid partition filter specification")) - } - } - test("drop views") { withTable("tab1") { val tabName = "tab1"