Skip to content

[SPARK-55952][SPARK-55953][SQL] Add ResolveChangelogTable analyzer rule for batch CDC post-processing#55588

Closed
gengliangwang wants to merge 2 commits into
apache:masterfrom
gengliangwang:SPARK-55952-resolve-changelog-table-reapply
Closed

[SPARK-55952][SPARK-55953][SQL] Add ResolveChangelogTable analyzer rule for batch CDC post-processing#55588
gengliangwang wants to merge 2 commits into
apache:masterfrom
gengliangwang:SPARK-55952-resolve-changelog-table-reapply

Conversation

@gengliangwang
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Re-apply of #55508 (commit 881957a, which was reverted in fe6051a) plus a fix for the ProtoToParsedPlanTestSuite failures that triggered the revert.

The original commit introduces the ResolveChangelogTable analyzer rule that post-processes a resolved DataSourceV2Relation(ChangelogTable) to inject carry-over removal and/or update detection, fused into a single pass over a (rowId, _commit_version)-partitioned Window. To prevent silent wrong results, it also includes an explicit rejection path for streaming CDC reads that would require post-processing.

Included changes (re-applied from #55508):

  • ResolveChangelogTable analyzer rule:
    • Batch: applies the requested post-processing transformations. Carry-over removal is a Filter on the Window (drops CoW pairs where min(rowVersion) == max(rowVersion)). Update detection is a CASE WHEN over delete/insert counts (relabels pairs as update_preimage / update_postimage). The two passes are fused into a single Window.
    • Streaming: throws INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED when the requested options would need post-processing. Streams that don't need post-processing pass through unchanged.
    • Net changes: throws INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED for both batch and streaming.
  • Option validation: throws INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL when computeUpdates = true is combined with a carry-over-surfacing connector and deduplicationMode = none.
  • Runtime guard: INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION when the connector emits more than one delete or insert for the same (rowId, _commit_version) partition.
  • Analyzer: register the rule after ResolveRelations.
  • InMemoryChangelogCatalog: ChangelogProperties extension so tests can configure post-processing scenarios without a real connector.

Additional changes in this PR that were missing from the original commit (the cause of the revert):

  • PlanGenerationTestSuite: switch the read changes with options test from deduplicationMode = netChanges to dropCarryovers since netChanges is now rejected up-front by the new rule (NET_CHANGES_NOT_YET_SUPPORTED).
  • Regenerate the corresponding read_changes_with_options.{json,proto.bin} query inputs.
  • Regenerate streaming_changes_API_with_options.explain golden file to include the new resolved: Boolean field added to ChangelogTable.

Why are the changes needed?

Currently, CHANGES FROM VERSION ... WITH (deduplicationMode = ..., computeUpdates = ...) parses the options, but they are silently ignored — connector output is returned raw. This PR wires the options to their actual semantics for batch reads, and prevents silent wrong results for streaming reads.

Does this PR introduce any user-facing change?

Yes, for CDC queries against a Changelog connector. See the original PR #55508 description for a before/after example.

How was this patch tested?

  • ResolveChangelogTablePostProcessingSuite exercises the batch rule end-to-end via SQL against InMemoryChangelogCatalog (carry-over removal, update detection, their interaction across the option and connector-flag matrix, data-column handling with mixed types, and plan-shape invariants).
  • ChangelogResolutionSuite adds streaming-rejection cases for the two capability flags that would require post-processing.
  • ProtoToParsedPlanTestSuite (724/724) — the suite that previously failed and led to the revert.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Opus 4.7

SanJSp and others added 2 commits April 28, 2026 11:59
…le for batch CDC post-processing

This is **PR 2 of a split** of apache#55426 (see the [split suggestion](apache#55426 (comment)) for the full plan). Its independent of PR 1, so we can merge in any order. For more context see [discussion](https://lists.apache.org/thread/dhxx6pohs7fvqc3knzhtoj4tbcgrwxts) posted to [devspark.apache.org](https://lists.apache.org/list.html?devspark.apache.org) and linked [SPIP](https://docs.google.com/document/d/1-4rCS3vsGIyhwnkAwPsEaqyUDg-AuVkdrYLotFPw0U0/edit?tab=t.0#heading=h.m1700lw4wsoj).

Introduce the analyzer rule that post-processes a resolved `DataSourceV2Relation(ChangelogTable)` to inject carry-over removal and/or update detection, fused into a single pass over a `(rowId, _commit_version)`-partitioned Window. To prevent silent wrong results, it also includes an explicit rejection path for streaming CDC reads that would require post-processing.

Included Changes:
- `ResolveChangelogTable` analyzer rule:
    - **Batch**: applies the requested post-processing transformations. Carry-over removal is a `Filter` on the Window (drops CoW pairs where `min(rowVersion) == max(rowVersion)`). Update detection is a `CASE WHEN` over delete/insert counts (relabels pairs as `update_preimage` /
  `update_postimage`). The two passes are fused into a single Window.
    - **Streaming**: throws `INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED` when the requested options would need post-processing. Streams that don't need post-processing pass through unchanged. Actual streaming support is scoped to a follow-up PR.
    - **Net changes**: throws `INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED` for both batch and streaming. Actual implementation is scoped to a follow-up PR.
- Option validation: throws `INVALID_CDC_OPTION.UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL` when `computeUpdates = true` is combined with a carry-over-surfacing connector and `deduplicationMode = none`, which would silently misclassify carry-overs as updates.
- Runtime guard: the generated plan raises `INVALID_CDC_OPTION.UNEXPECTED_MULTIPLE_CHANGES_PER_ROW_VERSION` when the connector emits more
  than one delete or insert for the same `(rowId, _commit_version)` partition, violating the `Changelog` contract.
- `Analyzer`: register the rule after `ResolveRelations`.
- `InMemoryChangelogCatalog`: `ChangelogProperties` extension so tests can configure post-processing scenarios without a real connector.

### Why are the changes needed?

Currently, `CHANGES FROM VERSION ... WITH (deduplicationMode = ..., computeUpdates = ...)` parses the options, but they are silently ignored — connector output is returned raw. This PR wires the options to their actual semantics for batch reads, and prevents silent wrong results for streaming reads.

### Does this PR introduce _any_ user-facing change?

Yes, for CDC queries against a `Changelog` connector.

  <details>
  <summary> Before/after example (click to expand)</summary>

  Given a `Changelog` connector that advertises both `containsCarryoverRows = true` and `representsUpdateAsDeleteAndInsert = true`, with rowId
  `id` and a `rowVersion` column, for versions 1–2:

  **Raw rows emitted by the connector:**
  ```
   1 | Alice  | insert | 1
   2 | Bob    | insert | 1
   3 | Carol  | insert | 1
   1 | Alice  | delete | 2    -- part of rename Alice -> Alicia
   1 | Alicia | insert | 2    -- part of rename Alice -> Alicia
   2 | Bob    | delete | 2    -- carry-over (CoW, row unchanged)
   2 | Bob    | insert | 2    -- carry-over (CoW, row unchanged)
   3 | Carol  | delete | 2    -- real delete
  ```

  **Before this PR:** `WITH (computeUpdates = 'true')` is silently ignored, carry-overs leak through:
  ```
   1 | Alice  | insert | 1
   2 | Bob    | insert | 1
   3 | Carol  | insert | 1
   1 | Alice  | delete | 2
   1 | Alicia | insert | 2
   2 | Bob    | delete | 2
   2 | Bob    | insert | 2
   3 | Carol  | delete | 2
  ```

  **After this PR:** `WITH (computeUpdates = 'true')`:
  ```
   1 | Alice  | insert           | 1
   2 | Bob    | insert           | 1
   3 | Carol  | insert           | 1
   1 | Alice  | update_preimage  | 2
   1 | Alicia | update_postimage | 2
   3 | Carol  | delete           | 2
  ```

  </details>

  ### How was this patch tested?

  `ResolveChangelogTablePostProcessingSuite` exercises the batch rule end-to-end via SQL against `InMemoryChangelogCatalog` (carry-over removal, update detection, their interaction across the option and connector-flag matrix, data-column handling with mixed types, and plan-shape invariants). `ChangelogResolutionSuite` adds streaming-rejection cases for the two capability flags that would require post-processing.

  ### Was this patch authored or co-authored using generative AI tooling?

  Generated-by: Claude Opus 4.7

Closes apache#55508 from SanJSp/SPARK-55668-PR2-resolve-changelog-table.

Lead-authored-by: Sandro Sp <sandrospeh@gmail.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
- Switch `read changes with options` from `deduplicationMode=netChanges` to
  `dropCarryovers`. `ResolveChangelogTable` now rejects `netChanges` with
  INVALID_CDC_OPTION.NET_CHANGES_NOT_YET_SUPPORTED.
- Regenerate the corresponding proto.bin / json query inputs.
- Regenerate `streaming_changes_API_with_options.explain` to include the new
  `resolved: Boolean` field on `ChangelogTable` introduced in the previous
  commit.
Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ResolveChangelogTable.scala:211-247: The runtime contract guard is currently only applied inside addUpdateRelabelProjection, so it only runs when computeUpdates=true. However, carry-over removal also depends on the same assumption that each (rowId, _commit_version) partition has at most one delete and one insert. With computeUpdates=false and the default deduplicationMode=dropCarryovers, a connector that emits multiple deletes or inserts for the same partition will silently bypass the CHANGELOG_CONTRACT_VIOLATION guard and may produce incorrect results by retaining rows that should be treated as a contract violation.

Copy link
Copy Markdown
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found one possible issue. Otherwise, looks good.

@gengliangwang
Copy link
Copy Markdown
Member Author

@viirya thanks for the review. Since this is re-picking, I will just merge it.
@SanJSp please follow-up #55588 (review)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants