From a5864b30d258edad8a92e86a17370a8c22b3acce Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 15:50:28 +0000 Subject: [PATCH 1/2] [SPARK-XXXXX][SQL][TESTS] Add DSv2 temp view with stored plan tests Add 11 new tests to DataSourceV2DataFrameSuite that verify temp view behavior with stored plans when the underlying DSv2 table changes. Tests cover all 7 scenarios from the DSv2 table refresh design doc, including both session and external write variants. Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 262 +++++++++++++++++- 1 file changed, 260 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index eadaafea81a53..3691f0726778e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} +import org.apache.spark.sql.connector.catalog.{BufferedRows, Column, ColumnDefaultValue, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -40,7 +41,7 @@ import org.apache.spark.sql.execution.ExplainUtils.stripAQEPlan import org.apache.spark.sql.execution.datasources.v2.{AlterTableExec, CreateTableExec, DataSourceV2Relation, ReplaceTableExec} import org.apache.spark.sql.functions.lit import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, StringType, TimestampType} +import org.apache.spark.sql.types.{BooleanType, CalendarIntervalType, DoubleType, IntegerType, LongType, StringType, StructType, TimestampType} import org.apache.spark.sql.util.QueryExecutionListener import org.apache.spark.unsafe.types.UTF8String @@ -1759,6 +1760,263 @@ class DataSourceV2DataFrameSuite } } + // Temp views with stored plans: scenarios from the DSv2 table refresh design doc. + // Each test creates a DSv2 table with initial data, builds a temp view with a filter + // (to demonstrate that the stored plan is non-trivial), and then verifies the view + // behavior after various table modifications (session or external). + + test("SPARK-53924: temp view with stored plan reflects session write") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"INSERT INTO $t VALUES (2, 200)") + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("SPARK-53924: temp view with stored plan reflects external write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via direct catalog API + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("SPARK-53924: temp view with stored plan preserves schema after session ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN new_column INT") + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("SPARK-53924: temp view with stored plan preserves schema after external ADD COLUMN") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val addCol = TableChange.addColumn(Array("new_column"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + // external writer adds data with new schema + val schema3 = StructType.fromDDL("id INT, salary INT, new_column INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // view preserves original 2-column schema, filter still applied + checkAnswer(spark.table("v"), Seq(Row(1, 100), Row(2, 200))) + } + } + + test("SPARK-53924: temp view with stored plan detects external column removal") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external schema change via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + catalog("testcat").alterTable(ident, dropCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` INT has been removed")) + } + } + + test("SPARK-53924: temp view with stored plan resolves to externally recreated table") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + val originalTableId = catalog("testcat").loadTable(ident).id + + // external drop and recreate via catalog API + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident, + new TableInfo.Builder() + .withColumns(Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType))) + .build()) + + val newTableId = catalog("testcat").loadTable(ident).id + assert(originalTableId != newTableId) + + // view resolves to the new empty table + checkAnswer(spark.table("v"), Seq.empty) + + // insert new data and verify the view picks it up + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer(spark.table("v"), Seq(Row(2, 200))) + } + } + + test("SPARK-53924: temp view with stored plan after session drop and re-add column same type") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // drop and re-add column with same name and type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary INT") + + // schema validation passes (same column names and types) + // InMemoryTable preserves row data through ALTER chain + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + } + } + + test("SPARK-53924: temp view with stored plan after external drop and re-add column same type") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), IntegerType, true) + catalog("testcat").alterTable(ident, dropCol, addCol) + + // schema validation passes (same column names and types) + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + } + } + + test("SPARK-53924: temp view with stored plan detects session column type change") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // drop and re-add column with same name but different type + sql(s"ALTER TABLE $t DROP COLUMN salary") + sql(s"ALTER TABLE $t ADD COLUMN salary STRING") + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + test("SPARK-53924: temp view with stored plan detects external column type change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // external drop and re-add column with different type via catalog API + val dropCol = TableChange.deleteColumn(Array("salary"), false) + val addCol = TableChange.addColumn(Array("salary"), StringType, true) + catalog("testcat").alterTable(ident, dropCol, addCol) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to STRING")) + } + } + + test("SPARK-53924: temp view with stored plan detects type widening") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100), (10, 1000)") + + spark.table(t).filter("salary < 999").createOrReplaceTempView("v") + checkAnswer(spark.table("v"), Seq(Row(1, 100))) + + // widen salary type from INT to BIGINT via catalog API + val updateType = TableChange.updateColumnType(Array("salary"), LongType) + catalog("testcat").alterTable(ident, updateType) + + checkError( + exception = intercept[AnalysisException] { spark.table("v").collect() }, + condition = "INCOMPATIBLE_COLUMN_CHANGES_AFTER_VIEW_WITH_PLAN_CREATION", + parameters = Map( + "viewName" -> "`v`", + "tableName" -> "`testcat`.`ns1`.`ns2`.`tbl`", + "colType" -> "data", + "errors" -> "- `salary` type has changed from INT to BIGINT")) + } + } + test("cached DSv2 table DataFrame is refreshed and reused after insert") { val t = "testcat.ns1.ns2.tbl" withTable(t) { From 0e3f40fa15c3baca36439a9d5051cb8ce3efad6e Mon Sep 17 00:00:00 2001 From: Thang Long VU Date: Fri, 24 Apr 2026 15:52:08 +0000 Subject: [PATCH 2/2] Remove SPARK-53924 prefix from new test names Co-authored-by: Isaac --- .../DataSourceV2DataFrameSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 3691f0726778e..088353ab42c8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -1765,7 +1765,7 @@ class DataSourceV2DataFrameSuite // (to demonstrate that the stored plan is non-trivial), and then verifies the view // behavior after various table modifications (session or external). - test("SPARK-53924: temp view with stored plan reflects session write") { + test("temp view with stored plan reflects session write") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") @@ -1780,7 +1780,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan reflects external write") { + test("temp view with stored plan reflects external write") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { @@ -1801,7 +1801,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan preserves schema after session ADD COLUMN") { + test("temp view with stored plan preserves schema after session ADD COLUMN") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") @@ -1818,7 +1818,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan preserves schema after external ADD COLUMN") { + test("temp view with stored plan preserves schema after external ADD COLUMN") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { @@ -1844,7 +1844,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan detects external column removal") { + test("temp view with stored plan detects external column removal") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { @@ -1869,7 +1869,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan resolves to externally recreated table") { + test("temp view with stored plan resolves to externally recreated table") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { @@ -1903,7 +1903,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan after session drop and re-add column same type") { + test("temp view with stored plan after session drop and re-add column same type") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") @@ -1922,7 +1922,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan after external drop and re-add column same type") { + test("temp view with stored plan after external drop and re-add column same type") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { @@ -1942,7 +1942,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan detects session column type change") { + test("temp view with stored plan detects session column type change") { val t = "testcat.ns1.ns2.tbl" withTable(t) { sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") @@ -1966,7 +1966,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan detects external column type change") { + test("temp view with stored plan detects external column type change") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) { @@ -1992,7 +1992,7 @@ class DataSourceV2DataFrameSuite } } - test("SPARK-53924: temp view with stored plan detects type widening") { + test("temp view with stored plan detects type widening") { val t = "testcat.ns1.ns2.tbl" val ident = Identifier.of(Array("ns1", "ns2"), "tbl") withTable(t) {