Skip to content

Commit

Permalink
bug fix
Browse files Browse the repository at this point in the history
  • Loading branch information
DazhuangSu committed Nov 14, 2017
1 parent 20f658a commit 85fdb46
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
}

/**
*
*/
protected def visitPartition(ctx: PartitionSpecContext): (Map[String, String], Expression) = {
(visitNonOptionalPartitionSpec(ctx), visitPartitionFilterSpec(ctx))
}

/**
* Convert a constant of any type into a string. This is typically used in DDL commands, and its
* main purpose is to prevent slight differences due to back to back conversions i.e.:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -916,8 +916,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
}
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec).filter(_ != null),
ctx.partitionSpec.asScala.map(visitPartitionFilterSpec).filter(_ != null),
ctx.partitionSpec.asScala.map(visitPartition),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,7 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
specs: Seq[TablePartitionSpec],
exprs: Seq[Expression] = Seq.empty[Expression],
partitions: Seq[(TablePartitionSpec, Expression)],
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
Expand All @@ -529,39 +528,38 @@ case class AlterTableDropPartitionCommand(
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

exprs.foreach { expr =>
expr.references.foreach { attr =>
if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.")
val toDrop = partitions.flatMap { partition =>
val normalizedSpecs = PartitioningUtils.normalizePartitionSpec(
partition._1,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)

val partitionSet = {
if (partition._2 != null) {
partition._2.references.foreach { attr =>
if (!table.partitionColumnNames.exists(resolver(_, attr.name))) {
throw new AnalysisException(s"${attr.name} is not a valid partition column " + s"in table ${table.identifier.quotedString}.")
}
}
val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(partition._2)).map(_.spec)
if (partitions.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${partition._2.sql}")
}
partitions
} else {
Seq.empty[TablePartitionSpec]
}
}
}

val partitionSet = exprs.flatMap { expr =>
val partitions = catalog.listPartitionsByFilter(table.identifier, Seq(expr)).map(_.spec)
if (partitions.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${expr.sql}")
}
partitions
}.distinct

val normalizedSpecs = specs.map { spec =>
PartitioningUtils.normalizePartitionSpec(
spec,
table.partitionColumnNames,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver)
}.filter(_.size != 0)
}.distinct

val toDrop = {
if (normalizedSpecs.isEmpty && partitionSet.isEmpty) {
Seq.empty[TablePartitionSpec]
} else if (normalizedSpecs.isEmpty && !partitionSet.isEmpty) {
partitionSet
} else if (!normalizedSpecs.isEmpty && partitionSet.isEmpty) {
normalizedSpecs
Seq(normalizedSpecs)
} else {
partitionSet.intersect(normalizedSpecs)
partitionSet.intersect(normalizedSpecs.toSeq)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.internal.io.FileCommitProtocol
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.util.SchemaUtils
Expand Down Expand Up @@ -128,7 +128,7 @@ case class InsertIntoHadoopFsRelationCommand(
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
catalogTable.get.identifier, deletedPartitions.toSeq,
catalogTable.get.identifier, deletedPartitions.map(x => (x, null)).toSeq,
ifExists = true, purge = false,
retainData = true /* already deleted */).run(sparkSession)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,18 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans
import org.apache.spark.sql.catalyst.dsl.plans.DslLogicalPlan
import org.apache.spark.sql.catalyst.expressions.{JsonTuple, Literal}
import org.apache.spark.sql.catalyst.expressions.JsonTuple
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Generate, InsertIntoDir, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Project, ScriptTransformation}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}



class DDLParserSuite extends PlanTest with SharedSQLContext {
private lazy val parser = new SparkSqlParser(new SQLConf)

Expand Down Expand Up @@ -826,8 +826,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext {
val expected1_table = AlterTableDropPartitionCommand(
tableIdent,
Seq(
Map("dt" -> "2008-08-08", "country" -> "us"),
Map("dt" -> "2009-09-09", "country" -> "uk")),
(Map("dt" -> "2008-08-08", "country" -> "us"), null),
(Map("dt" -> "2009-09-09", "country" -> "uk"), null)),
ifExists = true,
purge = false,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,13 +500,30 @@ class HiveDDLSuite
withTable("sales") {
sql("CREATE TABLE sales (id INT) PARTITIONED BY (country STRING, quarter STRING)")

for (country <- Seq("US", "CA", "KR")) {
for (quarter <- 1 to 4) {
for (country <- Seq("AU", "US", "CA", "KR")) {
for (quarter <- 1 to 5) {
sql(s"ALTER TABLE sales ADD PARTITION (country = '$country', quarter = '$quarter')")
}
}

sql("ALTER TABLE sales DROP PARTITION (country < 'KR', quarter > '2')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=AU/quarter=1") ::
Row("country=AU/quarter=2") ::
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Row("country=KR/quarter=1") ::
Row("country=KR/quarter=2") ::
Row("country=KR/quarter=3") ::
Row("country=KR/quarter=4") ::
Row("country=KR/quarter=5") ::
Row("country=US/quarter=1") ::
Row("country=US/quarter=2") ::
Row("country=US/quarter=3") ::
Row("country=US/quarter=4") ::
Row("country=US/quarter=5") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (country < 'CA'), PARTITION (quarter = '5')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=CA/quarter=1") ::
Row("country=CA/quarter=2") ::
Expand Down Expand Up @@ -536,13 +553,13 @@ class HiveDDLSuite
Row("country=US/quarter=2") ::
Row("country=US/quarter=4") :: Nil)

sql("ALTER TABLE sales DROP PARTITION (quarter <= 2), PARTITION (quarter >= '4')")
sql("ALTER TABLE sales DROP PARTITION (quarter <= '2'), PARTITION (quarter >= '4')")
checkAnswer(sql("SHOW PARTITIONS sales"),
Row("country=KR/quarter=3") :: Nil)

// According to the declarative partition spec definitions, this drops the union of target
// partitions without exceptions. Hive raises exceptions because it handles them sequentially.
sql("ALTER TABLE sales DROP PARTITION (quarter <= 4), PARTITION (quarter <= '3')")
sql("ALTER TABLE sales DROP PARTITION (quarter <= '4'), PARTITION (quarter <= '3')")
checkAnswer(sql("SHOW PARTITIONS sales"), Nil)
}
}
Expand Down

0 comments on commit 85fdb46

Please sign in to comment.