Skip to content

Commit

Permalink
[SPARK-14796][SQL] Add spark.sql.optimizer.inSetConversionThreshold c…
Browse files Browse the repository at this point in the history
…onfig option.

## What changes were proposed in this pull request?

Currently, `OptimizeIn` optimizer replaces `In` expression into `InSet` expression if the size of set is greater than a constant, 10.
This issue aims to make a configuration `spark.sql.optimizer.inSetConversionThreshold` for that.

After this PR, `OptimizerIn` is configurable.
```scala
scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [a#7 IN (1,2,3) AS (a IN (1, 2, 3))#8]
:     +- INPUT
+- Generate explode([1,2]), false, false, [a#7]
   +- Scan OneRowRelation[]

scala> sqlContext.setConf("spark.sql.optimizer.inSetConversionThreshold", "2")

scala> sql("select a in (1,2,3) from (select explode(array(1,2)) a) T").explain()
== Physical Plan ==
WholeStageCodegen
:  +- Project [a#16 INSET (1,2,3) AS (a IN (1, 2, 3))#17]
:     +- INPUT
+- Generate explode([1,2]), false, false, [a#16]
   +- Scan OneRowRelation[]
```

## How was this patch tested?

Pass the Jenkins tests (with a new testcase)

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes #12562 from dongjoon-hyun/SPARK-14796.
  • Loading branch information
dongjoon-hyun authored and rxin committed Apr 22, 2016
1 parent 0dcf9db commit 3647120
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ trait CatalystConf {
def groupByOrdinal: Boolean

def optimizerMaxIterations: Int
def optimizerInSetConversionThreshold: Int
def maxCaseBranchesForCodegen: Int

/**
Expand All @@ -47,6 +48,7 @@ case class SimpleCatalystConf(
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
CombineUnions,
// Constant folding and strength reduction
NullPropagation,
OptimizeIn,
OptimizeIn(conf),
ConstantFolding,
LikeSimplification,
BooleanSimplification,
Expand Down Expand Up @@ -682,10 +682,11 @@ object ConstantFolding extends Rule[LogicalPlan] {
* Replaces [[In (value, seq[Literal])]] with optimized version[[InSet (value, HashSet[Literal])]]
* which is much faster
*/
object OptimizeIn extends Rule[LogicalPlan] {
case class OptimizeIn(conf: CatalystConf) extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case q: LogicalPlan => q transformExpressionsDown {
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) && list.size > 10 =>
case In(v, list) if !list.exists(!_.isInstanceOf[Literal]) &&
list.size > conf.optimizerInSetConversionThreshold =>
val hSet = list.map(e => e.eval(EmptyRow))
InSet(v, HashSet() ++ hSet)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedExtractValue}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
Expand All @@ -33,7 +34,7 @@ class ConstantFoldingSuite extends PlanTest {
Batch("AnalysisNodes", Once,
EliminateSubqueryAliases) ::
Batch("ConstantFolding", Once,
OptimizeIn,
OptimizeIn(SimpleCatalystConf(true)),
ConstantFolding,
BooleanSimplification) :: Nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@

package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.sql.catalyst.SimpleCatalystConf
import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.types._
Expand All @@ -36,7 +37,7 @@ class OptimizeInSuite extends PlanTest {
NullPropagation,
ConstantFolding,
BooleanSimplification,
OptimizeIn) :: Nil
OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))) :: Nil
}

val testRelation = LocalRelation('a.int, 'b.int, 'c.int)
Expand Down Expand Up @@ -128,4 +129,23 @@ class OptimizeInSuite extends PlanTest {
comparePlans(optimized, correctAnswer)
}

test("OptimizedIn test: Setting the threshold for turning Set into InSet.") {
val plan =
testRelation
.where(In(UnresolvedAttribute("a"), Seq(Literal(1), Literal(2), Literal(3))))
.analyze

val notOptimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true))(plan)
comparePlans(notOptimizedPlan, plan)

// Reduce the threshold to turning into InSet.
val optimizedPlan = OptimizeIn(SimpleCatalystConf(caseSensitiveAnalysis = true,
optimizerInSetConversionThreshold = 2))(plan)
optimizedPlan match {
case Filter(cond, _)
if cond.isInstanceOf[InSet] && cond.asInstanceOf[InSet].getHSet().size == 3 =>
// pass
case _ => fail("Unexpected result for OptimizedIn")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,17 @@ object SQLConf {

val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer and analyzer runs")
.doc("The max number of iterations the optimizer and analyzer runs.")
.intConf
.createWithDefault(100)

val OPTIMIZER_INSET_CONVERSION_THRESHOLD =
SQLConfigBuilder("spark.sql.optimizer.inSetConversionThreshold")
.internal()
.doc("The threshold of set size for InSet conversion.")
.intConf
.createWithDefault(10)

val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
Expand Down Expand Up @@ -537,6 +544,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)

def optimizerInSetConversionThreshold: Int = getConf(OPTIMIZER_INSET_CONVERSION_THRESHOLD)

def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
Expand Down

0 comments on commit 3647120

Please sign in to comment.