[WIP][SPARK-55951][SPARK-55952][SPARK-55953][SQL] Add post-processing for CDC changelog tables#55426
[WIP][SPARK-55951][SPARK-55952][SPARK-55953][SQL] Add post-processing for CDC changelog tables#55426SanJSp wants to merge 5 commits intoapache:masterfrom
Conversation
Add analyzer rule and execution primitives for Spark-side CDC post-processing: - `ResolveChangelogTable` analyzer rule: injects carry-over removal and/or update-detection plans above a resolved `DataSourceV2Relation(ChangelogTable)`. - `CarryOverRemoval` logical node + `CarryOverRemovalExec` physical node + `CarryOverIterator`: sort-based removal of identical delete+insert CoW artifacts, keyed by rowId+rowVersion. - Update detection: Window-based relabeling of delete+insert pairs within the same rowId+rowVersion partition to `update_preimage`/`update_postimage`. - `ChangelogTable.validateSchema`: fail-fast validation of required CDC metadata columns, column types, and row-identity requirements. - New error class `INVALID_CHANGELOG_SCHEMA` with four sub-classes (MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID, NESTED_ROW_ID). - `InMemoryChangelogCatalog` extended with `ChangelogProperties` to configure post-processing scenarios in tests. - Tests: `ChangelogResolutionSuite` schema-validation cases and `ResolveChangelogTablePostProcessingSuite` end-to-end SQL coverage.
| val rowIdColumnNames = cl.rowId().map(_.fieldNames()(0)).toSeq | ||
| val rowVersionColumnName = cl.rowVersion().fieldNames()(0) // e.g. "_commit_version" |
There was a problem hiding this comment.
We pass column names and resolve them to ordinals at execution time in CarryOverRemovalExec.doExecute(). Ordinals computed at analysis time were tried first but broke: ColumnPruning / projection rewrites between analysis and physical planning can reorder or re-number columns, so analysis-time ordinals pointed at the wrong fields by the time the exec node ran. Resolving names against child.output at execute time dodges that.
| } | ||
| } else { | ||
| // pendingDelete != null && input.hasNext | ||
| val nextRow = input.next().copy() |
There was a problem hiding this comment.
Input rows may be mutable/reused by the upstream iterator. Any row buffered for comparison against the next row is defensively copied via .copy(). This is the standard DSv2 pattern, but the per-row copy cost is non-trivial on large scans. Confirm whether the upstream stage here guarantees non-reused rows (in which case we could drop the copy), or whether we should keep the defensive copy.
| * @param dataOrdinals column indices for data column comparison (field-by-field equality) | ||
| * @param schema the output schema for generic data column comparison | ||
| */ | ||
| class CarryOverIterator( |
There was a problem hiding this comment.
I know this is what's described in the proposal https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0
but I'm not convinced this whole machinery is really what we want
At a high-level, the way carry-overs are removed is by identifying rows that are deleted+inserted with the same row ID and row version.
The issue is that the proposal conflates row version with commit version: the version at which the row is deleted/inserted. With that definition, we can't distinguish between updates and rows being copied: both delete/insert get the current commit version as row version.
Delta & Iceberg actually mean something different by row version: the version at which the row was last modified - e.p. deleted rows don't get a new row version
https://github.com/delta-io/delta/blob/master/PROTOCOL.md#row-commit-versions
https://iceberg.apache.org/spec/#row-lineage
This definition actually makes implementing carry-over removal straightforward: given a delete and insert for a given row ID, if the row versions are equals it's a copy, if they aren't it's an update.
We still need a concept of commit version though because:
- This is used to compute updates: insert and deletes within the same commit would otherwise always show as delete+insert since they have different row versions (insert = current commit version, delete=previous commit version where the row was deleted)
- This is the API users query against when using version ranges:
CHANGES FROM VERSION 5 TO VERSION 10means all rows modifies in this commit range. Deleted rows don't get a new row version when they are deleted so they wouldn't show up.
I think we should update ChangeLog to include both _commit_version and row_version.
Either explicitly:
/**
* Returns the column used for ordering changes within the same row identity, used for
* removing carry-overs.
* <p>
* The default implementation throws {@link UnsupportedOperationException}. Connectors
* that support update detection must override this method.
*/
default NamedReference rowVersion() {
throw new UnsupportedOperationException("rowVersion is not supported.");
}
/**
* Returns the column used for collecting rows within the same commit, used for
* update detection.
*/
default NamedReference commitVersion() {
return FieldReference.column("_commit_version")
}
or implicitly - we already require column _commit_version to be present in the output:
/**
* Returns the column used for ordering changes within the same row identity, used for
* removing carry-overs.
* <p>
* The default implementation throws {@link UnsupportedOperationException}. Connectors
* that support update detection must override this method.
*/
default NamedReference rowVersion() {
throw new UnsupportedOperationException("rowVersion is not supported.");
}
There was a problem hiding this comment.
@johanl-db Thanks, agreed — this is the right call. We already expose rowVersion() on Changelog, but its current Javadoc describes it as an ordering column, which is what invites setting it equal to the commit version.
I have updated the SPIP (B.6). If you agree, we can update the javadoc in a follow-up PR(supporting _commit_version implicitly).
| if (options.deduplicationMode() != ChangelogInfo.DeduplicationMode.NONE && | ||
| changelog.containsCarryoverRows()) { | ||
| updatedRel = injectCarryoverRemoval(rel, changelog) | ||
| } | ||
| if (options.computeUpdates() && changelog.representsUpdateAsDeleteAndInsert()) { | ||
| updatedRel = injectUpdateDetection(updatedRel, changelog) | ||
| } | ||
| if (options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NET_CHANGES && | ||
| changelog.containsIntermediateChanges()) { | ||
| updatedRel = injectNetChangeComputation(updatedRel, changelog) | ||
| } |
There was a problem hiding this comment.
I believe it should be possible to perform all post-process in only two passes at most.
The proposal mentions computing updates during carry-over removal
https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0#heading=h.6ylqglw1kcc2
I'm ok leaving that for a follow up though
gengliangwang
left a comment
There was a problem hiding this comment.
A few inline comments. Nicely done on the refactor — the (rowId, _commit_version) partitioning with rowVersion as the discriminator, and fusing carry-over removal with update detection into a single window pass, both directly address the earlier review.
|
|
||
| val isCarryoverPair = And( | ||
| And(EqualTo(delCnt, Literal(1L)), EqualTo(insCnt, Literal(1L))), | ||
| EqualTo(minRv, maxRv)) |
There was a problem hiding this comment.
NULL-safety concern on _min_rv = _max_rv.
SQL equality with NULL is UNKNOWN, not TRUE:
- Both sides have NULL
rowVersion⇒Min = Max = NULL⇒NULL = NULLis UNKNOWN ⇒ filter keeps the pair (treated as an update). Arguably the safe default. - One side NULL, the other non-null ⇒
MinandMaxboth skip the null and return the non-null value ⇒_min_rv = _max_rv⇒ filter drops the pair as a carry-over. This is a silent correctness hole — a connector emitting NULL on one side of a real update would lose both rows.
Options:
- Document in
Changelog.rowVersion()that the column must be non-nullable, and enforce it inChangelogTable.validateSchema. - Add
count(rowVersion) = 2toisCarryoverPair, or useEqualNullSafeplus a non-null guard.
Worth a test with NULL rowVersion on exactly one half of a pair.
There was a problem hiding this comment.
Implemented option 1, added test cases "ChangelogTable - nullable rowVersion column fails" and "ChangelogTable - non-nullable rowVersion column passes"
There was a problem hiding this comment.
updated doc in #55460
We can add code and test for the nullability check as follow-up
| val planWithFilter = addCarryOverPairFilter(planWithWindow) | ||
| removeHelperColumns(planWithFilter) | ||
| } else if (requiresUpdateDetection) { | ||
| val planWithWindow = addWindowWithInsertAndDeleteCountsOnly(plan, cl) |
There was a problem hiding this comment.
Flag matrix gap: deduplicationMode = NONE + computeUpdates = true + connector with containsCarryoverRows = true.
With those flags, requiresCarryOverRemoval is false but requiresUpdateDetection is true, so this branch runs. A CoW carry-over pair then satisfies _del_cnt >= 1 AND _ins_cnt >= 1 and gets relabeled as update_preimage / update_postimage — the user asked for raw output with update detection and gets fabricated updates.
Two reasonable fixes:
- Reject this combination at analysis time (
NONEshouldn't compose withcomputeUpdates = truewhen the connector advertises carry-overs), or - Treat
computeUpdates = trueas implicitly requiring carry-over removal for any connector wherecontainsCarryoverRows = true, and run the full window path.
Either way, worth an explicit decision and a test covering the combo.
There was a problem hiding this comment.
Did first fix. Added
if (requiresUpdateDetection &&
changelog.containsCarryoverRows() &&
options.deduplicationMode() == ChangelogInfo.DeduplicationMode.NONE) {
throw QueryCompilationErrors.cdcUpdateDetectionRequiresCarryOverRemoval(
changelog.name())
}
in ResolveChangelogTable.scala
| val insCnt = getAttribute(input, "_ins_cnt") | ||
|
|
||
| val isUpdate = And( | ||
| GreaterThanOrEqual(delCnt, Literal(1L)), |
There was a problem hiding this comment.
The >= 1 here correctly tolerates partitions with more than one delete or more than one insert (connector over-emitting), but the behavior is non-obvious: all delete rows become update_preimage and all insert rows become update_postimage, potentially producing multiple update-pre/post rows for one logical row.
Either:
- Tighten to
_del_cnt = 1 AND _ins_cnt = 1and fail (or leave raw) for partitions that violate the 1:1 invariant, - Or document explicitly that Spark relabels every delete/insert in such a partition, and add a test pinning the behavior.
Same thought for addCarryOverPairFilter (line 196) — it already has the = 1 check, which is stricter than this one, so the two aren't aligned today.
There was a problem hiding this comment.
Going with option 1, failing variant. Just want to clarify with you that it is fine, as it implicitly forces max 1 insert and max 1 delete per commit version.
Added "update detection raises on multiple inserts per (rowId, commitVersion)" as a test.
| } | ||
| check("_change_type", StringType) | ||
| check("_commit_version") // connector-defined, any type accepted | ||
| check("_commit_timestamp", TimestampType) |
There was a problem hiding this comment.
This would be a natural place to also validate row-identity requirements, matching what the capability flags advertise:
- If
containsCarryoverRows() || representsUpdateAsDeleteAndInsert() || containsIntermediateChanges()⇒rowId()must be non-empty. - If
containsCarryoverRows() || representsUpdateAsDeleteAndInsert()⇒rowVersion()must be provided (and the referenced column must exist incolumns()).
Today ResolveChangelogTable guards on rowId().nonEmpty and silently no-ops when it's empty, and the helper-column lookups use IllegalStateException for missing columns — that's an assertion, not a user-facing error. Moving validation into validateSchema turns these into a proper AnalysisException with a clear error class at relation construction time.
| * pair, `_min_rv == _max_rv` signals that both sides share the same rowVersion and | ||
| * the pair is a CoW carry-over. | ||
| */ | ||
| private def addWindowWithInsertAndDeleteCountsAndRowVersionBounds( |
There was a problem hiding this comment.
This builder shares ~80% of its body with addWindowWithInsertAndDeleteCountsOnly above. Suggest collapsing into one:
private def addPostProcessingWindow(
plan: LogicalPlan, cl: Changelog, includeRowVersionBounds: Boolean): LogicalPlanThe name is also unusually long — addPostProcessingWindow with a boolean parameter reads more naturally and keeps the helper-column construction in a single place.
While here, a test for composite rowId (multi-column) would be good — the rowIdExprs ++ Seq(commitVersionAttr) path supports it but I don't see it exercised in the current suite.
There was a problem hiding this comment.
Done, this helped also to significantly simplify addRowLevelPostProcessing
var modifiedPlan = addPostProcessingWindow(plan, cl,
includeRowVersionBounds = requiresCarryOverRemoval)
if (requiresCarryOverRemoval) modifiedPlan = addCarryOverPairFilter(modifiedPlan)
if (requiresUpdateDetection) modifiedPlan = addUpdateRelabelProjection(modifiedPlan)
removeHelperColumns(modifiedPlan)
Added "update detection with composite rowId keeps different (id, name) tuples raw" test case
| * [[addRowLevelPostProcessing]]. | ||
| */ | ||
| private def removeHelperColumns(input: LogicalPlan): LogicalPlan = { | ||
| val helperNames = Set("_del_cnt", "_ins_cnt", "_min_rv", "_max_rv") |
There was a problem hiding this comment.
The helper column names are duplicated between the two window builders (implicitly via their Alias names) and this Set. Lifting them to a single private val HelperColumns = Set("_del_cnt", "_ins_cnt", "_min_rv", "_max_rv") near the top of the object — and using the same names to build the aliases — keeps producers and consumer in sync. Easy to drift otherwise.
| private def injectNetChangeComputation( | ||
| plan: LogicalPlan, | ||
| cl: Changelog): LogicalPlan = { | ||
| throw new UnsupportedOperationException( |
There was a problem hiding this comment.
Throwing at rule-apply time means a query with deduplicationMode = netChanges hits this UnsupportedOperationException deep in the analyzer. Since the WIP checkbox notes net-changes support is deferred, consider gating earlier with a proper AnalysisException / error class (e.g. in ChangelogTable.validateSchema or at the parser/builder level when netChanges is chosen) until the real implementation lands. That gives a useful error message to users and avoids a stack trace from an internal rule.
There was a problem hiding this comment.
Done, added 2 tests to validate:
"deduplicationMode=netChanges is rejected when connector emits intermediate changes" and "deduplicationMode=netChanges is rejected even when connector has no intermediate changes"
gengliangwang
left a comment
There was a problem hiding this comment.
Second pass on the current commit, now cross-referenced with the SPIP at SPARK-55668. Design is substantially the same as yesterday's review — the (rowId, _commit_version) partitioning + fused Window pass is intact, so I'm skipping the recap. Prior inline threads (validation relocation, NULL handling on _min_rv = _max_rv, >= 1 relabel looseness, netChanges error class, helper/window-builder consolidation, rowVersion Javadoc) are all still open.
The one design-contract divergence worth flagging: streaming post-processing is unimplemented — SPIP B.7 says update detection must be "Supported per micro-batch" but the rule matches only DataSourceV2Relation, not StreamingRelationV2. The rest of the new findings are smaller — a rowId()-default throw from an unconditional guard call, two dead error sub-classes, a vacuous plan assertion left over from the earlier exec-node design, and two Scaladoc bugs.
Also: the PR description is stale — it still references the CarryOverRemoval logical node, CarryOverRemovalExec, and CarryOverIterator, none of which exist after the simplification. Worth refreshing before merge.
| // Guard: without row identity, carry-over removal and update detection cannot | ||
| // correctly group rows. A match-miss leaves the relation unchanged -- exactly what | ||
| // we want for connectors that surface an empty rowId(). | ||
| case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) |
There was a problem hiding this comment.
Streaming CDC bypasses this rule entirely. spark.readStream.changes(...) resolves to StreamingRelationV2(_, _, table: ChangelogTable, ...) (see the "DataStreamReader - changes() resolves to StreamingRelationV2 with ChangelogTable" test), not DataSourceV2Relation(ChangelogTable), so the case here never matches and computeUpdates / deduplicationMode are silently ignored on streaming.
SPIP B.7 explicitly says update detection must be "Supported (per micro-batch)" for streaming — this PR is tagged as the Phase 2 implementation but leaves the streaming half of that contract unimplemented. Either add a StreamingRelationV2 case (plus a streaming test mirroring update detection relabels delete+insert with different data as update), or narrow the SPIP's streaming scope explicitly.
There was a problem hiding this comment.
Will be done in a later PR, added an AnalysisException if Streaming is requested with post-processing.
| // correctly group rows. A match-miss leaves the relation unchanged -- exactly what | ||
| // we want for connectors that surface an empty rowId(). | ||
| case rel @ DataSourceV2Relation(table: ChangelogTable, _, _, _, _, _) | ||
| if table.changelog.rowId().nonEmpty => |
There was a problem hiding this comment.
Changelog.rowId() is a default Java method that throws UnsupportedOperationException("rowId is not supported.") unless the connector overrides it. Its Javadoc says override is required only for connectors "that support update detection or net change computation" — so a connector with containsCarryoverRows = representsUpdateAsDeleteAndInsert = containsIntermediateChanges = false is allowed to leave the default in place.
But this case guard invokes rowId() for every DataSourceV2Relation(ChangelogTable). A minimal CDC connector that doesn't override it will fail analysis with the raw UnsupportedOperationException bubbling out of the guard — not an AnalysisException.
This is distinct from the validation concern on ChangelogTable.scala line 70: even once that validation lands and requires rowId() when any flag is set, the unconditional call here still throws for connectors where no flag is set. Suggest: compute the three requires* booleans using only capability flags + changelogInfo, then call rowId() only if at least one is true.
| "Required column `<columnName>` is missing." | ||
| ] | ||
| }, | ||
| "MISSING_ROW_ID" : { |
There was a problem hiding this comment.
INVALID_CHANGELOG_SCHEMA.MISSING_ROW_ID and INVALID_CHANGELOG_SCHEMA.NESTED_ROW_ID are declared here but not raised anywhere: no helper in QueryCompilationErrors, no call sites, no tests reference them. The PR description lists them as shipped but the code never uses them.
Either land the helpers + validation now (tying into the schema-validation relocation discussion on ChangelogTable.scala line 70), or remove these sub-classes until the follow-up that raises them. Dead error-class declarations surface in the generated docs and tend to drift once the real fix lands.
There was a problem hiding this comment.
Covered with the first batch of PR feedback fixes
| s"CHANGES FROM VERSION 1 TO VERSION 2 WITH (computeUpdates = 'true')") | ||
|
|
||
| val plan = df.queryExecution.analyzed.treeString | ||
| assert(!plan.contains("CarryOverRemoval"), |
There was a problem hiding this comment.
plan.contains("CarryOverRemoval") is a leftover from the earlier exec-node design. The current implementation has no plan node named CarryOverRemoval — post-processing uses generic Window / Filter / Project. This assertion is vacuously true and will pass regardless of whether the rule ran; only the _ins_cnt check below actually tests anything. Drop it, or replace with something like !plan.contains("_del_cnt") for a second independent signal.
| * @param rowIdNames optional row identity columns as top-level names (e.g. Seq("id")) | ||
| * @param rowIdPaths optional row identity paths for nested struct fields | ||
| * (e.g. Seq(Seq("payload", "id"))); takes precedence over rowIdNames | ||
| * @param rowVersionName optional row version column (e.g. Some("_commit_version")) |
There was a problem hiding this comment.
The example Some("_commit_version") would break the rule: every row in a given commit shares that value, so within any (rowId, _commit_version) partition _min_rv = _max_rv is trivially true and every delete+insert pair gets dropped as a "carry-over." Every test in the suite correctly passes "row_commit_version". Fix the doc example so readers don't copy the wrong column.
| * @param rowVersionName optional row version column (e.g. Some("_commit_version")) | |
| * @param rowVersionName optional row version column (e.g. Some("row_commit_version")) |
| * Collapses multiple changes per row identity into the net effect. Not in | ||
| * scope for this implementation. |
There was a problem hiding this comment.
"Not in scope for this implementation" reads as if the method were dead/placeholder, but it's wired up from apply and throws UnsupportedOperationException when reached — same throw-at-rule-apply-time concern raised in the line 236 thread.
| * Collapses multiple changes per row identity into the net effect. Not in | |
| * scope for this implementation. | |
| * Collapses multiple changes per row identity into the net effect. Not yet | |
| * implemented: throws UnsupportedOperationException when invoked. |
There was a problem hiding this comment.
Simply reduced to "Not yet implemented" - will be immediately implemented afterwards, hence I left a small stub to continue there.
Suggestion: split this PRThe current PR conflates four logically independent changes whose review cadence and blast radius are very different. Splitting would let reviewers iterate on each piece without blocking the others. PR 1 — Scope: Why first: pure validation, no plan rewriting, independent of post-processing design. Also unblocks the design discussion on the follow-ups — the "relocate row-identity validation into PR 2 — Scope: the rule itself (fused Window pass, the two This is where the load-bearing design debate lives. Keeping it isolated means reviewers don't context-switch between "is the fused Window correct?" and "is the error class structured right?" PR 3 — Streaming post-processing Scope: extend the rule's case pattern to cover Why separate: streaming touches a different plan shape and has its own correctness concerns (per-micro-batch partitioning, offset semantics, idempotency under retry). Much easier to reason about once PR 2 has settled the batch semantics. PR 4 — Net change computation Scope: actual implementation of Optional PR 0 — PR description refresh + doc fixes If you want to ship the two Scaladoc fixes ( Trade-offs
If the PR needs to stay as one ticket for SPIP tracking reasons, an alternative is to commit them as separable atomic commits on the same branch so the review can still proceed piece-by-piece. |
|
Will split up and re-request reviews from the new branches. Thanks for the feedback so far. |
|
@gengliangwang feel free to take another look at the two new PRs. Thank you for your feedback so far, it's greatly appreciated! |
Ongoing changes still open:
What changes were proposed in this pull request?
Building atop #54739 and #54738, this PR introduces changes to support Change Data Capture Queries given an underlying connector provides the expected data.
Important discussion posted to dev@spark.apache.org.
Added changes:
ResolveChangelogTableanalyzer rule: injects carry-over removal and/or update-detection plans above a resolvedDataSourceV2Relation(ChangelogTable).CarryOverRemovallogical node +CarryOverRemovalExecphysical node +CarryOverIterator: sort-based removal of identical delete+insert CoW artifacts, keyed by rowId+rowVersion.update_preimage/update_postimage.ChangelogTable.validateSchema: fail-fast validation of required CDC metadata columns, column types, and row-identity requirements.INVALID_CHANGELOG_SCHEMAwith four sub-classes (MISSING_COLUMN, INVALID_COLUMN_TYPE, MISSING_ROW_ID, NESTED_ROW_ID).InMemoryChangelogCatalogextended withChangelogPropertiesto configure post-processing scenarios in tests.Why are the changes needed?
Spark currently has the DSv2 CDC API and the SQL CHANGES clause, but the
computeUpdatesanddeduplicationModeoptions are not acted upon. This PR wires the analyzer-side post-processing so connectors implementing Changelog can deliver real CDC feeds.See the discussion and linked SPIP.
Does this PR introduce any user-facing change?
For connectors implementing the Changelog API, the deduplicationMode and computeUpdates options on CHANGES FROM VERSION ... now take effect (carry-over removal by default, optional update detection). Previously these options were parsed but silently ignored.
Before / after example (click to expand)
Given a
Changelogconnector that advertisescontainsCarryoverRows = true,representsUpdateAsDeleteAndInsert = true,and
containsIntermediateChanges = true, with rowIdidand rowVersion_commit_version:Setup (raw changes emitted by the connector for versions 1–3):
Query:
Before this PR — no analyzer rule, all options are silently ignored;
user sees the raw emitted rows including carry-over noise and no
update labeling regardless of
WITH (...):After this PR,
WITH (computeUpdates = 'true')(default
deduplicationMode = 'drop_carryovers') — carry-over removal andupdate detection apply:
After this PR,
WITH (deduplicationMode = 'netChanges')— intermediatestates are collapsed into the net effect per row over the whole range
(
id=1ends up asAlex;id=3was inserted and deleted within the range,so it disappears from the output):
How was this patch tested?
Tests:
ChangelogResolutionSuiteschema-validation cases andResolveChangelogTablePostProcessingSuiteend-to-end SQL coverage.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.6 / 4.7