From 80326efd4d55e598dbc9e6399f159dc74dc2c2a9 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Fri, 10 Mar 2017 19:10:32 +0800 Subject: [PATCH 1/4] restrict the nested level of a view. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 6 +++++- .../spark/sql/execution/SQLViewSuite.scala | 18 ++++++++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a3764d8c843dd..8fdd00f1a1a94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -60,7 +60,6 @@ object SimpleAnalyzer extends Analyzer( * current catalog database. * @param nestedViewLevel The nested level in the view resolution, this enables us to limit the * depth of nested views. - * TODO Limit the depth of nested views. */ case class AnalysisContext( defaultDatabase: Option[String] = None, @@ -598,6 +597,11 @@ class Analyzer( case view @ View(desc, _, child) if !child.resolved => // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) { + if (AnalysisContext.get.nestedViewLevel > conf.maxNestedViewDepth) { + view.failAnalysis(s"The nested level of view ${view.desc.identifier} has exceeded " + + s"${conf.maxNestedViewDepth}, terminate the view resolution to avoid further " + + "errors.") + } execute(child) } view.copy(child = newChild) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 2ca2206bb9d44..64bd1948cbed5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -644,4 +644,22 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { "-> `default`.`view2` -> `default`.`view1`)")) } } + + test("restrict the nested level of a view") { + val viewNames = Array.range(0, 11).map(idx => s"view$idx") + withView(viewNames: _*) { + sql("CREATE VIEW view0 AS SELECT * FROM jt") + Array.range(0, 10).foreach { idx => + sql(s"CREATE VIEW view${idx + 1} AS SELECT * FROM view$idx") + } + + withSQLConf("spark.sql.view.maxNestedViewDepth" -> "10") { + val e = intercept[AnalysisException] { + sql("SELECT * FROM view10") + }.getMessage + assert(e.contains("The nested level of view `default`.`view0` has exceeded 10, " + + "terminate the view resolution to avoid further errors.")) + } + } + } } From 6e078f5afb5e897539139a63f4328ea931f55d66 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Sat, 11 Mar 2017 07:45:18 +0800 Subject: [PATCH 2/4] update error message. --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../scala/org/apache/spark/sql/execution/SQLViewSuite.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 8fdd00f1a1a94..5118301f8a9f5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -599,7 +599,7 @@ class Analyzer( val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) { if (AnalysisContext.get.nestedViewLevel > conf.maxNestedViewDepth) { view.failAnalysis(s"The nested level of view ${view.desc.identifier} has exceeded " + - s"${conf.maxNestedViewDepth}, terminate the view resolution to avoid further " + + s"${conf.maxNestedViewDepth}. Terminate the view resolution to avoid further " + "errors.") } execute(child) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 64bd1948cbed5..95c4f259645db 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -657,8 +657,8 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("SELECT * FROM view10") }.getMessage - assert(e.contains("The nested level of view `default`.`view0` has exceeded 10, " + - "terminate the view resolution to avoid further errors.")) + assert(e.contains("The nested level of view `default`.`view0` has exceeded 10. " + + "Terminate the view resolution to avoid further errors.")) } } } From bb02c38c523b4259c6c731e3ee34a932b03f6da1 Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Tue, 14 Mar 2017 12:00:14 +0800 Subject: [PATCH 3/4] update messages and test for a corner case. --- .../spark/sql/catalyst/analysis/Analyzer.scala | 15 ++++++++------- .../org/apache/spark/sql/internal/SQLConf.scala | 13 +++++++++++++ .../apache/spark/sql/execution/SQLViewSuite.scala | 11 +++++++++-- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 5118301f8a9f5..68a4746a54d96 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -58,12 +58,12 @@ object SimpleAnalyzer extends Analyzer( * * @param defaultDatabase The default database used in the view resolution, this overrules the * current catalog database. - * @param nestedViewLevel The nested level in the view resolution, this enables us to limit the + * @param nestedViewDepth The nested depth in the view resolution, this enables us to limit the * depth of nested views. */ case class AnalysisContext( defaultDatabase: Option[String] = None, - nestedViewLevel: Int = 0) + nestedViewDepth: Int = 0) object AnalysisContext { private val value = new ThreadLocal[AnalysisContext]() { @@ -76,7 +76,7 @@ object AnalysisContext { def withAnalysisContext[A](database: Option[String])(f: => A): A = { val originContext = value.get() val context = AnalysisContext(defaultDatabase = database, - nestedViewLevel = originContext.nestedViewLevel + 1) + nestedViewDepth = originContext.nestedViewDepth + 1) set(context) try f finally { set(originContext) } } @@ -597,10 +597,11 @@ class Analyzer( case view @ View(desc, _, child) if !child.resolved => // Resolve all the UnresolvedRelations and Views in the child. val newChild = AnalysisContext.withAnalysisContext(desc.viewDefaultDatabase) { - if (AnalysisContext.get.nestedViewLevel > conf.maxNestedViewDepth) { - view.failAnalysis(s"The nested level of view ${view.desc.identifier} has exceeded " + - s"${conf.maxNestedViewDepth}. Terminate the view resolution to avoid further " + - "errors.") + if (AnalysisContext.get.nestedViewDepth > conf.maxNestedViewDepth) { + view.failAnalysis(s"The depth of view ${view.desc.identifier} exceeds the maximum " + + s"view resolution depth (${conf.maxNestedViewDepth}). Analysis is aborted to " + + "avoid errors. Increase the value of spark.sql.view.maxNestedViewDepth to work " + + "aroud this.") } execute(child) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 315bedb12e716..9583f2e87f843 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -571,6 +571,19 @@ object SQLConf { .booleanConf .createWithDefault(true) + val MAX_NESTED_VIEW_DEPTH = + buildConf("spark.sql.view.maxNestedViewDepth") + .internal() + .doc("The maximum depth of a view reference in a nested view. A nested view may reference " + + "other nested views, the dependencies are organized in a directed acyclic graph (DAG). " + + "However the DAG depth may become too large and cause unexpected behavior. This " + + "configuration puts a limit on this: when the depth of a view exceeds this value during " + + "analysis, we terminate the resolution to avoid potential errors.") + .intConf + .checkValue(depth => depth > 0, "The maximum depth of a view reference in a nested view " + + "must be positive.") + .createWithDefault(100) + val STREAMING_FILE_COMMIT_PROTOCOL_CLASS = buildConf("spark.sql.streaming.commitProtocolClass") .internal() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala index 95c4f259645db..d32716c18ddfb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewSuite.scala @@ -657,9 +657,16 @@ abstract class SQLViewSuite extends QueryTest with SQLTestUtils { val e = intercept[AnalysisException] { sql("SELECT * FROM view10") }.getMessage - assert(e.contains("The nested level of view `default`.`view0` has exceeded 10. " + - "Terminate the view resolution to avoid further errors.")) + assert(e.contains("The depth of view `default`.`view0` exceeds the maximum view " + + "resolution depth (10). Analysis is aborted to avoid errors. Increase the value " + + "of spark.sql.view.maxNestedViewDepth to work aroud this.")) } + + val e = intercept[IllegalArgumentException] { + withSQLConf("spark.sql.view.maxNestedViewDepth" -> "0") {} + }.getMessage + assert(e.contains("The maximum depth of a view reference in a nested view must be " + + "positive.")) } } } From d28b6767c2bf51e1a30363c3326e0f8a019bce5b Mon Sep 17 00:00:00 2001 From: jiangxingbo Date: Wed, 15 Mar 2017 00:36:42 +0800 Subject: [PATCH 4/4] code rebase --- .../org/apache/spark/sql/catalyst/SimpleCatalystConf.scala | 3 ++- .../src/main/scala/org/apache/spark/sql/internal/SQLConf.scala | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala index 746f84459de26..0d4903e03bf5a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SimpleCatalystConf.scala @@ -41,7 +41,8 @@ case class SimpleCatalystConf( override val joinReorderEnabled: Boolean = false, override val joinReorderDPThreshold: Int = 12, override val warehousePath: String = "/user/hive/warehouse", - override val sessionLocalTimeZone: String = TimeZone.getDefault().getID) + override val sessionLocalTimeZone: String = TimeZone.getDefault().getID, + override val maxNestedViewDepth: Int = 100) extends SQLConf { override def clone(): SimpleCatalystConf = this.copy() diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 9583f2e87f843..8f65672d5a839 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -945,6 +945,8 @@ class SQLConf extends Serializable with Logging { def joinReorderDPThreshold: Int = getConf(SQLConf.JOIN_REORDER_DP_THRESHOLD) + def maxNestedViewDepth: Int = getConf(SQLConf.MAX_NESTED_VIEW_DEPTH) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */