Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector.catalog

import java.util.concurrent.ConcurrentHashMap

/**
* An InMemoryTableCatalog that simulates a caching connector like
* Iceberg's CachingCatalog. On first loadTable, returns a fresh
* copy. On subsequent loads, returns the CACHED (stale) copy,
* making external changes invisible.
*
* Session writes go through the SQL path which modifies the
* original table and invalidates, but direct catalog API
* modifications are not visible until the cache is cleared.
*
* Call [[CachingInMemoryTableCatalog.clearCache()]] to simulate
* cache expiration (like Iceberg's 30-second TTL).
*/
class CachingInMemoryTableCatalog extends InMemoryTableCatalog {
import CachingInMemoryTableCatalog._

override def loadTable(ident: Identifier): Table = {
cachedTables.computeIfAbsent(cacheKey(name, ident), _ => {
super.loadTable(ident)
})
}

override def invalidateTable(ident: Identifier): Unit = {
super.invalidateTable(ident)
cachedTables.remove(cacheKey(name, ident))
}

private def cacheKey(
catalog: String, ident: Identifier): String = {
s"$catalog.${ident.toString}"
}
}

object CachingInMemoryTableCatalog {
private val cachedTables =
new ConcurrentHashMap[String, Table]()

def clearCache(): Unit = cachedTables.clear()
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ import scala.jdk.CollectionConverters._
import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
Expand All @@ -54,6 +55,9 @@ class DataSourceV2DataFrameSuite
.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
.set("spark.sql.catalog.testcat2", classOf[InMemoryTableCatalog].getName)
.set("spark.sql.catalog.cachingcat",
classOf[CachingInMemoryTableCatalog].getName)
.set("spark.sql.catalog.cachingcat.copyOnLoad", "true")
.set("spark.sql.catalog.nullidcat",
classOf[NullTableIdInMemoryTableCatalog].getName)
.set("spark.sql.catalog.nullidcat.copyOnLoad", "true")
Expand All @@ -71,6 +75,7 @@ class DataSourceV2DataFrameSuite
.set("spark.sql.catalog.composedidcat.copyOnLoad", "true")

after {
CachingInMemoryTableCatalog.clearCache()
spark.sessionState.catalogManager.reset()
}

Expand Down Expand Up @@ -3535,4 +3540,208 @@ class DataSourceV2DataFrameSuite
parameters = Map.empty)
}
}

// Repeated table access with external changes (no CACHE TABLE).
// Each sql() call creates a fresh QueryExecution, so it always sees
// the latest data, schema, and table identity.

// Scenario 1: external writes

test("repeated sql() reflects session write") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

sql(s"INSERT INTO $t VALUES (2, 200)")
checkAnswer(
sql(s"SELECT * FROM $t"),
Seq(Row(1, 100), Row(2, 200)))
}
}

test("repeated sql() reflects external write") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// external writer adds (2, 200)
val schema2 = StructType.fromDDL("id INT, salary INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema2).withRow(InternalRow(2, 200))))

checkAnswer(
sql(s"SELECT * FROM $t"),
Seq(Row(1, 100), Row(2, 200)))
}
}

// Scenario 1 connector w/ cache (external write, caching connector)
test("connector w/ cache: repeated sql() stale after external write") {
val t = "cachingcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// external writer adds (2, 200) via catalog API (bypasses cache)
val schema = StructType.fromDDL("id INT, salary INT")
val extTable = catalog("cachingcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema).withRow(InternalRow(2, 200))))

// Caching connector returns stale table: external write invisible
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// REFRESH TABLE invalidates the connector cache, external write becomes visible
sql(s"REFRESH TABLE $t")
checkAnswer(
sql(s"SELECT * FROM $t"),
Seq(Row(1, 100), Row(2, 200)))
}
}

// Scenario 2: external schema changes

test("repeated sql() reflects session schema change") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

sql(s"ALTER TABLE $t ADD COLUMN new_col INT")
sql(s"INSERT INTO $t VALUES (2, 200, -1)")
checkAnswer(
sql(s"SELECT * FROM $t"),
Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}

test("repeated sql() reflects external schema change") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// external schema change + data write via catalog API
val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
catalog("testcat").alterTable(ident, addCol)

val schema3 = StructType.fromDDL("id INT, salary INT, new_col INT")
val extTable = catalog("testcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))

checkAnswer(
sql(s"SELECT * FROM $t"),
Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}

// Scenario 2 connector w/ cache (external schema change, caching connector)
test("connector w/ cache: repeated sql() stale after external schema change") {
val t = "cachingcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// external schema change + data via catalog API
val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
catalog("cachingcat").alterTable(ident, addCol)

val schema3 = StructType.fromDDL("id INT, salary INT, new_col INT")
val extTable = catalog("cachingcat").loadTable(ident,
util.Set.of(TableWritePrivilege.INSERT)).asInstanceOf[InMemoryBaseTable]
extTable.withData(Array(
new BufferedRows(Seq.empty, schema3).withRow(InternalRow(2, 200, -1))))

// Caching connector returns stale table: external changes invisible
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// REFRESH TABLE invalidates the connector cache, schema change + data visible
sql(s"REFRESH TABLE $t")
checkAnswer(
sql(s"SELECT * FROM $t"),
Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}

// Scenario 3: drop and recreate table

test("repeated sql() reflects session drop/recreate") {
val t = "testcat.ns1.ns2.tbl"
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

sql(s"DROP TABLE $t")
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
checkAnswer(sql(s"SELECT * FROM $t"), Seq.empty)
}
}

test("repeated sql() reflects external drop/recreate") {
val t = "testcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// external drop and recreate via catalog API
catalog("testcat").dropTable(ident)
catalog("testcat").createTable(
ident = ident,
columns = Array(
Column.create("id", IntegerType),
Column.create("salary", IntegerType)),
partitions = Array.empty,
properties = Collections.emptyMap[String, String])

checkAnswer(sql(s"SELECT * FROM $t"), Seq.empty)
}
}

// Scenario 3 connector w/ cache (external drop/recreate, caching connector)
test("connector w/ cache: repeated sql() stale after external drop/recreate") {
val t = "cachingcat.ns1.ns2.tbl"
val ident = Identifier.of(Array("ns1", "ns2"), "tbl")
withTable(t) {
sql(s"CREATE TABLE $t (id INT, salary INT) USING foo")
sql(s"INSERT INTO $t VALUES (1, 100)")
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// external drop and recreate via catalog API
catalog("cachingcat").dropTable(ident)
catalog("cachingcat").createTable(
ident = ident,
columns = Array(
Column.create("id", IntegerType),
Column.create("salary", IntegerType)),
partitions = Array.empty,
properties = Collections.emptyMap[String, String])

// Caching connector returns stale table: drop/recreate invisible
checkAnswer(sql(s"SELECT * FROM $t"), Seq(Row(1, 100)))

// REFRESH TABLE invalidates the connector cache, new empty table visible
sql(s"REFRESH TABLE $t")
checkAnswer(sql(s"SELECT * FROM $t"), Seq.empty)
}
}
}