Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ case class ReplaceTableAsSelect(
// RTAS may drop and recreate table before query execution, breaking self-references
// refresh and pin versions here to read from original table versions instead of
// newly created empty table that is meant to serve as target for append/overwrite
val refreshedQuery = V2TableRefreshUtil.refreshVersions(query)
val refreshedQuery = V2TableRefreshUtil.refresh(query, versionedOnly = true)
val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery)
copy(query = pinnedQuery, isAnalyzed = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ case class DataSourceV2Relation(

def autoSchemaEvolution(): Boolean =
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)

def isVersioned: Boolean = table.currentVersion != null
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
def pinVersions(plan: LogicalPlan): LogicalPlan = {
plan transform {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
if r.table.currentVersion != null && r.timeTravelSpec.isEmpty =>
if r.isVersioned && r.timeTravelSpec.isEmpty =>
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
val version = r.table.currentVersion
logDebug(s"Pinning table version for $tableName to $version")
Expand All @@ -49,21 +49,25 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
}

/**
* Refreshes table metadata for all versioned tables in the plan.
* Refreshes table metadata for tables in the plan.
*
* This method reloads table metadata from the catalog and validates:
* - Table identity: Ensures table ID has not changed
* - Data columns: Verifies captured columns match the current schema
* - Metadata columns: Checks metadata column consistency
*
* Tables with time travel specifications are skipped as they reference a specific point
* in time and don't have to be refreshed.
*
* @param plan the logical plan to refresh
* @param versionedOnly indicates whether to refresh only versioned tables
* @return plan with refreshed table metadata
*/
def refreshVersions(plan: LogicalPlan): LogicalPlan = {
def refresh(plan: LogicalPlan, versionedOnly: Boolean = false): LogicalPlan = {
val cache = mutable.HashMap.empty[(TableCatalog, Identifier), Table]
plan transform {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
if r.table.currentVersion != null && r.timeTravelSpec.isEmpty =>
if (r.isVersioned || !versionedOnly) && r.timeTravelSpec.isEmpty =>
val currentTable = cache.getOrElseUpdate((catalog, ident), {
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
logDebug(s"Refreshing table metadata for $tableName")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable}
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable}
import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
Expand Down Expand Up @@ -352,11 +353,12 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper {
needToRecache.foreach { cd =>
cd.cachedRepresentation.cacheBuilder.clearCache()
val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark)
val newCache = sessionWithConfigsOff.withActive {
val qe = sessionWithConfigsOff.sessionState.executePlan(cd.plan)
InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
val (newKey, newCache) = sessionWithConfigsOff.withActive {
val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan)
val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan)
qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe)
}
val recomputedPlan = cd.copy(cachedRepresentation = newCache)
val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = newCache)
this.synchronized {
if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) {
logWarning("While recaching, data was already added to cache.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class QueryExecution(
// there may be delay between analysis and subsequent phases
// therefore, refresh captured table versions to reflect latest data
private val lazyTableVersionsRefreshed = LazyTry {
V2TableRefreshUtil.refreshVersions(commandExecuted)
V2TableRefreshUtil.refresh(commandExecuted, versionedOnly = true)
}

private[sql] def tableVersionsRefreshed: LogicalPlan = lazyTableVersionsRefreshed.get
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -406,11 +406,12 @@ object InMemoryRelation {
def apply(cacheBuilder: CachedRDDBuilder, qe: QueryExecution): InMemoryRelation = {
val optimizedPlan = qe.optimizedPlan
val serializer = cacheBuilder.serializer
val newBuilder = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
cacheBuilder.copy(cachedPlan = serializer.convertToColumnarPlanIfPossible(qe.executedPlan))
val newCachedPlan = if (serializer.supportsColumnarInput(optimizedPlan.output)) {
serializer.convertToColumnarPlanIfPossible(qe.executedPlan)
} else {
cacheBuilder.copy(cachedPlan = qe.executedPlan)
qe.executedPlan
}
val newBuilder = cacheBuilder.copy(cachedPlan = newCachedPlan, logicalPlan = qe.logical)
Comment on lines +409 to +414
Copy link
Member

@viirya viirya Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to refreshing cache of DSv2 tables?

This looks no difference to previous newCachedPlan but only logicalPlan = qe.logical? Is it related to the issue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is formatting. The real change is logicalPlan = qe.logical.

val relation = new InMemoryRelation(
newBuilder.cachedPlan.output, newBuilder, optimizedPlan.outputOrdering)
relation.statsOfPlanToCache = optimizedPlan.stats
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class CachedTableSuite extends QueryTest with SQLTestUtils

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryCatalog].getName)
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This config is important. A bunch of tests in this suite worked only because we reused the same table instance everywhere. With this config, there were test failures before this PR.


setupTestData()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1592,6 +1592,30 @@ class DataSourceV2DataFrameSuite
}
}

test("cached DSv2 table DataFrame is refreshed and reused after insert") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id bigint, data string) USING foo")
val df1 = Seq((1L, "a"), (2L, "b")).toDF("id", "data")
df1.write.insertInto(t)

// cache DataFrame pointing to table
val readDF1 = spark.table(t)
readDF1.cache()
assertCached(readDF1)
checkAnswer(readDF1, Seq(Row(1L, "a"), Row(2L, "b")))

// insert more data, invalidating and refreshing cache entry
val df2 = Seq((3L, "c"), (4L, "d")).toDF("id", "data")
df2.write.insertInto(t)

// verify underlying plan is recached and picks up new data
val readDF2 = spark.table(t)
assertCached(readDF2)
checkAnswer(readDF2, Seq(Row(1L, "a"), Row(2L, "b"), Row(3L, "c"), Row(4L, "d")))
}
}

private def pinTable(catalogName: String, ident: Identifier, version: String): Unit = {
catalog(catalogName) match {
case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, version)
Expand Down