Skip to content

Commit

Permalink
traverse only inner children of With, add temp view test
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-toth committed Feb 11, 2021
1 parent 1f13cfe commit f8564cb
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.analysis.{GlobalTempView, LocalTempView, Pe
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog, TemporaryViewRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, SubqueryExpression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View, With}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
Expand Down Expand Up @@ -560,29 +560,25 @@ object ViewHelper {
child.flatMap {
case UnresolvedRelation(nameParts, _, _) if catalog.isTempView(nameParts) =>
Seq(nameParts)
case w: With if !w.resolved => w.innerChildren.flatMap(collectTempViews)
case plan if !plan.resolved => plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempViews(e.plan)
case _ => Seq.empty
}) ++ plan.innerChildren.flatMap {
case p: LogicalPlan => collectTempViews(p)
case _ => Seq.empty
}
})
case _ => Seq.empty
}.distinct
}

def collectTempFunctions(child: LogicalPlan): Seq[String] = {
child.flatMap {
case w: With if !w.resolved => w.innerChildren.flatMap(collectTempFunctions)
case plan if !plan.resolved =>
plan.expressions.flatMap(_.flatMap {
case e: SubqueryExpression => collectTempFunctions(e.plan)
case e: UnresolvedFunction if (catalog.isTemporaryFunction(e.name)) =>
case e: UnresolvedFunction if catalog.isTemporaryFunction(e.name) =>
Seq(e.name.funcName)
case _ => Seq.empty
}) ++ plan.innerChildren.flatMap {
case p: LogicalPlan => collectTempFunctions(p)
case _ => Seq.empty
}
})
case _ => Seq.empty
}.distinct
}
Expand Down
25 changes: 23 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3944,13 +3944,34 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
}
}

test("SPARK-34421: Resolve temporary functions in views with CTEs") {
test("SPARK-34421: Resolve temporary functions and views in views with CTEs") {
withTempView("temp_view") {
spark.udf.register("temp_func", identity[Int](_))

sql("CREATE TEMPORARY VIEW temp_view AS WITH cte AS (SELECT temp_func(0)) SELECT * FROM cte")
sql(
s"""
|CREATE TEMPORARY VIEW temp_view AS
|WITH cte AS (
| SELECT temp_func(0)
|)
|SELECT * FROM cte
|""".stripMargin)
checkAnswer(sql("SELECT * FROM temp_view"), Row(0))
}
withTempView("temp_view") {
sql("CREATE TEMPORARY VIEW temp_view AS SELECT 0")

intercept[AnalysisException] {
sql(
"""
|CREATE VIEW view_on_temp_view AS
|WITH cte AS (
| SELECT * FROM temp_view
|)
|SELECT * FROM cte
|""".stripMargin)
}
}
}
}

Expand Down

0 comments on commit f8564cb

Please sign in to comment.