From 0a48ba3a75519bf40eaf3c2c3e039e6f4264b323 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 4 Jul 2017 19:47:49 -0700 Subject: [PATCH 1/3] fix. --- .../optimizer/CostBasedJoinReorder.scala | 7 ++-- .../sql/catalyst/optimizer/Optimizer.scala | 29 ++++++++------- .../optimizer/StarSchemaDetection.scala | 5 ++- .../sql/catalyst/optimizer/expressions.scala | 14 ++++---- .../spark/sql/catalyst/optimizer/joins.scala | 6 ++-- .../BinaryComparisonSimplificationSuite.scala | 2 +- .../BooleanSimplificationSuite.scala | 2 +- .../optimizer/CombiningLimitsSuite.scala | 2 +- .../optimizer/ConstantFoldingSuite.scala | 2 +- .../optimizer/DecimalAggregatesSuite.scala | 2 +- .../optimizer/EliminateMapObjectsSuite.scala | 2 +- .../optimizer/JoinOptimizationSuite.scala | 2 +- .../catalyst/optimizer/JoinReorderSuite.scala | 27 +++++++++++--- .../optimizer/LimitPushdownSuite.scala | 2 +- .../optimizer/OptimizeCodegenSuite.scala | 2 +- .../catalyst/optimizer/OptimizeInSuite.scala | 24 +++++++------ .../StarJoinCostBasedReorderSuite.scala | 36 ++++++++++++++----- .../optimizer/StarJoinReorderSuite.scala | 25 ++++++++++--- .../optimizer/complexTypesSuite.scala | 2 +- .../spark/sql/catalyst/plans/PlanTest.scala | 4 +-- 20 files changed, 129 insertions(+), 68 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index 3a7543e2141e9..f37dbe139731e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -32,7 +32,10 @@ import org.apache.spark.sql.internal.SQLConf * We may have several join reorder algorithms in the future. This class is the entry of these * algorithms, and chooses which one to use. */ -case class CostBasedJoinReorder(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { +object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper { + + val conf = SQLConf.get + def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { plan @@ -379,7 +382,7 @@ object JoinReorderDPFilters extends PredicateHelper { if (conf.joinReorderDPStarFilter) { // Compute the tables in a star-schema relationship. - val starJoin = StarSchemaDetection(conf).findStarJoins(items, conditions.toSeq) + val starJoin = StarSchemaDetection.findStarJoins(items, conditions.toSeq) val nonStarJoin = items.filterNot(starJoin.contains(_)) if (starJoin.nonEmpty && nonStarJoin.nonEmpty) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 946fa7bae0199..7eef65cb572f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -77,11 +77,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) Batch("Operator Optimizations", fixedPoint, Seq( // Operator push down PushProjectionThroughUnion, - ReorderJoin(conf), + ReorderJoin, EliminateOuterJoin, PushPredicateThroughJoin, PushDownPredicate, - LimitPushDown(conf), + LimitPushDown, ColumnPruning, InferFiltersFromConstraints, // Operator combine @@ -92,10 +92,10 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) CombineLimits, CombineUnions, // Constant folding and strength reduction - NullPropagation(conf), + NullPropagation, ConstantPropagation, FoldablePropagation, - OptimizeIn(conf), + OptimizeIn, ConstantFolding, ReorderAssociativeOperator, LikeSimplification, @@ -117,11 +117,11 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) CombineConcats) ++ extendedOperatorOptimizationRules: _*) :: Batch("Check Cartesian Products", Once, - CheckCartesianProducts(conf)) :: + CheckCartesianProducts) :: Batch("Join Reorder", Once, - CostBasedJoinReorder(conf)) :: + CostBasedJoinReorder) :: Batch("Decimal Optimizations", fixedPoint, - DecimalAggregates(conf)) :: + DecimalAggregates) :: Batch("Object Expressions Optimization", fixedPoint, EliminateMapObjects, CombineTypedFilters) :: @@ -129,7 +129,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) ConvertToLocalRelation, PropagateEmptyRelation) :: Batch("OptimizeCodegen", Once, - OptimizeCodegen(conf)) :: + OptimizeCodegen) :: Batch("RewriteSubquery", Once, RewritePredicateSubquery, CollapseProject) :: Nil @@ -288,7 +288,7 @@ object RemoveRedundantProject extends Rule[LogicalPlan] { /** * Pushes down [[LocalLimit]] beneath UNION ALL and beneath the streamed inputs of outer joins. */ -case class LimitPushDown(conf: SQLConf) extends Rule[LogicalPlan] { +object LimitPushDown extends Rule[LogicalPlan] { private def stripGlobalLimitIfPresent(plan: LogicalPlan): LogicalPlan = { plan match { @@ -1077,8 +1077,7 @@ object CombineLimits extends Rule[LogicalPlan] { * the join between R and S is not a cartesian product and therefore should be allowed. * The predicate R.r = S.s is not recognized as a join condition until the ReorderJoin rule. */ -case class CheckCartesianProducts(conf: SQLConf) - extends Rule[LogicalPlan] with PredicateHelper { +object CheckCartesianProducts extends Rule[LogicalPlan] with PredicateHelper { /** * Check if a join is a cartesian product. Returns true if * there are no join conditions involving references from both left and right. @@ -1090,7 +1089,7 @@ case class CheckCartesianProducts(conf: SQLConf) } def apply(plan: LogicalPlan): LogicalPlan = - if (conf.crossJoinEnabled) { + if (SQLConf.get.crossJoinEnabled) { plan } else plan transform { case j @ Join(left, right, Inner | LeftOuter | RightOuter | FullOuter, condition) @@ -1112,7 +1111,7 @@ case class CheckCartesianProducts(conf: SQLConf) * This uses the same rules for increasing the precision and scale of the output as * [[org.apache.spark.sql.catalyst.analysis.DecimalPrecision]]. */ -case class DecimalAggregates(conf: SQLConf) extends Rule[LogicalPlan] { +object DecimalAggregates extends Rule[LogicalPlan] { import Decimal.MAX_LONG_DIGITS /** Maximum number of decimal digits representable precisely in a Double */ @@ -1130,7 +1129,7 @@ case class DecimalAggregates(conf: SQLConf) extends Rule[LogicalPlan] { we.copy(windowFunction = ae.copy(aggregateFunction = Average(UnscaledValue(e)))) Cast( Divide(newAggExpr, Literal.create(math.pow(10.0, scale), DoubleType)), - DecimalType(prec + 4, scale + 4), Option(conf.sessionLocalTimeZone)) + DecimalType(prec + 4, scale + 4), Option(SQLConf.get.sessionLocalTimeZone)) case _ => we } @@ -1142,7 +1141,7 @@ case class DecimalAggregates(conf: SQLConf) extends Rule[LogicalPlan] { val newAggExpr = ae.copy(aggregateFunction = Average(UnscaledValue(e))) Cast( Divide(newAggExpr, Literal.create(math.pow(10.0, scale), DoubleType)), - DecimalType(prec + 4, scale + 4), Option(conf.sessionLocalTimeZone)) + DecimalType(prec + 4, scale + 4), Option(SQLConf.get.sessionLocalTimeZone)) case _ => ae } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index ca729127e7d1d..d7fcf275facf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -28,7 +28,10 @@ import org.apache.spark.sql.internal.SQLConf /** * Encapsulates star-schema detection logic. */ -case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper { +object StarSchemaDetection extends PredicateHelper { + + val conf = SQLConf.get + /** * Star schema consists of one or more fact tables referencing a number of dimension diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala index 66b8ca62e5e4c..6c83f4790004f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala @@ -173,12 +173,12 @@ object ReorderAssociativeOperator extends Rule[LogicalPlan] { * 2. Replaces [[In (value, seq[Literal])]] with optimized version * [[InSet (value, HashSet[Literal])]] which is much faster. */ -case class OptimizeIn(conf: SQLConf) extends Rule[LogicalPlan] { +object OptimizeIn extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsDown { case expr @ In(v, list) if expr.inSetConvertible => val newList = ExpressionSet(list).toSeq - if (newList.size > conf.optimizerInSetConversionThreshold) { + if (newList.size > SQLConf.get.optimizerInSetConversionThreshold) { val hSet = newList.map(e => e.eval(EmptyRow)) InSet(v, HashSet() ++ hSet) } else if (newList.size < list.size) { @@ -414,7 +414,7 @@ object LikeSimplification extends Rule[LogicalPlan] { * equivalent [[Literal]] values. This rule is more specific with * Null value propagation from bottom to top of the expression tree. */ -case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { +object NullPropagation extends Rule[LogicalPlan] { private def isNullLiteral(e: Expression): Boolean = e match { case Literal(null, _) => true case _ => false @@ -423,9 +423,9 @@ case class NullPropagation(conf: SQLConf) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transform { case q: LogicalPlan => q transformExpressionsUp { case e @ WindowExpression(Cast(Literal(0L, _), _, _), _) => - Cast(Literal(0L), e.dataType, Option(conf.sessionLocalTimeZone)) + Cast(Literal(0L), e.dataType, Option(SQLConf.get.sessionLocalTimeZone)) case e @ AggregateExpression(Count(exprs), _, _, _) if exprs.forall(isNullLiteral) => - Cast(Literal(0L), e.dataType, Option(conf.sessionLocalTimeZone)) + Cast(Literal(0L), e.dataType, Option(SQLConf.get.sessionLocalTimeZone)) case ae @ AggregateExpression(Count(exprs), _, false, _) if !exprs.exists(_.nullable) => // This rule should be only triggered when isDistinct field is false. ae.copy(aggregateFunction = Count(Literal(1))) @@ -552,14 +552,14 @@ object FoldablePropagation extends Rule[LogicalPlan] { /** * Optimizes expressions by replacing according to CodeGen configuration. */ -case class OptimizeCodegen(conf: SQLConf) extends Rule[LogicalPlan] { +object OptimizeCodegen extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case e: CaseWhen if canCodegen(e) => e.toCodegen() } private def canCodegen(e: CaseWhen): Boolean = { val numBranches = e.branches.size + e.elseValue.size - numBranches <= conf.maxCaseBranchesForCodegen + numBranches <= SQLConf.get.maxCaseBranchesForCodegen } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala index bb97e2c808b9f..edbeaf273fd6f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.internal.SQLConf * * If star schema detection is enabled, reorder the star join plans based on heuristics. */ -case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHelper { +object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper { /** * Join a list of plans together and push down the conditions into them. * @@ -87,8 +87,8 @@ case class ReorderJoin(conf: SQLConf) extends Rule[LogicalPlan] with PredicateHe def apply(plan: LogicalPlan): LogicalPlan = plan transform { case ExtractFiltersAndInnerJoins(input, conditions) if input.size > 2 && conditions.nonEmpty => - if (conf.starSchemaDetection && !conf.cboEnabled) { - val starJoinPlan = StarSchemaDetection(conf).reorderStarJoins(input, conditions) + if (SQLConf.get.starSchemaDetection && !SQLConf.get.cboEnabled) { + val starJoinPlan = StarSchemaDetection.reorderStarJoins(input, conditions) if (starJoinPlan.nonEmpty) { val rest = input.filterNot(starJoinPlan.contains(_)) createOrderedJoin(starJoinPlan ++ rest, conditions) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala index 2a04bd588dc1d..a313681eeb8f0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BinaryComparisonSimplificationSuite.scala @@ -33,7 +33,7 @@ class BinaryComparisonSimplificationSuite extends PlanTest with PredicateHelper Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Constant Folding", FixedPoint(50), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, SimplifyBinaryComparison, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala index c6345b60b744b..56399f4831a6f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/BooleanSimplificationSuite.scala @@ -35,7 +35,7 @@ class BooleanSimplificationSuite extends PlanTest with PredicateHelper { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("Constant Folding", FixedPoint(50), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, PruneFilters) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala index ac71887c16f96..87ad81db11b64 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CombiningLimitsSuite.scala @@ -32,7 +32,7 @@ class CombiningLimitsSuite extends PlanTest { Batch("Combine Limit", FixedPoint(10), CombineLimits) :: Batch("Constant Folding", FixedPoint(10), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, SimplifyConditionals) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala index 25c592b9c1dde..641c89873dcc4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala @@ -33,7 +33,7 @@ class ConstantFoldingSuite extends PlanTest { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("ConstantFolding", Once, - OptimizeIn(conf), + OptimizeIn, ConstantFolding, BooleanSimplification) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala index cc4fb3a244a98..711294ed61928 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecimalAggregatesSuite.scala @@ -29,7 +29,7 @@ class DecimalAggregatesSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Decimal Optimizations", FixedPoint(100), - DecimalAggregates(conf)) :: Nil + DecimalAggregates) :: Nil } val testRelation = LocalRelation('a.decimal(2, 1), 'b.decimal(12, 1)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala index d4f37e2a5e877..157472c2293f9 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/EliminateMapObjectsSuite.scala @@ -31,7 +31,7 @@ class EliminateMapObjectsSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { val batches = { Batch("EliminateMapObjects", FixedPoint(50), - NullPropagation(conf), + NullPropagation, SimplifyCasts, EliminateMapObjects) :: Nil } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala index a6584aa5fbba7..2f30a78f03211 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala @@ -37,7 +37,7 @@ class JoinOptimizationSuite extends PlanTest { CombineFilters, PushDownPredicate, BooleanSimplification, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala index 71db4e2e0ec4d..2fb587d50a4cb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala @@ -24,25 +24,42 @@ import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.{CBO_ENABLED, JOIN_REORDER_ENABLED} class JoinReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy(CBO_ENABLED -> true, JOIN_REORDER_ENABLED -> true) - object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Operator Optimizations", FixedPoint(100), CombineFilters, PushDownPredicate, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: Batch("Join Reorder", Once, - CostBasedJoinReorder(conf)) :: Nil + CostBasedJoinReorder) :: Nil + } + + var originalConfCBOEnabled = false + var originalConfJoinReorderEnabled = false + + override def beforeAll(): Unit = { + super.beforeAll() + originalConfCBOEnabled = conf.cboEnabled + originalConfJoinReorderEnabled = conf.joinReorderEnabled + conf.setConf(CBO_ENABLED, true) + conf.setConf(JOIN_REORDER_ENABLED, true) + } + + override def afterAll(): Unit = { + try { + conf.setConf(CBO_ENABLED, originalConfCBOEnabled) + conf.setConf(JOIN_REORDER_ENABLED, originalConfJoinReorderEnabled) + } finally { + super.afterAll() + } } /** Set up tables and columns for testing */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala index d8302dfc9462d..f50e2e86516f0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/LimitPushdownSuite.scala @@ -32,7 +32,7 @@ class LimitPushdownSuite extends PlanTest { Batch("Subqueries", Once, EliminateSubqueryAliases) :: Batch("Limit pushdown", FixedPoint(100), - LimitPushDown(conf), + LimitPushDown, CombineLimits, ConstantFolding, BooleanSimplification) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala index 9dc6738ba04b3..b71067c0af3a1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeCodegenSuite.scala @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.rules._ class OptimizeCodegenSuite extends PlanTest { object Optimize extends RuleExecutor[LogicalPlan] { - val batches = Batch("OptimizeCodegen", Once, OptimizeCodegen(conf)) :: Nil + val batches = Batch("OptimizeCodegen", Once, OptimizeCodegen) :: Nil } protected def assertEquivalent(e1: Expression, e2: Expression): Unit = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala index d8937321ecb98..6a77580b29a21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeInSuite.scala @@ -34,10 +34,10 @@ class OptimizeInSuite extends PlanTest { Batch("AnalysisNodes", Once, EliminateSubqueryAliases) :: Batch("ConstantFolding", FixedPoint(10), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, - OptimizeIn(conf)) :: Nil + OptimizeIn) :: Nil } val testRelation = LocalRelation('a.int, 'b.int, 'c.int) @@ -159,16 +159,20 @@ class OptimizeInSuite extends PlanTest { .where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3)))) .analyze - val notOptimizedPlan = OptimizeIn(conf)(plan) - comparePlans(notOptimizedPlan, plan) + withSQLConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "10") { + val notOptimizedPlan = OptimizeIn(plan) + comparePlans(notOptimizedPlan, plan) + } // Reduce the threshold to turning into InSet. - val optimizedPlan = OptimizeIn(conf.copy(OPTIMIZER_INSET_CONVERSION_THRESHOLD -> 2))(plan) - optimizedPlan match { - case Filter(cond, _) - if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 => - // pass - case _ => fail("Unexpected result for OptimizedIn") + withSQLConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD.key -> "2") { + val optimizedPlan = OptimizeIn(plan) + optimizedPlan match { + case Filter(cond, _) + if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 => + // pass + case _ => fail("Unexpected result for OptimizedIn") + } } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala index a23d6266b2840..ada6e2a43ea0f 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala @@ -24,28 +24,46 @@ import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} -import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf._ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy( - CBO_ENABLED -> true, - JOIN_REORDER_ENABLED -> true, - JOIN_REORDER_DP_STAR_FILTER -> true) - object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Operator Optimizations", FixedPoint(100), CombineFilters, PushDownPredicate, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: - Batch("Join Reorder", Once, - CostBasedJoinReorder(conf)) :: Nil + Batch("Join Reorder", Once, + CostBasedJoinReorder) :: Nil + } + + var originalConfCBOEnabled = false + var originalConfJoinReorderEnabled = false + var originalConfJoinReorderDPStarFilter = false + + override def beforeAll(): Unit = { + super.beforeAll() + originalConfCBOEnabled = conf.cboEnabled + originalConfJoinReorderEnabled = conf.joinReorderEnabled + originalConfJoinReorderDPStarFilter = conf.joinReorderDPStarFilter + conf.setConf(CBO_ENABLED, true) + conf.setConf(JOIN_REORDER_ENABLED, true) + conf.setConf(JOIN_REORDER_DP_STAR_FILTER, true) + } + + override def afterAll(): Unit = { + try { + conf.setConf(CBO_ENABLED, originalConfCBOEnabled) + conf.setConf(JOIN_REORDER_ENABLED, originalConfJoinReorderEnabled) + conf.setConf(JOIN_REORDER_DP_STAR_FILTER, originalConfJoinReorderDPStarFilter) + } finally { + super.afterAll() + } } private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala index 605c01b7220d1..777c5637201ed 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala @@ -24,19 +24,36 @@ import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest} import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan} -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.internal.SQLConf.{CASE_SENSITIVE, STARSCHEMA_DETECTION} +import org.apache.spark.sql.internal.SQLConf._ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase { - override val conf = new SQLConf().copy(CASE_SENSITIVE -> true, STARSCHEMA_DETECTION -> true) + var originalConfStarSchemaDetection = false + var originalConfCBOEnabled = true + + override def beforeAll(): Unit = { + super.beforeAll() + originalConfStarSchemaDetection = conf.starSchemaDetection + originalConfCBOEnabled = conf.cboEnabled + conf.setConf(STARSCHEMA_DETECTION, true) + conf.setConf(CBO_ENABLED, false) + } + + override def afterAll(): Unit = { + try { + conf.setConf(STARSCHEMA_DETECTION, originalConfStarSchemaDetection) + conf.setConf(CBO_ENABLED, originalConfCBOEnabled) + } finally { + super.afterAll() + } + } object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Operator Optimizations", FixedPoint(100), CombineFilters, PushDownPredicate, - ReorderJoin(conf), + ReorderJoin, PushPredicateThroughJoin, ColumnPruning, CollapseProject) :: Nil diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala index 0a18858350e1f..3634accf1ec21 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/complexTypesSuite.scala @@ -37,7 +37,7 @@ class ComplexTypesSuite extends PlanTest{ Batch("collapse projections", FixedPoint(10), CollapseProject) :: Batch("Constant Folding", FixedPoint(10), - NullPropagation(conf), + NullPropagation, ConstantFolding, BooleanSimplification, SimplifyConditionals, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index e9679d3361509..6fdb721e628b7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -31,8 +31,8 @@ import org.apache.spark.sql.internal.SQLConf */ trait PlanTest extends SparkFunSuite with PredicateHelper { - // TODO(gatorsmile): remove this from PlanTest and all the analyzer/optimizer rules - protected val conf = new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true) + // TODO(gatorsmile): remove this from PlanTest and all the analyzer rules + protected val conf = SQLConf.get /** * Since attribute references are given globally unique ids during analysis, From 092b41449a3fb12e32539f5733be49b2bdb9049f Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Tue, 4 Jul 2017 22:24:59 -0700 Subject: [PATCH 2/3] fix. --- .../spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala | 2 +- .../spark/sql/catalyst/optimizer/StarSchemaDetection.scala | 3 +-- .../scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala | 2 +- 3 files changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala index f37dbe139731e..db7baf6e9bc7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.internal.SQLConf */ object CostBasedJoinReorder extends Rule[LogicalPlan] with PredicateHelper { - val conf = SQLConf.get + private def conf = SQLConf.get def apply(plan: LogicalPlan): LogicalPlan = { if (!conf.cboEnabled || !conf.joinReorderEnabled) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala index d7fcf275facf9..1f20b7661489e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala @@ -30,8 +30,7 @@ import org.apache.spark.sql.internal.SQLConf */ object StarSchemaDetection extends PredicateHelper { - val conf = SQLConf.get - + private def conf = SQLConf.get /** * Star schema consists of one or more fact tables referencing a number of dimension diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala index 6fdb721e628b7..5389bf3389da4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/PlanTest.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.internal.SQLConf trait PlanTest extends SparkFunSuite with PredicateHelper { // TODO(gatorsmile): remove this from PlanTest and all the analyzer rules - protected val conf = SQLConf.get + protected def conf = SQLConf.get /** * Since attribute references are given globally unique ids during analysis, From f92b6f510fae05ba821973b1d3de1057b5b95527 Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 5 Jul 2017 08:19:05 -0700 Subject: [PATCH 3/3] fix. --- .../apache/spark/sql/catalyst/optimizer/Optimizer.scala | 7 +++---- .../spark/sql/execution/OptimizeMetadataOnlyQuery.scala | 8 +++----- .../org/apache/spark/sql/execution/SparkOptimizer.scala | 6 ++---- .../spark/sql/internal/BaseSessionStateBuilder.scala | 2 +- 4 files changed, 9 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7eef65cb572f8..d82af94dbffb7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -34,10 +34,10 @@ import org.apache.spark.sql.types._ * Abstract class all optimizers should inherit of, contains the standard batches (extending * Optimizers can override this. */ -abstract class Optimizer(sessionCatalog: SessionCatalog, conf: SQLConf) +abstract class Optimizer(sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] { - protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations) + protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) def batches: Seq[Batch] = { Batch("Eliminate Distinct", Once, EliminateDistinct) :: @@ -178,8 +178,7 @@ class SimpleTestOptimizer extends Optimizer( new SessionCatalog( new InMemoryCatalog, EmptyFunctionRegistry, - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)), - new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true)) + new SQLConf().copy(SQLConf.CASE_SENSITIVE -> true))) /** * Remove redundant aliases from a query plan. A redundant alias is an alias that does not change diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala index 3c046ce494285..5cfad9126986b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala @@ -38,12 +38,10 @@ import org.apache.spark.sql.internal.SQLConf * 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. * e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1. */ -case class OptimizeMetadataOnlyQuery( - catalog: SessionCatalog, - conf: SQLConf) extends Rule[LogicalPlan] { +case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = { - if (!conf.optimizerMetadataOnly) { + if (!SQLConf.get.optimizerMetadataOnly) { return plan } @@ -106,7 +104,7 @@ case class OptimizeMetadataOnlyQuery( val caseInsensitiveProperties = CaseInsensitiveMap(relation.tableMeta.storage.properties) val timeZoneId = caseInsensitiveProperties.get(DateTimeUtils.TIMEZONE_OPTION) - .getOrElse(conf.sessionLocalTimeZone) + .getOrElse(SQLConf.get.sessionLocalTimeZone) val partitionData = catalog.listPartitions(relation.tableMeta.identifier).map { p => InternalRow.fromSeq(partAttrs.map { attr => Cast(Literal(p.spec(attr.name)), attr.dataType, Option(timeZoneId)).eval() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala index 1de4f508b89a0..00ff4c8ac310b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkOptimizer.scala @@ -22,16 +22,14 @@ import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.execution.datasources.PruneFileSourcePartitions import org.apache.spark.sql.execution.python.ExtractPythonUDFFromAggregate -import org.apache.spark.sql.internal.SQLConf class SparkOptimizer( catalog: SessionCatalog, - conf: SQLConf, experimentalMethods: ExperimentalMethods) - extends Optimizer(catalog, conf) { + extends Optimizer(catalog) { override def batches: Seq[Batch] = (preOptimizationBatches ++ super.batches :+ - Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog, conf)) :+ + Batch("Optimize Metadata Only Query", Once, OptimizeMetadataOnlyQuery(catalog)) :+ Batch("Extract Python UDF from Aggregate", Once, ExtractPythonUDFFromAggregate) :+ Batch("Prune File Source Table Partitions", Once, PruneFileSourcePartitions)) ++ postHocOptimizationBatches :+ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala index 2532b2ddb72df..267f76217df84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala @@ -208,7 +208,7 @@ abstract class BaseSessionStateBuilder( * Note: this depends on the `conf`, `catalog` and `experimentalMethods` fields. */ protected def optimizer: Optimizer = { - new SparkOptimizer(catalog, conf, experimentalMethods) { + new SparkOptimizer(catalog, experimentalMethods) { override def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = super.extendedOperatorOptimizationRules ++ customOperatorOptimizationRules }