[DO NOT MERGE][SPARK-55952][SPARK-55953][SQL] Add ResolveChangelogTable analyzer rule for batch CDC post-processing#55568
Open
gengliangwang wants to merge 5 commits intoapache:masterfrom
Open
[DO NOT MERGE][SPARK-55952][SPARK-55953][SQL] Add ResolveChangelogTable analyzer rule for batch CDC post-processing#55568gengliangwang wants to merge 5 commits intoapache:masterfrom
gengliangwang wants to merge 5 commits intoapache:masterfrom
Conversation
…CDC post-processing
Post-process a resolved `DataSourceV2Relation(ChangelogTable)` to inject
carry-over removal and/or update detection, fused into a single pass over a
`(rowId, _commit_version)`-partitioned Window. Also reject streaming CDC
reads that would require post-processing, to prevent silent wrong results.
- `ResolveChangelogTable` analyzer rule:
- Batch: carry-over removal is a `Filter` on the Window (drops CoW pairs
where `min(rowVersion) == max(rowVersion)`); update detection is a
`CASE WHEN` over delete/insert counts (relabels pairs as
`update_preimage` / `update_postimage`). Fused into a single Window.
- Streaming: throws `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED`
when the requested options would need post-processing. Streams that
don't need post-processing pass through unchanged.
- Net changes: throws `INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED`
for both batch and streaming.
- Option validation: throws
`INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL` when
`computeUpdates = true` is combined with a carry-over-surfacing connector
and `deduplicationMode = none`, which would silently misclassify
carry-overs as updates.
- Runtime guard: the generated plan raises
`INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION` when the
connector emits more than one delete or insert for the same
`(rowId, _commit_version)` partition.
- `Analyzer`: register the rule after `ResolveRelations`.
- `InMemoryChangelogCatalog`: `ChangelogProperties` extension so tests can
configure post-processing scenarios.
Actual streaming post-processing implementation and net change computation
are scoped to follow-up PRs.
Member
Author
|
This PR is to help run the CI for #55508 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This is PR 2 of a split of #55426 (see the split suggestion for the full plan). Its independent of PR 1, so we can merge in any order. For more context see discussion posted to dev@spark.apache.org and linked SPIP.
Introduce the analyzer rule that post-processes a resolved
DataSourceV2Relation(ChangelogTable)to inject carry-over removal and/or update detection, fused into a single pass over a(rowId, _commit_version)-partitioned Window. To prevent silent wrong results, it also includes an explicit rejection path for streaming CDC reads that would require post-processing.Included Changes:
ResolveChangelogTableanalyzer rule:Filteron the Window (drops CoW pairs wheremin(rowVersion) == max(rowVersion)). Update detection is aCASE WHENover delete/insert counts (relabels pairs asupdate_preimage/update_postimage). The two passes are fused into a single Window.INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTEDwhen the requested options would need post-processing. Streams that don't need post-processing pass through unchanged. Actual streaming support is scoped to a follow-up PR.INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTEDfor both batch and streaming. Actual implementation is scoped to a follow-up PR.INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVALwhencomputeUpdates = trueis combined with a carry-over-surfacing connector anddeduplicationMode = none, which would silently misclassify carry-overs as updates.INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSIONwhen the connector emits morethan one delete or insert for the same
(rowId, _commit_version)partition, violating theChangelogcontract.Analyzer: register the rule afterResolveRelations.InMemoryChangelogCatalog:ChangelogPropertiesextension so tests can configure post-processing scenarios without a real connector.Why are the changes needed?
Currently,
CHANGES FROM VERSION ... WITH (deduplicationMode = ..., computeUpdates = ...)parses the options, but they are silently ignored — connector output is returned raw. This PR wires the options to their actual semantics for batch reads, and prevents silent wrong results for streaming reads.Does this PR introduce any user-facing change?
Yes, for CDC queries against a
Changelogconnector.Before/after example (click to expand)
Given a
Changelogconnector that advertises bothcontainsCarryoverRows = trueandrepresentsUpdateAsDeleteAndInsert = true, with rowIdidand arowVersioncolumn, for versions 1–2:Raw rows emitted by the connector:
Before this PR:
WITH (computeUpdates = 'true')is silently ignored, carry-overs leak through:After this PR:
WITH (computeUpdates = 'true'):How was this patch tested?
ResolveChangelogTablePostProcessingSuiteexercises the batch rule end-to-end via SQL againstInMemoryChangelogCatalog(carry-over removal, update detection, their interaction across the option and connector-flag matrix, data-column handling with mixed types, and plan-shape invariants).ChangelogResolutionSuiteadds streaming-rejection cases for the two capability flags that would require post-processing.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Opus 4.7