From 85fdb4663865a5998788860c4ec152bcaee3a894 Mon Sep 17 00:00:00 2001 From: Dylan Su Date: Tue, 14 Nov 2017 14:48:51 +0800 Subject: [PATCH] bug fix --- .../sql/catalyst/parser/AstBuilder.scala | 7 +++ .../spark/sql/execution/SparkSqlParser.scala | 3 +- .../spark/sql/execution/command/ddl.scala | 52 +++++++++---------- .../InsertIntoHadoopFsRelationCommand.scala | 4 +- .../execution/command/DDLParserSuite.scala | 10 ++-- .../sql/hive/execution/HiveDDLSuite.scala | 25 +++++++-- 6 files changed, 61 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index f79633e0140ab..4b1cf7b9a1050 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -314,6 +314,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging } } + /** + * + */ + protected def visitPartition(ctx: PartitionSpecContext): (Map[String, String], Expression) = { + (visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx)) + } + /** * Convert a constant of any type into a string. This is typically used in DDL commands, and its * main purpose is to prevent slight differences due to back to back conversions i.e.: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index a58b92b2b4de0..910a1a76af80e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -916,8 +916,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { } AlterTableDropPartitionCommand( visitTableIdentifier(ctx.tableIdentifier), - ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec).filter(_ != null), - ctx.partitionSpec.asScala.map(visitPartitionFilterSpec).filter(_ != null), + ctx.partitionSpec.asScala.map(visitPartition), ifExists = ctx.EXISTS != null, purge = ctx.PURGE != null, retainData = false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index 08060c6f0aeaa..f1f9bcfbc0496 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -515,8 +515,7 @@ case class AlterTableRenamePartitionCommand( */ case class AlterTableDropPartitionCommand( tableName: TableIdentifier, - specs: Seq[TablePartitionSpec], - exprs: Seq[Expression] = Seq.empty[Expression], + partitions: Seq[(TablePartitionSpec, Expression)], ifExists: Boolean, purge: Boolean, retainData: Boolean) @@ -529,39 +528,38 @@ case class AlterTableDropPartitionCommand( DDLUtils.verifyAlterTableType(catalog, table, isView = false) DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") - exprs.foreach { expr => - expr.references.foreach { attr => - if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { - throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") + val toDrop = partitions.flatMap { partition => + val normalizedSpecs = PartitioningUtils.normalizePartitionSpec( + partition._1, + table.partitionColumnNames, + table.identifier.quotedString, + sparkSession.sessionState.conf.resolver) + + val partitionSet = { + if (partition._2 != null) { + partition._2.references.foreach { attr => + if (!table.partitionColumnNames.exists(resolver(_, attr.name))) { + throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.") + } + } + val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(partition._2)).map(_.spec) + if (partitions.isEmpty && !ifExists) { + throw new AnalysisException(s"There is no partition for ${partition._2.sql}") + } + partitions + } else { + Seq.empty[TablePartitionSpec] } - } - } - - val partitionSet = exprs.flatMap { expr => - val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(expr)).map(_.spec) - if (partitions.isEmpty && !ifExists) { - throw new AnalysisException(s"There is no partition for ${expr.sql}") - } - partitions - }.distinct - - val normalizedSpecs = specs.map { spec => - PartitioningUtils.normalizePartitionSpec( - spec, - table.partitionColumnNames, - table.identifier.quotedString, - sparkSession.sessionState.conf.resolver) - }.filter(_.size != 0) + }.distinct - val toDrop = { if (normalizedSpecs.isEmpty && partitionSet.isEmpty) { Seq.empty[TablePartitionSpec] } else if (normalizedSpecs.isEmpty && !partitionSet.isEmpty) { partitionSet } else if (!normalizedSpecs.isEmpty && partitionSet.isEmpty) { - normalizedSpecs + Seq(normalizedSpecs) } else { - partitionSet.intersect(normalizedSpecs) + partitionSet.intersect(normalizedSpecs.toSeq) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala index 675bee85bf61e..ac0fef5f60e74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala @@ -25,7 +25,7 @@ import org.apache.spark.internal.io.FileCommitProtocol import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec -import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.util.SchemaUtils @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand( val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions if (deletedPartitions.nonEmpty) { AlterTableDropPartitionCommand( - catalogTable.get.identifier, deletedPartitions.toSeq, + catalogTable.get.identifier, deletedPartitions.map(x => (x, null)).toSeq, ifExists = true, purge = false, retainData = true /* already deleted */).run(sparkSession) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala index 41af956efaaf7..801ca65bb9f6d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLParserSuite.scala @@ -29,11 +29,10 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan -import org.apache.spark.sql.catalyst.expressions.{JsonTuple, Literal} +import org.apache.spark.sql.catalyst.expressions.JsonTuple import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan} -import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation} +import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.SparkSqlParser import org.apache.spark.sql.execution.datasources.CreateTable import org.apache.spark.sql.internal.{HiveSerDe, SQLConf} @@ -41,6 +40,7 @@ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} + class DDLParserSuite extends PlanTest with SharedSQLContext { private lazy val parser = new SparkSqlParser(new SQLConf) @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { val expected1_table = AlterTableDropPartitionCommand( tableIdent, Seq( - Map("dt" -> "2008-08-08", "country" -> "us"), - Map("dt" -> "2009-09-09", "country" -> "uk")), + (Map("dt" -> "2008-08-08", "country" -> "us"), null), + (Map("dt" -> "2009-09-09", "country" -> "uk"), null)), ifExists = true, purge = false, retainData = false) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index fad85d957a79e..2ecfd168ed3d3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -500,13 +500,30 @@ class HiveDDLSuite withTable("sales") { sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)") - for (country <- Seq("US", "CA", "KR")) { - for (quarter <- 1 to 4) { + for (country <- Seq("AU", "US", "CA", "KR")) { + for (quarter <- 1 to 5) { sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')") } } sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')") + checkAnswer(sql("SHOW PARTITIONS sales"), + Row("country=AU/quarter=1") :: + Row("country=AU/quarter=2") :: + Row("country=CA/quarter=1") :: + Row("country=CA/quarter=2") :: + Row("country=KR/quarter=1") :: + Row("country=KR/quarter=2") :: + Row("country=KR/quarter=3") :: + Row("country=KR/quarter=4") :: + Row("country=KR/quarter=5") :: + Row("country=US/quarter=1") :: + Row("country=US/quarter=2") :: + Row("country=US/quarter=3") :: + Row("country=US/quarter=4") :: + Row("country=US/quarter=5") :: Nil) + + sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')") checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=CA/quarter=1") :: Row("country=CA/quarter=2") :: @@ -536,13 +553,13 @@ class HiveDDLSuite Row("country=US/quarter=2") :: Row("country=US/quarter=4") :: Nil) - sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')") + sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')") checkAnswer(sql("SHOW PARTITIONS sales"), Row("country=KR/quarter=3") :: Nil) // According to the declarative partition spec definitions, this drops the union of target // partitions without exceptions. Hive raises exceptions because it handles them sequentially. - sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')") + sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')") checkAnswer(sql("SHOW PARTITIONS sales"), Nil) } }