From 4ae2b2b257c8440340b5db760309da5587dccffb Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 28 Jun 2017 00:44:51 +0800 Subject: [PATCH] remove QueryPlan.preCanonicalized --- .../sql/catalyst/catalog/interface.scala | 23 +++++++++++-------- .../spark/sql/catalyst/plans/QueryPlan.scala | 13 ++++------- .../sql/execution/DataSourceScanExec.scala | 8 +++++-- .../datasources/LogicalRelation.scala | 5 +++- 4 files changed, 27 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index b63bef9193332..da50b0e7e8e42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -27,7 +27,8 @@ import com.google.common.base.Objects import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation -import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, Literal} +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Cast, ExprId, Literal} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.catalyst.util.quoteIdentifier @@ -425,15 +426,17 @@ case class CatalogRelation( Objects.hashCode(tableMeta.identifier, output) } - override def preCanonicalized: LogicalPlan = copy(tableMeta = CatalogTable( - identifier = tableMeta.identifier, - tableType = tableMeta.tableType, - storage = CatalogStorageFormat.empty, - schema = tableMeta.schema, - partitionColumnNames = tableMeta.partitionColumnNames, - bucketSpec = tableMeta.bucketSpec, - createTime = -1 - )) + override lazy val canonicalized: LogicalPlan = copy( + tableMeta = tableMeta.copy( + storage = CatalogStorageFormat.empty, + createTime = -1 + ), + dataCols = dataCols.zipWithIndex.map { + case (attr, index) => attr.withExprId(ExprId(index)) + }, + partitionCols = partitionCols.zipWithIndex.map { + case (attr, index) => attr.withExprId(ExprId(index + dataCols.length)) + }) override def computeStats: Statistics = { // For data source tables, we will create a `LogicalRelation` and won't call this method, for diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 01b3da3f7c482..7addbaaa9afa5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -188,12 +188,13 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT * Plans where `this.canonicalized == other.canonicalized` will always evaluate to the same * result. * - * Some nodes should overwrite this to provide proper canonicalize logic. + * Some nodes should overwrite this to provide proper canonicalize logic, but they should remove + * expressions cosmetic variations themselves. */ lazy val canonicalized: PlanType = { val canonicalizedChildren = children.map(_.canonicalized) var id = -1 - preCanonicalized.mapExpressions { + mapExpressions { case a: Alias => id += 1 // As the root of the expression, Alias will always take an arbitrary exprId, we need to @@ -206,18 +207,12 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT // Top level `AttributeReference` may also be used for output like `Alias`, we should // normalize the epxrId too. id += 1 - ar.withExprId(ExprId(id)) + ar.withExprId(ExprId(id)).canonicalized case other => QueryPlan.normalizeExprId(other, allAttributes) }.withNewChildren(canonicalizedChildren) } - /** - * Do some simple transformation on this plan before canonicalizing. Implementations can override - * this method to provide customized canonicalize logic without rewriting the whole logic. - */ - protected def preCanonicalized: PlanType = this - /** * Returns true when the given query plan will return the same results as this query plan. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 74fc23a52a141..a0def68d88e0d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -138,8 +138,12 @@ case class RowDataSourceScanExec( } // Only care about `relation` and `metadata` when canonicalizing. - override def preCanonicalized: SparkPlan = - copy(rdd = null, outputPartitioning = null, metastoreTableIdentifier = None) + override lazy val canonicalized: SparkPlan = + copy( + output.map(QueryPlan.normalizeExprId(_, output)), + rdd = null, + outputPartitioning = null, + metastoreTableIdentifier = None) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index c1b2895f1747e..6ba190b9e5dcf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference} +import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics} import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.util.Utils @@ -43,7 +44,9 @@ case class LogicalRelation( } // Only care about relation when canonicalizing. - override def preCanonicalized: LogicalPlan = copy(catalogTable = None) + override lazy val canonicalized: LogicalPlan = copy( + output = output.map(QueryPlan.normalizeExprId(_, output)), + catalogTable = None) @transient override def computeStats: Statistics = { catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(