Skip to content

Commit

Permalink
[SPARK-39548][SQL] CreateView Command with a window clause query hit …
Browse files Browse the repository at this point in the history
…a wrong window definition not found issue

1. In the inline CTE code path, fix a bug that top down style unresolved window expression check leads to mis-clarification of a defined window expression.
2. Move unresolved window expression check in project to `CheckAnalysis`.

This bug fails a correct query.

No.

UT

Closes #36947 from amaliujia/improvewindow.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 4718d59)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
amaliujia authored and cloud-fan committed Jun 23, 2022
1 parent 725ce33 commit f5bc48b
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 10 deletions.
Expand Up @@ -444,7 +444,7 @@ class Analyzer(override val catalogManager: CatalogManager)
* Substitute child plan with WindowSpecDefinitions.
*/
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsDownWithPruning(
def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUpWithPruning(
_.containsAnyPattern(WITH_WINDOW_DEFINITION, UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) => child.resolveExpressions {
Expand All @@ -453,14 +453,6 @@ class Analyzer(override val catalogManager: CatalogManager)
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowName))
WindowExpression(c, windowSpecDefinition)
}

case p @ Project(projectList, _) =>
projectList.foreach(_.transformDownWithPruning(
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION), ruleId) {
case UnresolvedWindowExpression(_, windowSpec) =>
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
})
p
}
}

Expand Down
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.{BooleanSimplification, DecorrelateInnerQuery, InlineCTE}
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.UNRESOLVED_WINDOW_EXPRESSION
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
import org.apache.spark.sql.connector.catalog.{LookupCatalog, SupportsPartitionManagement}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
Expand Down Expand Up @@ -202,7 +203,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
failAnalysis("grouping_id() can only be used with GroupingSets/Cube/Rollup")

case e: Expression if e.children.exists(_.isInstanceOf[WindowFunction]) &&
!e.isInstanceOf[WindowExpression] =>
!e.isInstanceOf[WindowExpression] && e.resolved =>
val w = e.children.find(_.isInstanceOf[WindowFunction]).get
failAnalysis(s"Window function $w requires an OVER clause.")

Expand Down Expand Up @@ -477,6 +478,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog {
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.sql).mkString(",")}""".stripMargin)

case p @ Project(projectList, _) =>
projectList.foreach(_.transformDownWithPruning(
_.containsPattern(UNRESOLVED_WINDOW_EXPRESSION)) {
case UnresolvedWindowExpression(_, windowSpec) =>
throw QueryCompilationErrors.windowSpecificationNotDefinedError(windowSpec.name)
})

case j: Join if !j.duplicateResolved =>
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
failAnalysis(
Expand Down
23 changes: 23 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Expand Up @@ -4236,6 +4236,29 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
sql("SELECT * FROM testData, LATERAL (SELECT * FROM testData)").collect()
}
}

test("SPARK-39548: CreateView will make queries go into inline CTE code path thus" +
"trigger a mis-clarified `window definition not found` issue") {
sql(
"""
|create or replace temporary view test_temp_view as
|with step_1 as (
|select * , min(a) over w2 as min_a_over_w2 from
|(select 1 as a, 2 as b, 3 as c) window w2 as (partition by b order by c)) , step_2 as
|(
|select *, max(e) over w1 as max_a_over_w1
|from (select 1 as e, 2 as f, 3 as g)
|join step_1 on true
|window w1 as (partition by f order by g)
|)
|select *
|from step_2
|""".stripMargin)

checkAnswer(
sql("select * from test_temp_view"),
Row(1, 2, 3, 1, 2, 3, 1, 1))
}
}

case class Foo(bar: Option[String])

0 comments on commit f5bc48b

Please sign in to comment.