From b2011a295bd78b3693a516e049e90250366b8f52 Mon Sep 17 00:00:00 2001 From: Eric Wu <492960551@qq.com> Date: Mon, 10 Feb 2020 23:41:39 +0800 Subject: [PATCH] [SPARK-30326][SQL] Raise exception if analyzer exceed max iterations ### What changes were proposed in this pull request? Enhance RuleExecutor strategy to take different actions when exceeding max iterations. And raise exception if analyzer exceed max iterations. ### Why are the changes needed? Currently, both analyzer and optimizer just log warning message if rule execution exceed max iterations. They should have different behavior. Analyzer should raise exception to indicates the plan is not fixed after max iterations, while optimizer just log warning to keep the current plan. This is more feasible after SPARK-30138 was introduced. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Add test in AnalysisSuite Closes #26977 from Eric5553/EnhanceMaxIterations. Authored-by: Eric Wu <492960551@qq.com> Signed-off-by: Wenchen Fan --- .../sql/catalyst/analysis/Analyzer.scala | 10 ++++++- .../sql/catalyst/optimizer/Optimizer.scala | 5 +++- .../sql/catalyst/rules/RuleExecutor.scala | 27 ++++++++++++++++--- .../sql/catalyst/analysis/AnalysisSuite.scala | 25 ++++++++++++++++- 4 files changed, 60 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 75f1aa7185ef3..ce82b3b567b54 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -176,7 +176,15 @@ class Analyzer( def resolver: Resolver = conf.resolver - protected val fixedPoint = FixedPoint(maxIterations) + /** + * If the plan cannot be resolved within maxIterations, analyzer will throw exception to inform + * user to increase the value of SQLConf.ANALYZER_MAX_ITERATIONS. + */ + protected val fixedPoint = + FixedPoint( + maxIterations, + errorOnExceed = true, + maxIterationsSetting = SQLConf.ANALYZER_MAX_ITERATIONS.key) /** * Override to provide additional rules for the "Resolution" batch. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 935d62015afa1..08acac18f48bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -53,7 +53,10 @@ abstract class Optimizer(catalogManager: CatalogManager) "PartitionPruning", "Extract Python UDFs") - protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) + protected def fixedPoint = + FixedPoint( + SQLConf.get.optimizerMaxIterations, + maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key) /** * Defines the default rule batches in the Optimizer. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala index 287ae0e8e9f67..da5242bee28e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala @@ -45,7 +45,17 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { * An execution strategy for rules that indicates the maximum number of executions. If the * execution reaches fix point (i.e. converge) before maxIterations, it will stop. */ - abstract class Strategy { def maxIterations: Int } + abstract class Strategy { + + /** The maximum number of executions. */ + def maxIterations: Int + + /** Whether to throw exception when exceeding the maximum number. */ + def errorOnExceed: Boolean = false + + /** The key of SQLConf setting to tune maxIterations */ + def maxIterationsSetting: String = null + } /** A strategy that is run once and idempotent. */ case object Once extends Strategy { val maxIterations = 1 } @@ -54,7 +64,10 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { * A strategy that runs until fix point or maxIterations times, whichever comes first. * Especially, a FixedPoint(1) batch is supposed to run only once. */ - case class FixedPoint(maxIterations: Int) extends Strategy + case class FixedPoint( + override val maxIterations: Int, + override val errorOnExceed: Boolean = false, + override val maxIterationsSetting: String = null) extends Strategy /** A batch of rules. */ protected case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*) @@ -155,8 +168,14 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging { if (iteration > batch.strategy.maxIterations) { // Only log if this is a rule that is supposed to run more than once. if (iteration != 2) { - val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" - if (Utils.isTesting) { + val endingMsg = if (batch.strategy.maxIterationsSetting == null) { + "." + } else { + s", please set '${batch.strategy.maxIterationsSetting}' to a larger value." + } + val message = s"Max iterations (${iteration - 1}) reached for batch ${batch.name}" + + s"$endingMsg" + if (Utils.isTesting || batch.strategy.errorOnExceed) { throw new TreeNodeException(curPlan, message, null) } else { logWarning(message) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index c747d394b1bc2..d38513319388b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -25,9 +25,10 @@ import org.scalatest.Matchers import org.apache.spark.api.python.PythonEvalType import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, InMemoryCatalog, SessionCatalog} import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Count, Sum} import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan @@ -745,4 +746,26 @@ class AnalysisSuite extends AnalysisTest with Matchers { CollectMetrics("evt1", sumWithFilter :: Nil, testRelation), "aggregates with filter predicate are not allowed" :: Nil) } + + test("Analysis exceed max iterations") { + // RuleExecutor only throw exception or log warning when the rule is supposed to run + // more than once. + val maxIterations = 2 + val conf = new SQLConf().copy(SQLConf.ANALYZER_MAX_ITERATIONS -> maxIterations) + val testAnalyzer = new Analyzer( + new SessionCatalog(new InMemoryCatalog, FunctionRegistry.builtin, conf), conf) + + val plan = testRelation2.select( + $"a" / Literal(2) as "div1", + $"a" / $"b" as "div2", + $"a" / $"c" as "div3", + $"a" / $"d" as "div4", + $"e" / $"e" as "div5") + + val message = intercept[TreeNodeException[LogicalPlan]] { + testAnalyzer.execute(plan) + }.getMessage + assert(message.startsWith(s"Max iterations ($maxIterations) reached for batch Resolution, " + + s"please set '${SQLConf.ANALYZER_MAX_ITERATIONS.key}' to a larger value.")) + } }