From 2ec6e802ced95f2adc66ba306503761a1f492853 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 19 Nov 2025 15:02:00 -0800 Subject: [PATCH 1/2] [SPARK-54424][SQL] Failures during recaching must not fail operations --- .../spark/sql/execution/CacheManager.scala | 40 ++++++++++++++----- .../spark/sql/execution/QueryExecution.scala | 23 +++++++++-- .../datasources/v2/V2TableRefreshUtil.scala | 22 ---------- .../v2/WriteToDataSourceV2Exec.scala | 14 ++++--- .../DataSourceV2DataFrameSuite.scala | 33 +++++++++++++++ 5 files changed, 90 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 5a38751b61e1..c10dfc93545f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import scala.util.control.NonFatal + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.{Logging, MessageWithContext} @@ -374,22 +376,38 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } needToRecache.foreach { cd => cd.cachedRepresentation.cacheBuilder.clearCache() + tryRebuildCacheEntry(spark, cd).foreach { entry => + this.synchronized { + if (lookupCachedDataInternal(entry.plan).nonEmpty) { + logWarning("While recaching, data was already added to cache.") + } else { + cachedData = entry +: cachedData + CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}") + } + } + } + } + } + + private def tryRebuildCacheEntry( + spark: SparkSession, + cd: CachedData): Option[CachedData] = { + try { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val (newKey, newCache) = sessionWithConfigsOff.withActive { val refreshedPlan = V2TableRefreshUtil.refresh(sessionWithConfigsOff, cd.plan) - val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan) + val qe = QueryExecution.create( + sessionWithConfigsOff, + refreshedPlan, + refreshPhaseEnabled = false) qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe) } - val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = newCache) - this.synchronized { - if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) { - logWarning("While recaching, data was already added to cache.") - } else { - cachedData = recomputedPlan +: cachedData - CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + - log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}") - } - } + Some(cd.copy(plan = newKey, cachedRepresentation = newCache)) + } catch { + case NonFatal(e) => + logWarning(log"Failed to recache query", e) + None } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 26d2078791aa..3e0aef962e71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -66,7 +66,8 @@ class QueryExecution( val logical: LogicalPlan, val tracker: QueryPlanningTracker = new QueryPlanningTracker, val mode: CommandExecutionMode.Value = CommandExecutionMode.ALL, - val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup) extends Logging { + val shuffleCleanupMode: ShuffleCleanupMode = DoNotCleanup, + val refreshPhaseEnabled: Boolean = true) extends Logging { val id: Long = QueryExecution.nextExecutionId @@ -178,7 +179,7 @@ class QueryExecution( // for eagerly executed commands we mark this place as beginning of execution. tracker.setReadyForExecution() val qe = new QueryExecution(sparkSession, p, mode = mode, - shuffleCleanupMode = shuffleCleanupMode) + shuffleCleanupMode = shuffleCleanupMode, refreshPhaseEnabled = refreshPhaseEnabled) val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") { SQLExecution.withNewExecutionId(qe, Some(name)) { qe.executedPlan.executeCollect() @@ -207,7 +208,11 @@ class QueryExecution( // there may be delay between analysis and subsequent phases // therefore, refresh captured table versions to reflect latest data private val lazyTableVersionsRefreshed = LazyTry { - V2TableRefreshUtil.refresh(sparkSession, commandExecuted, versionedOnly = true) + if (refreshPhaseEnabled) { + V2TableRefreshUtil.refresh(sparkSession, commandExecuted, versionedOnly = true) + } else { + commandExecuted + } } private[sql] def tableVersionsRefreshed: LogicalPlan = lazyTableVersionsRefreshed.get @@ -569,6 +574,18 @@ object QueryExecution { private def nextExecutionId: Long = _nextExecutionId.getAndIncrement + private[execution] def create( + sparkSession: SparkSession, + logical: LogicalPlan, + refreshPhaseEnabled: Boolean = true): QueryExecution = { + new QueryExecution( + sparkSession, + logical, + mode = CommandExecutionMode.ALL, + shuffleCleanupMode = determineShuffleCleanupMode(sparkSession.sessionState.conf), + refreshPhaseEnabled = refreshPhaseEnabled) + } + /** * Construct a sequence of rules that are used to prepare a planned [[SparkPlan]] for execution. * These rules will make sure subqueries are planned, make sure the data partitioning and ordering diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala index 945ab122d54e..151329de9e6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala @@ -21,7 +21,6 @@ import scala.collection.mutable import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.SQLConfHelper -import org.apache.spark.sql.catalyst.analysis.AsOfVersion import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan} import org.apache.spark.sql.classic.SparkSession import org.apache.spark.sql.connector.catalog.{Identifier, Table, TableCatalog, V2TableUtil} @@ -32,27 +31,6 @@ import org.apache.spark.sql.util.SchemaValidationMode.ALLOW_NEW_FIELDS import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging { - /** - * Pins table versions for all versioned tables in the plan. - * - * This method captures the current version of each versioned table by adding time travel - * specifications. Tables that already have time travel specifications or are not versioned - * are left unchanged. - * - * @param plan the logical plan to pin versions for - * @return plan with pinned table versions - */ - def pinVersions(plan: LogicalPlan): LogicalPlan = { - plan transform { - case r @ ExtractV2CatalogAndIdentifier(catalog, ident) - if r.isVersioned && r.timeTravelSpec.isEmpty => - val tableName = V2TableUtil.toQualifiedName(catalog, ident) - val version = r.table.version - logDebug(s"Pinning table version for $tableName to $version") - r.copy(timeTravelSpec = Some(AsOfVersion(version))) - } - } - /** * Refreshes table metadata for tables in the plan. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala index 6c574be91ebf..75915d97ba4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2Exec.scala @@ -33,7 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.connector.metric.CustomMetric import org.apache.spark.sql.connector.write.{BatchWrite, DataWriter, DataWriterFactory, DeltaWrite, DeltaWriter, MergeSummaryImpl, PhysicalWriteInfoImpl, RowLevelOperationTable, Write, WriterCommitMessage, WriteSummary} import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.{SparkPlan, SQLExecution, UnaryExecNode} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan, SQLExecution, UnaryExecNode} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.joins.BaseJoinExec import org.apache.spark.sql.execution.metric.{CustomMetrics, SQLMetric, SQLMetrics} @@ -177,7 +177,6 @@ case class ReplaceTableAsSelectExec( query, versionedOnly = true, schemaValidationMode = PROHIBIT_CHANGES) - val pinnedQuery = V2TableRefreshUtil.pinVersions(refreshedQuery) if (catalog.tableExists(ident)) { invalidateCache(catalog, ident) catalog.dropTable(ident) @@ -185,13 +184,15 @@ case class ReplaceTableAsSelectExec( throw QueryCompilationErrors.cannotReplaceMissingTableError(ident) } val tableInfo = new TableInfo.Builder() - .withColumns(getV2Columns(pinnedQuery.schema, catalog.useNullableQuerySchema)) + .withColumns(getV2Columns(refreshedQuery.schema, catalog.useNullableQuerySchema)) .withPartitions(partitioning.toArray) .withProperties(properties.asJava) .build() val table = Option(catalog.createTable(ident, tableInfo)) .getOrElse(catalog.loadTable(ident, Set(TableWritePrivilege.INSERT).asJava)) - writeToTable(catalog, table, writeOptions, ident, pinnedQuery, overwrite = true) + writeToTable( + catalog, table, writeOptions, ident, refreshedQuery, + overwrite = true, refreshPhaseEnabled = false) } } @@ -764,7 +765,8 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { writeOptions: Map[String, String], ident: Identifier, query: LogicalPlan, - overwrite: Boolean): Seq[InternalRow] = { + overwrite: Boolean, + refreshPhaseEnabled: Boolean = true): Seq[InternalRow] = { Utils.tryWithSafeFinallyAndFailureCallbacks({ val relation = DataSourceV2Relation.create(table, Some(catalog), Some(ident)) val writeCommand = if (overwrite) { @@ -772,7 +774,7 @@ private[v2] trait V2CreateTableAsSelectBaseExec extends LeafV2CommandExec { } else { AppendData.byPosition(relation, query, writeOptions) } - val qe = session.sessionState.executePlan(writeCommand) + val qe = QueryExecution.create(session, writeCommand, refreshPhaseEnabled) qe.assertCommandExecuted() DataSourceV2Utils.commitStagedChanges(sparkContext, table, metrics) Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index d8d68f576e4e..a547a74a064b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, TableInfo} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.catalog.TableWritePrivilege import org.apache.spark.sql.connector.catalog.TruncatableTable import org.apache.spark.sql.connector.expressions.{ApplyTransform, GeneralScalarExpression, LiteralValue, Transform} @@ -1808,6 +1809,38 @@ class DataSourceV2DataFrameSuite } } + test("SPARK-54424: inability to refresh cache shouldn't fail writes") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache table + spark.table(t).cache() + + // verify caching works as expected + assertCached(spark.table(t)) + checkAnswer( + spark.table(t), + Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // evolve table directly to mimic external changes + // these external changes make cached plan invalid (column is no longer there) + val change = TableChange.deleteColumn(Array("category"), false) + catalog("testcat").alterTable(ident, change) + + // refresh table is supposed to trigger recaching + spark.sql(s"REFRESH TABLE $t") + + // recaching is expected to fail but there should be no stale entries + assert(spark.sharedState.cacheManager.isEmpty) + + // verify latest schema and data are propagated + checkAnswer(spark.table(t), Seq(Row(1, 10), Row(2, 20), Row(3, 30))) + } + } + private def pinTable(catalogName: String, ident: Identifier, version: String): Unit = { catalog(catalogName) match { case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, version) From b7e949b474c58f64441f702aba535023ad1fe704 Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Wed, 26 Nov 2025 13:38:57 -0800 Subject: [PATCH 2/2] Change approach --- .../spark/sql/execution/CacheManager.scala | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index c10dfc93545f..931bd703af76 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -390,23 +390,27 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } } - private def tryRebuildCacheEntry( - spark: SparkSession, - cd: CachedData): Option[CachedData] = { - try { - val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) - val (newKey, newCache) = sessionWithConfigsOff.withActive { - val refreshedPlan = V2TableRefreshUtil.refresh(sessionWithConfigsOff, cd.plan) + private def tryRebuildCacheEntry(spark: SparkSession, cd: CachedData): Option[CachedData] = { + val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) + sessionWithConfigsOff.withActive { + tryRefreshPlan(sessionWithConfigsOff, cd.plan).map { refreshedPlan => val qe = QueryExecution.create( sessionWithConfigsOff, refreshedPlan, refreshPhaseEnabled = false) - qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe) + val newKey = qe.normalized + val newCache = InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe) + cd.copy(plan = newKey, cachedRepresentation = newCache) } - Some(cd.copy(plan = newKey, cachedRepresentation = newCache)) + } + } + + private def tryRefreshPlan(spark: SparkSession, plan: LogicalPlan): Option[LogicalPlan] = { + try { + Some(V2TableRefreshUtil.refresh(spark, plan)) } catch { case NonFatal(e) => - logWarning(log"Failed to recache query", e) + logWarning(log"Failed to refresh plan", e) None } }