Skip to content

Commit a9251b6

Browse files
committed
[SPARK-53924] Reload DSv2 tables in views created using plans on each view access
1 parent 7599b2f commit a9251b6

File tree

10 files changed

+521
-2
lines changed

10 files changed

+521
-2
lines changed

common/utils/src/main/resources/error/error-conditions.json

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2128,6 +2128,15 @@
21282128
],
21292129
"sqlState" : "42000"
21302130
},
2131+
"INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" : {
2132+
"message" : [
2133+
"View <viewName> plan references table <tableName> whose <colType> columns changed since the view plan was initially captured.",
2134+
"Column changes:",
2135+
"<errors>",
2136+
"This indicates the table has evolved and the view based on the plan must be recreated."
2137+
],
2138+
"sqlState" : "51024"
2139+
},
21312140
"INCOMPATIBLE_COLUMN_TYPE" : {
21322141
"message" : [
21332142
"<operator> can only be performed on tables with compatible column types. The <columnOrdinalNumber> column of the <tableOrdinalNumber> table is <dataType1> type which is not compatible with <dataType2> at the same column of the first table.<hint>."

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,9 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
12241224
case u: UnresolvedRelation =>
12251225
resolveRelation(u).map(resolveViews(_, u.options)).getOrElse(u)
12261226

1227+
case r: TableReference =>
1228+
relationResolution.resolveReference(r)
1229+
12271230
case r @ RelationTimeTravel(u: UnresolvedRelation, timestamp, version)
12281231
if timestamp.forall(ts => ts.resolved && !SubqueryExpression.hasSubquery(ts)) =>
12291232
val timeTravelSpec = TimeTravelSpec.create(timestamp, version, conf.sessionLocalTimeZone)

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,44 @@ class RelationResolution(override val catalogManager: CatalogManager)
225225
}
226226
}
227227

228+
def resolveReference(ref: TableReference): LogicalPlan = {
229+
val relation = getOrLoadRelation(ref)
230+
val planId = ref.getTagValue(LogicalPlan.PLAN_ID_TAG)
231+
cloneWithPlanId(relation, planId)
232+
}
233+
234+
private def getOrLoadRelation(ref: TableReference): LogicalPlan = {
235+
val key = toCacheKey(ref.catalog, ref.identifier)
236+
relationCache.get(key) match {
237+
case Some(cached) =>
238+
adaptCachedRelation(cached, ref)
239+
case None =>
240+
val relation = loadRelation(ref)
241+
relationCache.update(key, relation)
242+
relation
243+
}
244+
}
245+
246+
private def loadRelation(ref: TableReference): LogicalPlan = {
247+
val table = ref.catalog.loadTable(ref.identifier)
248+
TableReferenceUtils.validateLoadedTable(table, ref)
249+
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
250+
SubqueryAlias(tableName, ref.toRelation(table))
251+
}
252+
253+
private def adaptCachedRelation(cached: LogicalPlan, ref: TableReference): LogicalPlan = {
254+
cached transform {
255+
case r: DataSourceV2Relation if matchesReference(r, ref) =>
256+
r.copy(output = ref.output, options = ref.options)
257+
}
258+
}
259+
260+
private def matchesReference(
261+
relation: DataSourceV2Relation,
262+
ref: TableReference): Boolean = {
263+
relation.catalog.contains(ref.catalog) && relation.identifier.contains(ref.identifier)
264+
}
265+
228266
private def isResolvingView: Boolean = AnalysisContext.get.catalogAndNamespace.nonEmpty
229267

230268
private def isReferredTempViewName(nameParts: Seq[String]): Boolean = {
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.catalyst.analysis
19+
20+
import org.apache.spark.SparkException
21+
import org.apache.spark.sql.catalyst.SQLConfHelper
22+
import org.apache.spark.sql.catalyst.analysis.TableReference.Context
23+
import org.apache.spark.sql.catalyst.analysis.TableReference.TableInfo
24+
import org.apache.spark.sql.catalyst.analysis.TableReference.TemporaryViewContext
25+
import org.apache.spark.sql.catalyst.expressions.AttributeReference
26+
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
27+
import org.apache.spark.sql.catalyst.plans.logical.Statistics
28+
import org.apache.spark.sql.catalyst.util.truncatedString
29+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper
30+
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
31+
import org.apache.spark.sql.connector.catalog.Column
32+
import org.apache.spark.sql.connector.catalog.Identifier
33+
import org.apache.spark.sql.connector.catalog.MetadataColumn
34+
import org.apache.spark.sql.connector.catalog.Table
35+
import org.apache.spark.sql.connector.catalog.TableCatalog
36+
import org.apache.spark.sql.connector.catalog.V2TableUtil
37+
import org.apache.spark.sql.errors.QueryCompilationErrors
38+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
39+
import org.apache.spark.sql.util.CaseInsensitiveStringMap
40+
import org.apache.spark.util.ArrayImplicits._
41+
42+
case class TableReference private (
43+
catalog: TableCatalog,
44+
identifier: Identifier,
45+
options: CaseInsensitiveStringMap,
46+
info: TableInfo,
47+
output: Seq[AttributeReference],
48+
context: Context)
49+
extends LeafNode with MultiInstanceRelation with NamedRelation {
50+
51+
override def name: String = V2TableUtil.toQualifiedName(catalog, identifier)
52+
53+
override def newInstance(): TableReference = {
54+
copy(output = output.map(_.newInstance()))
55+
}
56+
57+
override def computeStats(): Statistics = Statistics.DUMMY
58+
59+
override def simpleString(maxFields: Int): String = {
60+
val outputString = truncatedString(output, "[", ", ", "]", maxFields)
61+
s"TableReference$outputString $name"
62+
}
63+
64+
def toRelation(table: Table): DataSourceV2Relation = {
65+
DataSourceV2Relation(table, output, Some(catalog), Some(identifier), options)
66+
}
67+
}
68+
69+
object TableReference {
70+
71+
case class TableInfo(
72+
columns: Seq[Column],
73+
metadataColumns: Seq[MetadataColumn])
74+
75+
sealed trait Context
76+
case class TemporaryViewContext(viewName: Seq[String]) extends Context
77+
78+
def createForTempView(relation: DataSourceV2Relation, viewName: Seq[String]): TableReference = {
79+
create(relation, TemporaryViewContext(viewName))
80+
}
81+
82+
private def create(relation: DataSourceV2Relation, context: Context): TableReference = {
83+
val ref = TableReference(
84+
relation.catalog.get.asTableCatalog,
85+
relation.identifier.get,
86+
relation.options,
87+
TableInfo(
88+
columns = relation.table.columns.toImmutableArraySeq,
89+
metadataColumns = V2TableUtil.extractMetadataColumns(relation)),
90+
relation.output,
91+
context)
92+
ref.copyTagsFrom(relation)
93+
ref
94+
}
95+
}
96+
97+
object TableReferenceUtils extends SQLConfHelper {
98+
99+
def validateLoadedTable(table: Table, ref: TableReference): Unit = {
100+
ref.context match {
101+
case ctx: TemporaryViewContext =>
102+
validateLoadedTableInTempView(table, ref, ctx)
103+
case ctx =>
104+
throw SparkException.internalError(s"Unknown table ref context: ${ctx.getClass.getName}")
105+
}
106+
}
107+
108+
private def validateLoadedTableInTempView(
109+
table: Table,
110+
ref: TableReference,
111+
ctx: TemporaryViewContext): Unit = {
112+
val tableName = ref.identifier.toQualifiedNameParts(ref.catalog)
113+
114+
val dataErrors = V2TableUtil.validateCapturedColumns(table, ref.info.columns)
115+
if (dataErrors.nonEmpty) {
116+
throw QueryCompilationErrors.columnsChangedAfterViewWithPlanCreation(
117+
ctx.viewName,
118+
tableName,
119+
dataErrors)
120+
}
121+
122+
val metaErrors = V2TableUtil.validateCapturedMetadataColumns(table, ref.info.metadataColumns)
123+
if (metaErrors.nonEmpty) {
124+
throw QueryCompilationErrors.metadataColumnsChangedAfterViewWithPlanCreation(
125+
ctx.viewName,
126+
tableName,
127+
metaErrors)
128+
}
129+
}
130+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/V2TableUtil.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ private[sql] object V2TableUtil extends SQLConfHelper {
9797
}
9898

9999
// extracts original column info for all metadata attributes in relation
100-
private def extractMetadataColumns(relation: DataSourceV2Relation): Seq[MetadataColumn] = {
100+
def extractMetadataColumns(relation: DataSourceV2Relation): Seq[MetadataColumn] = {
101101
val metaAttrs = relation.output.filter(_.isMetadataCol)
102102
if (metaAttrs.nonEmpty) {
103103
val metaCols = metadataColumns(relation.table)

sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4466,4 +4466,30 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat
44664466
)
44674467
)
44684468
}
4469+
4470+
def columnsChangedAfterViewWithPlanCreation(
4471+
viewName: Seq[String],
4472+
tableName: Seq[String],
4473+
errors: Seq[String]): Throwable = {
4474+
new AnalysisException(
4475+
errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
4476+
messageParameters = Map(
4477+
"viewName" -> toSQLId(viewName),
4478+
"tableName" -> toSQLId(tableName),
4479+
"colType" -> "data",
4480+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
4481+
}
4482+
4483+
def metadataColumnsChangedAfterViewWithPlanCreation(
4484+
viewName: Seq[String],
4485+
tableName: Seq[String],
4486+
errors: Seq[String]): Throwable = {
4487+
new AnalysisException(
4488+
errorClass = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION",
4489+
messageParameters = Map(
4490+
"viewName" -> toSQLId(viewName),
4491+
"tableName" -> toSQLId(tableName),
4492+
"colType" -> "metadata",
4493+
"errors" -> errors.mkString("\n- ", "\n- ", "")))
4494+
}
44694495
}

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2222
import org.apache.spark.internal.{Logging, MessageWithContext}
2323
import org.apache.spark.internal.LogKeys._
2424
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
25+
import org.apache.spark.sql.catalyst.analysis.TableReference
2526
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
2627
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
2728
import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
@@ -250,6 +251,9 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
250251
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
251252
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)
252253

254+
case r: TableReference =>
255+
isSameName(r.identifier.toQualifiedNameParts(r.catalog))
256+
253257
case v: View =>
254258
isSameName(v.desc.identifier.nameParts)
255259

sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import org.apache.spark.internal.Logging
2727
import org.apache.spark.sql.{Row, SparkSession}
2828
import org.apache.spark.sql.catalyst.{CapturesConfig, SQLConfHelper, TableIdentifier}
2929
import org.apache.spark.sql.catalyst.analysis.{AnalysisContext, GlobalTempView, LocalTempView, SchemaEvolution, SchemaUnsupported, ViewSchemaMode, ViewType}
30+
import org.apache.spark.sql.catalyst.analysis.TableReference
3031
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, TemporaryViewRelation}
3132
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, SubqueryExpression, VariableReference}
3233
import org.apache.spark.sql.catalyst.plans.logical.{AnalysisOnlyCommand, CreateTempView, CTEInChildren, CTERelationDef, LogicalPlan, Project, View, WithCTE}
3334
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
3435
import org.apache.spark.sql.classic.ClassicConversions.castToImpl
3536
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.NamespaceHelper
3637
import org.apache.spark.sql.errors.QueryCompilationErrors
38+
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
3739
import org.apache.spark.sql.internal.StaticSQLConf
3840
import org.apache.spark.sql.types.{MetadataBuilder, StructType}
3941
import org.apache.spark.sql.util.SchemaUtils
@@ -733,7 +735,17 @@ object ViewHelper extends SQLConfHelper with Logging with CapturesConfig {
733735
} else {
734736
TemporaryViewRelation(
735737
prepareTemporaryViewStoringAnalyzedPlan(name, aliasedPlan, defaultCollation),
736-
Some(aliasedPlan))
738+
Some(prepareTemporaryViewPlan(name, aliasedPlan)))
739+
}
740+
}
741+
742+
private def prepareTemporaryViewPlan(
743+
viewName: TableIdentifier,
744+
plan: LogicalPlan): LogicalPlan = {
745+
plan transform {
746+
case r: DataSourceV2Relation
747+
if r.catalog.isDefined && r.identifier.isDefined && r.timeTravelSpec.isEmpty =>
748+
TableReference.createForTempView(r, viewName.nameParts)
737749
}
738750
}
739751

sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2457,6 +2457,63 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
24572457
}
24582458
}
24592459

2460+
test("SPARK-53924: insert into DSv2 table invalidates cache of SQL temp views with plans") {
2461+
val t = "testcat.tbl"
2462+
withTable(t, "v") {
2463+
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
2464+
sql(s"CREATE TABLE $t (id int, data string) USING foo")
2465+
sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
2466+
2467+
// create and cache SQL temp view
2468+
sql(s"CREATE TEMPORARY VIEW v AS SELECT id FROM $t")
2469+
sql("SELECT * FROM v").cache()
2470+
2471+
// verify view is cached
2472+
assertCached(sql("SELECT * FROM v"))
2473+
checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
2474+
2475+
// insert data into base table
2476+
sql(s"INSERT INTO $t VALUES (3, 'c'), (4, 'd')")
2477+
2478+
// verify cache was refreshed and will pick up new data
2479+
checkCacheLoading(sql(s"SELECT * FROM v"), isLoaded = false)
2480+
2481+
// verify view is recached correctly
2482+
assertCached(sql("SELECT * FROM v"))
2483+
checkAnswer(
2484+
sql("SELECT * FROM v"),
2485+
Seq(Row(1), Row(2), Row(3), Row(4)))
2486+
}
2487+
}
2488+
}
2489+
2490+
test("SPARK-53924: uncache DSv2 table using SQL uncaches SQL temp views with plans") {
2491+
val t = "testcat.tbl"
2492+
withTable(t, "v") {
2493+
withSQLConf(SQLConf.STORE_ANALYZED_PLAN_FOR_VIEW.key -> "true") {
2494+
sql(s"CREATE TABLE $t (id int, data string) USING foo")
2495+
sql(s"INSERT INTO $t VALUES (1, 'a'), (2, 'b')")
2496+
2497+
// cache table
2498+
sql(s"CACHE TABLE $t")
2499+
assertCached(sql(s"SELECT * FROM $t"))
2500+
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, "a"), Row(2, "b")))
2501+
2502+
// create and cache SQL temp view
2503+
sql(s"CREATE TEMPORARY VIEW v AS SELECT id FROM $t")
2504+
sql("SELECT * FROM v").cache()
2505+
assertCached(sql("SELECT * FROM v"))
2506+
checkAnswer(sql("SELECT * FROM v"), Seq(Row(1), Row(2)))
2507+
2508+
// uncache table must invalidate view cache (cascading)
2509+
sql(s"UNCACHE TABLE $t")
2510+
2511+
// verify view is not cached anymore
2512+
assertNotCached(sql("SELECT * FROM v"))
2513+
}
2514+
}
2515+
}
2516+
24602517
test("uncache persistent table via catalog API") {
24612518
withTable("tbl1") {
24622519
sql("CREATE TABLE tbl1 (name STRING, age INT) USING parquet")

0 commit comments

Comments
 (0)