Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
maropu committed Sep 9, 2018
1 parent 4e72f7e commit 52506f1
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,7 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
}
// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when
// running the command. The type is not relevant, it is replaced during the real resolution
val partition =
AttributeReference(pFilter.identifier().getText, StringType)()
val partition = UnresolvedAttribute(pFilter.identifier().getText)
val value = Literal(visitStringConstant(pFilter.constant()))
val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode]
buildComparison(partition, value, operator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ package org.apache.spark.sql.execution
import java.util.Locale

import scala.collection.JavaConverters._

import org.antlr.v4.runtime.{ParserRuleContext, Token}
import org.antlr.v4.runtime.tree.TerminalNode

import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.expressions.{And, Expression, Or}
import org.apache.spark.sql.catalyst.parser._
import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -925,9 +924,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
if (ctx.VIEW != null) {
operationNotAllowed("ALTER VIEW ... DROP PARTITION", ctx)
}
val tableIdent = visitTableIdentifier(ctx.tableIdentifier)
val conditions = ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec(_).reduce(And)).reduce(Or)
AlterTableDropPartitionCommand(
visitTableIdentifier(ctx.tableIdentifier),
ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec),
tableIdent,
Filter(conditions, UnresolvedRelation(tableIdent)),
ifExists = ctx.EXISTS != null,
purge = ctx.PURGE != null,
retainData = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,17 @@ import java.util.Locale
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.util.control.NonFatal

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs._
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}

import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute, UnresolvedRelation}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan}
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
Expand Down Expand Up @@ -523,89 +522,37 @@ case class AlterTableRenamePartitionCommand(
*/
case class AlterTableDropPartitionCommand(
tableName: TableIdentifier,
partitionsFilters: Seq[Seq[Expression]],
partitionFilter: LogicalPlan,
ifExists: Boolean,
purge: Boolean,
retainData: Boolean)
extends RunnableCommand {
extends RunnableCommand with PredicateHelper {

override def innerChildren: Seq[LogicalPlan] = partitionFilter :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
val catalog = sparkSession.sessionState.catalog
val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone)
val table = catalog.getTableMetadata(tableName)
val partitionColumns = table.partitionColumnNames
val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap
DDLUtils.verifyAlterTableType(catalog, table, isView = false)
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION")

val resolvedSpecs = partitionsFilters.flatMap { filtersSpec =>
if (hasComplexFilters(filtersSpec)) {
generatePartitionSpec(filtersSpec,
partitionColumns,
partitionAttributes,
table.identifier,
catalog,
sparkSession.sessionState.conf.resolver,
timeZone,
ifExists)
} else {
val partitionSpec = filtersSpec.map {
case EqualTo(key: Attribute, Literal(value, StringType)) =>
key.name -> value.toString
}.toMap
PartitioningUtils.normalizePartitionSpec(
partitionSpec,
partitionColumns,
table.identifier.quotedString,
sparkSession.sessionState.conf.resolver) :: Nil
}
val resolvedSpec = sparkSession.sessionState.analyzer.executeAndCheck(partitionFilter) match {
case Filter(conditions, _) =>
val filters = splitDisjunctivePredicates(conditions).flatMap { filter =>
splitConjunctivePredicates(filter)
}
// Resolve TablePartitionSpec based on the resolved expressions?
...
}

catalog.dropPartitions(
table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge,
table.identifier, resolvedSpec, ignoreIfNotExists = ifExists, purge = purge,
retainData = retainData)

CommandUtils.updateTableStats(sparkSession, table)

Seq.empty[Row]
}

def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = {
partitionFilterSpec.exists(!_.isInstanceOf[EqualTo])
}

def generatePartitionSpec(
partitionFilterSpec: Seq[Expression],
partitionColumns: Seq[String],
partitionAttributes: Map[String, Attribute],
tableIdentifier: TableIdentifier,
catalog: SessionCatalog,
resolver: Resolver,
timeZone: Option[String],
ifExists: Boolean): Seq[TablePartitionSpec] = {
val filters = partitionFilterSpec.map { pFilter =>
pFilter.transform {
// Resolve the partition attributes
case partitionCol: Attribute =>
val normalizedPartition = PartitioningUtils.normalizePartitionColumn(
partitionCol.name,
partitionColumns,
tableIdentifier.quotedString,
resolver)
partitionAttributes(normalizedPartition)
}.transform {
// Cast the partition value to the data type of the corresponding partition attribute
case cmp @ BinaryComparison(partitionAttr, value)
if !partitionAttr.dataType.sameType(value.dataType) =>
cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone)))
}
}
val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters)
if (partitions.isEmpty && !ifExists) {
throw new AnalysisException(s"There is no partition for ${filters.reduceLeft(And).sql}")
}
partitions.map(_.spec)
}
}


Expand All @@ -617,17 +564,15 @@ object AlterTableDropPartitionCommand {
ifExists: Boolean,
purge: Boolean,
retainData: Boolean): AlterTableDropPartitionCommand = {
AlterTableDropPartitionCommand(tableName,
specs.map(tablePartitionToPartitionFilters),
ifExists,
purge,
retainData)
val conditions = specs.map(tablePartitionToPartitionFilter)
val filter = Filter(conditions.reduce(Or), UnresolvedRelation(tableName))
AlterTableDropPartitionCommand(tableName, filter, ifExists, purge, retainData)
}

def tablePartitionToPartitionFilters(spec: TablePartitionSpec): Seq[Expression] = {
spec.map {
case (key, value) => EqualTo(AttributeReference(key, StringType)(), Literal(value))
}.toSeq
private def tablePartitionToPartitionFilter(spec: TablePartitionSpec): Expression = {
spec.map { case (key, value) =>
EqualTo(AttributeReference(key, StringType)(), Literal(value))
}.reduce(And)
}
}

Expand Down

0 comments on commit 52506f1

Please sign in to comment.