From 51b022e515e2a30e26b7369c02e28758760d197d Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 22 Apr 2026 12:51:03 +0000 Subject: [PATCH 01/23] [SPARK-XXXXX][CONNECT] Add DSv2 CACHE TABLE cross-session tests for Spark Connect Add DataSourceV2CacheConnectSuite that tests CACHE TABLE behavior with two Connect sessions. Session 1 caches and reads; session 2 acts as an external writer. Uses SharedInMemoryTableCatalog so both sessions share the same underlying table data via a static ConcurrentHashMap. Tests cover all five CACHE TABLE scenarios from the design doc: - S1: external data write after CACHE TABLE - S2: session write then external write - S3: external schema change (ADD COLUMN) - S4: session schema change then external write - S5: external drop and recreate table - REFRESH TABLE and UNCACHE TABLE interactions Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 239 ++++++++++++++++++ .../sql/connect/test/RemoteSparkSession.scala | 4 + 2 files changed, 243 insertions(+) create mode 100644 sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala new file mode 100644 index 000000000000..419996f7d4dd --- /dev/null +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import org.apache.spark.sql.Row +import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession, SQLHelper} +import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirExists +import org.apache.spark.sql.connect.test.SparkConnectServerUtils + +/** + * DSv2 CACHE TABLE tests for Spark Connect using two sessions. + * + * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that + * both sessions share the same underlying table data via a static + * ConcurrentHashMap. Session 1 caches and reads; session 2 acts as an + * external writer. + * + * All sessions on the same server share one CacheManager (via SharedState). + * DSv2 writes trigger refreshCache() which recaches on the shared + * CacheManager. These tests document the observable behavior for each + * design doc CACHE scenario. + */ +class DataSourceV2CacheConnectSuite + extends QueryTest with RemoteSparkSession with SQLHelper { + + private val T = "sharedcat.ns.tbl" + + private def assumeCanRun(): Unit = { + assume(spark != null && isAssemblyJarsDirExists, + "Spark Connect server not available") + } + + private def setupTable(): Unit = { + spark.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") + spark.sql(s"INSERT INTO $T VALUES (1, 100)") + } + + private def withSession2(f: SparkSession => Unit): Unit = { + val session2 = SparkConnectServerUtils.createSparkSession() + try { + f(session2) + } finally { + session2.stop() + } + } + + // Scenario 1: external write after CACHE TABLE + // + // Session 1 caches the table, then session 2 inserts a row. + // Because both sessions share the same CacheManager, session 2's + // INSERT triggers refreshCache() which recaches the table. Session 1 + // sees the new data. + test("[S1] external data write after CACHE TABLE") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 2 writes to the same underlying table + session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + // Shared CacheManager: session 2's write triggers refreshCache(), + // so session 1 sees the new data + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) + + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + // Scenario 2: session write then external write after CACHE TABLE + // + // Session 1 caches, then session 1 inserts (invalidates cache), + // then session 2 inserts. Both writes are visible due to shared + // CacheManager. + test("[S2] session write then external write after CACHE TABLE") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 1 writes (invalidates cache) + spark.sql(s"INSERT INTO $T VALUES (2, 200)") + + // Session 2 writes externally + session2.sql(s"INSERT INTO $T VALUES (3, 300)").collect() + + // Both writes visible due to shared CacheManager + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + // Scenario 3: external schema change (add column + insert) + // + // Session 2 adds a column and inserts data with the new schema. + test("[S3] external schema change after CACHE TABLE") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 2 adds a column and inserts + session2.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() + session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + + // Schema change + write visible via shared CacheManager + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + // Scenario 4: session schema change then external write + // + // Session 1 adds a column (invalidates cache), then session 2 inserts. + test("[S4] session schema change then external write") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 1 evolves schema + spark.sql(s"ALTER TABLE $T ADD COLUMN new_col INT") + + // Session 2 inserts with new schema + session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + + // Both changes visible + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + // Scenario 5: external drop and recreate table + // + // Session 2 drops and recreates the table with the same schema. + test("[S5] external drop and recreate table after CACHE TABLE") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 2 drops and recreates with same schema + session2.sql(s"DROP TABLE $T").collect() + session2.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + + // Recreated table is empty; cache was invalidated by drop + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq.empty) + + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + // REFRESH TABLE after external write + test("[S1] REFRESH TABLE picks up external write") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 2 writes + session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + // Explicit REFRESH should pick up the external write + spark.sql(s"REFRESH TABLE $T") + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) + + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + // UNCACHE TABLE after external write + test("[S1] UNCACHE TABLE then fresh read sees external write") { + assumeCanRun() + withTable(T) { + withSession2 { session2 => + setupTable() + spark.sql(s"CACHE TABLE $T") + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) + + // Session 2 writes + session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + // Uncache and re-read + spark.sql(s"UNCACHE TABLE IF EXISTS $T") + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) + } + } + } +} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala index 8bd6c5cf0168..0bb8155de094 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala @@ -125,6 +125,10 @@ object SparkConnectServerUtils { Seq( // Use InMemoryTableCatalog for V2 writer tests "spark.sql.catalog.testcat=org.apache.spark.sql.connector.catalog.InMemoryTableCatalog", + // Use SharedInMemoryTableCatalog for cross-session cache tests + "spark.sql.catalog.sharedcat=" + + "org.apache.spark.sql.connector.catalog.SharedInMemoryTableCatalog", + "spark.sql.catalog.sharedcat.copyOnLoad=true", // Try to use the hive catalog, fallback to in-memory if it is not there. "spark.sql.catalogImplementation=" + catalogImplementation, // Make the server terminate reattachable streams every 1 second and 123 bytes, From b7d25fe50a55599d78dcdcf3c88cac1706e2ed9f Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 22 Apr 2026 12:57:55 +0000 Subject: [PATCH 02/23] Add spark.catalog.isCached assertions to verify cache state --- .../connect/DataSourceV2CacheConnectSuite.scala | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 419996f7d4dd..6cc138c24a55 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -71,6 +71,7 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) // Session 2 writes to the same underlying table @@ -78,6 +79,7 @@ class DataSourceV2CacheConnectSuite // Shared CacheManager: session 2's write triggers refreshCache(), // so session 1 sees the new data + assert(spark.catalog.isCached(T)) checkAnswer( spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) @@ -98,15 +100,18 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 1 writes (invalidates cache) + // Session 1 writes (invalidates and recaches) spark.sql(s"INSERT INTO $T VALUES (2, 200)") + assert(spark.catalog.isCached(T)) // Session 2 writes externally session2.sql(s"INSERT INTO $T VALUES (3, 300)").collect() // Both writes visible due to shared CacheManager + assert(spark.catalog.isCached(T)) checkAnswer( spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) @@ -125,6 +130,7 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) // Session 2 adds a column and inserts @@ -150,9 +156,10 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 1 evolves schema + // Session 1 evolves schema (invalidates and recaches) spark.sql(s"ALTER TABLE $T ADD COLUMN new_col INT") // Session 2 inserts with new schema @@ -177,6 +184,7 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) // Session 2 drops and recreates with same schema @@ -184,6 +192,7 @@ class DataSourceV2CacheConnectSuite session2.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() // Recreated table is empty; cache was invalidated by drop + assert(!spark.catalog.isCached(T)) checkAnswer( spark.sql(s"SELECT * FROM $T"), Seq.empty) @@ -200,6 +209,7 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) // Session 2 writes @@ -207,6 +217,7 @@ class DataSourceV2CacheConnectSuite // Explicit REFRESH should pick up the external write spark.sql(s"REFRESH TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer( spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) @@ -223,6 +234,7 @@ class DataSourceV2CacheConnectSuite withSession2 { session2 => setupTable() spark.sql(s"CACHE TABLE $T") + assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) // Session 2 writes @@ -230,6 +242,7 @@ class DataSourceV2CacheConnectSuite // Uncache and re-read spark.sql(s"UNCACHE TABLE IF EXISTS $T") + assert(!spark.catalog.isCached(T)) checkAnswer( spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) From a6d7db5f05a47a982feb4eba172ad9f2469afb25 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 22 Apr 2026 13:33:55 +0000 Subject: [PATCH 03/23] Add classic DataSourceV2CacheSuite for external session cache tests Two categories of tests following the SPARK-54022 pattern: 1. Catalog API tests: modify table directly via catalog API (bypasses CacheManager). Cached reads return pinned/stale data. Session writes invalidate and recache. 2. External session tests: second SparkSession (shared CacheManager) modifies table via SQL (triggers refreshCache). Tests verify the shared CacheManager behavior for all 5 design doc scenarios. Co-authored-by: Isaac --- .../connector/DataSourceV2CacheSuite.scala | 286 ++++++++++++++++++ 1 file changed, 286 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala new file mode 100644 index 000000000000..3859d6282742 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala @@ -0,0 +1,286 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import java.util + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{QueryTest, Row, SparkSession} +import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Implicits, Identifier, InMemoryTableCatalog, SharedInMemoryTableCatalog, TableWritePrivilege, TruncatableTable} +import org.apache.spark.sql.test.SharedSparkSession + +/** + * DSv2 CACHE TABLE tests using an external session to simulate cross-session + * writes. Follows the SPARK-54022 pattern: cache a table, modify it externally + * (via catalog API or a second session), and verify cache pinning behavior. + * + * Uses [[SharedInMemoryTableCatalog]] so that both the primary and external + * sessions share the same underlying table data via a static map. + * + * External modifications via catalog API bypass the CacheManager, so cached + * reads return stale (pinned) data. External modifications via the second + * session's SQL go through the shared CacheManager, so cached reads see fresh + * data. + */ +class DataSourceV2CacheSuite extends QueryTest with SharedSparkSession { + + import CatalogV2Implicits._ + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.sharedcat", + classOf[SharedInMemoryTableCatalog].getName) + .set("spark.sql.catalog.sharedcat.copyOnLoad", "true") + .set("spark.sql.catalog.testcat", + classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + + private val T = "sharedcat.ns.tbl" + private val ident = Identifier.of(Array("ns"), "tbl") + + override def afterEach(): Unit = { + try { + SharedInMemoryTableCatalog.reset() + } finally { + super.afterEach() + } + } + + private def catalog(name: String): BasicInMemoryTableCatalog = { + spark.sessionState.catalogManager.catalog(name) + .asInstanceOf[BasicInMemoryTableCatalog] + } + + /** + * Creates a second [[SparkSession]] sharing the same [[SparkContext]] (and + * therefore the same [[SharedState]] and [[CacheManager]]). The external + * session can access the same tables via [[SharedInMemoryTableCatalog]]. + */ + private def withExtSession(f: SparkSession => Unit): Unit = { + val savedActive = SparkSession.getActiveSession + val savedDefault = SparkSession.getDefaultSession + val extSession = try { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + SparkSession.builder() + .sparkContext(spark.sparkContext) + .config("spark.sql.catalog.sharedcat", + classOf[SharedInMemoryTableCatalog].getName) + .config("spark.sql.catalog.sharedcat.copyOnLoad", "true") + .create() + } finally { + savedDefault.foreach(SparkSession.setDefaultSession) + savedActive.foreach(SparkSession.setActiveSession) + } + try { + f(extSession) + } finally { + extSession.close() + } + } + + private def setupTable(): Unit = { + sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") + sql(s"INSERT INTO $T VALUES (1, 100)") + } + + // ========================================================================= + // Catalog API external modifications (bypass CacheManager) + // These follow the SPARK-54022 pattern: modify the table directly via + // the catalog, which does NOT trigger refreshCache(). Cached reads + // should return the pinned (stale) data. + // ========================================================================= + + test("[catalog-api] external truncate after CACHE TABLE pins state") { + withTable(T) { + setupTable() + sql(s"CACHE TABLE $T") + + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // Externally truncate via catalog API (bypasses CacheManager) + val tableCatalog = catalog("sharedcat").asTableCatalog + val table = tableCatalog + .loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) + table.asInstanceOf[TruncatableTable].truncateTable() + + // Cached reads still return pinned data + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // Session write invalidates and recaches + sql(s"INSERT INTO $T VALUES (10, 1000)") + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(10, 1000))) + } + } + + test("[catalog-api] external drop+recreate after CACHE TABLE") { + withTable(T) { + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // Save schema before drop + val tableCatalog = catalog("sharedcat").asTableCatalog + val columns = tableCatalog.loadTable(ident).columns() + + // Externally drop and recreate via catalog API + tableCatalog.dropTable(ident) + tableCatalog.createTable( + ident, columns, Array.empty, new java.util.HashMap[String, String]()) + + // Cache was invalidated by the drop + assert(!spark.catalog.isCached(T)) + } + } + + // ========================================================================= + // External session SQL modifications (shared CacheManager) + // The second session shares the same CacheManager. SQL writes trigger + // refreshCache(), making external changes visible. + // ========================================================================= + + test("[ext-session] external data write after CACHE TABLE") { + withTable(T) { + withExtSession { ext => + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // External session writes via SQL (triggers refreshCache) + ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + // Shared CacheManager: external write triggers recache + assertCached(spark.table(T)) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) + + sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + test("[ext-session] session write then external write after CACHE TABLE") { + withTable(T) { + withExtSession { ext => + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // Session 1 writes (invalidates and recaches) + sql(s"INSERT INTO $T VALUES (2, 200)") + assertCached(spark.table(T)) + + // External session writes + ext.sql(s"INSERT INTO $T VALUES (3, 300)").collect() + + // All writes visible + assertCached(spark.table(T)) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + + sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + test("[ext-session] external schema change after CACHE TABLE") { + withTable(T) { + withExtSession { ext => + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // External session adds column and inserts + ext.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() + ext.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + + // Schema change visible via shared CacheManager + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + + sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + test("[ext-session] external drop and recreate after CACHE TABLE") { + withTable(T) { + withExtSession { ext => + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // External session drops and recreates + ext.sql(s"DROP TABLE $T").collect() + ext.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + + // Cache invalidated by drop; recreated table is empty + assert(!spark.catalog.isCached(T)) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) + } + } + } + + test("[ext-session] REFRESH TABLE picks up external write") { + withTable(T) { + withExtSession { ext => + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + + ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + sql(s"REFRESH TABLE $T") + assertCached(spark.table(T)) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) + + sql(s"UNCACHE TABLE IF EXISTS $T") + } + } + } + + test("[ext-session] UNCACHE TABLE then fresh read sees external write") { + withTable(T) { + withExtSession { ext => + setupTable() + sql(s"CACHE TABLE $T") + assertCached(spark.table(T)) + + ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + sql(s"UNCACHE TABLE IF EXISTS $T") + assert(!spark.catalog.isCached(T)) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) + } + } + } +} From dc786ef6026fe10a81e47b16a32b2e97362b133c Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 22 Apr 2026 22:22:35 +0000 Subject: [PATCH 04/23] Add missing SharedInMemoryTableCatalog to fix compilation The DataSourceV2CacheSuite imports SharedInMemoryTableCatalog but the class was not included in this branch. Add it to fix CI. Co-authored-by: Isaac --- .../catalog/SharedInMemoryTableCatalog.scala | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala new file mode 100644 index 000000000000..a3f41170748c --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +/** + * An InMemoryTableCatalog that shares table state across all instances. + * This allows multiple SparkSessions (via newSession/cloneSession) to + * read and write the same tables, simulating a real shared metastore. + * + * Use this catalog for multi-session concurrency tests where one session + * writes and another session reads the same DSv2 table. + */ +class SharedInMemoryTableCatalog extends InMemoryTableCatalog { + override protected val tables: java.util.Map[Identifier, Table] = + SharedInMemoryTableCatalog.sharedTables + override protected val namespaces + : java.util.Map[List[String], Map[String, String]] = + SharedInMemoryTableCatalog.sharedNamespaces +} + +object SharedInMemoryTableCatalog { + val sharedTables = + new java.util.concurrent.ConcurrentHashMap[Identifier, Table]() + val sharedNamespaces = + new java.util.concurrent.ConcurrentHashMap[ + List[String], Map[String, String]]() + + def reset(): Unit = { + sharedTables.clear() + sharedNamespaces.clear() + } +} From a12151c5c95253d8760e62db75dd9c2d91ef6c9f Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 22 Apr 2026 22:52:12 +0000 Subject: [PATCH 05/23] Fix scalafmt formatting in DataSourceV2CacheConnectSuite Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 48 ++++++------------- 1 file changed, 15 insertions(+), 33 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 6cc138c24a55..b9ed01e06df3 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -25,24 +25,20 @@ import org.apache.spark.sql.connect.test.SparkConnectServerUtils /** * DSv2 CACHE TABLE tests for Spark Connect using two sessions. * - * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that - * both sessions share the same underlying table data via a static - * ConcurrentHashMap. Session 1 caches and reads; session 2 acts as an - * external writer. + * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that both sessions share the + * same underlying table data via a static ConcurrentHashMap. Session 1 caches and reads; session + * 2 acts as an external writer. * - * All sessions on the same server share one CacheManager (via SharedState). - * DSv2 writes trigger refreshCache() which recaches on the shared - * CacheManager. These tests document the observable behavior for each - * design doc CACHE scenario. + * All sessions on the same server share one CacheManager (via SharedState). DSv2 writes trigger + * refreshCache() which recaches on the shared CacheManager. These tests document the observable + * behavior for each design doc CACHE scenario. */ -class DataSourceV2CacheConnectSuite - extends QueryTest with RemoteSparkSession with SQLHelper { +class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession with SQLHelper { private val T = "sharedcat.ns.tbl" private def assumeCanRun(): Unit = { - assume(spark != null && isAssemblyJarsDirExists, - "Spark Connect server not available") + assume(spark != null && isAssemblyJarsDirExists, "Spark Connect server not available") } private def setupTable(): Unit = { @@ -80,9 +76,7 @@ class DataSourceV2CacheConnectSuite // Shared CacheManager: session 2's write triggers refreshCache(), // so session 1 sees the new data assert(spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -112,9 +106,7 @@ class DataSourceV2CacheConnectSuite // Both writes visible due to shared CacheManager assert(spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -138,9 +130,7 @@ class DataSourceV2CacheConnectSuite session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() // Schema change + write visible via shared CacheManager - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100, null), Row(2, 200, -1))) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100, null), Row(2, 200, -1))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -166,9 +156,7 @@ class DataSourceV2CacheConnectSuite session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() // Both changes visible - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100, null), Row(2, 200, -1))) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100, null), Row(2, 200, -1))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -193,9 +181,7 @@ class DataSourceV2CacheConnectSuite // Recreated table is empty; cache was invalidated by drop assert(!spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq.empty) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -218,9 +204,7 @@ class DataSourceV2CacheConnectSuite // Explicit REFRESH should pick up the external write spark.sql(s"REFRESH TABLE $T") assert(spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -243,9 +227,7 @@ class DataSourceV2CacheConnectSuite // Uncache and re-read spark.sql(s"UNCACHE TABLE IF EXISTS $T") assert(!spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) + checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) } } } From 031ee62ac328810a2193f6ba95e2e85846fbe650 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Thu, 23 Apr 2026 00:57:32 +0000 Subject: [PATCH 06/23] Fix ext-session tests: use spark.newSession() to share CacheManager SparkSession.builder().sparkContext(sc).create() creates a new SharedState with a separate CacheManager, so ext-session writes never refresh the primary session's cache. Additionally, extSession.close() stops the shared SparkContext, breaking all subsequent tests. Fix by using spark.newSession() which shares the same SharedState (and CacheManager), and does not stop the SparkContext on cleanup. Co-authored-by: Isaac --- .../connector/DataSourceV2CacheSuite.scala | 33 ++++++------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala index 3859d6282742..1bcae3dc57ab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala @@ -66,31 +66,18 @@ class DataSourceV2CacheSuite extends QueryTest with SharedSparkSession { } /** - * Creates a second [[SparkSession]] sharing the same [[SparkContext]] (and - * therefore the same [[SharedState]] and [[CacheManager]]). The external - * session can access the same tables via [[SharedInMemoryTableCatalog]]. + * Creates a second [[SparkSession]] sharing the same [[SparkContext]], + * [[SharedState]], and [[CacheManager]]. Uses [[SparkSession.newSession]] + * so both sessions share the same CacheManager. Writes from the external + * session trigger refreshCache() on the shared CacheManager, making + * changes visible to cached reads in the primary session. + * + * The external session accesses the same tables via + * [[SharedInMemoryTableCatalog]] (configured in sparkConf). */ private def withExtSession(f: SparkSession => Unit): Unit = { - val savedActive = SparkSession.getActiveSession - val savedDefault = SparkSession.getDefaultSession - val extSession = try { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - SparkSession.builder() - .sparkContext(spark.sparkContext) - .config("spark.sql.catalog.sharedcat", - classOf[SharedInMemoryTableCatalog].getName) - .config("spark.sql.catalog.sharedcat.copyOnLoad", "true") - .create() - } finally { - savedDefault.foreach(SparkSession.setDefaultSession) - savedActive.foreach(SparkSession.setActiveSession) - } - try { - f(extSession) - } finally { - extSession.close() - } + val extSession = spark.newSession() + f(extSession) } private def setupTable(): Unit = { From c48cb4fc965b83e9d7d4e4bb8ecf55d7472a0740 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:29:31 +0000 Subject: [PATCH 07/23] Trim PR to Connect-only S1-S5 cache tests, remove classic suite Remove DataSourceV2CacheSuite (classic) as this PR focuses on Connect tests only. Trim DataSourceV2CacheConnectSuite to strictly the five design doc scenarios (S1-S5), removing REFRESH TABLE and UNCACHE TABLE extra tests. Each test documents both the current behavior (shared CacheManager, external writes visible) and the proposed behavior (per-session caching, external writes pinned) in comments. Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 148 +++++----- .../connector/DataSourceV2CacheSuite.scala | 273 ------------------ 2 files changed, 69 insertions(+), 352 deletions(-) delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index b9ed01e06df3..82a075901c38 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -23,15 +23,23 @@ import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirE import org.apache.spark.sql.connect.test.SparkConnectServerUtils /** - * DSv2 CACHE TABLE tests for Spark Connect using two sessions. + * DSv2 CACHE TABLE tests for Spark Connect covering the five design doc + * scenarios from SPARK-54022 section [5] "CACHE TABLE impact on reads". * - * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that both sessions share the - * same underlying table data via a static ConcurrentHashMap. Session 1 caches and reads; session - * 2 acts as an external writer. + * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that + * both sessions share the same underlying table data via a static map. * - * All sessions on the same server share one CacheManager (via SharedState). DSv2 writes trigger - * refreshCache() which recaches on the shared CacheManager. These tests document the observable - * behavior for each design doc CACHE scenario. + * Session 1 (Connect client) caches and reads. Session 2 (another Connect + * client) acts as an external writer. Both clients connect to the same + * server and share its [[CacheManager]], so session 2's writes trigger + * refreshCache() on the shared CacheManager. + * + * The proposed behavior for DSv2 is cache pinning: external writes should + * NOT invalidate session 1's cache. The expected values in these tests + * reflect the current behavior where external writes ARE visible through + * the shared CacheManager. When per-session caching is implemented, the + * expected values for S1-S4 should be updated to match the proposed + * pinning behavior documented in each test's comments. */ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession with SQLHelper { @@ -55,12 +63,17 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi } } - // Scenario 1: external write after CACHE TABLE + // Scenario 1: external data write after CACHE TABLE + // + // Session 1 caches, then session 2 inserts a row. + // + // Current behavior (shared CacheManager): session 2's INSERT triggers + // refreshCache(), so session 1 sees the new data. + // Result: (1, 100), (2, 200) // - // Session 1 caches the table, then session 2 inserts a row. - // Because both sessions share the same CacheManager, session 2's - // INSERT triggers refreshCache() which recaches the table. Session 1 - // sees the new data. + // Proposed behavior (per-session caching): external write does not + // invalidate session 1's cache. + // Result: (1, 100) test("[S1] external data write after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -70,13 +83,12 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 2 writes to the same underlying table session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - // Shared CacheManager: session 2's write triggers refreshCache(), - // so session 1 sees the new data assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -85,9 +97,15 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 2: session write then external write after CACHE TABLE // - // Session 1 caches, then session 1 inserts (invalidates cache), - // then session 2 inserts. Both writes are visible due to shared - // CacheManager. + // Session 1 caches, session 1 inserts (invalidates and recaches), + // then session 2 inserts. + // + // Current behavior: all three rows visible via shared CacheManager. + // Result: (1, 100), (2, 200), (3, 300) + // + // Proposed behavior: session 1's write recaches, but session 2's + // external write does not invalidate session 1's cache. + // Result: (1, 100), (2, 200) test("[S2] session write then external write after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -97,25 +115,32 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 1 writes (invalidates and recaches) spark.sql(s"INSERT INTO $T VALUES (2, 200)") assert(spark.catalog.isCached(T)) - // Session 2 writes externally session2.sql(s"INSERT INTO $T VALUES (3, 300)").collect() - // Both writes visible due to shared CacheManager assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100), Row(2, 200), Row(3, 300))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } } } - // Scenario 3: external schema change (add column + insert) + // Scenario 3: external schema change after CACHE TABLE // // Session 2 adds a column and inserts data with the new schema. + // + // Current behavior: schema change + write visible via shared + // CacheManager. + // Result: (1, 100, null), (2, 200, -1) + // + // Proposed behavior: external schema change does not invalidate + // session 1's cache. Cache remains pinned to the original schema. + // Result: (1, 100) test("[S3] external schema change after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -125,12 +150,12 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 2 adds a column and inserts session2.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() - // Schema change + write visible via shared CacheManager - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100, null), Row(2, 200, -1))) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -140,6 +165,13 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 4: session schema change then external write // // Session 1 adds a column (invalidates cache), then session 2 inserts. + // + // Current behavior: both changes visible via shared CacheManager. + // Result: (1, 100, null), (2, 200, -1) + // + // Proposed behavior: session 1's schema change recaches with new + // schema, but session 2's external write does not invalidate cache. + // Result: (1, 100, null) test("[S4] session schema change then external write") { assumeCanRun() withTable(T) { @@ -149,14 +181,13 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 1 evolves schema (invalidates and recaches) spark.sql(s"ALTER TABLE $T ADD COLUMN new_col INT") - // Session 2 inserts with new schema session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() - // Both changes visible - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100, null), Row(2, 200, -1))) + checkAnswer( + spark.sql(s"SELECT * FROM $T"), + Seq(Row(1, 100, null), Row(2, 200, -1))) spark.sql(s"UNCACHE TABLE IF EXISTS $T") } @@ -166,6 +197,13 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 5: external drop and recreate table // // Session 2 drops and recreates the table with the same schema. + // + // Current behavior: cache invalidated by drop, recreated table is + // empty. + // Result: empty + // + // Proposed behavior: same as current (keep as is). + // Result: empty test("[S5] external drop and recreate table after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -175,59 +213,11 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - // Session 2 drops and recreates with same schema session2.sql(s"DROP TABLE $T").collect() session2.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - // Recreated table is empty; cache was invalidated by drop assert(!spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) - - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - // REFRESH TABLE after external write - test("[S1] REFRESH TABLE picks up external write") { - assumeCanRun() - withTable(T) { - withSession2 { session2 => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - // Session 2 writes - session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - - // Explicit REFRESH should pick up the external write - spark.sql(s"REFRESH TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) - - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - // UNCACHE TABLE after external write - test("[S1] UNCACHE TABLE then fresh read sees external write") { - assumeCanRun() - withTable(T) { - withSession2 { session2 => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - // Session 2 writes - session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - - // Uncache and re-read - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - assert(!spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100), Row(2, 200))) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala deleted file mode 100644 index 1bcae3dc57ab..000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2CacheSuite.scala +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector - -import java.util - -import org.apache.spark.SparkConf -import org.apache.spark.sql.{QueryTest, Row, SparkSession} -import org.apache.spark.sql.connector.catalog.{BasicInMemoryTableCatalog, CatalogV2Implicits, Identifier, InMemoryTableCatalog, SharedInMemoryTableCatalog, TableWritePrivilege, TruncatableTable} -import org.apache.spark.sql.test.SharedSparkSession - -/** - * DSv2 CACHE TABLE tests using an external session to simulate cross-session - * writes. Follows the SPARK-54022 pattern: cache a table, modify it externally - * (via catalog API or a second session), and verify cache pinning behavior. - * - * Uses [[SharedInMemoryTableCatalog]] so that both the primary and external - * sessions share the same underlying table data via a static map. - * - * External modifications via catalog API bypass the CacheManager, so cached - * reads return stale (pinned) data. External modifications via the second - * session's SQL go through the shared CacheManager, so cached reads see fresh - * data. - */ -class DataSourceV2CacheSuite extends QueryTest with SharedSparkSession { - - import CatalogV2Implicits._ - - override def sparkConf: SparkConf = super.sparkConf - .set("spark.sql.catalog.sharedcat", - classOf[SharedInMemoryTableCatalog].getName) - .set("spark.sql.catalog.sharedcat.copyOnLoad", "true") - .set("spark.sql.catalog.testcat", - classOf[InMemoryTableCatalog].getName) - .set("spark.sql.catalog.testcat.copyOnLoad", "true") - - private val T = "sharedcat.ns.tbl" - private val ident = Identifier.of(Array("ns"), "tbl") - - override def afterEach(): Unit = { - try { - SharedInMemoryTableCatalog.reset() - } finally { - super.afterEach() - } - } - - private def catalog(name: String): BasicInMemoryTableCatalog = { - spark.sessionState.catalogManager.catalog(name) - .asInstanceOf[BasicInMemoryTableCatalog] - } - - /** - * Creates a second [[SparkSession]] sharing the same [[SparkContext]], - * [[SharedState]], and [[CacheManager]]. Uses [[SparkSession.newSession]] - * so both sessions share the same CacheManager. Writes from the external - * session trigger refreshCache() on the shared CacheManager, making - * changes visible to cached reads in the primary session. - * - * The external session accesses the same tables via - * [[SharedInMemoryTableCatalog]] (configured in sparkConf). - */ - private def withExtSession(f: SparkSession => Unit): Unit = { - val extSession = spark.newSession() - f(extSession) - } - - private def setupTable(): Unit = { - sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") - sql(s"INSERT INTO $T VALUES (1, 100)") - } - - // ========================================================================= - // Catalog API external modifications (bypass CacheManager) - // These follow the SPARK-54022 pattern: modify the table directly via - // the catalog, which does NOT trigger refreshCache(). Cached reads - // should return the pinned (stale) data. - // ========================================================================= - - test("[catalog-api] external truncate after CACHE TABLE pins state") { - withTable(T) { - setupTable() - sql(s"CACHE TABLE $T") - - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // Externally truncate via catalog API (bypasses CacheManager) - val tableCatalog = catalog("sharedcat").asTableCatalog - val table = tableCatalog - .loadTable(ident, util.Set.of(TableWritePrivilege.DELETE)) - table.asInstanceOf[TruncatableTable].truncateTable() - - // Cached reads still return pinned data - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // Session write invalidates and recaches - sql(s"INSERT INTO $T VALUES (10, 1000)") - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(10, 1000))) - } - } - - test("[catalog-api] external drop+recreate after CACHE TABLE") { - withTable(T) { - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // Save schema before drop - val tableCatalog = catalog("sharedcat").asTableCatalog - val columns = tableCatalog.loadTable(ident).columns() - - // Externally drop and recreate via catalog API - tableCatalog.dropTable(ident) - tableCatalog.createTable( - ident, columns, Array.empty, new java.util.HashMap[String, String]()) - - // Cache was invalidated by the drop - assert(!spark.catalog.isCached(T)) - } - } - - // ========================================================================= - // External session SQL modifications (shared CacheManager) - // The second session shares the same CacheManager. SQL writes trigger - // refreshCache(), making external changes visible. - // ========================================================================= - - test("[ext-session] external data write after CACHE TABLE") { - withTable(T) { - withExtSession { ext => - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // External session writes via SQL (triggers refreshCache) - ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - - // Shared CacheManager: external write triggers recache - assertCached(spark.table(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) - - sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - test("[ext-session] session write then external write after CACHE TABLE") { - withTable(T) { - withExtSession { ext => - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // Session 1 writes (invalidates and recaches) - sql(s"INSERT INTO $T VALUES (2, 200)") - assertCached(spark.table(T)) - - // External session writes - ext.sql(s"INSERT INTO $T VALUES (3, 300)").collect() - - // All writes visible - assertCached(spark.table(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200), Row(3, 300))) - - sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - test("[ext-session] external schema change after CACHE TABLE") { - withTable(T) { - withExtSession { ext => - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // External session adds column and inserts - ext.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() - ext.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() - - // Schema change visible via shared CacheManager - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100, null), Row(2, 200, -1))) - - sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - test("[ext-session] external drop and recreate after CACHE TABLE") { - withTable(T) { - withExtSession { ext => - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // External session drops and recreates - ext.sql(s"DROP TABLE $T").collect() - ext.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - - // Cache invalidated by drop; recreated table is empty - assert(!spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) - } - } - } - - test("[ext-session] REFRESH TABLE picks up external write") { - withTable(T) { - withExtSession { ext => - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - - ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - - sql(s"REFRESH TABLE $T") - assertCached(spark.table(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) - - sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - test("[ext-session] UNCACHE TABLE then fresh read sees external write") { - withTable(T) { - withExtSession { ext => - setupTable() - sql(s"CACHE TABLE $T") - assertCached(spark.table(T)) - - ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - - sql(s"UNCACHE TABLE IF EXISTS $T") - assert(!spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) - } - } - } -} From 854c3e2dffe9253569f03ac67e7b02dd982a3236 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:37:13 +0000 Subject: [PATCH 08/23] Remove internal references and verbose current/proposed behavior comments Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 51 +++---------------- 1 file changed, 6 insertions(+), 45 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 82a075901c38..4af4acc7d427 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -23,8 +23,8 @@ import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirE import org.apache.spark.sql.connect.test.SparkConnectServerUtils /** - * DSv2 CACHE TABLE tests for Spark Connect covering the five design doc - * scenarios from SPARK-54022 section [5] "CACHE TABLE impact on reads". + * DSv2 CACHE TABLE tests for Spark Connect covering five cross-session + * cache scenarios (S1 through S5). * * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that * both sessions share the same underlying table data via a static map. @@ -34,12 +34,10 @@ import org.apache.spark.sql.connect.test.SparkConnectServerUtils * server and share its [[CacheManager]], so session 2's writes trigger * refreshCache() on the shared CacheManager. * - * The proposed behavior for DSv2 is cache pinning: external writes should - * NOT invalidate session 1's cache. The expected values in these tests - * reflect the current behavior where external writes ARE visible through - * the shared CacheManager. When per-session caching is implemented, the - * expected values for S1-S4 should be updated to match the proposed - * pinning behavior documented in each test's comments. + * Write operations ignore the cache: external writes should NOT + * invalidate session 1's cache. The expected values in these tests + * reflect the current behavior where external writes ARE visible + * through the shared CacheManager. */ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession with SQLHelper { @@ -66,14 +64,6 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 1: external data write after CACHE TABLE // // Session 1 caches, then session 2 inserts a row. - // - // Current behavior (shared CacheManager): session 2's INSERT triggers - // refreshCache(), so session 1 sees the new data. - // Result: (1, 100), (2, 200) - // - // Proposed behavior (per-session caching): external write does not - // invalidate session 1's cache. - // Result: (1, 100) test("[S1] external data write after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -99,13 +89,6 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // // Session 1 caches, session 1 inserts (invalidates and recaches), // then session 2 inserts. - // - // Current behavior: all three rows visible via shared CacheManager. - // Result: (1, 100), (2, 200), (3, 300) - // - // Proposed behavior: session 1's write recaches, but session 2's - // external write does not invalidate session 1's cache. - // Result: (1, 100), (2, 200) test("[S2] session write then external write after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -133,14 +116,6 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 3: external schema change after CACHE TABLE // // Session 2 adds a column and inserts data with the new schema. - // - // Current behavior: schema change + write visible via shared - // CacheManager. - // Result: (1, 100, null), (2, 200, -1) - // - // Proposed behavior: external schema change does not invalidate - // session 1's cache. Cache remains pinned to the original schema. - // Result: (1, 100) test("[S3] external schema change after CACHE TABLE") { assumeCanRun() withTable(T) { @@ -165,13 +140,6 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 4: session schema change then external write // // Session 1 adds a column (invalidates cache), then session 2 inserts. - // - // Current behavior: both changes visible via shared CacheManager. - // Result: (1, 100, null), (2, 200, -1) - // - // Proposed behavior: session 1's schema change recaches with new - // schema, but session 2's external write does not invalidate cache. - // Result: (1, 100, null) test("[S4] session schema change then external write") { assumeCanRun() withTable(T) { @@ -197,13 +165,6 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi // Scenario 5: external drop and recreate table // // Session 2 drops and recreates the table with the same schema. - // - // Current behavior: cache invalidated by drop, recreated table is - // empty. - // Result: empty - // - // Proposed behavior: same as current (keep as is). - // Result: empty test("[S5] external drop and recreate table after CACHE TABLE") { assumeCanRun() withTable(T) { From 0d12617c17bc32c4671a838c0544d6553661d248 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:38:25 +0000 Subject: [PATCH 09/23] Add proper imports for java.util.concurrent.ConcurrentHashMap Co-authored-by: Isaac --- .../catalog/SharedInMemoryTableCatalog.scala | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala index a3f41170748c..c48a4caafff5 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.connector.catalog +import java.util +import java.util.concurrent.ConcurrentHashMap + /** * An InMemoryTableCatalog that shares table state across all instances. * This allows multiple SparkSessions (via newSession/cloneSession) to @@ -26,19 +29,15 @@ package org.apache.spark.sql.connector.catalog * writes and another session reads the same DSv2 table. */ class SharedInMemoryTableCatalog extends InMemoryTableCatalog { - override protected val tables: java.util.Map[Identifier, Table] = + override protected val tables: util.Map[Identifier, Table] = SharedInMemoryTableCatalog.sharedTables - override protected val namespaces - : java.util.Map[List[String], Map[String, String]] = + override protected val namespaces: util.Map[List[String], Map[String, String]] = SharedInMemoryTableCatalog.sharedNamespaces } object SharedInMemoryTableCatalog { - val sharedTables = - new java.util.concurrent.ConcurrentHashMap[Identifier, Table]() - val sharedNamespaces = - new java.util.concurrent.ConcurrentHashMap[ - List[String], Map[String, String]]() + val sharedTables = new ConcurrentHashMap[Identifier, Table]() + val sharedNamespaces = new ConcurrentHashMap[List[String], Map[String, String]]() def reset(): Unit = { sharedTables.clear() From 42ba22f27166f2190a5ee2864304ed45d57ef475 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:38:47 +0000 Subject: [PATCH 10/23] Rename withSession2/session2 to withExtSession/ext Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 32 +++++++++---------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 4af4acc7d427..f503d355ff88 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -52,12 +52,12 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi spark.sql(s"INSERT INTO $T VALUES (1, 100)") } - private def withSession2(f: SparkSession => Unit): Unit = { - val session2 = SparkConnectServerUtils.createSparkSession() + private def withExtSession(f: SparkSession => Unit): Unit = { + val ext = SparkConnectServerUtils.createSparkSession() try { - f(session2) + f(ext) } finally { - session2.stop() + ext.stop() } } @@ -67,13 +67,13 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S1] external data write after CACHE TABLE") { assumeCanRun() withTable(T) { - withSession2 { session2 => + withExtSession { ext => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - session2.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() assert(spark.catalog.isCached(T)) checkAnswer( @@ -92,7 +92,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S2] session write then external write after CACHE TABLE") { assumeCanRun() withTable(T) { - withSession2 { session2 => + withExtSession { ext => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) @@ -101,7 +101,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi spark.sql(s"INSERT INTO $T VALUES (2, 200)") assert(spark.catalog.isCached(T)) - session2.sql(s"INSERT INTO $T VALUES (3, 300)").collect() + ext.sql(s"INSERT INTO $T VALUES (3, 300)").collect() assert(spark.catalog.isCached(T)) checkAnswer( @@ -119,14 +119,14 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S3] external schema change after CACHE TABLE") { assumeCanRun() withTable(T) { - withSession2 { session2 => + withExtSession { ext => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - session2.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() - session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + ext.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() + ext.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() checkAnswer( spark.sql(s"SELECT * FROM $T"), @@ -143,7 +143,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S4] session schema change then external write") { assumeCanRun() withTable(T) { - withSession2 { session2 => + withExtSession { ext => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) @@ -151,7 +151,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi spark.sql(s"ALTER TABLE $T ADD COLUMN new_col INT") - session2.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + ext.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() checkAnswer( spark.sql(s"SELECT * FROM $T"), @@ -168,14 +168,14 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S5] external drop and recreate table after CACHE TABLE") { assumeCanRun() withTable(T) { - withSession2 { session2 => + withExtSession { ext => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - session2.sql(s"DROP TABLE $T").collect() - session2.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + ext.sql(s"DROP TABLE $T").collect() + ext.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() assert(!spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) From f713ae0171edc1b6596c24d59621a2963367d803 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:40:51 +0000 Subject: [PATCH 11/23] Rename ext to extSession Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 30 +++++++++---------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index f503d355ff88..8edda98ac1c5 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -53,11 +53,11 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi } private def withExtSession(f: SparkSession => Unit): Unit = { - val ext = SparkConnectServerUtils.createSparkSession() + val extSession = SparkConnectServerUtils.createSparkSession() try { - f(ext) + f(extSession) } finally { - ext.stop() + extSession.stop() } } @@ -67,13 +67,13 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S1] external data write after CACHE TABLE") { assumeCanRun() withTable(T) { - withExtSession { ext => + withExtSession { extSession => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - ext.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + extSession.sql(s"INSERT INTO $T VALUES (2, 200)").collect() assert(spark.catalog.isCached(T)) checkAnswer( @@ -92,7 +92,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S2] session write then external write after CACHE TABLE") { assumeCanRun() withTable(T) { - withExtSession { ext => + withExtSession { extSession => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) @@ -101,7 +101,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi spark.sql(s"INSERT INTO $T VALUES (2, 200)") assert(spark.catalog.isCached(T)) - ext.sql(s"INSERT INTO $T VALUES (3, 300)").collect() + extSession.sql(s"INSERT INTO $T VALUES (3, 300)").collect() assert(spark.catalog.isCached(T)) checkAnswer( @@ -119,14 +119,14 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S3] external schema change after CACHE TABLE") { assumeCanRun() withTable(T) { - withExtSession { ext => + withExtSession { extSession => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - ext.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() - ext.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + extSession.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() + extSession.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() checkAnswer( spark.sql(s"SELECT * FROM $T"), @@ -143,7 +143,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S4] session schema change then external write") { assumeCanRun() withTable(T) { - withExtSession { ext => + withExtSession { extSession => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) @@ -151,7 +151,7 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi spark.sql(s"ALTER TABLE $T ADD COLUMN new_col INT") - ext.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + extSession.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() checkAnswer( spark.sql(s"SELECT * FROM $T"), @@ -168,14 +168,14 @@ class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession wi test("[S5] external drop and recreate table after CACHE TABLE") { assumeCanRun() withTable(T) { - withExtSession { ext => + withExtSession { extSession => setupTable() spark.sql(s"CACHE TABLE $T") assert(spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - ext.sql(s"DROP TABLE $T").collect() - ext.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + extSession.sql(s"DROP TABLE $T").collect() + extSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() assert(!spark.catalog.isCached(T)) checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) From 2ec98ebfcb25eb0eee856b5c66286a0f4e955c05 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 13:50:20 +0000 Subject: [PATCH 12/23] Rewrite cache tests: use in-process Connect server with catalog API external writes Replace the Connect client tests (which shared a CacheManager and didn't actually test cache pinning) with proper in-process SparkConnectServerTest tests. External writes now go through the catalog API (InMemoryBaseTable.withData), which bypasses the CacheManager entirely. This verifies that external writes are truly invisible to cached reads. Remove SharedInMemoryTableCatalog and RemoteSparkSession changes (no longer needed). The tests use testcat (InMemoryTableCatalog) directly. Co-authored-by: Isaac --- .../catalog/SharedInMemoryTableCatalog.scala | 46 ----- .../DataSourceV2CacheConnectSuite.scala | 185 ----------------- .../sql/connect/test/RemoteSparkSession.scala | 4 - .../DataSourceV2CacheConnectSuite.scala | 194 ++++++++++++++++++ 4 files changed, 194 insertions(+), 235 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala delete mode 100644 sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala create mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala deleted file mode 100644 index c48a4caafff5..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -import java.util -import java.util.concurrent.ConcurrentHashMap - -/** - * An InMemoryTableCatalog that shares table state across all instances. - * This allows multiple SparkSessions (via newSession/cloneSession) to - * read and write the same tables, simulating a real shared metastore. - * - * Use this catalog for multi-session concurrency tests where one session - * writes and another session reads the same DSv2 table. - */ -class SharedInMemoryTableCatalog extends InMemoryTableCatalog { - override protected val tables: util.Map[Identifier, Table] = - SharedInMemoryTableCatalog.sharedTables - override protected val namespaces: util.Map[List[String], Map[String, String]] = - SharedInMemoryTableCatalog.sharedNamespaces -} - -object SharedInMemoryTableCatalog { - val sharedTables = new ConcurrentHashMap[Identifier, Table]() - val sharedNamespaces = new ConcurrentHashMap[List[String], Map[String, String]]() - - def reset(): Unit = { - sharedTables.clear() - sharedNamespaces.clear() - } -} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala deleted file mode 100644 index 8edda98ac1c5..000000000000 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect - -import org.apache.spark.sql.Row -import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession, SQLHelper} -import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirExists -import org.apache.spark.sql.connect.test.SparkConnectServerUtils - -/** - * DSv2 CACHE TABLE tests for Spark Connect covering five cross-session - * cache scenarios (S1 through S5). - * - * Uses [[SharedInMemoryTableCatalog]] (configured as "sharedcat") so that - * both sessions share the same underlying table data via a static map. - * - * Session 1 (Connect client) caches and reads. Session 2 (another Connect - * client) acts as an external writer. Both clients connect to the same - * server and share its [[CacheManager]], so session 2's writes trigger - * refreshCache() on the shared CacheManager. - * - * Write operations ignore the cache: external writes should NOT - * invalidate session 1's cache. The expected values in these tests - * reflect the current behavior where external writes ARE visible - * through the shared CacheManager. - */ -class DataSourceV2CacheConnectSuite extends QueryTest with RemoteSparkSession with SQLHelper { - - private val T = "sharedcat.ns.tbl" - - private def assumeCanRun(): Unit = { - assume(spark != null && isAssemblyJarsDirExists, "Spark Connect server not available") - } - - private def setupTable(): Unit = { - spark.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") - spark.sql(s"INSERT INTO $T VALUES (1, 100)") - } - - private def withExtSession(f: SparkSession => Unit): Unit = { - val extSession = SparkConnectServerUtils.createSparkSession() - try { - f(extSession) - } finally { - extSession.stop() - } - } - - // Scenario 1: external data write after CACHE TABLE - // - // Session 1 caches, then session 2 inserts a row. - test("[S1] external data write after CACHE TABLE") { - assumeCanRun() - withTable(T) { - withExtSession { extSession => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - extSession.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - - assert(spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200))) - - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - // Scenario 2: session write then external write after CACHE TABLE - // - // Session 1 caches, session 1 inserts (invalidates and recaches), - // then session 2 inserts. - test("[S2] session write then external write after CACHE TABLE") { - assumeCanRun() - withTable(T) { - withExtSession { extSession => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - spark.sql(s"INSERT INTO $T VALUES (2, 200)") - assert(spark.catalog.isCached(T)) - - extSession.sql(s"INSERT INTO $T VALUES (3, 300)").collect() - - assert(spark.catalog.isCached(T)) - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100), Row(2, 200), Row(3, 300))) - - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - // Scenario 3: external schema change after CACHE TABLE - // - // Session 2 adds a column and inserts data with the new schema. - test("[S3] external schema change after CACHE TABLE") { - assumeCanRun() - withTable(T) { - withExtSession { extSession => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - extSession.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect() - extSession.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() - - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100, null), Row(2, 200, -1))) - - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - // Scenario 4: session schema change then external write - // - // Session 1 adds a column (invalidates cache), then session 2 inserts. - test("[S4] session schema change then external write") { - assumeCanRun() - withTable(T) { - withExtSession { extSession => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - spark.sql(s"ALTER TABLE $T ADD COLUMN new_col INT") - - extSession.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() - - checkAnswer( - spark.sql(s"SELECT * FROM $T"), - Seq(Row(1, 100, null), Row(2, 200, -1))) - - spark.sql(s"UNCACHE TABLE IF EXISTS $T") - } - } - } - - // Scenario 5: external drop and recreate table - // - // Session 2 drops and recreates the table with the same schema. - test("[S5] external drop and recreate table after CACHE TABLE") { - assumeCanRun() - withTable(T) { - withExtSession { extSession => - setupTable() - spark.sql(s"CACHE TABLE $T") - assert(spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq(Row(1, 100))) - - extSession.sql(s"DROP TABLE $T").collect() - extSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - - assert(!spark.catalog.isCached(T)) - checkAnswer(spark.sql(s"SELECT * FROM $T"), Seq.empty) - } - } - } -} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala index 0bb8155de094..8bd6c5cf0168 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/test/RemoteSparkSession.scala @@ -125,10 +125,6 @@ object SparkConnectServerUtils { Seq( // Use InMemoryTableCatalog for V2 writer tests "spark.sql.catalog.testcat=org.apache.spark.sql.connector.catalog.InMemoryTableCatalog", - // Use SharedInMemoryTableCatalog for cross-session cache tests - "spark.sql.catalog.sharedcat=" + - "org.apache.spark.sql.connector.catalog.SharedInMemoryTableCatalog", - "spark.sql.catalog.sharedcat.copyOnLoad=true", // Try to use the hive catalog, fallback to in-memory if it is not there. "spark.sql.catalogImplementation=" + catalogImplementation, // Make the server terminate reattachable streams every 1 second and 123 bytes, diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala new file mode 100644 index 000000000000..fe179ba57d26 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import java.util.Collections + +import org.apache.spark.SparkConf +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, Identifier, InMemoryBaseTable, InMemoryTableCatalog, TableChange} +import org.apache.spark.sql.types.{IntegerType, StructType} + +/** + * DSv2 CACHE TABLE tests for Spark Connect covering five cross-session + * cache scenarios (S1 through S5). + * + * Uses an in-process Connect server ([[SparkConnectServerTest]]) so that + * the test can access the server's catalog directly. A Connect client + * performs cache and SQL operations; external writes go through the + * catalog API ([[InMemoryBaseTable.withData]]), which bypasses the + * [[CacheManager]]. This simulates a truly external writer whose + * changes are invisible to cached reads. + */ +class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + + private val T = "testcat.ns1.ns2.tbl" + private val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + + private def catalogTestcat: InMemoryTableCatalog = + spark.sessionState.catalogManager.catalog("testcat").asInstanceOf[InMemoryTableCatalog] + + private def setupTable(): Unit = { + spark.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") + spark.sql(s"INSERT INTO $T VALUES (1, 100)") + } + + // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). + // Scenario 2: session write invalidates cache; subsequent external write + // is again invisible. + test("[S1+S2] CACHE TABLE pins state; session write invalidates, external does not") { + withSession { connectSession => + withTable(T) { + // create table and cache via Connect + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // S1: external writer adds (2, 200) via direct catalog API + // (bypasses this session's CacheManager) + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + // cache is pinned, external write invisible + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // S2: session write via Connect invalidates the cache entry + connectSession.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100), Row(2, 200))) + + // external writer adds (3, 300) via direct catalog API + val extTable2 = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable2.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) + + // cache is re-pinned, external write invisible + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100), Row(2, 200))) + + // REFRESH TABLE picks up all external changes + connectSession.sql(s"REFRESH TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + } + } + } + + // Scenario 3: external schema change after CACHE TABLE. + // Cache stays pinned at original 2-column schema. + test("[S3] cached table pinned against external schema change") { + withSession { connectSession => + withTable(T) { + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // external schema change via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalogTestcat.alterTable(ident, addCol) + + // external writer adds (2, 200, -1) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // cache stays pinned at original 2-column schema + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // REFRESH TABLE picks up external schema change and data + connectSession.sql(s"REFRESH TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + // Scenario 4: session schema change invalidates cache; subsequent external + // write is invisible. + test("[S4] session schema change invalidates cache, external write invisible") { + withSession { connectSession => + withTable(T) { + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // session schema change via Connect: invalidates cache, rebuilds with new schema + connectSession.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100, null))) + + // external writer adds (2, 200, -1) via catalog API + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // external write invisible: cache still shows (1, 100, null) + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100, null))) + + // REFRESH TABLE picks up external write + connectSession.sql(s"REFRESH TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + } + + // Scenario 5: external drop and recreate with same schema. + test("[S5] cached table after external drop and recreate sees empty table") { + withSession { connectSession => + withTable(T) { + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + assertCached(spark.table(T)) + checkAnswer(spark.table(T), Seq(Row(1, 100))) + + // external drop and recreate via catalog API + catalogTestcat.dropTable(ident) + catalogTestcat.createTable( + ident, + Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType)), + Array.empty, + Collections.emptyMap[String, String]) + + // query sees the new empty table + checkAnswer(spark.table(T), Seq.empty) + } + } + } +} From 17cb01d6d3a1499086fe895730825c80c2a3fcf6 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 14:19:08 +0000 Subject: [PATCH 13/23] Fix tests: use server session for assertions, write-privilege loadTable Use getServerSession() after first RPC to access the isolated server session for assertCached/checkAnswer. Use loadTable with write privileges to get the original table (not a copy) for external writes. Fix S1+S2 expected values to account for both external and session rows. All 4 tests pass. Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 254 ++++++++++-------- 1 file changed, 135 insertions(+), 119 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index fe179ba57d26..4f6cd65c2702 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -22,7 +22,8 @@ import java.util.Collections import org.apache.spark.SparkConf import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, Identifier, InMemoryBaseTable, InMemoryTableCatalog, TableChange} +import org.apache.spark.sql.classic +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, Identifier, InMemoryBaseTable, InMemoryTableCatalog, TableChange, TableWritePrivilege} import org.apache.spark.sql.types.{IntegerType, StructType} /** @@ -45,57 +46,62 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { private val T = "testcat.ns1.ns2.tbl" private val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - private def catalogTestcat: InMemoryTableCatalog = - spark.sessionState.catalogManager.catalog("testcat").asInstanceOf[InMemoryTableCatalog] - - private def setupTable(): Unit = { - spark.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") - spark.sql(s"INSERT INTO $T VALUES (1, 100)") - } + /** Get the catalog from the server-side session. */ + private def serverCatalog(serverSession: classic.SparkSession): InMemoryTableCatalog = + serverSession.sessionState.catalogManager + .catalog("testcat").asInstanceOf[InMemoryTableCatalog] // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). // Scenario 2: session write invalidates cache; subsequent external write // is again invisible. test("[S1+S2] CACHE TABLE pins state; session write invalidates, external does not") { withSession { connectSession => - withTable(T) { - // create table and cache via Connect - connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() - connectSession.sql(s"CACHE TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // S1: external writer adds (2, 200) via direct catalog API - // (bypasses this session's CacheManager) - val schema = StructType.fromDDL("id INT, salary INT") - val extTable = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] - extTable.withData(Array( - new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) - - // cache is pinned, external write invisible - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // S2: session write via Connect invalidates the cache entry - connectSession.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100), Row(2, 200))) - - // external writer adds (3, 300) via direct catalog API - val extTable2 = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] - extTable2.withData(Array( - new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) - - // cache is re-pinned, external write invisible - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100), Row(2, 200))) - - // REFRESH TABLE picks up all external changes - connectSession.sql(s"REFRESH TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) - } + // create table and cache via Connect + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + // get server session after first RPC establishes it + val serverSession = getServerSession(connectSession) + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + + // S1: external writer adds (2, 200) via direct catalog API + // (bypasses this session's CacheManager) + val schema = StructType.fromDDL("id INT, salary INT") + val cat = serverCatalog(serverSession) + val extTable = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + // cache is pinned, external write invisible + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + + // S2: session write via Connect invalidates the cache entry. + // After recache, the table contains (1, 100) + external (2, 200) + session (2, 200). + connectSession.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(2, 200))) + + // external writer adds (3, 300) via direct catalog API + val extTable2 = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + extTable2.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) + + // cache is re-pinned, external write invisible + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(2, 200))) + + // REFRESH TABLE picks up all external changes + connectSession.sql(s"REFRESH TABLE $T").collect() + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), + Seq(Row(1, 100), Row(2, 200), Row(2, 200), Row(3, 300))) + + connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() + connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() } } @@ -103,32 +109,36 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // Cache stays pinned at original 2-column schema. test("[S3] cached table pinned against external schema change") { withSession { connectSession => - withTable(T) { - connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() - connectSession.sql(s"CACHE TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // external schema change via catalog API - val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) - catalogTestcat.alterTable(ident, addCol) - - // external writer adds (2, 200, -1) - val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") - val extTable = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] - extTable.withData(Array( - new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) - - // cache stays pinned at original 2-column schema - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // REFRESH TABLE picks up external schema change and data - connectSession.sql(s"REFRESH TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) - } + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + val serverSession = getServerSession(connectSession) + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + + // external schema change via catalog API + val cat = serverCatalog(serverSession) + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + cat.alterTable(ident, addCol) + + // external writer adds (2, 200, -1) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // cache stays pinned at original 2-column schema + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + + // REFRESH TABLE picks up external schema change and data + connectSession.sql(s"REFRESH TABLE $T").collect() + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) + + connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() + connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() } } @@ -136,59 +146,65 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // write is invisible. test("[S4] session schema change invalidates cache, external write invisible") { withSession { connectSession => - withTable(T) { - connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() - connectSession.sql(s"CACHE TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // session schema change via Connect: invalidates cache, rebuilds with new schema - connectSession.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100, null))) - - // external writer adds (2, 200, -1) via catalog API - val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") - val extTable = catalogTestcat.loadTable(ident).asInstanceOf[InMemoryBaseTable] - extTable.withData(Array( - new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) - - // external write invisible: cache still shows (1, 100, null) - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100, null))) - - // REFRESH TABLE picks up external write - connectSession.sql(s"REFRESH TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) - } + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + val serverSession = getServerSession(connectSession) + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + + // session schema change via Connect: invalidates cache, rebuilds with new schema + connectSession.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100, null))) + + // external writer adds (2, 200, -1) via catalog API + val cat = serverCatalog(serverSession) + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // external write invisible: cache still shows (1, 100, null) + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100, null))) + + // REFRESH TABLE picks up external write + connectSession.sql(s"REFRESH TABLE $T").collect() + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) + + connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() + connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() } } // Scenario 5: external drop and recreate with same schema. test("[S5] cached table after external drop and recreate sees empty table") { withSession { connectSession => - withTable(T) { - connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() - connectSession.sql(s"CACHE TABLE $T").collect() - assertCached(spark.table(T)) - checkAnswer(spark.table(T), Seq(Row(1, 100))) - - // external drop and recreate via catalog API - catalogTestcat.dropTable(ident) - catalogTestcat.createTable( - ident, - Array( - Column.create("id", IntegerType), - Column.create("salary", IntegerType)), - Array.empty, - Collections.emptyMap[String, String]) - - // query sees the new empty table - checkAnswer(spark.table(T), Seq.empty) - } + connectSession.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + connectSession.sql(s"INSERT INTO $T VALUES (1, 100)").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + val serverSession = getServerSession(connectSession) + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + + // external drop and recreate via catalog API + val cat = serverCatalog(serverSession) + cat.dropTable(ident) + cat.createTable( + ident, + Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType)), + Array.empty, + Collections.emptyMap[String, String]) + + // query sees the new empty table + checkAnswer(serverSession.table(T), Seq.empty) + + connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() } } } From d6aadc6e56cf8827d7de3042e7d184d54d64777b Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 14:37:05 +0000 Subject: [PATCH 14/23] S2: use UNCACHE+re-CACHE instead of INSERT to transition to scenario 2 Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 25 +++++++++---------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 4f6cd65c2702..7b308434112e 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -78,27 +78,26 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) - // S2: session write via Connect invalidates the cache entry. - // After recache, the table contains (1, 100) + external (2, 200) + session (2, 200). - connectSession.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + // S2: UNCACHE + re-CACHE picks up external write, then session + // write invalidates and recaches. + connectSession.sql(s"UNCACHE TABLE $T").collect() + connectSession.sql(s"CACHE TABLE $T").collect() + assertCached(serverSession.table(T)) + checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200))) + + connectSession.sql(s"INSERT INTO $T VALUES (3, 300)").collect() assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(2, 200))) + checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) - // external writer adds (3, 300) via direct catalog API + // external writer adds (4, 400) via direct catalog API val extTable2 = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) .asInstanceOf[InMemoryBaseTable] extTable2.withData(Array( - new BufferedRows(Seq.empty, schema).withRow(InternalRow(3, 300)))) + new BufferedRows(Seq.empty, schema).withRow(InternalRow(4, 400)))) // cache is re-pinned, external write invisible assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(2, 200))) - - // REFRESH TABLE picks up all external changes - connectSession.sql(s"REFRESH TABLE $T").collect() - assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), - Seq(Row(1, 100), Row(2, 200), Row(2, 200), Row(3, 300))) + checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() From 5c91dcd3204355a44e32792ff9838304d925f855 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Mon, 27 Apr 2026 22:12:15 +0000 Subject: [PATCH 15/23] Fix scalafmt formatting in DataSourceV2CacheConnectSuite Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 7b308434112e..bd9ab0604fba 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -27,15 +27,14 @@ import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, Identifier, import org.apache.spark.sql.types.{IntegerType, StructType} /** - * DSv2 CACHE TABLE tests for Spark Connect covering five cross-session - * cache scenarios (S1 through S5). + * DSv2 CACHE TABLE tests for Spark Connect covering five cross-session cache scenarios (S1 + * through S5). * - * Uses an in-process Connect server ([[SparkConnectServerTest]]) so that - * the test can access the server's catalog directly. A Connect client - * performs cache and SQL operations; external writes go through the - * catalog API ([[InMemoryBaseTable.withData]]), which bypasses the - * [[CacheManager]]. This simulates a truly external writer whose - * changes are invisible to cached reads. + * Uses an in-process Connect server ([[SparkConnectServerTest]]) so that the test can access the + * server's catalog directly. A Connect client performs cache and SQL operations; external writes + * go through the catalog API ([[InMemoryBaseTable.withData]]), which bypasses the + * [[CacheManager]]. This simulates a truly external writer whose changes are invisible to cached + * reads. */ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { @@ -49,7 +48,8 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { /** Get the catalog from the server-side session. */ private def serverCatalog(serverSession: classic.SparkSession): InMemoryTableCatalog = serverSession.sessionState.catalogManager - .catalog("testcat").asInstanceOf[InMemoryTableCatalog] + .catalog("testcat") + .asInstanceOf[InMemoryTableCatalog] // Scenario 1: external write after CACHE TABLE is invisible (cache pinned). // Scenario 2: session write invalidates cache; subsequent external write @@ -69,10 +69,10 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // (bypasses this session's CacheManager) val schema = StructType.fromDDL("id INT, salary INT") val cat = serverCatalog(serverSession) - val extTable = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + val extTable = cat + .loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) .asInstanceOf[InMemoryBaseTable] - extTable.withData(Array( - new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) // cache is pinned, external write invisible assertCached(serverSession.table(T)) @@ -90,10 +90,10 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) // external writer adds (4, 400) via direct catalog API - val extTable2 = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + val extTable2 = cat + .loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) .asInstanceOf[InMemoryBaseTable] - extTable2.withData(Array( - new BufferedRows(Seq.empty, schema).withRow(InternalRow(4, 400)))) + extTable2.withData(Array(new BufferedRows(Seq.empty, schema).withRow(InternalRow(4, 400)))) // cache is re-pinned, external write invisible assertCached(serverSession.table(T)) @@ -122,10 +122,11 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // external writer adds (2, 200, -1) val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") - val extTable = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + val extTable = cat + .loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) .asInstanceOf[InMemoryBaseTable] - extTable.withData(Array( - new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + extTable.withData( + Array(new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) // cache stays pinned at original 2-column schema assertCached(serverSession.table(T)) @@ -160,10 +161,11 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // external writer adds (2, 200, -1) via catalog API val cat = serverCatalog(serverSession) val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") - val extTable = cat.loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) + val extTable = cat + .loadTable(ident, java.util.Set.of(TableWritePrivilege.INSERT)) .asInstanceOf[InMemoryBaseTable] - extTable.withData(Array( - new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + extTable.withData( + Array(new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) // external write invisible: cache still shows (1, 100, null) assertCached(serverSession.table(T)) @@ -194,9 +196,7 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { cat.dropTable(ident) cat.createTable( ident, - Array( - Column.create("id", IntegerType), - Column.create("salary", IntegerType)), + Array(Column.create("id", IntegerType), Column.create("salary", IntegerType)), Array.empty, Collections.emptyMap[String, String]) From 6299d29b22da177548d3dfd0562b67ed7561a551 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 29 Apr 2026 14:15:48 +0000 Subject: [PATCH 16/23] Empty commit to retrigger CI Co-authored-by: Isaac From 2759b809c58e2594bd3983eaf4cc13688cfb0908 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 29 Apr 2026 17:22:48 +0000 Subject: [PATCH 17/23] Add client-side Connect read assertions to CacheConnectSuite Add assertRows() calls that read data through the Connect client (connectSession.sql("SELECT * FROM T").collect()) alongside the existing server-side assertions (checkAnswer(serverSession.table(T))). This verifies the full Connect round-trip for cached data reads, not just server-side cache state. Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index bd9ab0604fba..922485c799ae 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -35,6 +35,10 @@ import org.apache.spark.sql.types.{IntegerType, StructType} * go through the catalog API ([[InMemoryBaseTable.withData]]), which bypasses the * [[CacheManager]]. This simulates a truly external writer whose changes are invisible to cached * reads. + * + * Each scenario validates both server-side cache state (via [[assertCached]] and [[checkAnswer]] + * on the server session) and the client-side Connect round-trip (via [[assertRows]] on the + * Connect session). */ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { @@ -45,6 +49,13 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { private val T = "testcat.ns1.ns2.tbl" private val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + /** Assert that rows collected through the Connect client match expected rows (order-agnostic). */ + private def assertRows(actual: Array[Row], expected: Seq[Row]): Unit = { + assert( + actual.map(_.toString()).toSet == expected.map(_.toString()).toSet, + s"Expected ${expected.mkString(", ")} but got ${actual.mkString(", ")}") + } + /** Get the catalog from the server-side session. */ private def serverCatalog(serverSession: classic.SparkSession): InMemoryTableCatalog = serverSession.sessionState.catalogManager @@ -64,6 +75,7 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // S1: external writer adds (2, 200) via direct catalog API // (bypasses this session's CacheManager) @@ -77,6 +89,7 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // cache is pinned, external write invisible assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // S2: UNCACHE + re-CACHE picks up external write, then session // write invalidates and recaches. @@ -84,10 +97,14 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { connectSession.sql(s"CACHE TABLE $T").collect() assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100), Row(2, 200))) connectSession.sql(s"INSERT INTO $T VALUES (3, 300)").collect() assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + assertRows( + connectSession.sql(s"SELECT * FROM $T").collect(), + Seq(Row(1, 100), Row(2, 200), Row(3, 300))) // external writer adds (4, 400) via direct catalog API val extTable2 = cat @@ -98,6 +115,9 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // cache is re-pinned, external write invisible assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) + assertRows( + connectSession.sql(s"SELECT * FROM $T").collect(), + Seq(Row(1, 100), Row(2, 200), Row(3, 300))) connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() @@ -114,6 +134,7 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // external schema change via catalog API val cat = serverCatalog(serverSession) @@ -131,11 +152,16 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // cache stays pinned at original 2-column schema assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + // Connect client also sees the pinned 2-column cached data + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // REFRESH TABLE picks up external schema change and data connectSession.sql(s"REFRESH TABLE $T").collect() assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) + assertRows( + connectSession.sql(s"SELECT * FROM $T").collect(), + Seq(Row(1, 100, null), Row(2, 200, -1))) connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() @@ -152,11 +178,13 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // session schema change via Connect: invalidates cache, rebuilds with new schema connectSession.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100, null))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100, null))) // external writer adds (2, 200, -1) via catalog API val cat = serverCatalog(serverSession) @@ -170,11 +198,15 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // external write invisible: cache still shows (1, 100, null) assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100, null))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100, null))) // REFRESH TABLE picks up external write connectSession.sql(s"REFRESH TABLE $T").collect() assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) + assertRows( + connectSession.sql(s"SELECT * FROM $T").collect(), + Seq(Row(1, 100, null), Row(2, 200, -1))) connectSession.sql(s"UNCACHE TABLE IF EXISTS $T").collect() connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() @@ -190,6 +222,7 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) checkAnswer(serverSession.table(T), Seq(Row(1, 100))) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // external drop and recreate via catalog API val cat = serverCatalog(serverSession) @@ -202,6 +235,7 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // query sees the new empty table checkAnswer(serverSession.table(T), Seq.empty) + assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq.empty) connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() } From eece243fd6acc6b32a4e2ab6c0ae97092b1c0a3d Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 29 Apr 2026 19:38:23 +0000 Subject: [PATCH 18/23] Replace server-side checkAnswer with client-side assertRows Read data through connectSession (the Connect client) instead of serverSession (classic) for all data assertions. Only assertCached remains on serverSession since cache plan internals are not exposed through Connect. Co-authored-by: Isaac --- .../DataSourceV2CacheConnectSuite.scala | 21 +++---------------- 1 file changed, 3 insertions(+), 18 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 922485c799ae..e8789bbe8b98 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -36,9 +36,9 @@ import org.apache.spark.sql.types.{IntegerType, StructType} * [[CacheManager]]. This simulates a truly external writer whose changes are invisible to cached * reads. * - * Each scenario validates both server-side cache state (via [[assertCached]] and [[checkAnswer]] - * on the server session) and the client-side Connect round-trip (via [[assertRows]] on the - * Connect session). + * Each scenario validates server-side cache state (via [[assertCached]] on the server session, + * since cache plan internals are not exposed through Connect) and reads data through the Connect + * client (via [[assertRows]]) to simulate the real client experience. */ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { @@ -74,7 +74,6 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // get server session after first RPC establishes it val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // S1: external writer adds (2, 200) via direct catalog API @@ -88,7 +87,6 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // cache is pinned, external write invisible assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // S2: UNCACHE + re-CACHE picks up external write, then session @@ -96,12 +94,10 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { connectSession.sql(s"UNCACHE TABLE $T").collect() connectSession.sql(s"CACHE TABLE $T").collect() assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100), Row(2, 200))) connectSession.sql(s"INSERT INTO $T VALUES (3, 300)").collect() assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) assertRows( connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) @@ -114,7 +110,6 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // cache is re-pinned, external write invisible assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) assertRows( connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100), Row(2, 200), Row(3, 300))) @@ -133,7 +128,6 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { connectSession.sql(s"CACHE TABLE $T").collect() val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // external schema change via catalog API @@ -151,14 +145,11 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // cache stays pinned at original 2-column schema assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100))) - // Connect client also sees the pinned 2-column cached data assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // REFRESH TABLE picks up external schema change and data connectSession.sql(s"REFRESH TABLE $T").collect() assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) assertRows( connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100, null), Row(2, 200, -1))) @@ -177,13 +168,11 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { connectSession.sql(s"CACHE TABLE $T").collect() val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // session schema change via Connect: invalidates cache, rebuilds with new schema connectSession.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100, null))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100, null))) // external writer adds (2, 200, -1) via catalog API @@ -197,13 +186,11 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { // external write invisible: cache still shows (1, 100, null) assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100, null))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100, null))) // REFRESH TABLE picks up external write connectSession.sql(s"REFRESH TABLE $T").collect() assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100, null), Row(2, 200, -1))) assertRows( connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100, null), Row(2, 200, -1))) @@ -221,7 +208,6 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { connectSession.sql(s"CACHE TABLE $T").collect() val serverSession = getServerSession(connectSession) assertCached(serverSession.table(T)) - checkAnswer(serverSession.table(T), Seq(Row(1, 100))) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq(Row(1, 100))) // external drop and recreate via catalog API @@ -234,7 +220,6 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { Collections.emptyMap[String, String]) // query sees the new empty table - checkAnswer(serverSession.table(T), Seq.empty) assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq.empty) connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() From 31f270b7f3a41a5e7ca0453c238a32b549aedc7e Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 29 Apr 2026 23:02:30 +0000 Subject: [PATCH 19/23] Fix scalafmt formatting in DataSourceV2CacheConnectSuite Co-authored-by: Isaac --- .../spark/sql/connect/DataSourceV2CacheConnectSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index e8789bbe8b98..9475ac17c5bd 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -49,7 +49,9 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { private val T = "testcat.ns1.ns2.tbl" private val ident = Identifier.of(Array("ns1", "ns2"), "tbl") - /** Assert that rows collected through the Connect client match expected rows (order-agnostic). */ + /** + * Assert that rows collected through the Connect client match expected rows (order-agnostic). + */ private def assertRows(actual: Array[Row], expected: Seq[Row]): Unit = { assert( actual.map(_.toString()).toSet == expected.map(_.toString()).toSet, From ce6c0c9af5a3a3a3b503acb695a896b480ac9743 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Thu, 30 Apr 2026 12:42:20 +0000 Subject: [PATCH 20/23] Add schema assertion to external drop/recreate cache test Verify that after external drop/recreate, the table schema is preserved as (id, salary). Co-authored-by: Isaac --- .../spark/sql/connect/DataSourceV2CacheConnectSuite.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala index 9475ac17c5bd..3c34e264fa53 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2CacheConnectSuite.scala @@ -221,8 +221,10 @@ class DataSourceV2CacheConnectSuite extends SparkConnectServerTest { Array.empty, Collections.emptyMap[String, String]) - // query sees the new empty table - assertRows(connectSession.sql(s"SELECT * FROM $T").collect(), Seq.empty) + // query sees the new empty table with preserved schema + val result = connectSession.sql(s"SELECT * FROM $T") + assertRows(result.collect(), Seq.empty) + assert(result.schema.fieldNames.toSeq == Seq("id", "salary")) connectSession.sql(s"DROP TABLE IF EXISTS $T").collect() } From b79dc6b88670c0e3ec216f5f0c26595ef6a75ed2 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Thu, 30 Apr 2026 18:22:37 +0000 Subject: [PATCH 21/23] Retrigger CI: flaky pyspark-connect test (unrelated) Co-authored-by: Isaac From 1ddf67a809177c5adb28155a4782b424325f3018 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Thu, 30 Apr 2026 22:25:54 +0000 Subject: [PATCH 22/23] Retrigger CI From 694e9d17f0d1b7dab26cc6b50e4c750bab4ea374 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 1 May 2026 19:02:08 +0000 Subject: [PATCH 23/23] Empty commit to retrigger CI Co-authored-by: Isaac