From 6acb460381c96fe71f807f94bb617f3928f41694 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Thu, 27 Sep 2018 09:04:20 +0800 Subject: [PATCH 1/2] In the PruneFileSourcePartitions optimizer, replace the nonPartitionOps field with true in the And(partitionOps, nonPartitionOps) to make the partition can be pruned --- .../PruneFileSourcePartitions.scala | 25 +++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 16b2367bfdd5c..1cf842a9b6221 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.BooleanType private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown { @@ -39,21 +40,31 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { _, _)) if filters.nonEmpty && fsRelation.partitionSchemaOption.isDefined => + + val sparkSession = fsRelation.sparkSession + val partitionColumns = + logicalRelation.resolve( + partitionSchema, sparkSession.sessionState.analyzer.resolver) + val partitionSet = AttributeSet(partitionColumns) // The attribute name of predicate could be different than the one in schema in case of // case insensitive, we should change them to match the one in schema, so we donot need to // worry about case sensitivity anymore. val normalizedFilters = filters.map { e => - e transform { + e transformUp { case a: AttributeReference => a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) + // Replace the nonPartitionOps field with true in the And(partitionOps, nonPartitionOps) + // to make the partition can be pruned + case and @And(left, right) => + val leftPartition = left.references.filter(partitionSet.contains(_)) + val rightPartition = right.references.filter(partitionSet.contains(_)) + if (leftPartition.size == left.references.size && rightPartition.size == 0) { + and.withNewChildren(Seq(left, Literal(true, BooleanType))) + } else if (leftPartition.size == 0 && rightPartition.size == right.references.size) { + and.withNewChildren(Seq(Literal(true, BooleanType), right)) + } else and } } - - val sparkSession = fsRelation.sparkSession - val partitionColumns = - logicalRelation.resolve( - partitionSchema, sparkSession.sessionState.analyzer.resolver) - val partitionSet = AttributeSet(partitionColumns) val partitionKeyFilters = ExpressionSet(normalizedFilters .filterNot(SubqueryExpression.hasSubquery(_)) From a13fe239ec2f44cdb09a5b66d50e018b55dc8ee6 Mon Sep 17 00:00:00 2001 From: 10129659 Date: Mon, 15 Oct 2018 14:32:29 +0800 Subject: [PATCH 2/2] Replace with isEmpty --- .../execution/datasources/PruneFileSourcePartitions.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala index 1cf842a9b6221..d9e079b83f56c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PruneFileSourcePartitions.scala @@ -55,12 +55,12 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] { a.withName(logicalRelation.output.find(_.semanticEquals(a)).get.name) // Replace the nonPartitionOps field with true in the And(partitionOps, nonPartitionOps) // to make the partition can be pruned - case and @And(left, right) => + case and @ And(left, right) => val leftPartition = left.references.filter(partitionSet.contains(_)) val rightPartition = right.references.filter(partitionSet.contains(_)) - if (leftPartition.size == left.references.size && rightPartition.size == 0) { + if (leftPartition.size == left.references.size && rightPartition.isEmpty) { and.withNewChildren(Seq(left, Literal(true, BooleanType))) - } else if (leftPartition.size == 0 && rightPartition.size == right.references.size) { + } else if (leftPartition.isEmpty && rightPartition.size == right.references.size) { and.withNewChildren(Seq(Literal(true, BooleanType), right)) } else and }