diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala new file mode 100644 index 000000000000..f19f81a50121 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CachingInMemoryTableCatalog.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connector.catalog + +import java.util.concurrent.ConcurrentHashMap + +/** + * An InMemoryTableCatalog that simulates a caching connector like + * Iceberg's CachingCatalog. On first loadTable, returns a fresh + * copy. On subsequent loads, returns the CACHED (stale) copy, + * making external changes invisible. + * + * Session writes go through the SQL path which modifies the + * original table and invalidates, but direct catalog API + * modifications are not visible until the cache is cleared. + * + * Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate + * cache expiration (like Iceberg's 30-second TTL). + */ +class CachingInMemoryTableCatalog extends InMemoryTableCatalog { + import CachingInMemoryTableCatalog._ + + override def loadTable(ident: Identifier): Table = { + cachedTables.computeIfAbsent(cacheKey(name, ident), _ => { + super.loadTable(ident) + }) + } + + override def invalidateTable(ident: Identifier): Unit = { + super.invalidateTable(ident) + cachedTables.remove(cacheKey(name, ident)) + } + + private def cacheKey( + catalog: String, ident: Identifier): String = { + s"$catalog.${ident.toString}" + } +} + +object CachingInMemoryTableCatalog { + private val cachedTables = + new ConcurrentHashMap[String, Table]() + + def clearCache(): Unit = cachedTables.clear() +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala index 5cda5169369e..67fa02e9c5de 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSuite.scala @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._ import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured +import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect} -import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} +import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog} import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue} import org.apache.spark.sql.connector.catalog.TableChange @@ -54,6 +55,9 @@ class DataSourceV2DataFrameSuite .set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName) .set("spark.sql.catalog.testcat.copyOnLoad", "true") .set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat", + classOf[CachingInMemoryTableCatalog].getName) + .set("spark.sql.catalog.cachingcat.copyOnLoad", "true") .set("spark.sql.catalog.nullidcat", classOf[NullTableIdInMemoryTableCatalog].getName) .set("spark.sql.catalog.nullidcat.copyOnLoad", "true") @@ -71,6 +75,7 @@ class DataSourceV2DataFrameSuite .set("spark.sql.catalog.composedidcat.copyOnLoad", "true") after { + CachingInMemoryTableCatalog.clearCache() spark.sessionState.catalogManager.reset() } @@ -3535,4 +3540,208 @@ class DataSourceV2DataFrameSuite parameters = Map.empty) } } + + // Repeated table access with external changes (no CACHE TABLE). + // Each sql() call creates a fresh QueryExecution, so it always sees + // the latest data, schema, and table identity. + + // Scenario 1: external writes + + test("repeated sql() reflects session write") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + sql(s"INSERT INTO $t VALUES (2, 200)") + checkAnswer( + sql(s"SELECT * FROM $t"), + Seq(Row(1, 100), Row(2, 200))) + } + } + + test("repeated sql() reflects external write") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external writer adds (2, 200) + val schema2 = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema2).withRow(InternalRow(2, 200)))) + + checkAnswer( + sql(s"SELECT * FROM $t"), + Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 1 connector w/ cache (external write, caching connector) + test("connector w/ cache: repeated sql() stale after external write") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external writer adds (2, 200) via catalog API (bypasses cache) + val schema = StructType.fromDDL("id INT, salary INT") + val extTable = catalog("cachingcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200)))) + + // Caching connector returns stale table: external write invisible + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, external write becomes visible + sql(s"REFRESH TABLE $t") + checkAnswer( + sql(s"SELECT * FROM $t"), + Seq(Row(1, 100), Row(2, 200))) + } + } + + // Scenario 2: external schema changes + + test("repeated sql() reflects session schema change") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + sql(s"ALTER TABLE $t ADD COLUMN new_col INT") + sql(s"INSERT INTO $t VALUES (2, 200, -1)") + checkAnswer( + sql(s"SELECT * FROM $t"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + + test("repeated sql() reflects external schema change") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external schema change + data write via catalog API + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + catalog("testcat").alterTable(ident, addCol) + + val schema3 = StructType.fromDDL("id INT, salary INT, new_col INT") + val extTable = catalog("testcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + checkAnswer( + sql(s"SELECT * FROM $t"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + + // Scenario 2 connector w/ cache (external schema change, caching connector) + test("connector w/ cache: repeated sql() stale after external schema change") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external schema change + data via catalog API + val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true) + catalog("cachingcat").alterTable(ident, addCol) + + val schema3 = StructType.fromDDL("id INT, salary INT, new_col INT") + val extTable = catalog("cachingcat").loadTable(ident, + util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable] + extTable.withData(Array( + new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1)))) + + // Caching connector returns stale table: external changes invisible + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, schema change + data visible + sql(s"REFRESH TABLE $t") + checkAnswer( + sql(s"SELECT * FROM $t"), + Seq(Row(1, 100, null), Row(2, 200, -1))) + } + } + + // Scenario 3: drop and recreate table + + test("repeated sql() reflects session drop/recreate") { + val t = "testcat.ns1.ns2.tbl" + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + sql(s"DROP TABLE $t") + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + checkAnswer(sql(s"SELECT * FROM $t"), Seq.empty) + } + } + + test("repeated sql() reflects external drop/recreate") { + val t = "testcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external drop and recreate via catalog API + catalog("testcat").dropTable(ident) + catalog("testcat").createTable( + ident = ident, + columns = Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType)), + partitions = Array.empty, + properties = Collections.emptyMap[String, String]) + + checkAnswer(sql(s"SELECT * FROM $t"), Seq.empty) + } + } + + // Scenario 3 connector w/ cache (external drop/recreate, caching connector) + test("connector w/ cache: repeated sql() stale after external drop/recreate") { + val t = "cachingcat.ns1.ns2.tbl" + val ident = Identifier.of(Array("ns1", "ns2"), "tbl") + withTable(t) { + sql(s"CREATE TABLE $t (id INT, salary INT) USING foo") + sql(s"INSERT INTO $t VALUES (1, 100)") + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // external drop and recreate via catalog API + catalog("cachingcat").dropTable(ident) + catalog("cachingcat").createTable( + ident = ident, + columns = Array( + Column.create("id", IntegerType), + Column.create("salary", IntegerType)), + partitions = Array.empty, + properties = Collections.emptyMap[String, String]) + + // Caching connector returns stale table: drop/recreate invisible + checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100))) + + // REFRESH TABLE invalidates the connector cache, new empty table visible + sql(s"REFRESH TABLE $t") + checkAnswer(sql(s"SELECT * FROM $t"), Seq.empty) + } + } }