From a3f9fb6ae6cc0bf4d47ddc6ad36ca23b1f621048 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 1 May 2026 09:11:23 +0000 Subject: [PATCH 1/4] [SPARK-56643][SQL][TESTS] Add DSv2 temp view with stored plan tests Rebased on latest master. --- .../catalog/CachingInMemoryTableCatalog.scala | 60 +++ .../catalog/InMemoryTableCatalog.scala | 18 +- .../DataSourceV2DataFrameSuite.scala | 495 +++++++++++++++++- 3 files changed, 570 insertions(+), 3 deletions(-) create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala new file mode 100644 index 000000000000..f19f81a50121 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util.concurrent.ConcurrentHashMap + +/** + * An InMemoryTableCatalog that simulates a caching connector like + * Iceberg's CachingCatalog. On first loadTable, returns a fresh + * copy. On subsequent loads, returns the CACHED (stale) copy, + * making external changes invisible. + * + * Session writes go through the SQL path which modifies the + * original table and invalidates, but direct catalog API + * modifications are not visible until the cache is cleared. + * + * Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate + * cache expiration (like Iceberg's 30-second TTL). + */ +class CachingInMemoryTableCatalog extends InMemoryTableCatalog { + import CachingInMemoryTableCatalog._ + + override def loadTable(ident: Identifier): Table = { + cachedTables.computeIfAbsent(cacheKey(name, ident), _ => { + super.loadTable(ident) + }) + } + + override def invalidateTable(ident: Identifier): Unit = { + super.invalidateTable(ident) + cachedTables.remove(cacheKey(name, ident)) + } + + private def cacheKey( + catalog: String, ident: Identifier): String = { + s"$catalog.${ident.toString}" + } +} + +object CachingInMemoryTableCatalog { + private val cachedTables = + new ConcurrentHashMap[String, Table]() + + def clearCache(): Unit = cachedTables.clear() +} diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 1654e9e9a66d..2bb13bd3cb63 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -179,6 +179,20 @@ class BasicInMemoryTableCatalog extends TableCatalog { throw new IllegalArgumentException(s"Cannot drop all fields") } + // Compute the intermediate schema with only column deletions applied. + // This is used for data migration so that dropped column values are physically removed, + // even when a column with the same name is re-added in the same ALTER call. + val deleteOnlyChanges = changes.filter(_.isInstanceOf[TableChange.DeleteColumn]) + val schemaAfterDrops = if (deleteOnlyChanges.nonEmpty) { + CatalogV2Util.applySchemaChanges( + table.schema, + deleteOnlyChanges, + tableProvider = Some("in-memory"), + statementType = "ALTER TABLE") + } else { + schema + } + table.increaseVersion() val currentVersion = table.version() val columnsWithIds = InMemoryBaseTable.assignMissingIds( @@ -193,14 +207,14 @@ class BasicInMemoryTableCatalog extends TableCatalog { properties = properties, constraints = constraints, id = table.id) - .alterTableWithData(table.data, schema) + .alterTableWithData(table.data, schemaAfterDrops) case _: InMemoryTableWithV2Filter => new InMemoryTableWithV2Filter( name = table.name, columns = columnsWithIds, partitioning = finalPartitioning, properties = properties) - .alterTableWithData(table.data, schema) + .alterTableWithData(table.data, schemaAfterDrops) case other => throw new UnsupportedOperationException( s"Unsupported InMemoryBaseTable subclass: ${other.getClass.getName}") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 5cda5169369e..be201f36ea38 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} +import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -54,6 +55,9 @@ class DataSourceV2DataFrameSuite .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) .set("spark.sql.catalog.testcat.copyOnLoad", "true") .set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat", + classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") .set("spark.sql.catalog.nullidcat", classOf[NullTableIdInMemoryTableCatalog].getName) .set("spark.sql.catalog.nullidcat.copyOnLoad", "true") @@ -71,6 +75,7 @@ class DataSourceV2DataFrameSuite .set("spark.sql.catalog.composedidcat.copyOnLoad", "true") after { + CachingInMemoryTableCatalog.clearCache() spark.sessionState.catalogManager.reset() } @@ -2976,6 +2981,494 @@ class DataSourceV2DataFrameSuite } } + // Temp views with stored plans: scenarios from the DSv2 table refresh tests. + // Each test creates a DSv2 table with initial data, builds a temp view with a filter + // (to demonstrate that the stored plan is non-trivial), and then verifies the view + // behavior after various table modifications (session or external). + + // Scenario 1.1 (session write) + test("temp view with stored plan reflects session write") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"INSERT INTO $t VALUES (2, 200)") + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 1.2 (external write) + test("temp view with stored plan reflects external write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via direct catalog API + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 1.2 connector w/ cache (external write, caching connector) + test("connector w/ cache: temp view stale after external write") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via catalog API (bypasses cache) + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("cachingcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + // Caching connector returns stale table: external write invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.1 (session ADD COLUMN) + test("temp view with stored plan preserves schema after session ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.2 (external ADD COLUMN) + test("temp view with stored plan preserves schema after external ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // external writer adds data with new schema + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2.2 connector w/ cache (external ADD COLUMN, caching connector) + test("connector w/ cache: temp view stale after external ADD COLUMN") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change + data via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("cachingcat").alterTable(ident, addCol) + + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("cachingcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // Caching connector returns stale table: external changes invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view preserves original 2-column schema + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 3.2 (external column removal) + test("temp view with stored plan detects external column removal") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("testcat").alterTable(ident, dropCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + // Scenario 3.2 connector w/ cache (external column removal, caching connector) + test("connector w/ cache: temp view stale after external column removal") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external column removal via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("cachingcat").alterTable(ident, dropCol) + + // Caching connector returns stale table: column removal invisible, no error + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, column removal detected + sql(s"REFRESH TABLE $t") + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + // Scenario 4.1 (session drop and recreate table) + test("temp view with stored plan resolves to session-recreated table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + val originalTableId = catalog("testcat").loadTable(ident).id + + // session drop and recreate via SQL + sql(s"DROP TABLE $t") + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + + val newTableId = catalog("testcat").loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(spark.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("v"), Seq(Row(2, 200))) + } + } + + // Scenario 4.2 (external drop and recreate table) + test("temp view with stored plan resolves to externally recreated table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + val originalTableId = catalog("testcat").loadTable(ident).id + + // external drop and recreate via catalog API + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = catalog("testcat").loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(spark.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("v"), Seq(Row(2, 200))) + } + } + + // Scenario 4.2 connector w/ cache (external drop/recreate, caching connector) + test("connector w/ cache: temp view stale after external drop/recreate") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and recreate via catalog API + catalog("cachingcat").dropTable(ident) + catalog("cachingcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + // Caching connector returns stale table: drop/recreate invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, view resolves to new empty table + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq.empty) + } + } + + // Scenario 5.1 (session drop and re-add column with same type, multiple views) + test("temp view with stored plan after session drop and re-add column same type" + + " with unfiltered view") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + spark.table(t).createOrReplaceTempView("v_no_filter") + spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkAnswer(spark.table("v_filter_is_null"), Seq.empty) + + // drop and re-add column with same name and type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary INT") + + // salary values are now null, so the filtered view returns nothing + checkAnswer(spark.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + + // Scenario 5.2 (external drop and re-add column with same type) + test("temp view with stored plan after external drop and re-add column same type") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + spark.table(t).createOrReplaceTempView("v_no_filter") + spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_filter_is_null") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, 100), Row(10, 1000))) + checkAnswer(spark.table("v_filter_is_null"), Seq.empty) + + // external drop and re-add column via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + catalog("testcat").alterTable(ident, dropCol, addCol) + + // salary values are now null, so the filtered view returns nothing + checkAnswer(spark.table("v"), Seq.empty) + // unfiltered view returns rows with null salary + checkAnswer(spark.table("v_no_filter"), Seq(Row(1, null), Row(10, null))) + // IS NULL filter now matches all rows + checkAnswer(spark.table("v_filter_is_null"), Seq(Row(1, null), Row(10, null))) + } + } + + // Scenario 5.2 connector w/ cache (external drop/re-add column, caching connector) + test("connector w/ cache: temp view stale after external drop/re-add column same type") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column with same type via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + catalog("cachingcat").alterTable(ident, dropCol, addCol) + + // Caching connector returns stale table: column drop/re-add invisible + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, salary values are null + sql(s"REFRESH TABLE $t") + checkAnswer(spark.table("v"), Seq.empty) + } + } + + // Scenario 6.1 (session drop and re-add column with different type) + test("temp view with stored plan detects session column type change") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // drop and re-add column with same name but different type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + // Scenario 6.2 (external drop and re-add column with different type) + test("temp view with stored plan detects external column type change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column with different type via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + catalog("testcat").alterTable(ident, dropCol, addCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + // Scenario 6.2 connector w/ cache (external column type change, caching connector) + test("connector w/ cache: temp view stale after external column type change") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column with different type via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + catalog("cachingcat").alterTable(ident, dropCol, addCol) + + // Caching connector returns stale table: type change invisible, no error + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, type change detected + sql(s"REFRESH TABLE $t") + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`cachingcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + // Scenario 7 (type widening from INT to BIGINT) + test("temp view with stored plan detects type widening") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // widen salary type from INT to BIGINT via catalog API + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + catalog("testcat").alterTable(ident, updateType) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + test("cached DSv2 table DataFrame is refreshed and reused after insert") { val t = "testcat.ns1.ns2.tbl" withTable(t) { From 3c85016bf1202f925aef0b508c1026fc721d95ca Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 1 May 2026 09:18:25 +0000 Subject: [PATCH 2/4] Add schema verification assertions for empty table results Assert schema field names before every checkAnswer with empty results to verify schema is preserved correctly after table modifications. --- .../connector/DataSourceV2DataFrameSuite.scala | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index be201f36ea38..b568d97a07c1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -2518,6 +2518,7 @@ class DataSourceV2DataFrameSuite spark.table(t).filter("salary < 999").createOrReplaceTempView("v") spark.table(t).filter("salary IS NULL").createOrReplaceTempView("v_null") checkAnswer(spark.table("v"), Seq(Row(1, 100))) + assert(spark.table("v_null").schema.fieldNames.toSeq == Seq("id", "salary")) checkAnswer(spark.table("v_null"), Seq.empty) // drop and re-add column with same name and type @@ -2525,6 +2526,7 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t ADD COLUMN salary INT") // salary values are now null, so the salary < 999 filter returns nothing + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "salary")) checkAnswer(spark.table("v"), Seq.empty) // IS NULL filter now matches all rows checkAnswer(spark.table("v_null"), Seq(Row(1, null), Row(10, null))) @@ -2711,6 +2713,7 @@ class DataSourceV2DataFrameSuite // create temp view using DataFrame API spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // add top-level column to underlying table @@ -2718,6 +2721,7 @@ class DataSourceV2DataFrameSuite // accessing temp view should succeed as top-level column additions are allowed // view captures original columns + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // insert data to verify view still works correctly @@ -2733,6 +2737,7 @@ class DataSourceV2DataFrameSuite // create temp view using DataFrame API spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "address")) checkAnswer(spark.table("v"), Seq.empty) // add nested column to underlying table @@ -2757,6 +2762,7 @@ class DataSourceV2DataFrameSuite // create temp view spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data", "age")) checkAnswer(spark.table("v"), Seq.empty) // drop column from underlying table @@ -2781,6 +2787,7 @@ class DataSourceV2DataFrameSuite // create temp view using DataFrame API spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "address")) checkAnswer(spark.table("v"), Seq.empty) // drop nested column from underlying table @@ -2805,6 +2812,7 @@ class DataSourceV2DataFrameSuite // create temp view spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // change nullability constraint using ALTER TABLE @@ -2846,6 +2854,7 @@ class DataSourceV2DataFrameSuite assert(originalTableId != newTableId) // accessing temp view should work despite table ID change (returns empty data) + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // insert new data and verify view reflects it @@ -2861,6 +2870,7 @@ class DataSourceV2DataFrameSuite sql(s"CREATE TABLE $t (id bigint, data STRING, extra INT) USING foo") spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data", "extra")) checkAnswer(spark.table("v"), Seq.empty) // alter table @@ -2871,6 +2881,7 @@ class DataSourceV2DataFrameSuite // recreate view with updated schema spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // now it should work with new schema @@ -2901,6 +2912,7 @@ class DataSourceV2DataFrameSuite // accessing temp view should succeed as top-level column additions are allowed + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) } } @@ -2913,6 +2925,7 @@ class DataSourceV2DataFrameSuite // create temp view using SQL that should capture plan sql(s"CREATE OR REPLACE TEMPORARY VIEW v AS SELECT * FROM $t") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // verify that view stores analyzed plan @@ -2923,6 +2936,7 @@ class DataSourceV2DataFrameSuite sql(s"ALTER TABLE $t ADD COLUMN age int") // accessing temp view should succeed as top-level column additions are allowed + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // insert data to verify view still works correctly @@ -2939,6 +2953,7 @@ class DataSourceV2DataFrameSuite // create temp view spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "name")) checkAnswer(spark.table("v"), Seq.empty) // change VARCHAR(10) to VARCHAR(20) @@ -2963,6 +2978,7 @@ class DataSourceV2DataFrameSuite // create temp view spark.table(t).createOrReplaceTempView("v") + assert(spark.table("v").schema.fieldNames.toSeq == Seq("id", "data")) checkAnswer(spark.table("v"), Seq.empty) // insert data into underlying table (no schema change) @@ -3553,6 +3569,7 @@ class DataSourceV2DataFrameSuite // verify external changes are reflected correctly when table is queried assertNotCached(spark.table(t)) + assert(spark.table(t).schema.fieldNames.toSeq == Seq("id", "value", "category")) checkAnswer(spark.table(t), Seq.empty) } } From 16656d4703cc9beeac7a54be1424cdb3ac9fd789 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 1 May 2026 14:59:56 +0000 Subject: [PATCH 3/4] Add Scenario 3.1 session column removal test Add missing test for Section 1 Scenario 3.1 where a session ALTER TABLE DROP COLUMN should trigger an analysis exception when querying a temp view that references the removed column. --- .../DataSourceV2DataFrameSuite.scala | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index b568d97a07c1..9154b6e83dab 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -3142,6 +3142,31 @@ class DataSourceV2DataFrameSuite } } + // Scenario 3.2 (external column removal) + // Scenario 3.1 (session column removal) + test("temp view with stored plan detects session column removal") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // session schema change via SQL + sql(s"ALTER TABLE $t DROP COLUMN salary") + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + // Scenario 3.2 (external column removal) test("temp view with stored plan detects external column removal") { val t = "testcat.ns1.ns2.tbl" From c37fde18ee45f842e93daa34ca4a212b801344f9 Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 1 May 2026 19:02:15 +0000 Subject: [PATCH 4/4] Empty commit to retrigger CI Co-authored-by: Isaac