Skip to content

Commit

Permalink
[SPARK-30326][SQL] Raise exception if analyzer exceed max iterations
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Enhance RuleExecutor strategy to take different actions when exceeding max iterations. And raise exception if analyzer exceed max iterations.

### Why are the changes needed?
Currently, both analyzer and optimizer just log warning message if rule execution exceed max iterations. They should have different behavior. Analyzer should raise exception to indicates the plan is not fixed after max iterations, while optimizer just log warning to keep the current plan. This is more feasible after SPARK-30138 was introduced.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Add test in AnalysisSuite

Closes #26977 from Eric5553/EnhanceMaxIterations.

Authored-by: Eric Wu <492960551@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
Eric5553 authored and cloud-fan committed Feb 10, 2020
1 parent 5a24060 commit b2011a2
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,15 @@ class Analyzer(

def resolver: Resolver = conf.resolver

protected val fixedPoint = FixedPoint(maxIterations)
/**
* If the plan cannot be resolved within maxIterations, analyzer will throw exception to inform
* user to increase the value of SQLConf.ANALYZER_MAX_ITERATIONS.
*/
protected val fixedPoint =
FixedPoint(
maxIterations,
errorOnExceed = true,
maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key)

/**
* Override to provide additional rules for the "Resolution" batch.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ abstract class Optimizer(catalogManager: CatalogManager)
"PartitionPruning",
"Extract Python UDFs")

protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)
protected def fixedPoint =
FixedPoint(
SQLConf.get.optimizerMaxIterations,
maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key)

/**
* Defines the default rule batches in the Optimizer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
* An execution strategy for rules that indicates the maximum number of executions. If the
* execution reaches fix point (i.e. converge) before maxIterations, it will stop.
*/
abstract class Strategy { def maxIterations: Int }
abstract class Strategy {

/** The maximum number of executions. */
def maxIterations: Int

/** Whether to throw exception when exceeding the maximum number. */
def errorOnExceed: Boolean = false

/** The key of SQLConf setting to tune maxIterations */
def maxIterationsSetting: String = null
}

/** A strategy that is run once and idempotent. */
case object Once extends Strategy { val maxIterations = 1 }
Expand All @@ -54,7 +64,10 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
* A strategy that runs until fix point or maxIterations times, whichever comes first.
* Especially, a FixedPoint(1) batch is supposed to run only once.
*/
case class FixedPoint(maxIterations: Int) extends Strategy
case class FixedPoint(
override val maxIterations: Int,
override val errorOnExceed: Boolean = false,
override val maxIterationsSetting: String = null) extends Strategy

/** A batch of rules. */
protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
Expand Down Expand Up @@ -155,8 +168,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
if (iteration > batch.strategy.maxIterations) {
// Only log if this is a rule that is supposed to run more than once.
if (iteration != 2) {
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}"
if (Utils.isTesting) {
val endingMsg = if (batch.strategy.maxIterationsSetting == null) {
"."
} else {
s", please set '${batch.strategy.maxIterationsSetting}' to a larger value."
}
val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" +
s"$endingMsg"
if (Utils.isTesting || batch.strategy.errorOnExceed) {
throw new TreeNodeException(curPlan, message, null)
} else {
logWarning(message)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import org.scalatest.Matchers

import org.apache.spark.api.python.PythonEvalType
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
Expand Down Expand Up @@ -745,4 +746,26 @@ class AnalysisSuite extends AnalysisTest with Matchers {
CollectMetrics("evt1", sumWithFilter :: Nil, testRelation),
"aggregates with filter predicate are not allowed" :: Nil)
}

test("Analysis exceed max iterations") {
// RuleExecutor only throw exception or log warning when the rule is supposed to run
// more than once.
val maxIterations = 2
val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> maxIterations)
val testAnalyzer = new Analyzer(
new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf)

val plan = testRelation2.select(
$"a" / Literal(2) as "div1",
$"a" / $"b" as "div2",
$"a" / $"c" as "div3",
$"a" / $"d" as "div4",
$"e" / $"e" as "div5")

val message = intercept[TreeNodeException[LogicalPlan]] {
testAnalyzer.execute(plan)
}.getMessage
assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " +
s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value."))
}
}

0 comments on commit b2011a2

Please sign in to comment.