diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index a35efd96060f..012758439835 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import scala.util.control.NonFatal + import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.internal.{Logging, MessageWithContext} @@ -35,8 +37,7 @@ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.execution.columnar.InMemoryRelation import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.datasources.{FileIndex, HadoopFsRelation, LogicalRelation, LogicalRelationWithTable} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable} -import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil +import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, ExtractV2Table, FileTable, V2TableRefreshUtil} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -352,22 +353,35 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { } needToRecache.foreach { cd => cd.cachedRepresentation.cacheBuilder.clearCache() + tryRebuildCacheEntry(spark, cd).foreach { entry => + this.synchronized { + if (lookupCachedDataInternal(entry.plan).nonEmpty) { + logWarning("While recaching, data was already added to cache.") + } else { + cachedData = entry +: cachedData + CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + + log"${MDC(DATAFRAME_CACHE_ENTRY, entry)}") + } + } + } + } + } + + private def tryRebuildCacheEntry( + spark: SparkSession, + cd: CachedData): Option[CachedData] = { + try { val sessionWithConfigsOff = getOrCloneSessionWithConfigsOff(spark) val (newKey, newCache) = sessionWithConfigsOff.withActive { val refreshedPlan = V2TableRefreshUtil.refresh(cd.plan) val qe = sessionWithConfigsOff.sessionState.executePlan(refreshedPlan) qe.normalized -> InMemoryRelation(cd.cachedRepresentation.cacheBuilder, qe) } - val recomputedPlan = cd.copy(plan = newKey, cachedRepresentation = newCache) - this.synchronized { - if (lookupCachedDataInternal(recomputedPlan.plan).nonEmpty) { - logWarning("While recaching, data was already added to cache.") - } else { - cachedData = recomputedPlan +: cachedData - CacheManager.logCacheOperation(log"Re-cached Dataframe cache entry:" + - log"${MDC(DATAFRAME_CACHE_ENTRY, recomputedPlan)}") - } - } + Some(cd.copy(plan = newKey, cachedRepresentation = newCache)) + } catch { + case NonFatal(e) => + logWarning(log"Failed to recache query", e) + None } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index d41af4f1465e..950a116583cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSel import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, TableInfo} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} +import org.apache.spark.sql.connector.catalog.TableChange import org.apache.spark.sql.connector.expressions.{ApplyTransform, GeneralScalarExpression, LiteralValue, Transform} import org.apache.spark.sql.connector.expressions.filter.{AlwaysFalse, AlwaysTrue} import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} @@ -1640,6 +1641,38 @@ class DataSourceV2DataFrameSuite } } + test("SPARK-54424: inability to refresh cache shouldn't fail writes") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, value INT, category STRING) USING foo") + sql(s"INSERT INTO $t VALUES (1, 10, 'A'), (2, 20, 'B'), (3, 30, 'A')") + + // cache table + spark.table(t).cache() + + // verify caching works as expected + assertCached(spark.table(t)) + checkAnswer( + spark.table(t), + Seq(Row(1, 10, "A"), Row(2, 20, "B"), Row(3, 30, "A"))) + + // evolve table directly to mimic external changes + // these external changes make cached plan invalid (column is no longer there) + val change = TableChange.deleteColumn(Array("category"), false) + catalog("testcat").alterTable(ident, change) + + // refresh table is supposed to trigger recaching + spark.sql(s"REFRESH TABLE $t") + + // recaching is expected to fail but there should be no stale entries + assert(spark.sharedState.cacheManager.isEmpty) + + // verify latest schema and data are propagated + checkAnswer(spark.table(t), Seq(Row(1, 10), Row(2, 20), Row(3, 30))) + } + } + private def pinTable(catalogName: String, ident: Identifier, version: String): Unit = { catalog(catalogName) match { case inMemory: BasicInMemoryTableCatalog => inMemory.pinTable(ident, version)