Skip to content

[SPARK-56687][SQL] Support netChanges for DSv2 CDC streaming reads#55637

Open
gengliangwang wants to merge 10 commits intoapache:masterfrom
gengliangwang:streamingCDC-netChanges
Open

[SPARK-56687][SQL] Support netChanges for DSv2 CDC streaming reads#55637
gengliangwang wants to merge 10 commits intoapache:masterfrom
gengliangwang:streamingCDC-netChanges

Conversation

@gengliangwang
Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

This PR completes the DSv2 CDC streaming post-processing surface by implementing deduplicationMode = netChanges for streaming reads. The previous PR (#55636 / SPARK-56686) added carry-over removal and update detection for streaming but left netChanges batch-only.

The batch path (ResolveChangelogTable.injectNetChangeComputation) uses a Catalyst Window partitioned by rowId and ordered by (_commit_version, change_type_rank) to find the first and last events per row identity, then applies the SPIP collapse matrix on (existedBefore, existsAfter). Window is rejected on streaming children (NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING), and unlike the row-level passes the netChanges aggregate is keyed by rowId only -- there's no commit-version + commit-timestamp grouping that would let us reuse the streaming Aggregate pattern.

This PR adds a streaming-friendly equivalent by delegating per-row-identity state management to a new CdcNetChangesStatefulProcessor driven by TransformWithState:

  • The processor stores the first event ever observed and the most-recent event observed for each row identity in ValueState[Row].
  • An event-time timer is armed on each batch to the latest _commit_timestamp observed for the key. When the global watermark advances past the timer, handleExpiredTimer evaluates the SPIP matrix and emits 0, 1, or 2 output rows -- identical semantics to the batch path.
  • Existing per-key timers are deleted before re-arming so that out-of-order events for an earlier commit can't fire a stale timer between batches and produce duplicate output for the same row identity.

The analyzer rule constructs TransformWithState directly (no precedent in catalyst for this; the typed-Dataset DSL is the usual entry point). Encoders for the input/output Row and the rowId tuple are built via ExpressionEncoder(StructType). Nested rowId paths (e.g. payload.id) are handled by aliasing each rowId expression to a top-level __spark_cdc_rowid_<i> helper column before the TransformWithState, then dropping the helpers in a final Project so the user-visible schema matches the connector's declared changelog schema.

Plan shape:

EventTimeWatermark(_commit_timestamp, 0s)
  -> Project (alias rowId expressions to flat helper columns)
  -> TransformWithState (grouping = rowId helpers, EventTime mode, Append)
  -> SerializeFromObject
  -> Project (drop rowId helper columns)

When carry-over removal / update detection are also requested, the row-level rewrite is applied first; the netChanges TransformWithState then sits on top of it and the rule reuses the existing EventTimeWatermark rather than stacking another (multi-watermark stacking is rejected unless STATEFUL_OPERATOR_ALLOW_MULTIPLE is set).

Documented limitation

Row identities only touched in the latest observed commit are held back until a later commit (with strictly greater _commit_timestamp) advances the watermark past them, or the source terminates. End-of-input flushes all timers, so bounded streams produce output equivalent to the corresponding batch read. This matches the steady-state behavior of the row-level streaming rewrite.

Also removes the now-obsolete error class INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED introduced in SPARK-56686.

Why are the changes needed?

Without this PR, deduplicationMode = netChanges is unavailable on streaming CDC reads, forcing users with intermediate-state connectors (containsIntermediateChanges = true) to fall back to batch reads when they want a deduplicated change feed. With SPARK-56686 already shipping carry-over removal and update detection for streaming, netChanges was the only post-processing pass still gated to batch -- this completes the surface.

Does this PR introduce any user-facing change?

Yes.

  • Streaming spark.readStream.changes(...) now supports deduplicationMode = netChanges. Previously this threw INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED.
  • That error class is removed; the wording in DataStreamReader.changes() and Changelog.java Scaladoc has been updated to describe the supported behavior and the latest-commit limitation.

Note: the netChanges streaming path uses TransformWithState, which requires the RocksDB state store backend (spark.sql.streaming.stateStore.providerClass = ...RocksDBStateStoreProvider). Spark surfaces UNSUPPORTED_FEATURE.STORE_BACKEND_NOT_SUPPORTED_FOR_TWS if the default HDFS-backed provider is left in place, so this is discoverable.

How was this patch tested?

89 tests pass across 4 CDC suites (all green):

  • ResolveChangelogTableStreamingPostProcessingSuite -- two new plan-shape tests: netChanges alone injects watermark + TransformWithState and netChanges + carry-over removal share a single watermark (verifies that the netChanges TransformWithState reuses the row-level rewrite's EventTimeWatermark rather than stacking another).
  • ChangelogResolutionSuite -- the netChanges throws test from SPARK-56686 is flipped to assert that exactly one TransformWithState appears in the analyzed plan.
  • ResolveChangelogTablePostProcessingSuite -- the corresponding netChanges throw test is similarly flipped.
  • ChangelogEndToEndSuite -- two new end-to-end tests that drive a streaming query against InMemoryChangelogCatalog with the RocksDB state store: streaming netChanges collapses INSERT then DELETE to no output (confirms the (false, false) cancel case and that end-of-input flushes the latest commit's group) and streaming netChanges with computeUpdates labels persisting rows as updates (confirms the (false, true) case relabels correctly).

Also confirmed UnsupportedOperationsSuite (216 tests) still passes.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (claude-opus-4-7)

Implements carry-over removal and update detection for DSv2 CDC streaming
reads, which previously rejected any post-processing with a blanket error.

The batch path uses a Catalyst Window keyed by (rowId, _commit_version),
which UnsupportedOperationChecker rejects on streaming queries
(NON_TIME_WINDOW_NOT_SUPPORTED_IN_STREAMING). The streaming rewrite
expresses the same logic with streaming-allowed primitives:
EventTimeWatermark on _commit_timestamp -> Aggregate keyed by
(rowId..., _commit_version, _commit_timestamp) buffering events into a
collect_list of structs -> [Filter on the carry-over predicate] ->
Generate(Inline(events)) to re-emit rows -> [Project relabeling
_change_type for delete+insert pairs] -> drop helper columns.

deduplicationMode=netChanges remains batch-only; it requires partitioning
by rowId across the entire requested range and is fundamentally
cross-batch. The existing
INVALID_CDC_OPTION.STREAMING_POST_PROCESSING_NOT_SUPPORTED error is
replaced with the more specific
INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED, which now also
points users at the supported streaming alternatives.

Also clarifies the Changelog.java contract that all rows of a single
_commit_version must share _commit_timestamp and that streaming reads
expect non-decreasing _commit_timestamp across micro-batches, plus a
note in DataStreamReader.changes() Scaladoc about the netChanges
streaming limitation.
@gengliangwang
Copy link
Copy Markdown
Member Author

@huaxingao FYI this is part of the SPIP for CDC support (SPARK-55668), targeting the Spark 4.2 release. We're aiming to get it ready and merged ASAP.

@gengliangwang gengliangwang force-pushed the streamingCDC-netChanges branch from c5bc9bf to 4d48424 Compare May 1, 2026 00:19
…l rewrite

Three fixes from viirya's review on apache#55636:

1. Strip the auto-injected EventTimeWatermark metadata from the user-visible
   `_commit_timestamp` output. The metadata flowed through `Generate(Inline)`
   onto the public output, where it would have interacted with downstream
   user-supplied watermarks via the global multi-watermark policy. A final
   Project at the boundary of the rewrite now removes
   `EventTimeWatermark.delayKey` so the watermark stays internal-only.

2. Reject non-Append output modes for streaming CDC reads with post-processing.
   The injected streaming Aggregate's append-mode emission (one group per
   `_commit_timestamp` once the watermark advances past it) is the only
   semantically valid mode -- Update would re-emit per-batch state changes,
   Complete would re-emit the full result table per batch, neither matching
   batch CDC semantics. UnsupportedOperationChecker now detects the rewrite
   by the `__spark_cdc_events` helper aggregate expression and throws
   STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION for non-Append modes.

3. Tighten the `_commit_timestamp` streaming contract in `Changelog.java`.
   The previous "non-decreasing across micro-batches" wording was too weak:
   Spark's stateful aggregate evicts groups with `eventTime <= watermark`
   (statefulOperators.scala:643-650), so equal-timestamp rows in a later
   micro-batch would be dropped as late. The contract now requires that all
   rows of a single commit appear in the same micro-batch -- the natural
   atomic-commit emission pattern of real CDC connectors (Delta versions,
   Iceberg snapshots) -- which makes the zero-delay watermark sound.

Adds a plan-shape test asserting no watermark metadata leaks to user-visible
output, and two end-to-end negative tests covering Update / Complete output
mode rejection.
…treaming CDC

Address the second sub-case from viirya's review on apache#55636. The previous
contract change covered the same-commit-split-across-micro-batches case via
"all rows of a single commit must appear in the same micro-batch", but missed
the case where two DIFFERENT commits with the same `_commit_timestamp` arrive
in different micro-batches.

Spark's late-event filter and state-eviction predicate both use
`LessThanOrEqual` (`statefulOperators.scala:633-651`), so once a micro-batch
observes max event time T and advances the watermark to T, any later row at
exactly `_commit_timestamp = T` is silently dropped as late. The
atomic-microbatch contract alone doesn't rule this out for distinct commits.

Adds a second contract requirement: distinct `_commit_version` values must
have distinct `_commit_timestamp` values when streaming post-processing is
enabled. Atomic-commit CDC connectors that derive `_commit_timestamp` from
wall-clock time at commit time (Delta, Iceberg) naturally satisfy this.

Doc-only change; no code modifications. The existing tests already exercise
the supported cases; the unsupported case 2 is by definition a connector
contract violation, so we don't add a test for it.
* 6. Final [[Project]] (via [[removeHelperColumns]]) drops `__spark_cdc_*` helpers so
* the output schema matches the connector's declared schema.
*/
private def addStreamingRowLevelPostProcessing(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we enforce non-null commit timestamp similar to CdcNetChangesStatefulProcessor.scala?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call -- added a matching analyzer-level guard for the row-level path. The streaming row-level rewrite now starts with a Filter that raises CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP (via RaiseError) on any row with a NULL _commit_timestamp, mirroring the runtime guard in CdcNetChangesStatefulProcessor.

A NULL _commit_timestamp would silently stall the row-level path (the downstream Aggregate uses _commit_timestamp as both the watermark column and a grouping key, and the eventTime <= watermark eviction predicate is never satisfied for a NULL key, so the group sits in state forever producing no output and no error). Failing fast at the analyzer level surfaces the contract violation immediately.

Lives on the row-level PR (#55636) since the method being reviewed is in that PR; this branch picks it up via rebase. Plan-shape and end-to-end tests added on #55636 too. Pushed in 791d5ce3246 (row-level PR), now visible on this PR after the rebase.

…ermark-strip

Two follow-ups on the streaming CDC row-level rewrite:

1. `dev/lint-scala` runs scalafmt on `sql/api`; my prior edit to
   `DataStreamReader.changes()` left the Scaladoc lines wrapped at the
   wrong column. Re-flowed via
   `./build/mvn scalafmt:format -pl sql/api`.

2. Updated the user-visible Scaladoc on `DataStreamReader.changes()` to
   reflect the watermark-metadata strip from dee5e84. The previous wording
   said "the watermark metadata is preserved on the user-visible
   `_commit_timestamp` output ... global watermark becomes the min of the
   two" -- that was accurate before the strip, but is now stale. The new
   wording says the metadata is stripped (so downstream user-supplied
   watermarks do not interact with it via the global multi-watermark
   policy) and explicitly notes that streaming row-level post-processing
   constrains the query to Append output mode.

Note: the Java unidoc CI step is failing on an unrelated pre-existing
name-clash error in `core/target/java/.../JavaSparkContext.java:415`
(`<K,V>union(Seq<JavaPairRDD<K,V>>)` vs `<T>union(Seq<JavaRDD<T>>)` --
same erasure). Verified identical to upstream master, so it's not from
this PR.
…treaming row-level rewrite

Address @zikangh's review on apache#55637 -- the streaming row-level rewrite should
enforce non-NULL _commit_timestamp, mirroring the runtime guard in
CdcNetChangesStatefulProcessor.

A NULL _commit_timestamp on a streaming read is a connector contract
violation that would silently stall the row's group: the downstream
streaming Aggregate uses _commit_timestamp as an event-time watermark
column AND a grouping key, and Spark's eviction predicate is
LessThanOrEqual(eventTime, watermark) -- a NULL group key never
satisfies that, so the group sits in state until end of stream
producing no output and no error.

Add a Filter at the top of the streaming row-level rewrite that raises
CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP via the same
RaiseError pattern used for the multiple-changes-per-row-version guard
in the batch path. Also adds the new error class to
error-conditions.json.

Tests:
 - Plan-shape tests: assert the guard Filter is present and sits
   directly above the streaming relation (so it runs before any
   downstream operator sees the NULL).
 - End-to-end test: feeding a row with a NULL _commit_timestamp
   surfaces CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP at the
   streaming query level rather than producing no output.
 - Existing carry-over / update-detection plan-shape tests updated for
   the extra guard Filter (was 1 -> now 2 Filters in carry-over and
   combined paths; was 0 -> now 1 in update-detection-only).

Also refreshed the addStreamingRowLevelPostProcessing Scaladoc to add a
step 0 (the guard) and step 7 (the watermark-metadata strip), keeping
the per-operator detail aligned with the rewrite's actual shape.

Doc-only side effect: scalafmt reflowed the watermark-metadata bullet
in DataStreamReader.changes() Scaladoc (no semantic change).
Implements `deduplicationMode = netChanges` for DSv2 CDC streaming reads,
completing the streaming post-processing surface. The previous PR
(SPARK-56686) added carry-over removal and update detection for streaming
but kept netChanges batch-only.

The batch path uses a Catalyst Window partitioned by `rowId` and ordered
by `(_commit_version, change_type_rank)` to find the first/last events
per row identity, then applies the SPIP collapse matrix on
`(existedBefore, existsAfter)`. Window is rejected on streaming children.

This PR adds a streaming-friendly equivalent by delegating per-row-identity
state management to a new CdcNetChangesStatefulProcessor driven by
TransformWithState. The processor stores the first event ever observed and
the most-recent event observed for each rowId, and arms an event-time timer
on the latest `_commit_timestamp` for the key. When the global watermark
advances past the timer, handleExpiredTimer evaluates the SPIP matrix and
emits 0, 1, or 2 output rows -- identical semantics to the batch path.

Plan shape (streaming netChanges):
  EventTimeWatermark(_commit_timestamp, 0s)
    -> Project (alias rowId expressions to flat helper columns)
    -> TransformWithState (grouping=rowId helpers, EventTime mode, Append)
    -> SerializeFromObject
    -> Project (drop rowId helper columns)

When carry-over removal / update detection are also requested, the
row-level rewrite is applied first; the netChanges TransformWithState then
sits on top, sharing the same EventTimeWatermark (it's reused rather than
stacked, which would be rejected by the multi-watermark check).

Documented limitation: row identities only touched in the latest observed
commit are held back until a later commit (with strictly greater
`_commit_timestamp`) advances the watermark past them, or the source
terminates. End-of-input flushes all timers, so bounded streams produce
output equivalent to the corresponding batch read.

Removes the now-obsolete error class
INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED that was introduced
by the previous PR; updates the DataStreamReader.changes() and
Changelog.java Scaladoc to describe the new behavior and the latest-commit
limitation. Tests in ChangelogResolutionSuite,
ResolveChangelogTablePostProcessingSuite, and the new plan-shape suite
flip from "expect throw" to "expect TransformWithState in plan", and
ChangelogEndToEndSuite gains two streaming netChanges end-to-end tests
covering the SPIP matrix's cancel and persist-via-update cases.
…d test fixes

- Tighten Changelog.java to require LONG for _commit_version (post-processing
  compares versions as primitive longs); update the NULL _commit_timestamp
  clause to point to the new error class.
- CdcNetChangesStatefulProcessor: replace the internalError-with-string-prefix
  pattern with structured CHANGELOG_CONTRACT_VIOLATION.UNEXPECTED_CHANGE_TYPE
  (matches batch path) and add a NULL _commit_timestamp guard raising
  CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP rather than NPE'ing on
  getTime().
- Hoist `inputSchema.fieldIndex("_change_type")` out of `relabel` to avoid a
  linear scan per emitted row.
- Expand `addStreamingNetChangeComputation` Scaladoc Step 1 to describe the
  watermark reuse for combined row-level + netChanges paths.
- ResolveChangelogTableStreamingPostProcessingSuite: assert the
  TransformWithState grouping attributes (so a regression that grouped by the
  wrong attributes can't pass the size check alone).
- ChangelogEndToEndSuite "labels persisting rows as updates": rewrite the
  fixture so the row identity actually exercises (existedBefore=true,
  existsAfter=true) and the (true,true) cell of the SPIP matrix is asserted to
  emit update_preimage + update_postimage; the previous fixture started with
  an INSERT so it fell into the (false,true) cell and emitted INSERT.
- DataStreamReader.changes(): broaden the post-processing "Two implications
  follow" intro from row-level-only to all post-processing modes so the
  netChanges path is covered.

Co-authored-by: Isaac
…rowId

- ChangelogEndToEndSuite: 3 new e2e tests covering missing SPIP matrix cells
  ((true, false) DELETE, (true, true) without computeUpdates), and combined
  netChanges + carry-over removal post-processing path.
- ResolveChangelogTableStreamingPostProcessingSuite: assert grouping
  attribute on netChanges-alone test; add composite rowId plan-shape test.
- Remove stale "deduplicationMode=netChanges is rejected on streaming" test
  (netChanges is now supported on streaming) and the now-unused
  AnalysisException import.

Co-authored-by: Isaac
@gengliangwang gengliangwang force-pushed the streamingCDC-netChanges branch from e2f65a2 to e31cd2b Compare May 1, 2026 04:50
Iterator.empty
}

override def handleExpiredTimer(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

handleExpiredTimer emits the current net result and then clears both first/last state. In a long-running stream, the same rowId can receive later commits after an unrelated commit advances the watermark. Example: id=1 insert at v1, id=2 change at v2 advances watermark and emits id=1 insert, then id=1 delete at v3 emits a later delete. Batch netChanges over v1..v3 would cancel to no row, so this is not equivalent to the documented range-scoped netChanges semantics. The tests add all rows before starting the query, so they do not cover a later event arriving after an earlier timer fired.

timerValues: TimerValues): Iterator[Row] = {
val handle = getHandle
val sorted = inputRows.toSeq.sortBy { row =>
val v = row.getAs[Long]("_commit_version")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changelog contract says _commit_version is connector-defined, while the batch path sorts the attribute generically through Catalyst. This processor hard-casts _commit_version with getAs[Long], so any connector using another orderable version type will fail at runtime or be unsupported only on the streaming path. The plan should either preserve Catalyst ordering/ranking before entering the processor, or explicitly validate/restrict the version type.

* .changes("my_table")
* }}}
*
* Streaming reads support all of the same post-processing as batch reads -- `computeUpdates`,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR body notes that TransformWithState requires RocksDB, but the changes() Scaladoc says streaming supports all post-processing without mentioning that the default state store will fail at query start. Since this is an analyzer-injected operator rather than user-written transformWithState, users have little reason to know they must set the RocksDB provider.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants