From 63c50b8066a77506c6751710d5b5b5edb77ca933 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 10 Dec 2016 08:02:36 +0000 Subject: [PATCH] Add optimizer rule to reorder Filter predicates. --- .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../sql/catalyst/optimizer/expressions.scala | 28 +++++ .../ReorderPredicatesInFilterSuite.scala | 104 ++++++++++++++++++ 3 files changed, 133 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderPredicatesInFilterSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 75d9997582aa6..5c36d2e28d063 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -103,6 +103,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) SimplifyConditionals, RemoveDispensableExpressions, SimplifyBinaryComparison, + ReorderPredicatesInFilter, PruneFilters, EliminateSorts, SimplifyCasts, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 6958398e03f70..2de44b96a1e3f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -513,6 +513,34 @@ case class OptimizeCodegen(conf: CatalystConf) extends Rule[LogicalPlan] { } +/** + * Reorders the predicates in `Filter` so more expensive expressions like UDF can evaluate later. + */ +object ReorderPredicatesInFilter extends Rule[LogicalPlan] with PredicateHelper { + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Filter(pred, child) => + // Extracts deterministic suffix expressions from Filter predicate. + val expressions = splitConjunctivePredicates(pred) + // The beginning index of the deterministic suffix expressions. + var splitIndex = -1 + (expressions.length - 1 to 0 by -1).foreach { idx => + if (splitIndex == -1 && !expressions(idx).deterministic) { + splitIndex = idx + 1 + } + } + if (splitIndex == expressions.length) { + // All expressions are non-deterministic, no reordering. + f + } else { + val (nonDeterminstics, deterministicExprs) = expressions.splitAt(splitIndex) + val (udfs, condExprs) = deterministicExprs.partition { e => + e.find(_.isInstanceOf[ScalaUDF]).isDefined + } + Filter((nonDeterminstics ++ condExprs ++ udfs).reduce(And), child) + } + } +} + /** * Removes [[Cast Casts]] that are unnecessary because the input is already the correct type. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderPredicatesInFilterSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderPredicatesInFilterSuite.scala new file mode 100644 index 0000000000000..849b69b803c73 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReorderPredicatesInFilterSuite.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types.IntegerType + +class ReorderPredicatesInFilterSuite extends PlanTest with PredicateHelper { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = + Batch("ReorderPredicatesInFilter", Once, + ReorderPredicatesInFilter) :: Nil + } + + val testRelation = LocalRelation('a.int, 'b.int, 'c.int) + + test("Reorder Filter deterministic predicates") { + val udf = ScalaUDF((x: Int, y: Int) => 1, IntegerType, Seq('a, 'b)) + + val originalQuery1 = + testRelation + .select('a, 'b).where(udf > 2 && 'a > 1 && 'b > 2) + val optimized1 = Optimize.execute(originalQuery1.analyze) + val correctAnswer1 = + testRelation + .select('a, 'b).where('a > 1 && 'b > 2 && udf > 2).analyze + comparePlans(optimized1, correctAnswer1) + + val originalQuery2 = + testRelation + .select('a, 'b, 'c).where('c > 5 && udf > 2 && 'a > 1 && 'b > 2) + val optimized2 = Optimize.execute(originalQuery2.analyze) + val correctAnswer2 = + testRelation + .select('a, 'b, 'c).where('c > 5 && 'a > 1 && 'b > 2 && udf > 2).analyze + comparePlans(optimized2, correctAnswer2) + + val originalQuery3 = + testRelation + .select('a, 'b, 'c).where(('c > 5 || udf > 2) && 'a > 1 && 'b > 2) + val optimized3 = Optimize.execute(originalQuery3.analyze) + val correctAnswer3 = + testRelation + .select('a, 'b, 'c).where('a > 1 && 'b > 2 && ('c > 5 || udf > 2)).analyze + comparePlans(optimized3, correctAnswer3) + } + + test("Reorder Filter non deterministic predicates") { + val udf = ScalaUDF((x: Int, y: Int) => 1, IntegerType, Seq('a, 'b)) + // The UDF is before non-deterministic expression, we can't reorder it. + val originalQuery1 = + testRelation + .select('a, 'b, 'c).where('c > 5 && udf > 2 && Rand(0) > 0.1 && 'a > 1 && 'b > 2) + val optimized1 = Optimize.execute(originalQuery1.analyze) + val correctAnswer1 = originalQuery1.analyze + comparePlans(optimized1, correctAnswer1) + + val originalQuery2 = + testRelation + .select('a, 'b).where((Rand(0) > 0.1 && udf > 2) && 'a > 1 && 'b > 2) + val optimized2 = Optimize.execute(originalQuery2.analyze) + val correctAnswer2 = + testRelation + .select('a, 'b).where(Rand(0) > 0.1 && 'a > 1 && 'b > 2 && udf > 2).analyze + comparePlans(optimized2, correctAnswer2) + + // The UDF is in a disjunctive with a non-deterministic expression, we can't reorder it. + val originalQuery3 = + testRelation + .select('a, 'b).where((Rand(0) > 0.1 || udf > 2) && 'a > 1 && 'b > 2) + val optimized3 = Optimize.execute(originalQuery3.analyze) + val correctAnswer3 = originalQuery3.analyze + comparePlans(optimized3, correctAnswer3) + + // The UDF is put among non-deterministic expressions, we can't reorder it. + val originalQuery4 = + testRelation + .select('a, 'b).where(Rand(0) > 0.1 && udf > 2 && Rand(1) > 0.2 && 'a > 1 && 'b > 2) + val optimized4 = Optimize.execute(originalQuery4.analyze) + val correctAnswer4 = originalQuery4.analyze + comparePlans(optimized4, correctAnswer4) + } +}