-
Notifications
You must be signed in to change notification settings - Fork 28.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support partition filters in ALTER TABLE DROP PARTITION #20999
[SPARK-14922][SPARK-17732][SPARK-23866][SQL] Support partition filters in ALTER TABLE DROP PARTITION #20999
Changes from 3 commits
b57a5d1
148f477
7d3cf0c
a964d2a
94d1862
67c2214
6397f98
77a945e
09498c6
5e9e28c
441acf3
9b84057
2085088
146aa32
25533a0
9676061
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -293,6 +293,28 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |
} | ||
} | ||
|
||
/** | ||
* Create a partition specification map with filters. | ||
*/ | ||
override def visitDropPartitionSpec( | ||
ctx: DropPartitionSpecContext): Seq[Expression] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: can we move
|
||
withOrigin(ctx) { | ||
ctx.dropPartitionVal().asScala.map { pFilter => | ||
if (pFilter.identifier() == null || pFilter.constant() == null || | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. no chance There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure. The other 2 conditions can definitely be true, but I am not sure about this. I think it is safer to check it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I saw no null check in the other partition spec, then I thought so; https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala#L274 |
||
pFilter.comparisonOperator() == null) { | ||
throw new ParseException(s"Invalid partition spec: ${pFilter.getText}", ctx) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add tests for this exception in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, will do ASAP, thanks. |
||
} | ||
// We cannot use UnresolvedAttribute because resolution is performed after Analysis, when | ||
// running the command. The type is not relevant, it is replaced during the real resolution | ||
val partition = | ||
AttributeReference(pFilter.identifier().getText, StringType)() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks weird and why can we use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, the answer is in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me have more time to check this behaviour. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For example, how about this approach? (you tried already?) 52506f1 It added unresolved a logical plan (an input relation and filters) for There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Your approach has the same issue, ie. would fail because the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I may be missing something here, so sorry if I am not understanding something, but I think the issue is that the analyzer is called anyway before the I think the alternative here is to add a rule to the analyzer for this, but it seems an overkill to me. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yea, if you put There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh, now I see, sorry. What about then having a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya, looks good to me. But, I'm not sure which one is the right approach, so we'd be better to wait for other reviewer's comments here, too. cc: @gatorsmile @viirya There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, thanks @maropu. |
||
val value = Literal(visitStringConstant(pFilter.constant())) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
sorry, I'd need to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok, thanks for the check. It's ok to keep the current one. |
||
val operator = pFilter.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] | ||
buildComparison(partition, value, operator) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Convert a constant of any type into a string. This is typically used in DDL commands, and its | ||
* main purpose is to prevent slight differences due to back to back conversions i.e.: | ||
|
@@ -1015,6 +1037,23 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging | |
val left = expression(ctx.left) | ||
val right = expression(ctx.right) | ||
val operator = ctx.comparisonOperator().getChild(0).asInstanceOf[TerminalNode] | ||
buildComparison(left, right, operator) | ||
} | ||
|
||
/** | ||
* Creates a comparison expression. The following comparison operators are supported: | ||
* - Equal: '=' or '==' | ||
* - Null-safe Equal: '<=>' | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Seems we can't support null-safe equality because it is not supported by Hive metastore partition predicate pushdown. See HiveShim.scala. |
||
* - Not Equal: '<>' or '!=' | ||
* - Less than: '<' | ||
* - Less then or Equal: '<=' | ||
* - Greater than: '>' | ||
* - Greater then or Equal: '>=' | ||
*/ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hive also supports all the comparators above? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, it does |
||
private def buildComparison( | ||
left: Expression, | ||
right: Expression, | ||
operator: TerminalNode): Expression = { | ||
operator.getSymbol.getType match { | ||
case SqlBaseParser.EQ => | ||
EqualTo(left, right) | ||
|
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -927,7 +927,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) { | |||
} | ||||
AlterTableDropPartitionCommand( | ||||
visitTableIdentifier(ctx.tableIdentifier), | ||||
ctx.partitionSpec.asScala.map(visitNonOptionalPartitionSpec), | ||||
ctx.dropPartitionSpec().asScala.map(visitDropPartitionSpec), | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you update the comment?: spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala Line 916 in 01c3dfa
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. mmmh, I am not sure how to update it. The only difference is that |
||||
ifExists = ctx.EXISTS != null, | ||||
purge = ctx.PURGE != null, | ||||
retainData = false) | ||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -29,10 +29,10 @@ import org.apache.hadoop.mapred.{FileInputFormat, JobConf} | |
|
||
import org.apache.spark.sql.{AnalysisException, Row, SparkSession} | ||
import org.apache.spark.sql.catalyst.TableIdentifier | ||
import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, Resolver} | ||
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute} | ||
import org.apache.spark.sql.catalyst.catalog._ | ||
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec | ||
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. too many imports? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure what you mean here. The list of imports would be very long, as I use, EqualTo, And, Literal, Cast, BinaryComparison, etc. I can list all them, but I am not sure it is worth. What do you think? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just wanted to check if your IDE wrongly folded this import, or not. It's ok. |
||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, PartitioningUtils} | ||
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat | ||
|
@@ -521,35 +521,112 @@ case class AlterTableRenamePartitionCommand( | |
*/ | ||
case class AlterTableDropPartitionCommand( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we make table relation as a child? then we can resolve the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about that. The point is that we have anyway to check that the attributes specified are the partitioning ones. So I am not sure it is worth to run the whole analyzer rules for something we have anyway to handle somehow. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But it's also weird to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, I will. Thanks. |
||
tableName: TableIdentifier, | ||
specs: Seq[TablePartitionSpec], | ||
partitionsFilters: Seq[Seq[Expression]], | ||
ifExists: Boolean, | ||
purge: Boolean, | ||
retainData: Boolean) | ||
extends RunnableCommand { | ||
|
||
override def run(sparkSession: SparkSession): Seq[Row] = { | ||
val catalog = sparkSession.sessionState.catalog | ||
val timeZone = Option(sparkSession.sessionState.conf.sessionLocalTimeZone) | ||
val table = catalog.getTableMetadata(tableName) | ||
val partitionColumns = table.partitionColumnNames | ||
val partitionAttributes = table.partitionSchema.toAttributes.map(a => a.name -> a).toMap | ||
DDLUtils.verifyAlterTableType(catalog, table, isView = false) | ||
DDLUtils.verifyPartitionProviderIsHive(sparkSession, table, "ALTER TABLE DROP PARTITION") | ||
|
||
val normalizedSpecs = specs.map { spec => | ||
PartitioningUtils.normalizePartitionSpec( | ||
spec, | ||
table.partitionColumnNames, | ||
table.identifier.quotedString, | ||
sparkSession.sessionState.conf.resolver) | ||
val resolvedSpecs = partitionsFilters.flatMap { filtersSpec => | ||
if (hasComplexFilters(filtersSpec)) { | ||
generatePartitionSpec(filtersSpec, | ||
partitionColumns, | ||
partitionAttributes, | ||
table.identifier, | ||
catalog, | ||
sparkSession.sessionState.conf.resolver, | ||
timeZone, | ||
ifExists) | ||
} else { | ||
val partitionSpec = filtersSpec.map { | ||
case EqualTo(key: Attribute, Literal(value, StringType)) => | ||
key.name -> value.toString | ||
}.toMap | ||
PartitioningUtils.normalizePartitionSpec( | ||
partitionSpec, | ||
partitionColumns, | ||
table.identifier.quotedString, | ||
sparkSession.sessionState.conf.resolver) :: Nil | ||
} | ||
} | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should check resolvedSpecs here to throw error message if total resolved spec is empty. |
||
catalog.dropPartitions( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does hive have an API to drop partitions with a predicate? I think the current approach is very inefficient with non-equal partition predicates. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unfortunately, no. I checked https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java but I could find none. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So the implementation here is similar to how hive implements it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is my understanding. You can check |
||
table.identifier, normalizedSpecs, ignoreIfNotExists = ifExists, purge = purge, | ||
table.identifier, resolvedSpecs, ignoreIfNotExists = ifExists, purge = purge, | ||
retainData = retainData) | ||
|
||
CommandUtils.updateTableStats(sparkSession, table) | ||
|
||
Seq.empty[Row] | ||
} | ||
|
||
def hasComplexFilters(partitionFilterSpec: Seq[Expression]): Boolean = { | ||
partitionFilterSpec.exists(!_.isInstanceOf[EqualTo]) | ||
} | ||
|
||
def generatePartitionSpec( | ||
partitionFilterSpec: Seq[Expression], | ||
partitionColumns: Seq[String], | ||
partitionAttributes: Map[String, Attribute], | ||
tableIdentifier: TableIdentifier, | ||
catalog: SessionCatalog, | ||
resolver: Resolver, | ||
timeZone: Option[String], | ||
ifExists: Boolean): Seq[TablePartitionSpec] = { | ||
val filters = partitionFilterSpec.map { pFilter => | ||
pFilter.transform { | ||
// Resolve the partition attributes | ||
case partitionCol: Attribute => | ||
val normalizedPartition = PartitioningUtils.normalizePartitionColumn( | ||
partitionCol.name, | ||
partitionColumns, | ||
tableIdentifier.quotedString, | ||
resolver) | ||
partitionAttributes(normalizedPartition) | ||
}.transform { | ||
// Cast the partition value to the data type of the corresponding partition attribute | ||
case cmp @ BinaryComparison(partitionAttr, value) | ||
if !partitionAttr.dataType.sameType(value.dataType) => | ||
cmp.withNewChildren(Seq(partitionAttr, Cast(value, partitionAttr.dataType, timeZone))) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm, have you tested There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, please see the tests here: https://github.com/apache/spark/pull/20999/files/a964d2a7def5aed04bd362b3000b36583c0ba272#diff-b7094baa12601424a5d19cb930e3402fR663. Notice that value is always a There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The predicates are not actually converted to Hive's partition predicates. If it can't convert the predicates, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah nice catch, thanks! |
||
} | ||
} | ||
val partitions = catalog.listPartitionsByFilter(tableIdentifier, filters) | ||
if (partitions.isEmpty && !ifExists) { | ||
throw new AnalysisException(s"There is no partition for ${filters.reduceLeft(And).sql}") | ||
} | ||
partitions.map(_.spec) | ||
} | ||
} | ||
|
||
|
||
object AlterTableDropPartitionCommand { | ||
|
||
def fromSpecs( | ||
tableName: TableIdentifier, | ||
specs: Seq[TablePartitionSpec], | ||
ifExists: Boolean, | ||
purge: Boolean, | ||
retainData: Boolean): AlterTableDropPartitionCommand = { | ||
AlterTableDropPartitionCommand(tableName, | ||
specs.map(tablePartitionToPartitionFilters), | ||
ifExists, | ||
purge, | ||
retainData) | ||
} | ||
|
||
def tablePartitionToPartitionFilters(spec: TablePartitionSpec): Seq[Expression] = { | ||
spec.map { | ||
case (key, value) => EqualTo(AttributeReference(key, StringType)(), Literal(value)) | ||
}.toSeq | ||
} | ||
} | ||
|
||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -861,7 +861,8 @@ class DDLParserSuite extends PlanTest with SharedSQLContext { | |
assertUnsupported(sql2_view) | ||
|
||
val tableIdent = TableIdentifier("table_name", None) | ||
val expected1_table = AlterTableDropPartitionCommand( | ||
|
||
val expected1_table = AlterTableDropPartitionCommand.fromSpecs( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add tests case to check if the parser can accept the comparators added by this pr? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sure, will do, thanks. |
||
tableIdent, | ||
Seq( | ||
Map("dt" -> "2008-08-08", "country" -> "us"), | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It has to be in this format?
partCol1 > 2
How about2 > partCol1
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, in Hive it has to be like this.
2 > partCol1
is not supported by Hive.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive also throws antler errors for the case
2 > partCol1
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive does throw an error in that case, you mean asking that error is a parsing or another kind of exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea, yes. I like user-understandable error messages.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hive throws this parser exception:
so yes, it is analogous to this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for the check. I still like meaningful messages though, we shold wait for other reviewer's comments.