Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -189,8 +189,6 @@ class Analyzer(
TypeCoercion.typeCoercionRules(conf) ++
extendedResolutionRules : _*),
Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*),
Batch("View", Once,
AliasViewChild(conf)),
Batch("Nondeterministic", Once,
PullOutNondeterministic),
Batch("UDF", Once,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -300,6 +301,48 @@ trait CheckAnalysis extends PredicateHelper {
}
}

// If the view output doesn't have the same number of columns neither with the child
// output, nor with the query column names, throw an AnalysisException.
// If the view's child output can't up cast to the view output,
// throw an AnalysisException, too.
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
if (output.length != queryColumnNames.length) {
// If the view output doesn't have the same number of columns with the query column
// names, throw an AnalysisException.
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same" +
"number of columns with the query column names " +
s"${queryColumnNames.mkString("[", ",", "]")}")
}
val resolver = SQLConf.get.resolver
queryColumnNames.map { colName =>
child.output.find { attr =>
resolver(attr.name, colName)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$colName' is not found in " +
s"'${child.output.map(_.name).mkString("(", ",", ")")}'"))
}
} else {
child.output
}

output.zip(queryOutput).foreach {
case (attr, originAttr) if !attr.dataType.sameType(originAttr.dataType) =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output
// attribute. Will throw an AnalysisException if the cast can't be performed or
// might truncate.
if (Cast.mayTruncate(originAttr.dataType, attr.dataType) ||
!Cast.canCast(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} " +
"as it may truncate\n")
}
case _ =>
}

case _ => // Fallbacks to the following checks
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast}
import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, View}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -28,7 +27,12 @@ import org.apache.spark.sql.internal.SQLConf
*/

/**
* Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* This rule has two goals:
*
* 1. Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*
* 2. Make sure that a view's child plan produces the view's output attributes. We try to wrap the
* child by:
* 1. Generate the `queryOutput` by:
* 1.1. If the query column names are defined, map the column names to attributes in the child
Expand All @@ -41,27 +45,29 @@ import org.apache.spark.sql.internal.SQLConf
* 2. Map the `queryOutput` to view output by index, if the corresponding attributes don't match,
* try to up cast and alias the attribute in `queryOutput` to the attribute in the view output.
* 3. Add a Project over the child, with the new output generated by the previous steps.
* If the view output doesn't have the same number of columns neither with the child output, nor
* with the query column names, throw an AnalysisException.
*
* Once reaches this rule, it means `CheckAnalysis` did necessary checks on number of columns
* between the view output and the child output or the query column names. `CheckAnalysis` also
* checked the cast from the view's child to the Project is up-cast.
*
* This should be only done after the batch of Resolution, because the view attributes are not
* completely resolved during the batch of Resolution.
*/
case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupport {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
object EliminateView extends Rule[LogicalPlan] with CastSupport {
override def conf: SQLConf = SQLConf.get

override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
// The child has the different output attributes with the View operator. Adds a Project over
// the child of the view.
case v @ View(desc, output, child) if child.resolved && output != child.output =>
val resolver = conf.resolver
val queryColumnNames = desc.viewQueryColumnNames
val queryOutput = if (queryColumnNames.nonEmpty) {
// If the view output doesn't have the same number of columns with the query column names,
// throw an AnalysisException.
if (output.length != queryColumnNames.length) {
throw new AnalysisException(
s"The view output ${output.mkString("[", ",", "]")} doesn't have the same number of " +
s"columns with the query column names ${queryColumnNames.mkString("[", ",", "]")}")
}
// Find the attribute that has the expected attribute name from an attribute list, the names
// are compared using conf.resolver.
// `CheckAnalysis` already guarantees the expected attribute can be found for sure.
desc.viewQueryColumnNames.map { colName =>
findAttributeByName(colName, child.output, resolver)
child.output.find(attr => resolver(attr.name, colName)).get
}
} else {
// For view created before Spark 2.2.0, the view text is already fully qualified, the plan
Expand All @@ -70,52 +76,17 @@ case class AliasViewChild(conf: SQLConf) extends Rule[LogicalPlan] with CastSupp
}
// Map the attributes in the query output to the attributes in the view output by index.
val newOutput = output.zip(queryOutput).map {
case (attr, originAttr) if attr != originAttr =>
// The dataType of the output attributes may be not the same with that of the view
// output, so we should cast the attribute to the dataType of the view output attribute.
// Will throw an AnalysisException if the cast can't perform or might truncate.
if (Cast.mayTruncate(originAttr.dataType, attr.dataType)) {
throw new AnalysisException(s"Cannot up cast ${originAttr.sql} from " +
s"${originAttr.dataType.catalogString} to ${attr.dataType.catalogString} as it " +
s"may truncate\n")
} else {
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
}
case (attr, originAttr) if !attr.semanticEquals(originAttr) =>
// `CheckAnalysis` already guarantees that the cast is a up-cast for sure.
Alias(cast(originAttr, attr.dataType), attr.name)(exprId = attr.exprId,
qualifier = attr.qualifier, explicitMetadata = Some(attr.metadata))
case (_, originAttr) => originAttr
}
v.copy(child = Project(newOutput, child))
}
Project(newOutput, child)

/**
* Find the attribute that has the expected attribute name from an attribute list, the names
* are compared using conf.resolver.
* If the expected attribute is not found, throw an AnalysisException.
*/
private def findAttributeByName(
name: String,
attrs: Seq[Attribute],
resolver: Resolver): Attribute = {
attrs.find { attr =>
resolver(attr.name, name)
}.getOrElse(throw new AnalysisException(
s"Attribute with name '$name' is not found in " +
s"'${attrs.map(_.name).mkString("(", ",", ")")}'"))
}
}

/**
* Removes [[View]] operators from the plan. The operator is respected till the end of analysis
* stage because we want to see which part of an analyzed logical plan is generated from a view.
*/
object EliminateView extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// The child should have the same output attributes with the View operator, so we simply
// remove the View operator.
case View(_, output, child) =>
assert(output == child.output,
s"The output of the child ${child.output.mkString("[", ",", "]")} is different from the " +
s"view output ${output.mkString("[", ",", "]")}")
case View(_, _, child) =>
child
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,9 @@ case class View(
output: Seq[Attribute],
child: LogicalPlan) extends LogicalPlan with MultiInstanceRelation {

@transient
override lazy val references: AttributeSet = AttributeSet.empty

override lazy val resolved: Boolean = child.resolved

override def children: Seq[LogicalPlan] = child :: Nil
Expand Down
31 changes: 31 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, SortAggregateExec}
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.execution.datasources.FilePartition
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, CartesianProductExec, SortMergeJoinExec}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -3003,6 +3004,36 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
}
}
}

test("SPARK-28156: self-join should not miss cached view") {
withTable("table1") {
withView("table1_vw") {
val df = Seq.tabulate(5) { x => (x, x + 1, x + 2, x + 3) }.toDF("a", "b", "c", "d")
df.write.mode("overwrite").format("orc").saveAsTable("table1")
sql("drop view if exists table1_vw")
sql("create view table1_vw as select * from table1")

val cachedView = sql("select a, b, c, d from table1_vw")

cachedView.createOrReplaceTempView("cachedview")
cachedView.persist()

val queryDf = sql(
s"""select leftside.a, leftside.b
|from cachedview leftside
|join cachedview rightside
|on leftside.a = rightside.a
""".stripMargin)

val inMemoryTableScan = queryDf.queryExecution.executedPlan.collect {
case i: InMemoryTableScanExec => i
}
assert(inMemoryTableScan.size == 2)
checkAnswer(queryDf, Row(0, 1) :: Row(1, 2) :: Row(2, 3) :: Row(3, 4) :: Row(4, 5) :: Nil)
}
}

}
}

case class Foo(bar: Option[String])