Skip to content
9 changes: 7 additions & 2 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,11 @@
"The Change Data Capture (CDC) connector violated the `Changelog` contract at runtime."
],
"subClass" : {
"NULL_COMMIT_TIMESTAMP" : {
"message" : [
"Connector emitted a row with a NULL `_commit_timestamp` on a streaming read engaging post-processing. The `Changelog` contract requires `_commit_timestamp` to be non-NULL for streaming reads, since post-processing uses it as event time to advance the watermark."
]
},
"UNEXPECTED_CHANGE_TYPE" : {
"message" : [
"Connector emitted a row with a `_change_type` value that is not one of the four supported types (`insert`, `delete`, `update_preimage`, `update_postimage`). The `Changelog` contract requires every emitted row to carry one of these four values."
Expand Down Expand Up @@ -3297,9 +3302,9 @@
"`startingVersion` is required when `endingVersion` is specified for CDC queries."
]
},
"STREAMING_POST_PROCESSING_NOT_SUPPORTED" : {
"STREAMING_NET_CHANGES_NOT_SUPPORTED" : {
"message" : [
"Change Data Capture (CDC) streaming reads on connector `<changelogName>` do not yet support post-processing (carry-over removal, update detection, or net change computation). The requested combination of options would require post-processing, which is currently only available for batch reads. Use a batch read, or set `deduplicationMode = none` and `computeUpdates = false` to receive raw change rows in streaming."
"Change Data Capture (CDC) streaming reads on connector `<changelogName>` do not yet support net change computation (`deduplicationMode = netChanges`). Net change computation reasons over the entire requested version range and is currently only available for batch reads. Use a batch read, or set `deduplicationMode` to `none` or `dropCarryovers` for streaming."
]
},
"UPDATE_DETECTION_REQUIRES_CARRY_OVER_REMOVAL" : {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,26 @@ abstract class DataStreamReader {
* .changes("my_table")
* }}}
*
* Streaming reads support the same `computeUpdates` and `deduplicationMode = dropCarryovers`
* post-processing as batch reads. `deduplicationMode = netChanges` is currently batch-only --
* it requires reasoning over the entire requested range, which is not incrementalized yet.
* Requesting it on a streaming read raises an explicit
* `INVALID_CDC_OPTION.STREAMING_NET_CHANGES_NOT_SUPPORTED` error.
*
* When the requested options engage row-level post-processing (carry-over removal or update
* detection), the rewrite injects an internal `EventTimeWatermark` on `_commit_timestamp` and a
* stateful streaming aggregate. Two implications follow:
* - A commit's events are emitted in the next micro-batch after the commit is read
* (append-mode aggregate eviction is `eventTime <= watermark`, and the watermark advances
* to the max `_commit_timestamp` observed in the previous batch). A stream that reads its
* last commit and stops will keep that commit's events in state until a subsequent
* (no-data) micro-batch fires.
* - The query is constrained to `Append` output mode; `Update` and `Complete` are rejected at
* writer-start time with `STREAMING_OUTPUT_MODE.UNSUPPORTED_OPERATION`. The internal
* watermark metadata is stripped from the user-visible `_commit_timestamp` output, so
* downstream user-supplied watermarks on other columns do not interact with it via the
* global multi-watermark policy.
*
* @param tableName
* a qualified or unqualified name that designates a table.
* @since 4.2.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,34 @@
* {@code update_preimage}, or {@code update_postimage}</li>
* <li>{@code _commit_version} (connector-defined type, e.g. LONG) — the version containing
* this change</li>
* <li>{@code _commit_timestamp} (TIMESTAMP) — the timestamp of the commit</li>
* <li>{@code _commit_timestamp} (TIMESTAMP) -- the timestamp of the commit. All rows
* belonging to a single {@code _commit_version} must share the same
* {@code _commit_timestamp}. For streaming reads with post-processing enabled,
* two additional requirements apply:
* <ol>
* <li>All rows of a single commit must appear in the same micro-batch (i.e.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new requirements fix the “same commit split across batches” case and the “same timestamp in later batch” case only if timestamps also arrive in increasing event-time order. But the doc no longer explicitly requires that every later micro-batch has _commit_timestamp greater than the previous watermark/max.

Example:

batch 1: commit v2, ts = 20
batch 2: commit v3, ts = 10

Timestamps are distinct, and each commit is atomic, but batch 2 is late after watermark 20. So the real required invariant is closer to: no later micro-batch may contain rows with _commit_timestamp <= previous max event time. Also, “distinct commit versions must have distinct timestamps” is stronger than necessary and may be unrealistic for ms-resolution commit timestamps; equal timestamps are safe if all such commits are emitted before the watermark advances.

* micro-batch boundaries align with commit boundaries).</li>
* <li>Distinct {@code _commit_version} values must have distinct
* {@code _commit_timestamp} values.</li>
* </ol>
* Streaming post-processing uses {@code _commit_timestamp} as event time with a
* zero-delay watermark, so once a micro-batch observes max event time T the
* global watermark advances to T. Both Spark's late-event filter and its
* state-eviction predicate then use {@code eventTime <= T} -- so any later row
* at exactly {@code _commit_timestamp = T} (whether from the same commit split
* across batches, or from a different commit that happens to share T) is
* silently dropped as late. Requirement 1 rules out the same-commit case;
* requirement 2 rules out the different-commit case. Atomic-commit CDC connectors
* (e.g. Delta versions, Iceberg snapshots) that derive {@code _commit_timestamp}
* from wall-clock time at commit time naturally satisfy both requirements.
* Behavior is undefined if {@code _commit_timestamp} is {@code NULL} on any row
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It says NULL behavior is “undefined,” but the latest code now intentionally raises CHANGELOG_CONTRACT_VIOLATION.NULL_COMMIT_TIMESTAMP. That doc should be updated to match the new fail-fast behavior.

* of a streaming read engaging post-processing -- a NULL group key never advances
* the watermark, which can stall emission of that group indefinitely</li>
* </ul>
* <p>
* Streaming reads support carry-over removal and update detection but not net change
* computation. The latter requires reasoning over the entire requested range and is
* batch-only.
*
* @since 4.2.0
*/
Expand Down Expand Up @@ -81,6 +107,12 @@ public interface Changelog {
* Spark will collapse multiple changes per row identity into the net effect.
* If {@code false}, the connector guarantees at most one change per row identity across
* the entire changelog range, and Spark will skip net change computation.
* <p>
* Note this flag is range-scoped (across all commits in the request), not
* micro-batch-scoped. Streaming CDC reads currently reject
* {@code deduplicationMode = netChanges} because the per-row-identity collapse cannot
* be incrementalized: a row's full history may span an unbounded number of
* micro-batches.
*/
boolean containsIntermediateChanges();

Expand Down
Loading