Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ default Map<String, String> properties() {
default Constraint[] constraints() { return new Constraint[0]; }

/**
* Returns the current table version if implementation supports versioning.
* If the table is not versioned, null can be returned here.
* Returns the version of this table if versioning is supported, null otherwise.
* <p>
* This method must not trigger a refresh of the table metadata. It should return
* the version that corresponds to the current state of this table instance.
*/
default String currentVersion() { return null; }
default String version() { return null; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ case class DataSourceV2Relation(
def autoSchemaEvolution(): Boolean =
table.capabilities().contains(TableCapability.AUTOMATIC_SCHEMA_EVOLUTION)

def isVersioned: Boolean = table.currentVersion != null
def isVersioned: Boolean = table.version != null
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ private[sql] object V2TableRefreshUtil extends SQLConfHelper with Logging {
case r @ ExtractV2CatalogAndIdentifier(catalog, ident)
if r.isVersioned && r.timeTravelSpec.isEmpty =>
val tableName = V2TableUtil.toQualifiedName(catalog, ident)
val version = r.table.currentVersion
val version = r.table.version
logDebug(s"Pinning table version for $tableName to $version")
r.copy(timeTravelSpec = Some(AsOfVersion(version)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1357,24 +1357,24 @@ class CatalogSuite extends SparkFunSuite {
intercept[NoSuchFunctionException](catalog.loadFunction(Identifier.of(Array("ns1"), "func")))
}

test("currentVersion") {
test("version") {
val catalog = newCatalog()

val table = catalog.createTable(testIdent, columns, emptyTrans, emptyProps)
.asInstanceOf[InMemoryTable]
assert(table.currentVersion() == "0")
assert(table.version() == "0")
table.withData(Array(
BufferedRows("3", table.columns()).withRow(InternalRow(0, "abc", "3")),
BufferedRows("4", table.columns()).withRow(InternalRow(1, "def", "4"))))
assert(table.currentVersion() == "1")
assert(table.version() == "1")

table.truncateTable()
assert(catalog.loadTable(testIdent).currentVersion() == "2")
assert(catalog.loadTable(testIdent).version() == "2")

catalog.alterTable(testIdent, TableChange.setProperty("prop-1", "1"))
assert(catalog.loadTable(testIdent).currentVersion() == "3")
assert(catalog.loadTable(testIdent).version() == "3")

catalog.alterTable(testIdent, TableChange.addConstraint(constraints.apply(0), "3"))
assert(catalog.loadTable(testIdent).currentVersion() == "4")
assert(catalog.loadTable(testIdent).version() == "4")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ abstract class InMemoryBaseTable(
extends Table with SupportsRead with SupportsWrite with SupportsMetadataColumns {

// Tracks the current version number of the table.
protected var currentTableVersion: Int = 0
protected var tableVersion: Int = 0

// Stores the table version validated during the last `ALTER TABLE ... ADD CONSTRAINT` operation.
private var validatedTableVersion: String = null
Expand All @@ -75,14 +75,14 @@ abstract class InMemoryBaseTable(

override def columns(): Array[Column] = tableColumns

override def currentVersion(): String = currentTableVersion.toString
override def version(): String = tableVersion.toString

def setCurrentVersion(version: String): Unit = {
currentTableVersion = version.toInt
def setVersion(version: String): Unit = {
tableVersion = version.toInt
}

def increaseCurrentVersion(): Unit = {
currentTableVersion += 1
def increaseVersion(): Unit = {
tableVersion += 1
}

def validatedVersion(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ class InMemoryTable(
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
dataMap --= InMemoryTable
.filtersToKeys(dataMap.keys, partCols.map(_.toSeq.quoted).toImmutableArraySeq, filters)
increaseCurrentVersion()
increaseVersion()
}

override def withData(data: Array[BufferedRows]): InMemoryTable = {
Expand Down Expand Up @@ -109,7 +109,7 @@ class InMemoryTable(
row.getInt(0) == InMemoryTable.uncommittableValue()))) {
throw new IllegalArgumentException(s"Test only mock write failure")
}
increaseCurrentVersion()
increaseVersion()
this
}
}
Expand Down Expand Up @@ -155,7 +155,7 @@ class InMemoryTable(

copiedTable.commits ++= commits.map(_.copy())

copiedTable.setCurrentVersion(currentVersion())
copiedTable.setVersion(version())
if (validatedVersion() != null) {
copiedTable.setValidatedVersion(validatedVersion())
}
Expand All @@ -165,12 +165,12 @@ class InMemoryTable(

override def equals(other: Any): Boolean = other match {
case that: InMemoryTable =>
this.id == that.id && this.currentVersion() == that.currentVersion()
this.id == that.id && this.version() == that.version()
case _ => false
}

override def hashCode(): Int = {
Objects.hash(id, currentVersion())
Objects.hash(id, version())
}

class InMemoryWriterBuilderWithOverWrite(override val info: LogicalWriteInfo)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,8 +179,8 @@ class BasicInMemoryTableCatalog extends TableCatalog {
throw new IllegalArgumentException(s"Cannot drop all fields")
}

table.increaseCurrentVersion()
val currentVersion = table.currentVersion()
table.increaseVersion()
val currentVersion = table.version()
val newTable = new InMemoryTable(
name = table.name,
columns = CatalogV2Util.structTypeToV2Columns(schema),
Expand All @@ -189,7 +189,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {
constraints = constraints,
id = table.id)
.alterTableWithData(table.data, schema)
newTable.setCurrentVersion(currentVersion)
newTable.setVersion(currentVersion)
changes.foreach {
case a: TableChange.AddConstraint =>
newTable.setValidatedVersion(a.validatedTableVersion())
Expand All @@ -209,7 +209,7 @@ class BasicInMemoryTableCatalog extends TableCatalog {

Option(tables.remove(oldIdent)) match {
case Some(table: InMemoryBaseTable) =>
table.increaseCurrentVersion()
table.increaseVersion()
tables.put(newIdent, table)
case _ =>
throw new NoSuchTableException(oldIdent.asMultipartIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val condition = a.checkConstraint.condition
val change = TableChange.addConstraint(
check.toV2Constraint,
d.relation.table.currentVersion)
d.relation.table.version)
ResolveTableConstraints.validateCatalogForTableChange(Seq(change), catalog, ident)
AddCheckConstraintExec(catalog, ident, change, condition, planLater(a.child)) :: Nil

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
sql(s"INSERT INTO $t VALUES (1, 'a'), (null, 'b')")
sql(s"ALTER TABLE $t ADD CONSTRAINT c1 CHECK (id > 0) $characteristic")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "c1")
Expand Down Expand Up @@ -254,7 +254,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
// Add a valid check constraint
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_positive_num CHECK (s.num >= -1)")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "valid_positive_num")
Expand Down Expand Up @@ -284,7 +284,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
// Add a valid check constraint
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_map_val CHECK (m['a'] >= -1)")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "valid_map_val")
Expand Down Expand Up @@ -312,7 +312,7 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
// Add a valid check constraint
sql(s"ALTER TABLE $t ADD CONSTRAINT valid_array CHECK (a[1] >= -2)")
val table = loadTable(nonPartitionCatalog, "ns", "tbl")
assert(table.currentVersion() == "2")
assert(table.version() == "2")
assert(table.validatedVersion() == "1")
val constraint = getCheckConstraint(table)
assert(constraint.name() == "valid_array")
Expand Down