Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ object SimpleAnalyzer extends Analyzer(
FunctionRegistry.builtin,
TableFunctionRegistry.builtin) {
override def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = {}
})) {
}),
RelationCache.empty) {
override def resolver: Resolver = caseSensitiveResolution
}

Expand Down Expand Up @@ -285,11 +286,14 @@ object Analyzer {
* Provides a logical query plan analyzer, which translates [[UnresolvedAttribute]]s and
* [[UnresolvedRelation]]s into fully typed objects using information in a [[SessionCatalog]].
*/
class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor[LogicalPlan]
class Analyzer(
override val catalogManager: CatalogManager,
private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
extends RuleExecutor[LogicalPlan]
with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper {

private val v1SessionCatalog: SessionCatalog = catalogManager.v1SessionCatalog
private val relationResolution = new RelationResolution(catalogManager)
private val relationResolution = new RelationResolution(catalogManager, sharedRelationCache)
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)

override protected def validatePlanChanges(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

private[sql] trait RelationCache {
Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be any relation cache in the future.

def lookup(nameParts: Seq[String], resolver: Resolver): Option[LogicalPlan]
}

private[sql] object RelationCache {
val empty: RelationCache = (_, _) => None
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._

class RelationResolution(override val catalogManager: CatalogManager)
class RelationResolution(
override val catalogManager: CatalogManager,
sharedRelationCache: RelationCache)
extends DataTypeErrorsBase
with Logging
with LookupCatalog
Expand Down Expand Up @@ -118,36 +120,62 @@ class RelationResolution(override val catalogManager: CatalogManager)
val planId = u.getTagValue(LogicalPlan.PLAN_ID_TAG)
relationCache
.get(key)
.map { cache =>
val cachedRelation = cache.transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}
cloneWithPlanId(cachedRelation, planId)
}
.map(adaptCachedRelation(_, planId))
.orElse {
val writePrivilegesString =
Option(u.options.get(UnresolvedRelation.REQUIRED_WRITE_PRIVILEGES))
val table =
CatalogV2Util.loadTable(catalog, ident, finalTimeTravelSpec, writePrivilegesString)
val loaded = createRelation(
val writePrivileges = u.options.get(UnresolvedRelation.REQUIRED_WRITE_PRIVILEGES)
val finalOptions = u.clearWritePrivileges.options
val table = CatalogV2Util.loadTable(
catalog,
ident,
table,
u.clearWritePrivileges.options,
u.isStreaming,
finalTimeTravelSpec
)
loaded.foreach(relationCache.update(key, _))
loaded.map(cloneWithPlanId(_, planId))
}
finalTimeTravelSpec,
Option(writePrivileges))

val sharedRelationCacheMatch = for {
t <- table
if finalTimeTravelSpec.isEmpty && writePrivileges == null && !u.isStreaming
cached <- lookupSharedRelationCache(catalog, ident, t)
} yield {
val updatedRelation = cached.copy(options = finalOptions)
val nameParts = ident.toQualifiedNameParts(catalog)
val aliasedRelation = SubqueryAlias(nameParts, updatedRelation)
relationCache.update(key, aliasedRelation)
adaptCachedRelation(aliasedRelation, planId)
}

sharedRelationCacheMatch.orElse {
val loaded = createRelation(
catalog,
ident,
table,
finalOptions,
u.isStreaming,
finalTimeTravelSpec)
loaded.foreach(relationCache.update(key, _))
loaded.map(cloneWithPlanId(_, planId))
}
}
case _ => None
}
}
}

private def lookupSharedRelationCache(
catalog: CatalogPlugin,
ident: Identifier,
table: Table): Option[DataSourceV2Relation] = {
CatalogV2Util.lookupCachedRelation(sharedRelationCache, catalog, ident, table, conf)
}

private def adaptCachedRelation(cached: LogicalPlan, planId: Option[Long]): LogicalPlan = {
val plan = cached transform {
case multi: MultiInstanceRelation =>
val newRelation = multi.newInstance()
newRelation.copyTagsFrom(multi)
newRelation
}
cloneWithPlanId(plan, planId)
}

private def createRelation(
catalog: CatalogPlugin,
ident: Identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ object HybridAnalyzer {
resolverGuard = new ResolverGuard(legacyAnalyzer.catalogManager),
resolver = new Resolver(
catalogManager = legacyAnalyzer.catalogManager,
sharedRelationCache = legacyAnalyzer.sharedRelationCache,
extensions = legacyAnalyzer.singlePassResolverExtensions,
metadataResolverExtensions = legacyAnalyzer.singlePassMetadataResolverExtensions,
externalRelationResolution = Some(legacyAnalyzer.getRelationResolution)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.sql.catalyst.analysis.{
AnalysisErrorAt,
FunctionResolution,
MultiInstanceRelation,
RelationCache,
RelationResolution,
ResolvedInlineTable,
UnresolvedHaving,
Expand Down Expand Up @@ -71,6 +72,7 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
*/
class Resolver(
catalogManager: CatalogManager,
sharedRelationCache: RelationCache = RelationCache.empty,
override val extensions: Seq[ResolverExtension] = Seq.empty,
metadataResolverExtensions: Seq[ResolverExtension] = Seq.empty,
externalRelationResolution: Option[RelationResolution] = None)
Expand All @@ -81,8 +83,9 @@ class Resolver(
private val cteRegistry = new CteRegistry
private val subqueryRegistry = new SubqueryRegistry
private val identifierAndCteSubstitutor = new IdentifierAndCteSubstitutor
private val relationResolution =
externalRelationResolution.getOrElse(Resolver.createRelationResolution(catalogManager))
private val relationResolution = externalRelationResolution.getOrElse {
Resolver.createRelationResolution(catalogManager, sharedRelationCache)
}
private val functionResolution = new FunctionResolution(catalogManager, relationResolution)
private val expressionResolver = new ExpressionResolver(this, functionResolution, planLogger)
private val aggregateResolver = new AggregateResolver(this, expressionResolver)
Expand Down Expand Up @@ -788,7 +791,9 @@ object Resolver {
/**
* Create a new instance of the [[RelationResolution]].
*/
def createRelationResolution(catalogManager: CatalogManager): RelationResolution = {
new RelationResolution(catalogManager)
def createRelationResolution(
catalogManager: CatalogManager,
sharedRelationCache: RelationCache = RelationCache.empty): RelationResolution = {
new RelationResolution(catalogManager, sharedRelationCache)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.sql.connector.write.RowLevelOperation.Command.{DELETE, M
import org.apache.spark.sql.errors.DataTypeErrors.toSQLType
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table}
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, AtomicType, BooleanType, DataType, IntegerType, MapType, MetadataBuilder, StringType, StructField, StructType}
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -689,12 +688,7 @@ case class ReplaceTableAsSelect(
extends V2CreateTableAsSelectPlan {

override def markAsAnalyzed(ac: AnalysisContext): LogicalPlan = {
// RTAS may drop and recreate table before query execution, breaking self-references
// refresh and pin versions here to read from original table versions instead of
// newly created empty table that is meant to serve as target for append/overwrite
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we can remove this now?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Nov 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We simply moved this to exec node as I no longer have access to SparkSession in catalyst. We can't do this refresh without checking cached relations as it may potentially hit the metastore and move the version inconsistently.

val refreshedQuery = V2TableRefreshUtil.refresh(query, versionedOnly = true)
val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
copy(query = pinnedQuery, isAnalyzed = true)
copy(isAnalyzed = true)
}

override def withPartitioning(rewritten: Seq[Transform]): V2CreateTablePlan = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.CurrentUserContext
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, TimeTravelSpec}
import org.apache.spark.sql.catalyst.analysis.{AsOfTimestamp, AsOfVersion, NamedRelation, NoSuchDatabaseException, NoSuchFunctionException, NoSuchTableException, RelationCache, TimeTravelSpec}
import org.apache.spark.sql.catalyst.catalog.ClusterBySpec
import org.apache.spark.sql.catalyst.expressions.{Expression, Literal, V2ExpressionUtils}
import org.apache.spark.sql.catalyst.plans.logical.{SerdeInfo, TableSpec}
Expand All @@ -36,6 +36,7 @@ import org.apache.spark.sql.connector.catalog.constraints.Constraint
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.connector.expressions.{ClusterByTransform, LiteralValue, Transform}
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, Metadata, MetadataBuilder, StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
Expand Down Expand Up @@ -497,6 +498,27 @@ private[sql] object CatalogV2Util {
loadTable(catalog, ident).map(DataSourceV2Relation.create(_, Some(catalog), Some(ident)))
}

def isSameTable(
rel: DataSourceV2Relation,
catalog: CatalogPlugin,
ident: Identifier,
table: Table): Boolean = {
rel.catalog.contains(catalog) && rel.identifier.contains(ident) && rel.table.id == table.id
}

def lookupCachedRelation(
cache: RelationCache,
catalog: CatalogPlugin,
ident: Identifier,
table: Table,
conf: SQLConf): Option[DataSourceV2Relation] = {
val nameParts = ident.toQualifiedNameParts(catalog)
val cached = cache.lookup(nameParts, conf.resolver)
cached.collect {
case r: DataSourceV2Relation if isSameTable(r, catalog, ident, table) => r
}
}

def isSessionCatalog(catalog: CatalogPlugin): Boolean = {
catalog.name().equalsIgnoreCase(CatalogManager.SESSION_CATALOG_NAME)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.{Logging, MessageWithContext}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.analysis.V2TableReference
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.expressions.{Attribute, SubqueryExpression}
Expand All @@ -30,13 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPla
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.classic.{Dataset, SparkSession}
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.IdentifierHelper
import org.apache.spark.sql.connector.catalog.CatalogPlugin
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{IdentifierHelper, MultipartIdentifierHelper}
import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable}
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2CatalogAndIdentifier, ExtractV2Table, FileTable, V2TableRefreshUtil}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
Expand Down Expand Up @@ -240,31 +242,51 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
name: Seq[String],
conf: SQLConf,
includeTimeTravel: Boolean): Boolean = {
def isSameName(nameInCache: Seq[String]): Boolean = {
nameInCache.length == name.length && nameInCache.zip(name).forall(conf.resolver.tupled)
}
isMatchedTableOrView(plan, name, conf.resolver, includeTimeTravel)
}

private def isMatchedTableOrView(
plan: LogicalPlan,
name: Seq[String],
resolver: Resolver,
includeTimeTravel: Boolean): Boolean = {

EliminateSubqueryAliases(plan) match {
case LogicalRelationWithTable(_, Some(catalogTable)) =>
isSameName(catalogTable.identifier.nameParts)
isSameName(name, catalogTable.identifier.nameParts, resolver)

case DataSourceV2Relation(_, _, Some(catalog), Some(v2Ident), _, timeTravelSpec) =>
val nameInCache = v2Ident.toQualifiedNameParts(catalog)
isSameName(nameInCache) && (includeTimeTravel || timeTravelSpec.isEmpty)
isSameName(name, nameInCache, resolver) && (includeTimeTravel || timeTravelSpec.isEmpty)

case r: V2TableReference =>
isSameName(r.identifier.toQualifiedNameParts(r.catalog))
isSameName(name, r.identifier.toQualifiedNameParts(r.catalog), resolver)

case v: View =>
isSameName(v.desc.identifier.nameParts)
isSameName(name, v.desc.identifier.nameParts, resolver)

case HiveTableRelation(catalogTable, _, _, _, _) =>
isSameName(catalogTable.identifier.nameParts)
isSameName(name, catalogTable.identifier.nameParts, resolver)

case _ => false
}
}

private def isSameName(
name: Seq[String],
catalog: CatalogPlugin,
ident: Identifier,
resolver: Resolver): Boolean = {
isSameName(name, ident.toQualifiedNameParts(catalog), resolver)
}

private def isSameName(
name: Seq[String],
nameInCache: Seq[String],
resolver: Resolver): Boolean = {
nameInCache.length == name.length && nameInCache.zip(name).forall(resolver.tupled)
}

private def uncacheByCondition(
spark: SparkSession,
isMatchedPlan: LogicalPlan => Boolean,
Expand Down Expand Up @@ -354,7 +376,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
cd.cachedRepresentation.cacheBuilder.clearCache()
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val (newKey, newCache) = sessionWithConfigsOff.withActive {
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
val refreshedPlan = V2TableRefreshUtil.refresh(sessionWithConfigsOff, cd.plan)
val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
}
Expand All @@ -371,6 +393,35 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
}
}

private[sql] def lookupCachedTable(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's make it more straightforward: lookupCachedTableByName

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aokolnychyi This is my last comment for this PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gengliangwang, I tested the rename locally, but not sure it would make the code any clearer. In fact, it only creates inconsistencies with other methods that accept name in args but don't have xxxByName suffix.

I feel lookupCachedTable(name, resolver) is already pretty clear. What do you think?

name: Seq[String],
resolver: Resolver): Option[LogicalPlan] = {
val cachedRelations = findCachedRelations(name, resolver)
cachedRelations match {
case cachedRelation +: _ =>
CacheManager.logCacheOperation(
log"Relation cache hit for table ${MDC(TABLE_NAME, name.quoted)}")
Some(cachedRelation)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm we just return the first match? Shall we use the scan with the latest table version?

Copy link
Contributor Author

@aokolnychyi aokolnychyi Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we shouldn't have multiple matching relations after this change, but cachedData is IndexedSeq to which we always prepend entries (so newer entries are at the beginning of the sequence). We don't know which version of the table is newer cause they are strings. In Iceberg, for instance, they are random UUIDs. That said, this piece should always take the newest matching entry but we expect to only have one.

case _ =>
None
}
}

private def findCachedRelations(
name: Seq[String],
resolver: Resolver): Seq[LogicalPlan] = {
cachedData.flatMap { cd =>
val plan = EliminateSubqueryAliases(cd.plan)
plan match {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
if isSameName(name, catalog, ident, resolver) && r.timeTravelSpec.isEmpty =>
Some(r)
case _ =>
None
}
}
}

/**
* Optionally returns cached data for the given [[Dataset]]
*/
Expand Down
Loading