From d40576bcfeeb9b49cced5ad4b6af4584c56fe51d Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Tue, 18 Nov 2025 09:30:12 -0800 Subject: [PATCH 1/2] [SPARK-51771][SQL][FOLLOWUP] Rename currentVersion to version in DSv2 Table --- .../apache/spark/sql/connector/catalog/Table.java | 4 ++-- .../datasources/v2/DataSourceV2Relation.scala | 2 +- .../datasources/v2/V2TableRefreshUtil.scala | 2 +- .../spark/sql/connector/catalog/CatalogSuite.scala | 12 ++++++------ .../sql/connector/catalog/InMemoryBaseTable.scala | 12 ++++++------ .../spark/sql/connector/catalog/InMemoryTable.scala | 10 +++++----- .../sql/connector/catalog/InMemoryTableCatalog.scala | 8 ++++---- .../datasources/v2/DataSourceV2Strategy.scala | 2 +- .../execution/command/v2/CheckConstraintSuite.scala | 8 ++++---- 9 files changed, 30 insertions(+), 30 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java index f85b5e356929..1c9081fa1283 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java @@ -104,8 +104,8 @@ default Map properties() { default Constraint[] constraints() { return new Constraint[0]; } /** - * Returns the current table version if implementation supports versioning. + * Returns this table version without refreshing state if implementation supports versioning. * If the table is not versioned, null can be returned here. */ - default String currentVersion() { return null; } + default String version() { return null; } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index b0fb414fce97..a093b499b1e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -134,7 +134,7 @@ case class DataSourceV2Relation( def autoSchemaEvolution(): Boolean = table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION) - def isVersioned: Boolean = table.currentVersion != null + def isVersioned: Boolean = table.version != null } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala index e98b80b6a5a0..0852043cb822 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2TableRefreshUtil.scala @@ -42,7 +42,7 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging { case r @ ExtractV2CatalogAndIdentifier(catalog, ident) if r.isVersioned && r.timeTravelSpec.isEmpty => val tableName = V2TableUtil.toQualifiedName(catalog, ident) - val version = r.table.currentVersion + val version = r.table.version logDebug(s"Pinning table version for $tableName to $version") r.copy(timeTravelSpec = Some(AsOfVersion(version))) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala index 4798623417b1..b4a4c6f46cda 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogSuite.scala @@ -1357,24 +1357,24 @@ class CatalogSuite extends SparkFunSuite { intercept[NoSuchFunctionException](catalog.loadFunction(Identifier.of(Array("ns1"), "func"))) } - test("currentVersion") { + test("version") { val catalog = newCatalog() val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps) .asInstanceOf[InMemoryTable] - assert(table.currentVersion() == "0") + assert(table.version() == "0") table.withData(Array( BufferedRows("3", table.columns()).withRow(InternalRow(0, "abc", "3")), BufferedRows("4", table.columns()).withRow(InternalRow(1, "def", "4")))) - assert(table.currentVersion() == "1") + assert(table.version() == "1") table.truncateTable() - assert(catalog.loadTable(testIdent).currentVersion() == "2") + assert(catalog.loadTable(testIdent).version() == "2") catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1")) - assert(catalog.loadTable(testIdent).currentVersion() == "3") + assert(catalog.loadTable(testIdent).version() == "3") catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3")) - assert(catalog.loadTable(testIdent).currentVersion() == "4") + assert(catalog.loadTable(testIdent).version() == "4") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala index d66ba5a23cc8..18fe80c2e924 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala @@ -66,7 +66,7 @@ abstract class InMemoryBaseTable( extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns { // Tracks the current version number of the table. - protected var currentTableVersion: Int = 0 + protected var tableVersion: Int = 0 // Stores the table version validated during the last `ALTER TABLE ... ADD CONSTRAINT` operation. private var validatedTableVersion: String = null @@ -75,14 +75,14 @@ abstract class InMemoryBaseTable( override def columns(): Array[Column] = tableColumns - override def currentVersion(): String = currentTableVersion.toString + override def version(): String = tableVersion.toString - def setCurrentVersion(version: String): Unit = { - currentTableVersion = version.toInt + def setVersion(version: String): Unit = { + tableVersion = version.toInt } - def increaseCurrentVersion(): Unit = { - currentTableVersion += 1 + def increaseVersion(): Unit = { + tableVersion += 1 } def validatedVersion(): String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala index beac7751bf77..3bea136b34d4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTable.scala @@ -69,7 +69,7 @@ class InMemoryTable( import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper dataMap --= InMemoryTable .filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters) - increaseCurrentVersion() + increaseVersion() } override def withData(data: Array[BufferedRows]): InMemoryTable = { @@ -109,7 +109,7 @@ class InMemoryTable( row.getInt(0) == InMemoryTable.uncommittableValue()))) { throw new IllegalArgumentException(s"Test only mock write failure") } - increaseCurrentVersion() + increaseVersion() this } } @@ -155,7 +155,7 @@ class InMemoryTable( copiedTable.commits ++= commits.map(_.copy()) - copiedTable.setCurrentVersion(currentVersion()) + copiedTable.setVersion(version()) if (validatedVersion() != null) { copiedTable.setValidatedVersion(validatedVersion()) } @@ -165,12 +165,12 @@ class InMemoryTable( override def equals(other: Any): Boolean = other match { case that: InMemoryTable => - this.id == that.id && this.currentVersion() == that.currentVersion() + this.id == that.id && this.version() == that.version() case _ => false } override def hashCode(): Int = { - Objects.hash(id, currentVersion()) + Objects.hash(id, version()) } class InMemoryWriterBuilderWithOverWrite(override val info: LogicalWriteInfo) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala index 2156dba619ec..5ea33a1764aa 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryTableCatalog.scala @@ -179,8 +179,8 @@ class BasicInMemoryTableCatalog extends TableCatalog { throw new IllegalArgumentException(s"Cannot drop all fields") } - table.increaseCurrentVersion() - val currentVersion = table.currentVersion() + table.increaseVersion() + val currentVersion = table.version() val newTable = new InMemoryTable( name = table.name, columns = CatalogV2Util.structTypeToV2Columns(schema), @@ -189,7 +189,7 @@ class BasicInMemoryTableCatalog extends TableCatalog { constraints = constraints, id = table.id) .alterTableWithData(table.data, schema) - newTable.setCurrentVersion(currentVersion) + newTable.setVersion(currentVersion) changes.foreach { case a: TableChange.AddConstraint => newTable.setValidatedVersion(a.validatedTableVersion()) @@ -209,7 +209,7 @@ class BasicInMemoryTableCatalog extends TableCatalog { Option(tables.remove(oldIdent)) match { case Some(table: InMemoryBaseTable) => - table.increaseCurrentVersion() + table.increaseVersion() tables.put(newIdent, table) case _ => throw new NoSuchTableException(oldIdent.asMultipartIdentifier) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 9c624d951a76..81bc1990404a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -571,7 +571,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat val condition = a.checkConstraint.condition val change = TableChange.addConstraint( check.toV2Constraint, - d.relation.table.currentVersion) + d.relation.table.version) ResolveTableConstraints.validateCatalogForTableChange(Seq(change), catalog, ident) AddCheckConstraintExec(catalog, ident, change, condition, planLater(a.child)) :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala index 1d14d710744a..44c5b1ad2874 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala @@ -207,7 +207,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma sql(s"INSERT INTO $t VALUES (1, 'a'), (null, 'b')") sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) $characteristic") val table = loadTable(nonPartitionCatalog, "ns", "tbl") - assert(table.currentVersion() == "2") + assert(table.version() == "2") assert(table.validatedVersion() == "1") val constraint = getCheckConstraint(table) assert(constraint.name() == "c1") @@ -254,7 +254,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma // Add a valid check constraint sql(s"ALTER TABLE $t ADD CONSTRAINT valid_positive_num CHECK (s.num >= -1)") val table = loadTable(nonPartitionCatalog, "ns", "tbl") - assert(table.currentVersion() == "2") + assert(table.version() == "2") assert(table.validatedVersion() == "1") val constraint = getCheckConstraint(table) assert(constraint.name() == "valid_positive_num") @@ -284,7 +284,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma // Add a valid check constraint sql(s"ALTER TABLE $t ADD CONSTRAINT valid_map_val CHECK (m['a'] >= -1)") val table = loadTable(nonPartitionCatalog, "ns", "tbl") - assert(table.currentVersion() == "2") + assert(table.version() == "2") assert(table.validatedVersion() == "1") val constraint = getCheckConstraint(table) assert(constraint.name() == "valid_map_val") @@ -312,7 +312,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma // Add a valid check constraint sql(s"ALTER TABLE $t ADD CONSTRAINT valid_array CHECK (a[1] >= -2)") val table = loadTable(nonPartitionCatalog, "ns", "tbl") - assert(table.currentVersion() == "2") + assert(table.version() == "2") assert(table.validatedVersion() == "1") val constraint = getCheckConstraint(table) assert(constraint.name() == "valid_array") From 8eb9edd2c145522b5c695ab7540c73d092b0540a Mon Sep 17 00:00:00 2001 From: Anton Okolnychyi Date: Thu, 20 Nov 2025 11:00:52 -0800 Subject: [PATCH 2/2] Another attempt --- .../java/org/apache/spark/sql/connector/catalog/Table.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java index 1c9081fa1283..a298520760bc 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/Table.java @@ -104,8 +104,10 @@ default Map properties() { default Constraint[] constraints() { return new Constraint[0]; } /** - * Returns this table version without refreshing state if implementation supports versioning. - * If the table is not versioned, null can be returned here. + * Returns the version of this table if versioning is supported, null otherwise. + *

+ * This method must not trigger a refresh of the table metadata. It should return + * the version that corresponds to the current state of this table instance. */ default String version() { return null; } }