From d75b94c1abb4b60444a4191319c787a50a061bf9 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Thu, 23 Feb 2017 21:08:02 -0800 Subject: [PATCH 1/2] fix. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 +++-- .../apache/spark/sql/catalyst/dsl/package.scala | 2 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 4 ++-- .../catalyst/optimizer/ColumnPruningSuite.scala | 4 ++-- .../optimizer/EliminateSubqueryAliasesSuite.scala | 7 ------- .../catalyst/optimizer/FilterPushdownSuite.scala | 14 +++++++------- .../catalyst/optimizer/JoinOptimizationSuite.scala | 2 +- .../org/apache/spark/sql/DataFrameWriter.scala | 7 ++++--- .../execution/command/AnalyzeColumnCommand.scala | 4 +--- .../execution/command/AnalyzeTableCommand.scala | 4 +--- .../spark/sql/hive/execution/SQLQuerySuite.scala | 4 ++-- 11 files changed, 24 insertions(+), 33 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c477cb48d0b07..e36f0badd0a58 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -604,7 +604,7 @@ class Analyzer( def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) if child.resolved => - i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u))) + i.copy(table = lookupTableFromCatalog(u).canonicalized) case u: UnresolvedRelation => resolveRelation(u) } @@ -2338,7 +2338,8 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child, _) => child + case SubqueryAlias(_, child, _) => + child } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index c062e4e84bcdd..71d0c5cc5c0ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -377,7 +377,7 @@ package object dsl { RepartitionByExpression(exprs, logicalPlan, numPartitions = n) def analyze: LogicalPlan = - EliminateSubqueryAliases(analysis.SimpleAnalyzer.execute(logicalPlan)) + analysis.SimpleAnalyzer.execute(logicalPlan).canonicalized } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 036da3ad2062f..3be2b30d0cd56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -48,8 +48,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) def batches: Seq[Batch] = { // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). - // However, because we also use the analyzer to canonicalized queries (for view definition), - // we do not eliminate subqueries or compute current time in the analyzer. + // However, because we always do eager analysis in Dataset, we do not eliminate subqueries + // or compute current time in the analyzer. Batch("Finish Analysis", Once, EliminateSubqueryAliases, EliminateView, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index 5bd1bc80c3b8a..74d9e9e0d2fa8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -246,7 +246,7 @@ class ColumnPruningSuite extends PlanTest { x.select('a) .sortBy(SortOrder('a, Ascending)).analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) // push down invalid val originalQuery1 = { @@ -261,7 +261,7 @@ class ColumnPruningSuite extends PlanTest { .sortBy(SortOrder('a, Ascending)) .select('b).analyze - comparePlans(optimized1, analysis.EliminateSubqueryAliases(correctAnswer1)) + comparePlans(optimized1, correctAnswer1) } test("Column pruning on Window with useless aggregate functions") { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala index a8aeedbd62759..400701d99871a 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateSubqueryAliasesSuite.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.dsl.expressions._ -import org.apache.spark.sql.catalyst.dsl.plans._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral import org.apache.spark.sql.catalyst.plans.PlanTest @@ -34,12 +33,6 @@ class EliminateSubqueryAliasesSuite extends PlanTest with PredicateHelper { val batches = Batch("EliminateSubqueryAliases", Once, EliminateSubqueryAliases) :: Nil } - private def assertEquivalent(e1: Expression, e2: Expression): Unit = { - val correctAnswer = Project(Alias(e2, "out")() :: Nil, OneRowRelation).analyze - val actual = Optimize.execute(Project(Alias(e1, "out")() :: Nil, OneRowRelation).analyze) - comparePlans(actual, correctAnswer) - } - private def afterOptimization(plan: LogicalPlan): LogicalPlan = { Optimize.execute(analysis.SimpleAnalyzer.execute(plan)) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 6feea4060f46a..a20a61c393a37 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -449,7 +449,7 @@ class FilterPushdownSuite extends PlanTest { } val optimized = Optimize.execute(originalQuery.analyze) - comparePlans(analysis.EliminateSubqueryAliases(originalQuery.analyze), optimized) + comparePlans(originalQuery.analyze, optimized) } test("joins: conjunctive predicates") { @@ -468,7 +468,7 @@ class FilterPushdownSuite extends PlanTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) } test("joins: conjunctive predicates #2") { @@ -487,7 +487,7 @@ class FilterPushdownSuite extends PlanTest { left.join(right, condition = Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) } test("joins: conjunctive predicates #3") { @@ -511,7 +511,7 @@ class FilterPushdownSuite extends PlanTest { condition = Some("z.a".attr === "x.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) } test("joins: push down where clause into left anti join") { @@ -526,7 +526,7 @@ class FilterPushdownSuite extends PlanTest { x.where("x.a".attr > 10) .join(y, LeftAnti, Some("x.b".attr === "y.b".attr)) .analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) } test("joins: only push down join conditions to the right of a left anti join") { @@ -543,7 +543,7 @@ class FilterPushdownSuite extends PlanTest { LeftAnti, Some("x.b".attr === "y.b".attr && "x.a".attr > 10)) .analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) } test("joins: only push down join conditions to the right of an existence join") { @@ -561,7 +561,7 @@ class FilterPushdownSuite extends PlanTest { ExistenceJoin(fillerVal), Some("x.a".attr > 1)) .analyze - comparePlans(optimized, analysis.EliminateSubqueryAliases(correctAnswer)) + comparePlans(optimized, correctAnswer) } val testRelationWithArrayType = LocalRelation('a.int, 'b.int, 'c_arr.array(IntegerType)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index 65dd6225cea07..b949ad8a09cfe 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -119,7 +119,7 @@ class JoinOptimizationSuite extends PlanTest { queryAnswers foreach { queryAnswerPair => val optimized = Optimize.execute(queryAnswerPair._1.analyze) - comparePlans(optimized, analysis.EliminateSubqueryAliases(queryAnswerPair._2.analyze)) + comparePlans(optimized, queryAnswerPair._2.analyze) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 393925161fc7b..f31ec8da06456 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.annotation.InterfaceStability import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} +import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.DDLUtils @@ -353,8 +353,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { relation.catalogTable.identifier } - val tableRelation = df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed - EliminateSubqueryAliases(tableRelation) match { + val tableRelation = + df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed.canonicalized + tableRelation match { // check if the table is a data source table (the relation is a BaseRelation). case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) => throw new AnalysisException( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala index d024a3673d4ba..7ceb5b46c392d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command import org.apache.spark.internal.Logging import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -40,8 +39,7 @@ case class AnalyzeColumnCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = - EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) + val relation = sparkSession.table(tableIdentWithDB).queryExecution.analyzed.canonicalized // Compute total size val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala index 30b6cc7617cb3..f64f0a488a03e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala @@ -24,7 +24,6 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession} import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable} import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.internal.SessionState @@ -41,8 +40,7 @@ case class AnalyzeTableCommand( val sessionState = sparkSession.sessionState val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase) val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db)) - val relation = - EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed) + val relation = sparkSession.table(tableIdentWithDB).queryExecution.analyzed.canonicalized relation match { case relation: CatalogRelation => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala index faed8b504649f..576453def24f8 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.TestUtils import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, FunctionRegistry, NoSuchPartitionException} +import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog.CatalogTableType import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} @@ -518,7 +518,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton { withSQLConf( HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false", HiveUtils.CONVERT_METASTORE_ORC.key -> "false") { - relation = EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed) + relation = spark.table(tableName).queryExecution.analyzed.canonicalized } val catalogTable = sessionState.catalog.getTableMetadata(TableIdentifier(tableName)) From 3cde705c6baa1e4a869149f3ca289a5c1e3a3000 Mon Sep 17 00:00:00 2001 From: Xiao Li Date: Thu, 23 Feb 2017 21:23:05 -0800 Subject: [PATCH 2/2] clean --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index e36f0badd0a58..213cc0ab0caba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2338,8 +2338,7 @@ class Analyzer( */ object EliminateSubqueryAliases extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { - case SubqueryAlias(_, child, _) => - child + case SubqueryAlias(_, child, _) => child } }