Skip to content

Commit

Permalink
[SPARK-14677][SQL] Make the max number of iterations configurable for…
Browse files Browse the repository at this point in the history
… Catalyst

## What changes were proposed in this pull request?
We currently hard code the max number of optimizer/analyzer iterations to 100. This patch makes it configurable. While I'm at it, I also added the SessionCatalog to the optimizer, so we can use information there in optimization.

## How was this patch tested?
Updated unit tests to reflect the change.

Author: Reynold Xin <rxin@databricks.com>

Closes #12434 from rxin/SPARK-14677.
  • Loading branch information
rxin authored and yhuai committed Apr 16, 2016
1 parent b2dfa84 commit f4be094
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,32 @@ package org.apache.spark.sql.catalyst

import org.apache.spark.sql.catalyst.analysis._

private[spark] trait CatalystConf {
/**
* Interface for configuration options used in the catalyst module.
*/
trait CatalystConf {
def caseSensitiveAnalysis: Boolean

def orderByOrdinal: Boolean
def groupByOrdinal: Boolean

def optimizerMaxIterations: Int

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
*/
def resolver: Resolver = {
if (caseSensitiveAnalysis) {
caseSensitiveResolution
} else {
caseInsensitiveResolution
}
if (caseSensitiveAnalysis) caseSensitiveResolution else caseInsensitiveResolution
}
}

/**
* A trivial conf that is empty. Used for testing when all
* relations are already filled in and the analyser needs only to resolve attribute references.
*/
object EmptyConf extends CatalystConf {
override def caseSensitiveAnalysis: Boolean = {
throw new UnsupportedOperationException
}
override def orderByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
override def groupByOrdinal: Boolean = {
throw new UnsupportedOperationException
}
}

/** A CatalystConf that can be used for local testing. */
case class SimpleCatalystConf(
caseSensitiveAnalysis: Boolean,
orderByOrdinal: Boolean = true,
groupByOrdinal: Boolean = true)

groupByOrdinal: Boolean = true,
optimizerMaxIterations: Int = 100)
extends CatalystConf {
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,13 @@ import org.apache.spark.sql.types._
* Used for testing when all relations are already filled in and the analyzer needs only
* to resolve attribute references.
*/
object SimpleAnalyzer
extends SimpleAnalyzer(
EmptyFunctionRegistry,
object SimpleAnalyzer extends Analyzer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
new SimpleCatalystConf(caseSensitiveAnalysis = true))

class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
extends Analyzer(new SessionCatalog(new InMemoryCatalog, functionRegistry, conf), conf)

/**
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a
Expand All @@ -55,9 +54,13 @@ class SimpleAnalyzer(functionRegistry: FunctionRegistry, conf: CatalystConf)
class Analyzer(
catalog: SessionCatalog,
conf: CatalystConf,
maxIterations: Int = 100)
maxIterations: Int)
extends RuleExecutor[LogicalPlan] with CheckAnalysis {

def this(catalog: SessionCatalog, conf: CatalystConf) = {
this(catalog, conf, conf.optimizerMaxIterations)
}

def resolver: Resolver = {
if (conf.caseSensitiveAnalysis) {
caseSensitiveResolution
Expand All @@ -66,7 +69,7 @@ class Analyzer(
}
}

val fixedPoint = FixedPoint(maxIterations)
protected val fixedPoint = FixedPoint(maxIterations)

/**
* Override to provide additional rules for the "Resolution" batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import scala.annotation.tailrec
import scala.collection.immutable.HashSet

import org.apache.spark.sql.catalyst.{CatalystConf, EmptyConf}
import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, DistinctAggregationRewriter, EliminateSubqueryAliases, EmptyFunctionRegistry}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
Expand All @@ -36,9 +36,11 @@ import org.apache.spark.sql.types._
* Abstract class all optimizers should inherit of, contains the standard batches (extending
* Optimizers can override this.
*/
abstract class Optimizer(
conf: CatalystConf,
sessionCatalog: SessionCatalog) extends RuleExecutor[LogicalPlan] {
abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf)
extends RuleExecutor[LogicalPlan] {

protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)

def batches: Seq[Batch] = {
// Technically some of the rules in Finish Analysis are not optimizer rules and belong more
// in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
Expand All @@ -59,12 +61,12 @@ abstract class Optimizer(
// since the other rules might make two separate Unions operators adjacent.
Batch("Union", Once,
CombineUnions) ::
Batch("Replace Operators", FixedPoint(100),
Batch("Replace Operators", fixedPoint,
ReplaceIntersectWithSemiJoin,
ReplaceDistinctWithAggregate) ::
Batch("Aggregate", FixedPoint(100),
Batch("Aggregate", fixedPoint,
RemoveLiteralFromGroupExpressions) ::
Batch("Operator Optimizations", FixedPoint(100),
Batch("Operator Optimizations", fixedPoint,
// Operator push down
SetOperationPushDown,
SamplePushDown,
Expand Down Expand Up @@ -95,11 +97,11 @@ abstract class Optimizer(
SimplifyCasts,
SimplifyCaseConversionExpressions,
EliminateSerialization) ::
Batch("Decimal Optimizations", FixedPoint(100),
Batch("Decimal Optimizations", fixedPoint,
DecimalAggregates) ::
Batch("Typed Filter Optimization", FixedPoint(100),
Batch("Typed Filter Optimization", fixedPoint,
EmbedSerializerInFilter) ::
Batch("LocalRelation", FixedPoint(100),
Batch("LocalRelation", fixedPoint,
ConvertToLocalRelation) ::
Batch("Subquery", Once,
OptimizeSubqueries) :: Nil
Expand All @@ -117,15 +119,19 @@ abstract class Optimizer(
}

/**
* Non-abstract representation of the standard Spark optimizing strategies
* An optimizer used in test code.
*
* To ensure extendability, we leave the standard rules in the abstract optimizer rules, while
* specific rules go to the subclasses
*/
object DefaultOptimizer
extends Optimizer(
EmptyConf,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf))
object SimpleTestOptimizer extends SimpleTestOptimizer

class SimpleTestOptimizer extends Optimizer(
new SessionCatalog(
new InMemoryCatalog,
EmptyFunctionRegistry,
new SimpleCatalystConf(caseSensitiveAnalysis = true)),
new SimpleCatalystConf(caseSensitiveAnalysis = true))

/**
* Pushes operations down into a Sample.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.scalatest.prop.GeneratorDrivenPropertyChecks
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -153,7 +153,7 @@ trait ExpressionEvalHelper extends GeneratorDrivenPropertyChecks {
expected: Any,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkEvaluationWithoutCodegen(optimizedPlan.expressions.head, expected, inputRow)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.catalyst.optimizer.DefaultOptimizer
import org.apache.spark.sql.catalyst.optimizer.SimpleTestOptimizer
import org.apache.spark.sql.catalyst.plans.logical.{OneRowRelation, Project}
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -151,7 +151,7 @@ class MathFunctionsSuite extends SparkFunSuite with ExpressionEvalHelper {
expression: Expression,
inputRow: InternalRow = EmptyRow): Unit = {
val plan = Project(Alias(expression, s"Optimized($expression)")() :: Nil, OneRowRelation)
val optimizedPlan = DefaultOptimizer.execute(plan)
val optimizedPlan = SimpleTestOptimizer.execute(plan)
checkNaNWithoutCodegen(optimizedPlan.expressions.head, inputRow)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
* limitations under the License.
*/

package org.apache.spark.sql.catalyst
package org.apache.spark.sql.catalyst.optimizer

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.analysis.EmptyFunctionRegistry
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

Expand All @@ -40,10 +37,7 @@ class OptimizerExtendableSuite extends SparkFunSuite {
* This class represents a dummy extended optimizer that takes the batches of the
* Optimizer and adds custom ones.
*/
class ExtendedOptimizer
extends Optimizer(
EmptyConf,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, EmptyConf)) {
class ExtendedOptimizer extends SimpleTestOptimizer {

// rules set to DummyRule, would not be executed anyways
val myBatches: Seq[Batch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.spark.sql.execution

import org.apache.spark.sql.ExperimentalMethods
import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
import org.apache.spark.sql.catalyst.optimizer.Optimizer
import org.apache.spark.sql.internal.SQLConf

class SparkOptimizer(
conf: CatalystConf,
sessionCatalog: SessionCatalog,
experimentalMethods: ExperimentalMethods) extends Optimizer(conf, sessionCatalog) {
catalog: SessionCatalog,
conf: SQLConf,
experimentalMethods: ExperimentalMethods)
extends Optimizer(catalog, conf) {

override def batches: Seq[Batch] = super.batches :+ Batch(
"User Provided Optimizers", FixedPoint(100), experimentalMethods.extraOptimizations: _*)
"User Provided Optimizers", fixedPoint, experimentalMethods.extraOptimizations: _*)
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ object SQLConf {

}

val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.doc("The max number of iterations the optimizer and analyzer runs")
.intConf
.createWithDefault(100)

val ALLOW_MULTIPLE_CONTEXTS = SQLConfigBuilder("spark.sql.allowMultipleContexts")
.doc("When set to true, creating multiple SQLContexts/HiveContexts is allowed. " +
"When set to false, only one SQLContext/HiveContext is allowed to be created " +
Expand Down Expand Up @@ -473,6 +478,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

/** ************************ Spark SQL Params/Hints ******************* */

def optimizerMaxIterations: Int = getConf(OPTIMIZER_MAX_ITERATIONS)

def checkpointLocation: String = getConf(CHECKPOINT_LOCATION)

def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ private[sql] class SessionState(ctx: SQLContext) {
/**
* Logical query plan optimizer.
*/
lazy val optimizer: Optimizer = new SparkOptimizer(conf, catalog, experimentalMethods)
lazy val optimizer: Optimizer = new SparkOptimizer(catalog, conf, experimentalMethods)

/**
* Parser that extracts expressions, plans, table identifiers etc. from SQL texts.
Expand Down

0 comments on commit f4be094

Please sign in to comment.