From 1c5da4a4ae414c6cdaf47fa0f6fe8d0c16d2ae1a Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Tue, 21 Apr 2026 20:26:00 +0000 Subject: [PATCH 1/5] [SPARK-XXXXX][CONNECT][TESTS] Add Connect temp view refresh tests Co-authored-by: Isaac --- .../catalog/CachingInMemoryTableCatalog.scala | 55 +++++++++ .../connector/catalog/InMemoryBaseTable.scala | 33 +++++- .../catalog/NullIdInMemoryTableCatalog.scala | 50 ++++++++ .../catalog/SharedInMemoryTableCatalog.scala | 47 ++++++++ .../DataSourceV2ConnectTempViewSuite.scala | 65 +++++++++++ .../DataSourceV2RefreshConnectTestBase.scala | 86 ++++++++++++++ .../spark/sql/classic/DataFrameWriter.scala | 7 ++ .../spark/sql/classic/DataFrameWriterV2.scala | 8 ++ .../DataSourceV2TableRefreshTestBase.scala | 110 ++++++++++++++++++ 9 files changed, 455 insertions(+), 6 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala create mode 100644 sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala create mode 100644 sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala new file mode 100644 index 000000000000..abcb04615d24 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util.concurrent.ConcurrentHashMap + +/** + * An InMemoryTableCatalog that simulates a caching connector like + * Iceberg's CachingCatalog. On first loadTable, returns a fresh + * copy. On subsequent loads, returns the CACHED (stale) copy, + * making external changes invisible. + * + * Session writes go through the SQL path which modifies the + * original table and invalidates, but direct catalog API + * modifications are not visible until the cache is cleared. + * + * Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate + * cache expiration (like Iceberg's 30-second TTL). + */ +class CachingInMemoryTableCatalog extends InMemoryTableCatalog { + import CachingInMemoryTableCatalog._ + + override def loadTable(ident: Identifier): Table = { + cachedTables.computeIfAbsent(cacheKey(name, ident), _ => { + super.loadTable(ident) + }) + } + + private def cacheKey( + catalog: String, ident: Identifier): String = { + s"$catalog.${ident.toString}" + } +} + +object CachingInMemoryTableCatalog { + private val cachedTables = + new ConcurrentHashMap[String, Table]() + + def clearCache(): Unit = cachedTables.clear() +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index fd2c0f6e9c2e..53c2278fd636 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -380,24 +380,45 @@ abstract class InMemoryBaseTable( def alterTableWithData( data: Array[BufferedRows], newSchema: StructType): InMemoryBaseTable = { + val newFieldNames = newSchema.fieldNames.toSet data.foreach { bufferedRow => val oldSchema = bufferedRow.schema + // Shrink rows when columns are dropped: remove values for fields + // not in newSchema. This ensures drop+re-add same column returns + // null (reader sees column missing in writeSchema). Leave all other + // evolution (additions, type widening, defaults) to the reader. + val keepFields = oldSchema.fields.zipWithIndex.filter { + case (f, _) => newFieldNames.contains(f.name) + } + val needsShrink = keepFields.length < oldSchema.length + val effectiveSchema = if (needsShrink) { + StructType(keepFields.map(_._1)) + } else { + oldSchema + } + bufferedRow.rows.foreach { row => - // handle partition evolution by re-keying all data - val key = getKey(row, newSchema) + val effectiveRow = if (needsShrink) { + new GenericInternalRow(keepFields.map { case (f, i) => + row.get(i, f.dataType) + }) + } else { + row + } + val key = getKey(effectiveRow, newSchema) dataMap += dataMap.get(key) .map { splits => val newSplits = if ((splits.last.rows.size >= numRowsPerSplit) || - (splits.last.schema != oldSchema)) { - splits :+ new BufferedRows(key, oldSchema) + (splits.last.schema != effectiveSchema)) { + splits :+ new BufferedRows(key, effectiveSchema) } else { splits } - newSplits.last.withRow(row) + newSplits.last.withRow(effectiveRow) key -> newSplits } .getOrElse(key -> Seq( - new BufferedRows(key, oldSchema).withRow(row))) + new BufferedRows(key, effectiveSchema).withRow(effectiveRow))) addPartitionKey(key) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala new file mode 100644 index 000000000000..29ae09f08c55 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +/** + * An InMemoryTableCatalog that creates tables WITHOUT table IDs + * (id() returns null). This simulates connectors that don't + * support table identity tracking. + * + * When table ID is null, the validateTableIdentity check in + * V2TableRefreshUtil is skipped entirely, meaning drop/recreate + * of a table is NOT detected as an error. + */ +class NullIdInMemoryTableCatalog extends InMemoryTableCatalog { + + override def createTable( + ident: Identifier, + info: TableInfo): Table = { + val created = super.createTable(ident, info) + // Replace with a null-ID wrapper + val nullIdTable = created match { + case t: InMemoryTable => + new InMemoryTable( + t.name, + t.columns(), + t.partitioning, + t.properties, + t.constraints, + id = null) // null table ID + case other => other + } + tables.put(ident, nullIdTable) + nullIdTable + } +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala new file mode 100644 index 000000000000..a3f41170748c --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +/** + * An InMemoryTableCatalog that shares table state across all instances. + * This allows multiple SparkSessions (via newSession/cloneSession) to + * read and write the same tables, simulating a real shared metastore. + * + * Use this catalog for multi-session concurrency tests where one session + * writes and another session reads the same DSv2 table. + */ +class SharedInMemoryTableCatalog extends InMemoryTableCatalog { + override protected val tables: java.util.Map[Identifier, Table] = + SharedInMemoryTableCatalog.sharedTables + override protected val namespaces + : java.util.Map[List[String], Map[String, String]] = + SharedInMemoryTableCatalog.sharedNamespaces +} + +object SharedInMemoryTableCatalog { + val sharedTables = + new java.util.concurrent.ConcurrentHashMap[Identifier, Table]() + val sharedNamespaces = + new java.util.concurrent.ConcurrentHashMap[ + List[String], Map[String, String]]() + + def reset(): Unit = { + sharedTables.clear() + sharedNamespaces.clear() + } +} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala new file mode 100644 index 000000000000..c238f44e0a59 --- /dev/null +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import org.apache.spark.sql.Row + +/** + * Design doc Section [1] in Connect: Temp views with stored plans. + * + * In Connect, SQL temp views (CREATE OR REPLACE TEMP VIEW ... AS SELECT *) + * re-analyze on each access. Column names captured at creation time constrain + * which schema changes are tolerated: data writes and column additions succeed, + * but column removal, rename, type change, and drop+add different type fail. + */ +class DataSourceV2ConnectTempViewSuite extends DataSourceV2RefreshConnectTestBase { + + // Section 1: SQL Temp View x All Modifications + mods.foreach { mod => + test(s"[S1] SQL temp view: ${mod.name}") { + assumeCanRun() + withTable(T) { + setupTable() + spark.sql(s"CREATE OR REPLACE TEMP VIEW tmp AS SELECT * FROM $T") + checkAnswer(spark.sql("SELECT * FROM tmp"), Seq(Row(1, 100))) + mod.fn(T) + if (mod.sqlViewOk) { + spark.sql("SELECT * FROM tmp").collect() + } else { + assertThrows[Exception] { + spark.sql("SELECT * FROM tmp").collect() + } + } + } + } + } + + test("[connect edge] createOrReplaceTempView + schema change") { + assumeCanRun() + withTable(T) { + setupTable() + spark.sql(s"CREATE OR REPLACE TEMP VIEW tv AS SELECT * FROM $T") + checkAnswer(spark.sql("SELECT * FROM tv"), Seq(Row(1, 100))) + spark.sql(s"ALTER TABLE $T ADD COLUMN bonus INT") + spark.sql(s"INSERT INTO $T VALUES (2, 200, 50)") + // SQL view re-analyzes: SELECT * picks up new column + val r = spark.sql("SELECT * FROM tv").collect() + assert(r.length == 2) + } + } +} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala new file mode 100644 index 000000000000..e12681111151 --- /dev/null +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession, SQLHelper} +import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirExists + +/** + * Shared infrastructure for DSv2 table refresh and pinning tests in Connect mode. + * + * In Connect, every action re-analyzes the plan on the server: + * - No stale QueryExecution (collect is NOT pinned) + * - Schema changes are picked up on every access + * - Type widening, column rename, column removal all succeed for DataFrames + * - Joins/unions always see consistent latest version + */ +trait DataSourceV2RefreshConnectTestBase + extends QueryTest with RemoteSparkSession with SQLHelper { + + protected val T = "testcat.ns1.ns2.tbl" + + protected def setupTable(): Unit = { + spark.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") + spark.sql(s"INSERT INTO $T VALUES (1, 100)") + } + + protected def assumeCanRun(): Unit = { + assume(spark != null && isAssemblyJarsDirExists, "Spark Connect server not available") + } + + // In Connect, SQL temp views (SELECT *) capture column names. + // Removing/renaming/retyping a captured column fails on re-analysis. + // Adding columns or data is fine. + // In Connect, DataFrames re-analyze on every action. ALL mods succeed for DFs. + case class Mod(name: String, fn: String => Unit, sqlViewOk: Boolean, dfOk: Boolean) + + protected val mods: Seq[Mod] = Seq( + Mod("data write", + t => spark.sql(s"INSERT INTO $t VALUES (2, 200)"), + sqlViewOk = true, dfOk = true), + Mod("column addition", + t => spark.sql(s"ALTER TABLE $t ADD COLUMN new_col INT"), + sqlViewOk = true, dfOk = true), + Mod("column removal", + t => spark.sql(s"ALTER TABLE $t DROP COLUMN salary"), + sqlViewOk = false, dfOk = true), + Mod("column rename", + t => spark.sql(s"ALTER TABLE $t RENAME COLUMN salary TO pay"), + sqlViewOk = false, dfOk = true), + Mod("type widening INT to BIGINT", + t => spark.sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE BIGINT"), + sqlViewOk = false, dfOk = true), + Mod("drop+add column same type", + t => { + spark.sql(s"ALTER TABLE $t DROP COLUMN salary") + spark.sql(s"ALTER TABLE $t ADD COLUMN salary INT") + }, + sqlViewOk = true, dfOk = true), + Mod("drop+add column different type", + t => { + spark.sql(s"ALTER TABLE $t DROP COLUMN salary") + spark.sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + }, + sqlViewOk = false, dfOk = true), + Mod("drop/recreate table", + t => { + spark.sql(s"DROP TABLE $t") + spark.sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + }, + sqlViewOk = true, dfOk = true)) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala index f0359b33f431..9720ac8a6b98 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala @@ -43,6 +43,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap +import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES import org.apache.spark.util.ArrayImplicits._ /** @@ -587,6 +588,12 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram * user-registered callback functions. */ private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = { + // Validate source DF's captured table versions before the + // new QE's analyzer re-resolves them. Defense-in-depth for + // cases where name-based resolution could silently adapt. + V2TableRefreshUtil.refresh( + session, df.queryExecution.analyzed, + versionedOnly = true, PROHIBIT_CHANGES) val qe = new QueryExecution(session, command, df.queryExecution.tracker, shuffleCleanupModeOpt = Some(QueryExecution.determineShuffleCleanupMode(session.sessionState.conf))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala index 169822db96c2..0743ed96bd10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala @@ -33,8 +33,10 @@ import org.apache.spark.sql.connector.catalog.TableWritePrivilege._ import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType +import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES /** * Interface used to write a [[org.apache.spark.sql.classic.Dataset]] to external storage using @@ -227,6 +229,12 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) * callback functions. */ private def runCommand(command: LogicalPlan): Unit = { + // Validate the source DF's captured table versions BEFORE + // the new QE's analyzer re-resolves them. Use the analyzed + // plan (not logical) because logical is still unresolved. + V2TableRefreshUtil.refresh( + sparkSession, df.queryExecution.analyzed, + versionedOnly = true, PROHIBIT_CHANGES) val qe = new QueryExecution(sparkSession, command, df.queryExecution.tracker, shuffleCleanupModeOpt = Some(QueryExecution.determineShuffleCleanupMode(sparkSession.sessionState.conf))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala new file mode 100644 index 000000000000..a43fda2cd275 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector + +import org.apache.spark.SparkConf +import org.apache.spark.sql._ +import org.apache.spark.sql.connector.catalog._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession + +/** + * Shared infrastructure for DSv2 table refresh and pinning test suites. + * + * Provides catalog setup (sharedcat, cachingcat, nullidcat), cleanup, + * and helper methods for table creation and external session simulation. + */ +trait DataSourceV2TableRefreshTestBase + extends QueryTest with SharedSparkSession { + + // Error condition constants + protected val COL_MISMATCH = + "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH" + protected val ID_MISMATCH = + "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH" + protected val VIEW_PLAN_CHANGED = + "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" + + override protected def checkErrorIgnorableParameters + : Map[String, Set[String]] = + super.checkErrorIgnorableParameters ++ Map( + COL_MISMATCH -> Set("tableName", "errors"), + ID_MISMATCH -> + Set("tableName", "capturedTableId", "currentTableId"), + VIEW_PLAN_CHANGED -> + Set("viewName", "tableName", "colType", "errors")) + + override protected def sparkConf: SparkConf = super.sparkConf + .set(SQLConf.ANSI_ENABLED, true) + .set("spark.sql.catalog.sharedcat", + classOf[SharedInMemoryTableCatalog].getName) + .set("spark.sql.catalog.sharedcat.copyOnLoad", "true") + .set("spark.sql.catalog.cachingcat", + classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") + .set("spark.sql.catalog.nullidcat", + classOf[NullIdInMemoryTableCatalog].getName) + .set("spark.sql.catalog.nullidcat.copyOnLoad", "true") + + override def afterEach(): Unit = { + SharedInMemoryTableCatalog.reset() + CachingInMemoryTableCatalog.clearCache() + try { + spark.sessionState.catalogManager.reset() + } finally { + super.afterEach() + } + } + + protected val T = "sharedcat.ns.tbl" + protected val T2 = "sharedcat.ns.tbl2" + + protected def setupTable(): Unit = { + sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") + sql(s"INSERT INTO $T VALUES (1, 100)") + } + + protected val CT = "cachingcat.ns.tbl" + + protected def setupCachingTable(): Unit = { + sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo") + sql(s"INSERT INTO $CT VALUES (1, 100)") + } + + /** + * Creates a SparkSession with a SEPARATE CacheManager (separate SharedState) + * but the same SparkContext and catalog configs. SharedInMemoryTableCatalog + * tables are shared via the companion object, so the external session sees + * the same table data. This simulates a truly external writer (different JVM + * in production) whose writes do NOT invalidate Session 1's CacheManager. + */ + protected def extSession: SparkSession = { + val savedActive = SparkSession.getActiveSession + val savedDefault = SparkSession.getDefaultSession + try { + SparkSession.clearActiveSession() + SparkSession.clearDefaultSession() + SparkSession.builder() + .sparkContext(spark.sparkContext) + .create() + } finally { + savedDefault.foreach(s => SparkSession.setDefaultSession(s)) + savedActive.foreach(s => SparkSession.setActiveSession(s)) + } + } +} From 84db7fd7fa4828f19a85cb8b2cbb2531c538a841 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Wed, 22 Apr 2026 00:34:01 +0000 Subject: [PATCH 2/5] Fix scalafmt formatting in connect test files --- .../DataSourceV2ConnectTempViewSuite.scala | 8 +-- .../DataSourceV2RefreshConnectTestBase.scala | 60 ++++++++++++------- 2 files changed, 43 insertions(+), 25 deletions(-) diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala index c238f44e0a59..e2faa4c9ed1f 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala @@ -22,10 +22,10 @@ import org.apache.spark.sql.Row /** * Design doc Section [1] in Connect: Temp views with stored plans. * - * In Connect, SQL temp views (CREATE OR REPLACE TEMP VIEW ... AS SELECT *) - * re-analyze on each access. Column names captured at creation time constrain - * which schema changes are tolerated: data writes and column additions succeed, - * but column removal, rename, type change, and drop+add different type fail. + * In Connect, SQL temp views (CREATE OR REPLACE TEMP VIEW ... AS SELECT *) re-analyze on each + * access. Column names captured at creation time constrain which schema changes are tolerated: + * data writes and column additions succeed, but column removal, rename, type change, and drop+add + * different type fail. */ class DataSourceV2ConnectTempViewSuite extends DataSourceV2RefreshConnectTestBase { diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala index e12681111151..cd00d61b1674 100644 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala +++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala @@ -24,13 +24,15 @@ import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirE * Shared infrastructure for DSv2 table refresh and pinning tests in Connect mode. * * In Connect, every action re-analyzes the plan on the server: - * - No stale QueryExecution (collect is NOT pinned) - * - Schema changes are picked up on every access - * - Type widening, column rename, column removal all succeed for DataFrames - * - Joins/unions always see consistent latest version + * - No stale QueryExecution (collect is NOT pinned) + * - Schema changes are picked up on every access + * - Type widening, column rename, column removal all succeed for DataFrames + * - Joins/unions always see consistent latest version */ trait DataSourceV2RefreshConnectTestBase - extends QueryTest with RemoteSparkSession with SQLHelper { + extends QueryTest + with RemoteSparkSession + with SQLHelper { protected val T = "testcat.ns1.ns2.tbl" @@ -50,37 +52,53 @@ trait DataSourceV2RefreshConnectTestBase case class Mod(name: String, fn: String => Unit, sqlViewOk: Boolean, dfOk: Boolean) protected val mods: Seq[Mod] = Seq( - Mod("data write", + Mod( + "data write", t => spark.sql(s"INSERT INTO $t VALUES (2, 200)"), - sqlViewOk = true, dfOk = true), - Mod("column addition", + sqlViewOk = true, + dfOk = true), + Mod( + "column addition", t => spark.sql(s"ALTER TABLE $t ADD COLUMN new_col INT"), - sqlViewOk = true, dfOk = true), - Mod("column removal", + sqlViewOk = true, + dfOk = true), + Mod( + "column removal", t => spark.sql(s"ALTER TABLE $t DROP COLUMN salary"), - sqlViewOk = false, dfOk = true), - Mod("column rename", + sqlViewOk = false, + dfOk = true), + Mod( + "column rename", t => spark.sql(s"ALTER TABLE $t RENAME COLUMN salary TO pay"), - sqlViewOk = false, dfOk = true), - Mod("type widening INT to BIGINT", + sqlViewOk = false, + dfOk = true), + Mod( + "type widening INT to BIGINT", t => spark.sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE BIGINT"), - sqlViewOk = false, dfOk = true), - Mod("drop+add column same type", + sqlViewOk = false, + dfOk = true), + Mod( + "drop+add column same type", t => { spark.sql(s"ALTER TABLE $t DROP COLUMN salary") spark.sql(s"ALTER TABLE $t ADD COLUMN salary INT") }, - sqlViewOk = true, dfOk = true), - Mod("drop+add column different type", + sqlViewOk = true, + dfOk = true), + Mod( + "drop+add column different type", t => { spark.sql(s"ALTER TABLE $t DROP COLUMN salary") spark.sql(s"ALTER TABLE $t ADD COLUMN salary STRING") }, - sqlViewOk = false, dfOk = true), - Mod("drop/recreate table", + sqlViewOk = false, + dfOk = true), + Mod( + "drop/recreate table", t => { spark.sql(s"DROP TABLE $t") spark.sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") }, - sqlViewOk = true, dfOk = true)) + sqlViewOk = true, + dfOk = true)) } From d13fa353b8c2b096de01d0520099ebdaa359f9be Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Mon, 27 Apr 2026 21:03:24 +0000 Subject: [PATCH 3/5] Replace multi-file Connect test with clean 11-test single-file suite Remove all irrelevant files (catalog helpers, DataFrameWriter changes, base traits). Add DataSourceV2TempViewConnectSuite with 11 tests matching the classic PR structure exactly. Co-authored-by: Isaac --- .../catalog/CachingInMemoryTableCatalog.scala | 55 --- .../connector/catalog/InMemoryBaseTable.scala | 33 +- .../catalog/NullIdInMemoryTableCatalog.scala | 50 --- .../catalog/SharedInMemoryTableCatalog.scala | 47 --- .../DataSourceV2ConnectTempViewSuite.scala | 65 ---- .../DataSourceV2RefreshConnectTestBase.scala | 104 ------ .../DataSourceV2TempViewConnectSuite.scala | 347 ++++++++++++++++++ .../spark/sql/classic/DataFrameWriter.scala | 7 - .../spark/sql/classic/DataFrameWriterV2.scala | 8 - .../DataSourceV2TableRefreshTestBase.scala | 110 ------ 10 files changed, 353 insertions(+), 473 deletions(-) delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala delete mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala delete mode 100644 sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala delete mode 100644 sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala create mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala delete mode 100644 sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala deleted file mode 100644 index abcb04615d24..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala +++ /dev/null @@ -1,55 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -import java.util.concurrent.ConcurrentHashMap - -/** - * An InMemoryTableCatalog that simulates a caching connector like - * Iceberg's CachingCatalog. On first loadTable, returns a fresh - * copy. On subsequent loads, returns the CACHED (stale) copy, - * making external changes invisible. - * - * Session writes go through the SQL path which modifies the - * original table and invalidates, but direct catalog API - * modifications are not visible until the cache is cleared. - * - * Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate - * cache expiration (like Iceberg's 30-second TTL). - */ -class CachingInMemoryTableCatalog extends InMemoryTableCatalog { - import CachingInMemoryTableCatalog._ - - override def loadTable(ident: Identifier): Table = { - cachedTables.computeIfAbsent(cacheKey(name, ident), _ => { - super.loadTable(ident) - }) - } - - private def cacheKey( - catalog: String, ident: Identifier): String = { - s"$catalog.${ident.toString}" - } -} - -object CachingInMemoryTableCatalog { - private val cachedTables = - new ConcurrentHashMap[String, Table]() - - def clearCache(): Unit = cachedTables.clear() -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 53c2278fd636..fd2c0f6e9c2e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -380,45 +380,24 @@ abstract class InMemoryBaseTable( def alterTableWithData( data: Array[BufferedRows], newSchema: StructType): InMemoryBaseTable = { - val newFieldNames = newSchema.fieldNames.toSet data.foreach { bufferedRow => val oldSchema = bufferedRow.schema - // Shrink rows when columns are dropped: remove values for fields - // not in newSchema. This ensures drop+re-add same column returns - // null (reader sees column missing in writeSchema). Leave all other - // evolution (additions, type widening, defaults) to the reader. - val keepFields = oldSchema.fields.zipWithIndex.filter { - case (f, _) => newFieldNames.contains(f.name) - } - val needsShrink = keepFields.length < oldSchema.length - val effectiveSchema = if (needsShrink) { - StructType(keepFields.map(_._1)) - } else { - oldSchema - } - bufferedRow.rows.foreach { row => - val effectiveRow = if (needsShrink) { - new GenericInternalRow(keepFields.map { case (f, i) => - row.get(i, f.dataType) - }) - } else { - row - } - val key = getKey(effectiveRow, newSchema) + // handle partition evolution by re-keying all data + val key = getKey(row, newSchema) dataMap += dataMap.get(key) .map { splits => val newSplits = if ((splits.last.rows.size >= numRowsPerSplit) || - (splits.last.schema != effectiveSchema)) { - splits :+ new BufferedRows(key, effectiveSchema) + (splits.last.schema != oldSchema)) { + splits :+ new BufferedRows(key, oldSchema) } else { splits } - newSplits.last.withRow(effectiveRow) + newSplits.last.withRow(row) key -> newSplits } .getOrElse(key -> Seq( - new BufferedRows(key, effectiveSchema).withRow(effectiveRow))) + new BufferedRows(key, oldSchema).withRow(row))) addPartitionKey(key) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala deleted file mode 100644 index 29ae09f08c55..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/NullIdInMemoryTableCatalog.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -/** - * An InMemoryTableCatalog that creates tables WITHOUT table IDs - * (id() returns null). This simulates connectors that don't - * support table identity tracking. - * - * When table ID is null, the validateTableIdentity check in - * V2TableRefreshUtil is skipped entirely, meaning drop/recreate - * of a table is NOT detected as an error. - */ -class NullIdInMemoryTableCatalog extends InMemoryTableCatalog { - - override def createTable( - ident: Identifier, - info: TableInfo): Table = { - val created = super.createTable(ident, info) - // Replace with a null-ID wrapper - val nullIdTable = created match { - case t: InMemoryTable => - new InMemoryTable( - t.name, - t.columns(), - t.partitioning, - t.properties, - t.constraints, - id = null) // null table ID - case other => other - } - tables.put(ident, nullIdTable) - nullIdTable - } -} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala deleted file mode 100644 index a3f41170748c..000000000000 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/SharedInMemoryTableCatalog.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector.catalog - -/** - * An InMemoryTableCatalog that shares table state across all instances. - * This allows multiple SparkSessions (via newSession/cloneSession) to - * read and write the same tables, simulating a real shared metastore. - * - * Use this catalog for multi-session concurrency tests where one session - * writes and another session reads the same DSv2 table. - */ -class SharedInMemoryTableCatalog extends InMemoryTableCatalog { - override protected val tables: java.util.Map[Identifier, Table] = - SharedInMemoryTableCatalog.sharedTables - override protected val namespaces - : java.util.Map[List[String], Map[String, String]] = - SharedInMemoryTableCatalog.sharedNamespaces -} - -object SharedInMemoryTableCatalog { - val sharedTables = - new java.util.concurrent.ConcurrentHashMap[Identifier, Table]() - val sharedNamespaces = - new java.util.concurrent.ConcurrentHashMap[ - List[String], Map[String, String]]() - - def reset(): Unit = { - sharedTables.clear() - sharedNamespaces.clear() - } -} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala deleted file mode 100644 index e2faa4c9ed1f..000000000000 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2ConnectTempViewSuite.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect - -import org.apache.spark.sql.Row - -/** - * Design doc Section [1] in Connect: Temp views with stored plans. - * - * In Connect, SQL temp views (CREATE OR REPLACE TEMP VIEW ... AS SELECT *) re-analyze on each - * access. Column names captured at creation time constrain which schema changes are tolerated: - * data writes and column additions succeed, but column removal, rename, type change, and drop+add - * different type fail. - */ -class DataSourceV2ConnectTempViewSuite extends DataSourceV2RefreshConnectTestBase { - - // Section 1: SQL Temp View x All Modifications - mods.foreach { mod => - test(s"[S1] SQL temp view: ${mod.name}") { - assumeCanRun() - withTable(T) { - setupTable() - spark.sql(s"CREATE OR REPLACE TEMP VIEW tmp AS SELECT * FROM $T") - checkAnswer(spark.sql("SELECT * FROM tmp"), Seq(Row(1, 100))) - mod.fn(T) - if (mod.sqlViewOk) { - spark.sql("SELECT * FROM tmp").collect() - } else { - assertThrows[Exception] { - spark.sql("SELECT * FROM tmp").collect() - } - } - } - } - } - - test("[connect edge] createOrReplaceTempView + schema change") { - assumeCanRun() - withTable(T) { - setupTable() - spark.sql(s"CREATE OR REPLACE TEMP VIEW tv AS SELECT * FROM $T") - checkAnswer(spark.sql("SELECT * FROM tv"), Seq(Row(1, 100))) - spark.sql(s"ALTER TABLE $T ADD COLUMN bonus INT") - spark.sql(s"INSERT INTO $T VALUES (2, 200, 50)") - // SQL view re-analyzes: SELECT * picks up new column - val r = spark.sql("SELECT * FROM tv").collect() - assert(r.length == 2) - } - } -} diff --git a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala deleted file mode 100644 index cd00d61b1674..000000000000 --- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/DataSourceV2RefreshConnectTestBase.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect - -import org.apache.spark.sql.connect.test.{QueryTest, RemoteSparkSession, SQLHelper} -import org.apache.spark.sql.connect.test.IntegrationTestUtils.isAssemblyJarsDirExists - -/** - * Shared infrastructure for DSv2 table refresh and pinning tests in Connect mode. - * - * In Connect, every action re-analyzes the plan on the server: - * - No stale QueryExecution (collect is NOT pinned) - * - Schema changes are picked up on every access - * - Type widening, column rename, column removal all succeed for DataFrames - * - Joins/unions always see consistent latest version - */ -trait DataSourceV2RefreshConnectTestBase - extends QueryTest - with RemoteSparkSession - with SQLHelper { - - protected val T = "testcat.ns1.ns2.tbl" - - protected def setupTable(): Unit = { - spark.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") - spark.sql(s"INSERT INTO $T VALUES (1, 100)") - } - - protected def assumeCanRun(): Unit = { - assume(spark != null && isAssemblyJarsDirExists, "Spark Connect server not available") - } - - // In Connect, SQL temp views (SELECT *) capture column names. - // Removing/renaming/retyping a captured column fails on re-analysis. - // Adding columns or data is fine. - // In Connect, DataFrames re-analyze on every action. ALL mods succeed for DFs. - case class Mod(name: String, fn: String => Unit, sqlViewOk: Boolean, dfOk: Boolean) - - protected val mods: Seq[Mod] = Seq( - Mod( - "data write", - t => spark.sql(s"INSERT INTO $t VALUES (2, 200)"), - sqlViewOk = true, - dfOk = true), - Mod( - "column addition", - t => spark.sql(s"ALTER TABLE $t ADD COLUMN new_col INT"), - sqlViewOk = true, - dfOk = true), - Mod( - "column removal", - t => spark.sql(s"ALTER TABLE $t DROP COLUMN salary"), - sqlViewOk = false, - dfOk = true), - Mod( - "column rename", - t => spark.sql(s"ALTER TABLE $t RENAME COLUMN salary TO pay"), - sqlViewOk = false, - dfOk = true), - Mod( - "type widening INT to BIGINT", - t => spark.sql(s"ALTER TABLE $t ALTER COLUMN salary TYPE BIGINT"), - sqlViewOk = false, - dfOk = true), - Mod( - "drop+add column same type", - t => { - spark.sql(s"ALTER TABLE $t DROP COLUMN salary") - spark.sql(s"ALTER TABLE $t ADD COLUMN salary INT") - }, - sqlViewOk = true, - dfOk = true), - Mod( - "drop+add column different type", - t => { - spark.sql(s"ALTER TABLE $t DROP COLUMN salary") - spark.sql(s"ALTER TABLE $t ADD COLUMN salary STRING") - }, - sqlViewOk = false, - dfOk = true), - Mod( - "drop/recreate table", - t => { - spark.sql(s"DROP TABLE $t") - spark.sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") - }, - sqlViewOk = true, - dfOk = true)) -} diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala new file mode 100644 index 000000000000..f24bd8ea7433 --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala @@ -0,0 +1,347 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect + +import java.util + +import org.apache.spark.SparkConf +import org.apache.spark.sql.{AnalysisException, Row} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.classic +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, Identifier, InMemoryBaseTable, InMemoryTableCatalog, TableChange, TableInfo, TableWritePrivilege} +import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructType} + +/** + * DSv2 temp view with stored plan tests for Spark Connect, mirroring the classic + * DataSourceV2DataFrameSuite temp view scenarios. + * + * Uses an in-process Connect server ([[SparkConnectServerTest]]) so that the test can + * access the server's catalog directly. A Connect client creates temp views and performs + * SQL operations; external changes go through the catalog API, which bypasses the + * Connect session's analysis. + */ +class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { + + override def sparkConf: SparkConf = super.sparkConf + .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.testcat.copyOnLoad", "true") + + private val T = "testcat.ns1.ns2.tbl" + private val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + + /** Get the catalog from the server-side session. */ + private def serverCatalog( + serverSession: classic.SparkSession): InMemoryTableCatalog = + serverSession.sessionState.catalogManager + .catalog("testcat").asInstanceOf[InMemoryTableCatalog] + + test("[connect] temp view with stored plan reflects session write") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + s.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + + checkAnswer(serverSession.table("v"), Seq(Row(1, 100), Row(2, 200))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan reflects external write") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via direct catalog API + val schema = StructType.fromDDL("id INT, salary INT") + val cat = serverCatalog(serverSession) + val extTable = cat + .loadTable(ident, util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + checkAnswer(serverSession.table("v"), Seq(Row(1, 100), Row(2, 200))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan preserves schema after session ADD COLUMN") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + s.sql(s"ALTER TABLE $T ADD COLUMN new_column INT").collect() + s.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect() + + // view preserves original 2-column schema, filter still applied + checkAnswer(serverSession.table("v"), Seq(Row(1, 100), Row(2, 200))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan preserves schema after external ADD COLUMN") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val cat = serverCatalog(serverSession) + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + cat.alterTable(ident, addCol) + + // external writer adds data with new schema + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = cat + .loadTable(ident, util.Set.of(TableWritePrivilege.INSERT)) + .asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // view preserves original 2-column schema, filter still applied + checkAnswer(serverSession.table("v"), Seq(Row(1, 100), Row(2, 200))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan detects external column removal") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val cat = serverCatalog(serverSession) + val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(ident, dropCol) + + checkError( + exception = intercept[AnalysisException] { + serverSession.table("v").collect() + }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan resolves to externally recreated table") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + val cat = serverCatalog(serverSession) + val originalTableId = cat.loadTable(ident).id + + // external drop and recreate via catalog API + cat.dropTable(ident) + cat.createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = cat.loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(serverSession.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + s.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkAnswer(serverSession.table("v"), Seq(Row(2, 200))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan after session drop and re-add column same type") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // drop and re-add column with same name and type + s.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + s.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect() + + // schema validation passes (same column names and types) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan after external drop and re-add column same type") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column via catalog API + val cat = serverCatalog(serverSession) + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + cat.alterTable(ident, dropCol, addCol) + + // schema validation passes (same column names and types) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan detects session column type change") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // drop and re-add column with same name but different type + s.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + s.sql(s"ALTER TABLE $T ADD COLUMN salary STRING").collect() + + checkError( + exception = intercept[AnalysisException] { + serverSession.table("v").collect() + }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan detects external column type change") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column with different type via catalog API + val cat = serverCatalog(serverSession) + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + cat.alterTable(ident, dropCol, addCol) + + checkError( + exception = intercept[AnalysisException] { + serverSession.table("v").collect() + }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + + test("[connect] temp view with stored plan detects type widening") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + // widen salary type from INT to BIGINT via catalog API + val cat = serverCatalog(serverSession) + val updateType = + TableChange.updateColumnType(Array("salary"), LongType) + cat.alterTable(ident, updateType) + + checkError( + exception = intercept[AnalysisException] { + serverSession.table("v").collect() + }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala index 9720ac8a6b98..f0359b33f431 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriter.scala @@ -43,7 +43,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap -import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES import org.apache.spark.util.ArrayImplicits._ /** @@ -588,12 +587,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) extends sql.DataFram * user-registered callback functions. */ private def runCommand(session: SparkSession)(command: LogicalPlan): Unit = { - // Validate source DF's captured table versions before the - // new QE's analyzer re-resolves them. Defense-in-depth for - // cases where name-based resolution could silently adapt. - V2TableRefreshUtil.refresh( - session, df.queryExecution.analyzed, - versionedOnly = true, PROHIBIT_CHANGES) val qe = new QueryExecution(session, command, df.queryExecution.tracker, shuffleCleanupModeOpt = Some(QueryExecution.determineShuffleCleanupMode(session.sessionState.conf))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala index 0743ed96bd10..169822db96c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/classic/DataFrameWriterV2.scala @@ -33,10 +33,8 @@ import org.apache.spark.sql.connector.catalog.TableWritePrivilege._ import org.apache.spark.sql.connector.expressions._ import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.QueryExecution -import org.apache.spark.sql.execution.datasources.v2.V2TableRefreshUtil import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.IntegerType -import org.apache.spark.sql.util.SchemaValidationMode.PROHIBIT_CHANGES /** * Interface used to write a [[org.apache.spark.sql.classic.Dataset]] to external storage using @@ -229,12 +227,6 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) * callback functions. */ private def runCommand(command: LogicalPlan): Unit = { - // Validate the source DF's captured table versions BEFORE - // the new QE's analyzer re-resolves them. Use the analyzed - // plan (not logical) because logical is still unresolved. - V2TableRefreshUtil.refresh( - sparkSession, df.queryExecution.analyzed, - versionedOnly = true, PROHIBIT_CHANGES) val qe = new QueryExecution(sparkSession, command, df.queryExecution.tracker, shuffleCleanupModeOpt = Some(QueryExecution.determineShuffleCleanupMode(sparkSession.sessionState.conf))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala deleted file mode 100644 index a43fda2cd275..000000000000 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2TableRefreshTestBase.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connector - -import org.apache.spark.SparkConf -import org.apache.spark.sql._ -import org.apache.spark.sql.connector.catalog._ -import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SharedSparkSession - -/** - * Shared infrastructure for DSv2 table refresh and pinning test suites. - * - * Provides catalog setup (sharedcat, cachingcat, nullidcat), cleanup, - * and helper methods for table creation and external session simulation. - */ -trait DataSourceV2TableRefreshTestBase - extends QueryTest with SharedSparkSession { - - // Error condition constants - protected val COL_MISMATCH = - "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.COLUMNS_MISMATCH" - protected val ID_MISMATCH = - "INCOMPATIBLE_TABLE_CHANGE_AFTER_ANALYSIS.TABLE_ID_MISMATCH" - protected val VIEW_PLAN_CHANGED = - "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION" - - override protected def checkErrorIgnorableParameters - : Map[String, Set[String]] = - super.checkErrorIgnorableParameters ++ Map( - COL_MISMATCH -> Set("tableName", "errors"), - ID_MISMATCH -> - Set("tableName", "capturedTableId", "currentTableId"), - VIEW_PLAN_CHANGED -> - Set("viewName", "tableName", "colType", "errors")) - - override protected def sparkConf: SparkConf = super.sparkConf - .set(SQLConf.ANSI_ENABLED, true) - .set("spark.sql.catalog.sharedcat", - classOf[SharedInMemoryTableCatalog].getName) - .set("spark.sql.catalog.sharedcat.copyOnLoad", "true") - .set("spark.sql.catalog.cachingcat", - classOf[CachingInMemoryTableCatalog].getName) - .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") - .set("spark.sql.catalog.nullidcat", - classOf[NullIdInMemoryTableCatalog].getName) - .set("spark.sql.catalog.nullidcat.copyOnLoad", "true") - - override def afterEach(): Unit = { - SharedInMemoryTableCatalog.reset() - CachingInMemoryTableCatalog.clearCache() - try { - spark.sessionState.catalogManager.reset() - } finally { - super.afterEach() - } - } - - protected val T = "sharedcat.ns.tbl" - protected val T2 = "sharedcat.ns.tbl2" - - protected def setupTable(): Unit = { - sql(s"CREATE TABLE $T (id INT, salary INT) USING foo") - sql(s"INSERT INTO $T VALUES (1, 100)") - } - - protected val CT = "cachingcat.ns.tbl" - - protected def setupCachingTable(): Unit = { - sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo") - sql(s"INSERT INTO $CT VALUES (1, 100)") - } - - /** - * Creates a SparkSession with a SEPARATE CacheManager (separate SharedState) - * but the same SparkContext and catalog configs. SharedInMemoryTableCatalog - * tables are shared via the companion object, so the external session sees - * the same table data. This simulates a truly external writer (different JVM - * in production) whose writes do NOT invalidate Session 1's CacheManager. - */ - protected def extSession: SparkSession = { - val savedActive = SparkSession.getActiveSession - val savedDefault = SparkSession.getDefaultSession - try { - SparkSession.clearActiveSession() - SparkSession.clearDefaultSession() - SparkSession.builder() - .sparkContext(spark.sparkContext) - .create() - } finally { - savedDefault.foreach(s => SparkSession.setDefaultSession(s)) - savedActive.foreach(s => SparkSession.setActiveSession(s)) - } - } -} From 66e95448a524308caa5814c9637798f53b6743a9 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Mon, 27 Apr 2026 21:16:06 +0000 Subject: [PATCH 4/5] Match PR #55540 structure: 13 tests, section comments, two views in S5 Add session variants for scenarios 3 (column removal) and 4 (table recreate). Add v_unfiltered view to scenario 5. Include InMemoryBaseTable fix for column drop data migration. Add section comments for all 7 scenarios. Use separate alterTable calls for external variants. Co-authored-by: Isaac --- .../connector/catalog/InMemoryBaseTable.scala | 40 ++++++-- .../DataSourceV2TempViewConnectSuite.scala | 93 +++++++++++++++++-- 2 files changed, 120 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index fd2c0f6e9c2e..8b702ee39916 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -380,24 +380,52 @@ abstract class InMemoryBaseTable( def alterTableWithData( data: Array[BufferedRows], newSchema: StructType): InMemoryBaseTable = { + val newFieldNames = newSchema.fieldNames.toSet data.foreach { bufferedRow => val oldSchema = bufferedRow.schema + + // Identify which columns from the old schema still exist in the new schema. + // Each entry is (StructField, original index in old row) so we can extract values later. + val fieldsRetainedInOldSchema = oldSchema.fields.zipWithIndex.filter { + case (oldField, _) => newFieldNames.contains(oldField.name) + } + val areColumnsDropped = fieldsRetainedInOldSchema.length < oldSchema.length + + // Build a schema that only contains the retained columns. + // This becomes the write schema for the migrated rows. + val retainedSchemaAfterDroppedColumns = if (areColumnsDropped) { + StructType(fieldsRetainedInOldSchema.map(_._1)) + } else { + oldSchema + } + bufferedRow.rows.foreach { row => - // handle partition evolution by re-keying all data - val key = getKey(row, newSchema) + // Physically remove dropped column values from the row so they do not + // survive through ALTER chains (e.g. DROP COLUMN then ADD COLUMN same name). + val retainedRowAfterDroppedColumns = if (areColumnsDropped) { + new GenericInternalRow(fieldsRetainedInOldSchema.map { + case (retainedField, idx) => row.get(idx, retainedField.dataType) + }) + } else { + row + } + + // Re-key and store the migrated row under the new partition layout. + val key = getKey(retainedRowAfterDroppedColumns, newSchema) dataMap += dataMap.get(key) .map { splits => val newSplits = if ((splits.last.rows.size >= numRowsPerSplit) || - (splits.last.schema != oldSchema)) { - splits :+ new BufferedRows(key, oldSchema) + (splits.last.schema != retainedSchemaAfterDroppedColumns)) { + splits :+ new BufferedRows(key, retainedSchemaAfterDroppedColumns) } else { splits } - newSplits.last.withRow(row) + newSplits.last.withRow(retainedRowAfterDroppedColumns) key -> newSplits } .getOrElse(key -> Seq( - new BufferedRows(key, oldSchema).withRow(row))) + new BufferedRows(key, retainedSchemaAfterDroppedColumns) + .withRow(retainedRowAfterDroppedColumns))) addPartitionKey(key) } } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala index f24bd8ea7433..7788973a798f 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala @@ -50,6 +50,12 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { serverSession.sessionState.catalogManager .catalog("testcat").asInstanceOf[InMemoryTableCatalog] + // Temp views with stored plans. + // Each test creates a DSv2 table with initial data, builds a temp view with a filter + // (to demonstrate that the stored plan is non-trivial), and then verifies the view + // behavior after various table modifications (session or external). + + // Scenario 1: session and external writes. test("[connect] temp view with stored plan reflects session write") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -93,6 +99,7 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } + // Scenario 2: adding new columns and data. test("[connect] temp view with stored plan preserves schema after session ADD COLUMN") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -143,6 +150,34 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } + // Scenario 3: removing existing columns. + test("[connect] temp view with stored plan detects session column removal") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + s.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() + + checkError( + exception = intercept[AnalysisException] { + serverSession.table("v").collect() + }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + test("[connect] temp view with stored plan detects external column removal") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -173,6 +208,31 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } + // Scenario 4: drop and re-create table. + test("[connect] temp view with stored plan resolves to session recreated table") { + withSession { s => + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() + + s.table(T).filter("salary < 999").createOrReplaceTempView("v") + val serverSession = getServerSession(s) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + + s.sql(s"DROP TABLE $T").collect() + s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() + + // view resolves to the new empty table + checkAnswer(serverSession.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + s.sql(s"INSERT INTO $T VALUES (2, 200)").collect() + checkAnswer(serverSession.table("v"), Seq(Row(2, 200))) + + s.sql("DROP VIEW IF EXISTS v").collect() + s.sql(s"DROP TABLE IF EXISTS $T").collect() + } + } + test("[connect] temp view with stored plan resolves to externally recreated table") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -210,12 +270,14 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } + // Scenario 5: drop and re-add column with the same name and type. test("[connect] temp view with stored plan after session drop and re-add column same type") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() s.table(T).filter("salary < 999").createOrReplaceTempView("v") + s.table(T).createOrReplaceTempView("v_unfiltered") val serverSession = getServerSession(s) checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) @@ -223,10 +285,16 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { s.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() s.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect() - // schema validation passes (same column names and types) - checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + // salary data is no longer preserved after drop and re-add + // null < 999 evaluates to null (falsy), so no rows pass the filter + checkAnswer(serverSession.table("v"), Seq.empty) + // unfiltered view shows all rows with null salary + checkAnswer( + serverSession.table("v_unfiltered"), + Seq(Row(1, null), Row(10, null))) s.sql("DROP VIEW IF EXISTS v").collect() + s.sql("DROP VIEW IF EXISTS v_unfiltered").collect() s.sql(s"DROP TABLE IF EXISTS $T").collect() } } @@ -237,23 +305,32 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() s.table(T).filter("salary < 999").createOrReplaceTempView("v") + s.table(T).createOrReplaceTempView("v_unfiltered") val serverSession = getServerSession(s) checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) - // external drop and re-add column via catalog API + // external drop and re-add column via catalog API (two separate calls, + // matching how separate ALTER TABLE statements work in practice) val cat = serverCatalog(serverSession) val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(ident, dropCol) val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) - cat.alterTable(ident, dropCol, addCol) + cat.alterTable(ident, addCol) - // schema validation passes (same column names and types) - checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) + // salary data is no longer preserved after drop and re-add + checkAnswer(serverSession.table("v"), Seq.empty) + // unfiltered view shows all rows with null salary + checkAnswer( + serverSession.table("v_unfiltered"), + Seq(Row(1, null), Row(10, null))) s.sql("DROP VIEW IF EXISTS v").collect() + s.sql("DROP VIEW IF EXISTS v_unfiltered").collect() s.sql(s"DROP TABLE IF EXISTS $T").collect() } } + // Scenario 6: drop and re-add column with the same name but different type. test("[connect] temp view with stored plan detects session column type change") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -295,8 +372,9 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { // external drop and re-add column with different type via catalog API val cat = serverCatalog(serverSession) val dropCol = TableChange.deleteColumn(Array("salary"), false) + cat.alterTable(ident, dropCol) val addCol = TableChange.addColumn(Array("salary"), StringType, true) - cat.alterTable(ident, dropCol, addCol) + cat.alterTable(ident, addCol) checkError( exception = intercept[AnalysisException] { @@ -314,6 +392,7 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } + // Scenario 7: type widening from INT to BIGINT. test("[connect] temp view with stored plan detects type widening") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() From f2983239f11353a15120045b34631cabc1551a0c Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Mon, 27 Apr 2026 21:19:27 +0000 Subject: [PATCH 5/5] Revert to 11 tests matching PR #55540 current state Remove InMemoryBaseTable change, extra session tests, v_unfiltered, and section comments to match the current PR #55540 structure exactly. Co-authored-by: Isaac --- .../connector/catalog/InMemoryBaseTable.scala | 40 ++------ .../DataSourceV2TempViewConnectSuite.scala | 91 ++----------------- 2 files changed, 15 insertions(+), 116 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index 8b702ee39916..fd2c0f6e9c2e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -380,52 +380,24 @@ abstract class InMemoryBaseTable( def alterTableWithData( data: Array[BufferedRows], newSchema: StructType): InMemoryBaseTable = { - val newFieldNames = newSchema.fieldNames.toSet data.foreach { bufferedRow => val oldSchema = bufferedRow.schema - - // Identify which columns from the old schema still exist in the new schema. - // Each entry is (StructField, original index in old row) so we can extract values later. - val fieldsRetainedInOldSchema = oldSchema.fields.zipWithIndex.filter { - case (oldField, _) => newFieldNames.contains(oldField.name) - } - val areColumnsDropped = fieldsRetainedInOldSchema.length < oldSchema.length - - // Build a schema that only contains the retained columns. - // This becomes the write schema for the migrated rows. - val retainedSchemaAfterDroppedColumns = if (areColumnsDropped) { - StructType(fieldsRetainedInOldSchema.map(_._1)) - } else { - oldSchema - } - bufferedRow.rows.foreach { row => - // Physically remove dropped column values from the row so they do not - // survive through ALTER chains (e.g. DROP COLUMN then ADD COLUMN same name). - val retainedRowAfterDroppedColumns = if (areColumnsDropped) { - new GenericInternalRow(fieldsRetainedInOldSchema.map { - case (retainedField, idx) => row.get(idx, retainedField.dataType) - }) - } else { - row - } - - // Re-key and store the migrated row under the new partition layout. - val key = getKey(retainedRowAfterDroppedColumns, newSchema) + // handle partition evolution by re-keying all data + val key = getKey(row, newSchema) dataMap += dataMap.get(key) .map { splits => val newSplits = if ((splits.last.rows.size >= numRowsPerSplit) || - (splits.last.schema != retainedSchemaAfterDroppedColumns)) { - splits :+ new BufferedRows(key, retainedSchemaAfterDroppedColumns) + (splits.last.schema != oldSchema)) { + splits :+ new BufferedRows(key, oldSchema) } else { splits } - newSplits.last.withRow(retainedRowAfterDroppedColumns) + newSplits.last.withRow(row) key -> newSplits } .getOrElse(key -> Seq( - new BufferedRows(key, retainedSchemaAfterDroppedColumns) - .withRow(retainedRowAfterDroppedColumns))) + new BufferedRows(key, oldSchema).withRow(row))) addPartitionKey(key) } } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala index 7788973a798f..453e8258b141 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/DataSourceV2TempViewConnectSuite.scala @@ -50,12 +50,11 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { serverSession.sessionState.catalogManager .catalog("testcat").asInstanceOf[InMemoryTableCatalog] - // Temp views with stored plans. + // Temp views with stored plans: scenarios from the DSv2 table refresh design doc. // Each test creates a DSv2 table with initial data, builds a temp view with a filter // (to demonstrate that the stored plan is non-trivial), and then verifies the view // behavior after various table modifications (session or external). - // Scenario 1: session and external writes. test("[connect] temp view with stored plan reflects session write") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -99,7 +98,6 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } - // Scenario 2: adding new columns and data. test("[connect] temp view with stored plan preserves schema after session ADD COLUMN") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -150,34 +148,6 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } - // Scenario 3: removing existing columns. - test("[connect] temp view with stored plan detects session column removal") { - withSession { s => - s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() - - s.table(T).filter("salary < 999").createOrReplaceTempView("v") - val serverSession = getServerSession(s) - checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) - - s.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() - - checkError( - exception = intercept[AnalysisException] { - serverSession.table("v").collect() - }, - condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", - parameters = Map( - "viewName" -> "`v`", - "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", - "colType" -> "data", - "errors" -> "- `salary` INT has been removed")) - - s.sql("DROP VIEW IF EXISTS v").collect() - s.sql(s"DROP TABLE IF EXISTS $T").collect() - } - } - test("[connect] temp view with stored plan detects external column removal") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -208,31 +178,6 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } - // Scenario 4: drop and re-create table. - test("[connect] temp view with stored plan resolves to session recreated table") { - withSession { s => - s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() - - s.table(T).filter("salary < 999").createOrReplaceTempView("v") - val serverSession = getServerSession(s) - checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) - - s.sql(s"DROP TABLE $T").collect() - s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() - - // view resolves to the new empty table - checkAnswer(serverSession.table("v"), Seq.empty) - - // insert new data and verify the view picks it up - s.sql(s"INSERT INTO $T VALUES (2, 200)").collect() - checkAnswer(serverSession.table("v"), Seq(Row(2, 200))) - - s.sql("DROP VIEW IF EXISTS v").collect() - s.sql(s"DROP TABLE IF EXISTS $T").collect() - } - } - test("[connect] temp view with stored plan resolves to externally recreated table") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -270,14 +215,12 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } - // Scenario 5: drop and re-add column with the same name and type. test("[connect] temp view with stored plan after session drop and re-add column same type") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() s.table(T).filter("salary < 999").createOrReplaceTempView("v") - s.table(T).createOrReplaceTempView("v_unfiltered") val serverSession = getServerSession(s) checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) @@ -285,16 +228,11 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { s.sql(s"ALTER TABLE $T DROP COLUMN salary").collect() s.sql(s"ALTER TABLE $T ADD COLUMN salary INT").collect() - // salary data is no longer preserved after drop and re-add - // null < 999 evaluates to null (falsy), so no rows pass the filter - checkAnswer(serverSession.table("v"), Seq.empty) - // unfiltered view shows all rows with null salary - checkAnswer( - serverSession.table("v_unfiltered"), - Seq(Row(1, null), Row(10, null))) + // schema validation passes (same column names and types) + // InMemoryTable preserves row data through ALTER chain + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) s.sql("DROP VIEW IF EXISTS v").collect() - s.sql("DROP VIEW IF EXISTS v_unfiltered").collect() s.sql(s"DROP TABLE IF EXISTS $T").collect() } } @@ -305,32 +243,23 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { s.sql(s"INSERT INTO $T VALUES (1, 100), (10, 1000)").collect() s.table(T).filter("salary < 999").createOrReplaceTempView("v") - s.table(T).createOrReplaceTempView("v_unfiltered") val serverSession = getServerSession(s) checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) - // external drop and re-add column via catalog API (two separate calls, - // matching how separate ALTER TABLE statements work in practice) + // external drop and re-add column via catalog API val cat = serverCatalog(serverSession) val dropCol = TableChange.deleteColumn(Array("salary"), false) - cat.alterTable(ident, dropCol) val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) - cat.alterTable(ident, addCol) + cat.alterTable(ident, dropCol, addCol) - // salary data is no longer preserved after drop and re-add - checkAnswer(serverSession.table("v"), Seq.empty) - // unfiltered view shows all rows with null salary - checkAnswer( - serverSession.table("v_unfiltered"), - Seq(Row(1, null), Row(10, null))) + // schema validation passes (same column names and types) + checkAnswer(serverSession.table("v"), Seq(Row(1, 100))) s.sql("DROP VIEW IF EXISTS v").collect() - s.sql("DROP VIEW IF EXISTS v_unfiltered").collect() s.sql(s"DROP TABLE IF EXISTS $T").collect() } } - // Scenario 6: drop and re-add column with the same name but different type. test("[connect] temp view with stored plan detects session column type change") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect() @@ -372,9 +301,8 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { // external drop and re-add column with different type via catalog API val cat = serverCatalog(serverSession) val dropCol = TableChange.deleteColumn(Array("salary"), false) - cat.alterTable(ident, dropCol) val addCol = TableChange.addColumn(Array("salary"), StringType, true) - cat.alterTable(ident, addCol) + cat.alterTable(ident, dropCol, addCol) checkError( exception = intercept[AnalysisException] { @@ -392,7 +320,6 @@ class DataSourceV2TempViewConnectSuite extends SparkConnectServerTest { } } - // Scenario 7: type widening from INT to BIGINT. test("[connect] temp view with stored plan detects type widening") { withSession { s => s.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()