From bb992c2058863322a9183b2985806a87729e4168 Mon Sep 17 00:00:00 2001 From: Henry Robinson Date: Wed, 11 Apr 2018 20:44:36 -0700 Subject: [PATCH] [SPARK-23957][SQL] Remove redundant sort operators from subqueries ## What changes were proposed in this pull request? Subqueries (at least in SQL) have 'bag of tuples' semantics. Ordering them is therefore redundant (unless combined with a limit). This patch adds a new optimizer rule that removes sort operators that are directly below subqueries (or some combination of projection and filtering below a subquery). ## How was this patch tested? New unit tests. All sql unit tests pass. --- .../sql/catalyst/optimizer/Optimizer.scala | 30 ++++++- .../optimizer/RemoveSubquerySorts.scala | 78 +++++++++++++++++++ 2 files changed, 107 insertions(+), 1 deletion(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveSubquerySorts.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9a1bbc675e397..e023d4a02d2a8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -103,9 +103,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) (Batch("Eliminate Distinct", Once, EliminateDistinct) :: // Technically some of the rules in Finish Analysis are not optimizer rules and belong more // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime). - // However, because we also use the analyzer to canonicalized queries (for view definition), + // However, because we also use the analyzer to canonicalize queries (for view definition), // we do not eliminate subqueries or compute current time in the analyzer. Batch("Finish Analysis", Once, + // Must come before EliminateSubqueryAliases. + RemoveSubquerySorts, EliminateSubqueryAliases, EliminateView, ReplaceExpressions, @@ -307,6 +309,32 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { } } +/** + * Remove [[Sort]] in subqueries that do not affect the set of rows produced, only their + * order. Subqueries produce unordered sets of rows so sorting their output is unnecessary. + */ +object RemoveSubquerySorts extends Rule[LogicalPlan] { + + /** + * Removes all [[Sort]] operators from a plan that are accessible from the root operator via + * 0 or more [[Project]], [[Filter]] or [[View]] operators. + */ + private def removeTopLevelSorts(plan: LogicalPlan): LogicalPlan = { + plan match { + case Sort(_, _, child) => removeTopLevelSorts(child) + case Project(fields, child) => Project(fields, removeTopLevelSorts(child)) + case Filter(condition, child) => Filter(condition, removeTopLevelSorts(child)) + case View(tbl, output, child) => View(tbl, output, removeTopLevelSorts(child)) + case _ => plan + } + } + + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case Subquery(child) => Subquery(removeTopLevelSorts(child)) + case SubqueryAlias(name, child) => SubqueryAlias(name, removeTopLevelSorts(child)) + } +} + /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveSubquerySorts.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveSubquerySorts.scala new file mode 100644 index 0000000000000..560a3d34dffcf --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveSubquerySorts.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, SimpleAnalyzer} +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules._ + +class RemoveSubquerySortsSuite extends PlanTest { + + private object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("Subqueries", Once, + RemoveSubquerySorts, + EliminateSubqueryAliases) :: Nil + } + + private val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + private def analyzeAndCompare(plan: LogicalPlan, correct: LogicalPlan) { + // We can't use the implicit analyze method, that tests usually use, for 'plan' + // because it explicitly calls EliminateSubqueryAliases. + comparePlans(Optimize.execute(SimpleAnalyzer.execute(plan)), correct.analyze) + } + + test("Remove top-level sort") { + val query = testRelation.orderBy('a.asc).subquery('x) + analyzeAndCompare(query, testRelation) + } + + test("Remove sort behind filter and project") { + val query = testRelation.orderBy('a.asc).where('a.attr > 10).select('b).subquery('x) + analyzeAndCompare(query, testRelation.where('a.attr > 10).select('b)) + } + + test("Remove sort below subquery that is not at root") { + val query = testRelation.orderBy('a.asc).subquery('x).groupBy('a)(sum('b)) + analyzeAndCompare(query, testRelation.groupBy('a)(sum('b))) + } + + test("Sorts with limits must not be removed from subqueries") { + val query = testRelation.orderBy('a.asc).limit(10).subquery('x) + analyzeAndCompare(query, testRelation.orderBy('a.asc).limit(10)) + } + + test("Remove more than one sort") { + val query = testRelation.orderBy('a.asc).orderBy('b.desc).subquery('x) + analyzeAndCompare(query, testRelation) + } + + test("Nested subqueries") { + val query = testRelation.orderBy('a.asc).subquery('x).orderBy('b.desc).subquery('y) + analyzeAndCompare(query, testRelation) + } + + test("Sorts below non-project / filter operators don't get removed") { + val query = testRelation.orderBy('a.asc).groupBy('a)(sum('b)).subquery('x) + analyzeAndCompare(query, testRelation.orderBy('a.asc).groupBy('a)(sum('b))) + } +}