Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,12 @@
* <ul>
* <li>{@code _change_type} (STRING) — the kind of change: {@code insert}, {@code delete},
* {@code update_preimage}, or {@code update_postimage}</li>
* <li>{@code _commit_version} — the commit version containing this change. Must be of
* an atomic orderable type (e.g. {@code LongType}, {@code StringType},
* {@code IntegerType}, {@code TimestampType}); complex types
* ({@code ArrayType}, {@code MapType}, {@code StructType}) are rejected.
* Spark post-processing sorts rows of a given row identity by this column to
* determine the first and last events</li>
* <li>{@code _commit_version} — the commit version containing this change. Must be
* either {@code LongType} or {@code StringType}; all other types are rejected.
* The column's natural ordering (numeric for {@code LongType}, lexicographic for
* {@code StringType}) must match commit order, because the netChanges
* post-processing path sorts rows of a given row identity by this column to
* determine the first and last events.</li>
* <li>{@code _commit_timestamp} (TIMESTAMP) -- the timestamp of the commit. All rows
* belonging to a single {@code _commit_version} must share the same
* {@code _commit_timestamp}. For streaming reads with post-processing enabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,10 @@ private[analysis] class CdcNetChangesStatefulProcessor(
private val changeTypeIdx: Int = inputSchema.fieldIndex("_change_type")
private val commitVersionIdx: Int = inputSchema.fieldIndex("_commit_version")

// `_commit_version` is connector-defined and may be any atomic orderable type
// (LongType, StringType, IntegerType, TimestampType, BinaryType, ...). To order
// rows generically -- without assuming the boxed external Java value is
// `Comparable` (e.g. BinaryType -> Array[Byte], which is not) -- we route the
// value through Catalyst: convert the external Row value to its Catalyst-internal
// form and compare with the type-aware interpreted ordering. This mirrors the
// batch path which uses Catalyst's `SortOrder` on the same attribute.
// `_commit_version` is connector-defined and is restricted to LongType or StringType
// (validated in `ChangelogTable.validateSchema`). We still route through Catalyst's
// type-aware interpreted ordering for symmetry with the batch path's `SortOrder` on
// the same attribute.
private val versionDataType = inputSchema(commitVersionIdx).dataType
private val versionToCatalyst: Any => Any =
CatalystTypeConverters.createToCatalystConverter(versionDataType)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.sql.connector.catalog.{Changelog, ChangelogInfo, Column,
import org.apache.spark.sql.connector.catalog.TableCapability.{BATCH_READ, MICRO_BATCH_READ}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.types.{AtomicType, DataType, StringType, TimestampType}
import org.apache.spark.sql.types.{DataType, LongType, StringType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap

/**
Expand Down Expand Up @@ -66,14 +66,19 @@ object ChangelogTable {
}
}
check("_change_type", StringType)
// `_commit_version` is connector-defined but must be an atomic orderable type. Both
// the batch (Catalyst `SortOrder`) and the streaming netChanges (typed Comparable
// ordering inside the stateful processor) paths require an orderable scalar.
// `_commit_version` must be either `LongType` or `StringType`. Connectors must
// additionally guarantee that the column's natural ordering (numeric /
// lexicographic) matches commit order, because the netChanges post-processing path
// sorts rows by this column. These two types cover every realistic CDC source;
// broader atomic types like `IntegerType` are strict subsets of `LongType`, and
// `TimestampType` duplicates the role of `_commit_timestamp`. The narrower
// contract can always be relaxed later (relaxing is non-breaking; restricting is
// not).
val versionCol = byName.getOrElse("_commit_version",
throw QueryCompilationErrors.changelogMissingColumnError(cl.name, "_commit_version"))
if (!versionCol.dataType.isInstanceOf[AtomicType]) {
if (versionCol.dataType != LongType && versionCol.dataType != StringType) {
throw QueryCompilationErrors.changelogInvalidColumnTypeError(
cl.name, "_commit_version", "an atomic orderable type", versionCol.dataType.sql)
cl.name, "_commit_version", "BIGINT or STRING", versionCol.dataType.sql)
}
check("_commit_timestamp", TimestampType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,17 +380,21 @@ class ChangelogResolutionSuite extends SharedSparkSession {
parameters = wrongType("_commit_timestamp", "TIMESTAMP", "BIGINT"))
}

test("ChangelogTable - _commit_version accepts any atomic orderable type") {
Seq(LongType, IntegerType, StringType, TimestampType).foreach { versionType =>
test("ChangelogTable - _commit_version accepts LongType and StringType") {
Seq(LongType, StringType).foreach { versionType =>
ChangelogTable(
cl("any_cl", validChangeType, "_commit_version" -> versionType, validTimestamp),
stubInfo())
}
}

test("ChangelogTable - non-atomic _commit_version data type throws") {
test("ChangelogTable - _commit_version rejects all other data types") {
val structVersion = StructType(Seq(StructField("v", LongType)))
Seq[(org.apache.spark.sql.types.DataType, String)](
// Other atomic types previously allowed under the AtomicType contract.
IntegerType -> "INT",
TimestampType -> "TIMESTAMP",
// Complex types (always rejected).
ArrayType(LongType) -> "ARRAY<BIGINT>",
MapType(StringType, LongType) -> "MAP<STRING, BIGINT>",
structVersion -> structVersion.sql).foreach { case (versionType, sql) =>
Expand All @@ -401,7 +405,7 @@ class ChangelogResolutionSuite extends SharedSparkSession {
stubInfo())
},
condition = "INVALID_CHANGELOG_SCHEMA.INVALID_COLUMN_TYPE",
parameters = wrongType("_commit_version", "an atomic orderable type", sql))
parameters = wrongType("_commit_version", "BIGINT or STRING", sql))
}
}

Expand Down