Skip to content

[SPARK-55948][SQL] Add DSv2 CDC connector API, analyzer resolution, and SQL CHANGES clause#54738

Closed
gengliangwang wants to merge 8 commits intoapache:masterfrom
gengliangwang:cdc-sql
Closed

[SPARK-55948][SQL] Add DSv2 CDC connector API, analyzer resolution, and SQL CHANGES clause#54738
gengliangwang wants to merge 8 commits intoapache:masterfrom
gengliangwang:cdc-sql

Conversation

@gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Mar 10, 2026

What changes were proposed in this pull request?

This PR introduces the DSv2 connector API, analyzer resolution, and SQL CHANGES clause.

Connector API (new Java interfaces)

  • Changelog: The central connector interface for CDC. Connectors implement this to expose row-level change data with metadata columns (_change_type, _commit_version, _commit_timestamp). Connectors declare properties (containsCarryoverRows, containsIntermediateChanges, representsUpdateAsDeleteAndInsert) that tell Spark what post-processing is needed.
  • ChangelogInfo: Encapsulates CDC query parameters — range, deduplication mode (NONE, DROP_CARRYOVERS, NET_CHANGES), and computeUpdates flag.
  • ChangelogRange: Sealed interface with three record types — VersionRange, TimestampRange, and Unbounded.
  • TableCatalog.loadChangelog(Identifier, ChangelogInfo): Entry point for catalogs to provide a Changelog.
  • TableCatalogCapability.SUPPORT_CHANGELOG: Catalog capability flag that gates CDC support.

Analyzer resolution

  • RelationChanges: Unresolved logical node representing a CDC query.
  • ChangelogTable: DSv2 Table wrapper around a Changelog, declaring BATCH_READ and MICRO_BATCH_READ capabilities.
  • Resolution rule in Analyzer: resolves RelationChanges -> looks up catalog -> calls loadChangelog() -> wraps in ChangelogTable -> creates DataSourceV2Relation (batch) or StreamingRelationV2 (streaming).
  • CheckAnalysis: Reports TABLE_OR_VIEW_NOT_FOUND for unresolved RelationChanges.
  • CTESubstitution: Blocks CDC queries on CTE relations (mirrors time travel restriction).

Utilities

  • ChangelogInfoUtils: Parses DataFrame API options into a ChangelogInfo.
  • TimeTravelSpec.resolveTimestampExpression(): Extracted shared method for timestamp evaluation, deduplicating logic between time travel and CDC.

SQL CHANGES clause

Batch CDC queries:

SELECT * FROM catalog.table CHANGES FROM VERSION 1 TO VERSION 5
SELECT * FROM catalog.table CHANGES FROM VERSION 1
SELECT * FROM catalog.table CHANGES FROM TIMESTAMP '2026-01-01' TO TIMESTAMP '2026-06-01'

Streaming CDC queries:

STREAM catalog.table CHANGES
STREAM catalog.table CHANGES IDENTIFIED BY source_name
  • ANTLR grammar (SqlBaseLexer.g4, SqlBaseParser.g4): Adds CHANGES, FROM VERSION, FROM TIMESTAMP, TO VERSION, TO TIMESTAMP syntax to relationPrimary. Adds STREAM ... CHANGES variant to streamRelation.
  • AstBuilder: Parses the CHANGES clause into a RelationChanges node with a ChangelogInfo. Reuses TimeTravelSpec.resolveTimestampExpression() for timestamp evaluation.

Test infrastructure

  • InMemoryChangelogCatalog: Test catalog with pre-populated change rows and a working scan pipeline. Reports containsCarryoverRows = false.

Why are the changes needed?

Spark currently has no standardized framework for Change Data Capture (CDC) queries via DSv2 connectors. This PR establishes the connector API contract, analyzer resolution, and SQL syntax so that connectors can expose row-level change data in a uniform way.

Does this PR introduce any user-facing change?

Yes. New SQL syntax for querying change data:

SELECT * FROM my_catalog.my_table CHANGES FROM VERSION 1 TO VERSION 5
SELECT id, _change_type FROM my_catalog.my_table CHANGES FROM TIMESTAMP '2026-01-01'

The query returns the table's data columns plus CDC metadata columns: _change_type (STRING), _commit_version (LONG), _commit_timestamp (TIMESTAMP).

How was this patch tested?

  • ChangelogInfoUtilsSuite — 18 unit tests covering version/timestamp range parsing, deduplication modes, bound inclusivity, timezone handling, and error cases.
  • PlanParserSuite — Parser tests covering version ranges, timestamp ranges, streaming syntax, error cases, and CHANGES as an identifier.
  • ChangelogResolutionSuite — 5 SQL-path resolution tests: successful resolution, catalog without CDC capability, table not found, schema verification, CTE restriction.
  • ChangelogEndToEndSuite — 5 SQL-path end-to-end tests using InMemoryChangelogCatalog: full data retrieval, column projection, filtering, schema verification, empty results.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

gengliangwang and others added 2 commits March 10, 2026 11:18
Introduce the foundational Change Data Capture (CDC) framework for DSv2:

- Changelog interface: connector contract for exposing change data
- ChangelogInfo/ChangelogRange: CDC query parameters (version/timestamp range,
  deduplication mode, compute updates)
- TableCatalog.loadChangelog() and SUPPORT_CHANGELOG capability
- ChangelogTable: DSv2 Table wrapper for Changelog
- RelationChanges: unresolved logical node for CDC queries
- Analyzer resolution: RelationChanges -> ChangelogTable -> DataSourceV2Relation
- ChangelogInfoUtils: utility for parsing CDC options
- TimeTravelSpec: extract shared resolveTimestampExpression method
- Error conditions for CDC validation
- InMemoryChangelogCatalog: test-only implementation with scan support
- ChangelogInfoUtilsSuite: unit tests

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Add SQL syntax for querying Change Data Capture (CDC) data:

  SELECT * FROM table CHANGES FROM VERSION 1 TO VERSION 5
  SELECT * FROM table CHANGES FROM TIMESTAMP '2026-01-01'
  STREAM table CHANGES [IDENTIFIED BY name]

- ANTLR grammar: CHANGES, FROM VERSION/TIMESTAMP, TO VERSION/TIMESTAMP
- AstBuilder: parse CHANGES clause into RelationChanges node
- Reuse TimeTravelSpec.resolveTimestampExpression for timestamp evaluation
- Parser tests (PlanParserSuite): 77 tests
- Resolution tests (ChangelogResolutionSuite): SQL path tests
- End-to-end tests (ChangelogEndToEndSuite): SQL path with data

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@gengliangwang gengliangwang changed the title [SPARK-XXXXX][SQL] Add SQL CHANGES clause for CDC queries [SPARK-XXXXX][SQL] Add DSv2 CDC connector API, analyzer resolution, and SQL CHANGES clause Mar 10, 2026
@gengliangwang gengliangwang changed the title [SPARK-XXXXX][SQL] Add DSv2 CDC connector API, analyzer resolution, and SQL CHANGES clause [SPARK-55948][SQL] Add DSv2 CDC connector API, analyzer resolution, and SQL CHANGES clause Mar 10, 2026
Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments. I will take anther look.

* If {@code false}, the connector guarantees at most one change per row identity per
* commit version, and Spark will skip net change computation.
*/
boolean containsIntermediateChanges();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description says "Spark handles post-processing based on the properties declared by the connector." But no code in this PR ever reads these flags or performs any post-processing. They're part of a public API contract that Spark currently ignores entirely. This risks connectors shipping against this API expecting Spark to do the post-processing, and silently getting wrong results.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we should throw an exception for now for unsupported cases, including when user specifies options which are currently ignored as far as I can tell

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if (value == null) {
throw new AnalysisException(
"INVALID_TIME_TRAVEL_TIMESTAMP_EXPR.OPTION",
Map("expr" -> s"'$timestampStr'"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a user passes a bad timestamp string to DataFrameReader.changes(), the error message will say "invalid time travel timestamp" — confusing for a CDC query. A CDC-specific error condition should be used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

Comment on lines +2445 to +2450
// Default deduplication mode and computeUpdates. These can be overridden via WITH options
// at a later stage, but the grammar's optionsClause is handled separately.
new ChangelogInfo(
range,
ChangelogInfo.DeduplicationMode.DROP_CARRYOVERS,
false)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deduplicationMode and computeUpdates are hardcoded in the SQL Path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

*/
record Unbounded() implements ChangelogRange {
@Override public boolean startingBoundInclusive() { return true; }
@Override public boolean endingBoundInclusive() { return true; }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unbounded has no bounds at all — returning true for these is arbitrary and misleading. A connector receiving Unbounded would call startingBoundInclusive() and get true, which implies "the non-existent starting bound is inclusive." These methods could be removed from the sealed interface (since Unbounded doesn't need them) or the Unbounded implementations could throw UnsupportedOperationException.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

(INCLUSIVE | endExclusive=EXCLUSIVE)?)?
;

streamChangesClause
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a comment to highlight that the difference with changesClause is that startingVersion/startingTimestamp is optional

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

Comment on lines 434 to 438
: STREAM multipartIdentifier streamChangesClause
optionsClause? identifiedByClause?
watermarkClause? tableAlias #streamChangelogTableName
| STREAM multipartIdentifier optionsClause?
identifiedByClause? watermarkClause? tableAlias #streamTableName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you instead do:

Suggested change
: STREAM multipartIdentifier streamChangesClause
optionsClause? identifiedByClause?
watermarkClause? tableAlias #streamChangelogTableName
| STREAM multipartIdentifier optionsClause?
identifiedByClause? watermarkClause? tableAlias #streamTableName
: STREAM multipartIdentifier streamChangesClause?
optionsClause? identifiedByClause?
watermarkClause? tableAlias #streamTableName

to avoid duplicating the syntax and logic in AstBuilder for change log vs. regular relations?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, updated

* If {@code false}, the connector guarantees at most one change per row identity per
* commit version, and Spark will skip net change computation.
*/
boolean containsIntermediateChanges();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, we should throw an exception for now for unsupported cases, including when user specifies options which are currently ignored as far as I can tell

@gengliangwang
Copy link
Member Author

@viirya @johanl-db Thanks for the reviews. I have addressed all of them. PTAL.

Copy link
Contributor

@aokolnychyi aokolnychyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking mostly good to me.

},
"CHANGE_DATA_CAPTURE_ON_RELATION" : {
"message" : [
"Change Data Capture (CDC) on the relation: <relationId>."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm? Should this be Relation <relationId> does not support Change Data Capture (CDC).?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

"subClass" : {
"CONFLICTING_RANGE_TYPES" : {
"message" : [
"Cannot specify both version and timestamp ranges for CDC queries. Use either startingVersion/endingVersion or startingTimestamp/endingTimestamp."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we use backticks or quotes around option names? Applies to all relevant places in this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Added backticks around all option names in error messages.


changesClause
: CHANGES FROM (SYSTEM_VERSION | VERSION) startingVersion=version
(INCLUSIVE | startExclusive=EXCLUSIVE)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Does it make sense to put this on a separate line? What about the formatting below?

changesClause
    : CHANGES FROM (SYSTEM_VERSION | VERSION) startingVersion=version (INCLUSIVE | startExclusive=EXCLUSIVE)?
        (TO (SYSTEM_VERSION | VERSION) endingVersion=version (INCLUSIVE | endExclusive=EXCLUSIVE)?)?
    | CHANGES FROM (SYSTEM_TIME | TIMESTAMP) startingTimestamp=valueExpression (INCLUSIVE | startExclusive=EXCLUSIVE)?
        (TO (SYSTEM_TIME | TIMESTAMP) endingTimestamp=valueExpression (INCLUSIVE | endExclusive=EXCLUSIVE)?)?
    ;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// without an explicit starting point) and there is no ending bound (streaming is open-ended).
streamChangesClause
: CHANGES (FROM (SYSTEM_VERSION | VERSION) startingVersion=version
(INCLUSIVE | startExclusive=EXCLUSIVE)?)?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Same comment about having bounds on the same line.

streamChangesClause
    : CHANGES (FROM (SYSTEM_VERSION | VERSION) startingVersion=version (INCLUSIVE | startExclusive=EXCLUSIVE)?)?
    | CHANGES (FROM (SYSTEM_TIME | TIMESTAMP) startingTimestamp=valueExpression (INCLUSIVE | startExclusive=EXCLUSIVE)?)?
    ;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

boolean containsCarryoverRows();

/**
* Whether the raw change data may contain multiple intermediate states per row identity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it per commit version or per changelog range? Say if a changelog covers 3 commits and within each commit we have unique changes but the same rows got modified across 3 commits?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. It's per changelog range. Clarified the javadoc to say "within the requested changelog range (across all commit versions in the range)".

changelog.newScanBuilder(options)
}

override def capabilities(): JSet[TableCapability] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about Java EnumSet?

override def name: String = changelog.name
override def columns: Array[Column] = changelog.columns
override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, MICRO_BATCH_READ)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Some(catalog),
Some(ident),
None
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why dangling ) on a separate line? Seems inconsistent with the rest of the code?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

)
} else {
DataSourceV2Relation.create(
changelogTable, Some(catalog), Some(ident), u.options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why on a separate line?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Version-based range
val startVersion = visitVersion(ctx.startingVersion).get
val endVersion = visitVersion(ctx.endingVersion)
new ChangelogRange.VersionRange(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Direct imports?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

new ChangelogRange.TimestampRange(
startTsValue,
endTsValue.map(java.lang.Long.valueOf)
.map(java.util.Optional.of[java.lang.Long])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some places in this PR use qualified imports some use named aliases like JXXX. Be consistent.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Now using direct imports consistently.

expandIdentifier(u.multipartIdentifier) match {
case CatalogAndIdentifier(catalog, ident) =>
val tableCatalog = catalog.asTableCatalog
if (!tableCatalog.capabilities().contains(TableCatalogCapability.SUPPORT_CHANGELOG)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the value of the capability if we check it right before calling loadChangelog? We could have just defaulted loadChangelog to throw an exception?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Removed the capability. Now loadChangelog defaults to throwing UnsupportedOperationException, and RelationResolution catches it to produce a proper AnalysisException.

*
* @since 4.2.0
*/
SUPPORT_CHANGELOG
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The value of this capability is not clear as it is checked right before calling loadChangelog.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Removed SUPPORT_CHANGELOG entirely.

@aokolnychyi
Copy link
Contributor

@gengliangwang, there are a few places that use fully qualified imports for Java objects in ChangelogInfoUtils. You can apply the same named import for JLong JOptional as you do in other places in this PR but it is up to you.

@gengliangwang
Copy link
Member Author

there are a few places that use fully qualified imports for Java objects in ChangelogInfoUtils. You can apply the same named import for JLong JOptional as you do in other places in this PR but it is up to you.

Thanks, updated

tableCatalog.loadChangelog(ident, changelogInfo)
} catch {
case _: UnsupportedOperationException =>
throw QueryCompilationErrors.cdcNotSupportedError(tableCatalog.name())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to catch NoSuchTableException here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For NoSuchTableException, let's simply throw it

* Shared by time travel and CDC timestamp resolution.
*/
def resolveTimestampExpression(ts: Expression, sessionLocalTimeZone: String): Long = {
assert(ts.resolved && ts.references.isEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In TimeTravelSpec.scala, the original TimeTravelSpec.create had:

assert(ts.resolved && ts.references.isEmpty && !SubqueryExpression.hasSubquery(ts))

After the refactoring, the extracted shared helper resolveTimestampExpression has:

assert(ts.resolved && ts.references.isEmpty)

The subquery check was moved back into the TimeTravelSpec.create call site, but the CDC path in buildChangelogInfo calls resolveTimestampForChanges which delegates to resolveTimestampExpression — without the subquery check. This means:

SELECT * FROM t CHANGES FROM TIMESTAMP (SELECT MAX(ts) FROM other_table)

would not be rejected at parse time for CDC, even though it would be for time travel?

Copy link
Member Author

@gengliangwang gengliangwang Mar 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch!

changelog.newScanBuilder(options)
}

override def capabilities: JSet[TableCapability] = JEnumSet.of(BATCH_READ, MICRO_BATCH_READ)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every ChangelogTable unconditionally declares MICRO_BATCH_READ, regardless of whether the underlying Changelog.newScanBuilder actually returns a MicroBatchStream-capable scan.

The Changelog interface has no capabilities() method for the connector to declare what it supports. A batch-only connector would cause a runtime failure in a streaming plan rather than a clean error at analysis time.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is by design and part of the SPIP. The default streaming implementation in Scan.java intentionally throws an exception:

  default MicroBatchStream toMicroBatchStream(String checkpointLocation) {
    throw new SparkUnsupportedOperationException(
      "_LEGACY_ERROR_TEMP_3148", Map.of("description", description()));
  }

  default ContinuousStream toContinuousStream(String checkpointLocation) {
    throw new SparkUnsupportedOperationException(
      "_LEGACY_ERROR_TEMP_3149", Map.of("description", description()));
  }

The upside of this approach is that it avoids adding extra APIs to the Changelog. I'd prefer to keep it simple for now. If you have strong opinions on this, feel free to create a follow-up PR where we can discuss it further, as this current PR is already quite large.


override def loadChangelog(
ident: Identifier,
changelogInfo: ChangelogInfo): Changelog = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changelogInfo is ignored entirely?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems no streaming CDC end-to-end test, and no test for STREAM t CHANGES (bare, no FROM)?

@gengliangwang
Copy link
Member Author

Seems no streaming CDC end-to-end test, and no test for STREAM t CHANGES (bare, no FROM)?

Thanks, added tests. PTAL

@gengliangwang
Copy link
Member Author

@viirya @johanl-db @aokolnychyi thanks for the reviews. I am merging to master.

gengliangwang added a commit that referenced this pull request Mar 20, 2026
…rt for CDC queries

### What changes were proposed in this pull request?

This is PR 2/2 of the CDC framework. It adds the DataFrame API (`changes()`) and Spark Connect support. Depends on #54738.

**DataFrame API — new methods on `DataFrameReader` and `DataStreamReader`:**
```scala
// Batch
spark.read
  .option("startingVersion", "1")
  .option("endingVersion", "5")
  .option("deduplicationMode", "netChanges")
  .changes("catalog.table")

// Streaming
spark.readStream
  .option("startingVersion", "1")
  .changes("catalog.table")
```

- API definitions in `sql/api` (`DataFrameReader.changes()`, `DataStreamReader.changes()`). Currently only supported for DSv2 tables whose catalog implements `TableCatalog.loadChangelog()`.
- Classic implementation in `sql/core`: Parses options via `ChangelogInfoUtils.fromOptions()` using the session timezone, creates `UnresolvedRelation` + `RelationChanges`.

**Spark Connect support:**
- Proto (`relations.proto`): New `RelationChanges` message (field 46) with `unparsed_identifier`, `options` map, and `is_streaming` flag.
- Client (`DataFrameReader.scala`, `DataStreamReader.scala`): Serializes `changes()` calls into `RelationChanges` proto.
- Server (`SparkConnectPlanner.scala`): `transformRelationChanges()` deserializes the proto, calls `ChangelogInfoUtils.fromOptions()` with the session timezone, and constructs `UnresolvedRelation` + `RelationChanges`.

### Why are the changes needed?

Users need a programmatic API to query CDC data, complementing the SQL `CHANGES` clause added in #54738. The DataFrame API provides type-safe, composable access to change data with the standard option-based configuration pattern. Spark Connect support ensures feature parity for remote Spark sessions.

### Does this PR introduce _any_ user-facing change?

Yes. New DataFrame API methods:
```scala
// Before: spark.read.changes("table") throws SparkUnsupportedOperationException
// After:
val changes = spark.read
  .option("startingVersion", "1")
  .changes("my_catalog.my_table")

changes.filter("_change_type = 'insert'").show()
```

Both `DataFrameReader.changes()` and `DataStreamReader.changes()` are now functional in classic and Spark Connect sessions.

### How was this patch tested?

- **`ChangelogResolutionSuite`** — 5 tests: `changes()` resolution, catalog without CDC support, schema rejection on `readStream`, streaming resolution to `StreamingRelationV2`, streaming catalog error.
- **`ChangelogEndToEndSuite`** — 19 end-to-end tests using `InMemoryChangelogCatalog`, each pairing the DataFrame API with the equivalent SQL syntax:
  - Basic retrieval (3 tests): data retrieval, open-ended version range, empty results with schema validation.
  - Projection/filter/aggregation (4 tests): CDC metadata column selection, projection with filter, aggregation on change types, schema verification.
  - Version range filtering (1 test): inclusive range with boundary rows.
  - Bound inclusivity (4 tests): default inclusive bounds (with explicit `INCLUSIVE` keyword), `startingBoundInclusive=false` (SQL: `EXCLUSIVE`), `endingBoundInclusive=false`, both bounds exclusive.
  - CDC options (3 tests): default `dropCarryovers` mode, `deduplicationMode=none`, `netChanges` with `computeUpdates`.
  - Timestamp range (1 test): `startingTimestamp`/`endingTimestamp` (SQL: `FROM TIMESTAMP ... TO TIMESTAMP ...`).
  - Error cases (1 test): user-specified schema rejection.
  - Streaming (4 tests): basic retrieval, `startingVersion` filtering, projection with filter, CDC options pass-through — all with both DataFrame API and SQL (`SELECT * FROM STREAM table CHANGES ...`) variants.
  - Streaming error cases (1 test): schema rejection.
- **`PlanGenerationTestSuite`** — 3 golden files: `read_changes`, `read_changes_with_options`, `streaming_changes_API_with_options`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

Closes #54739 from gengliangwang/cdc-dataframe.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
terana pushed a commit to terana/spark that referenced this pull request Mar 23, 2026
…nd SQL CHANGES clause

### What changes were proposed in this pull request?

This PR introduces the DSv2 connector API, analyzer resolution, and SQL `CHANGES` clause.

#### Connector API (new Java interfaces)
- `Changelog`: The central connector interface for CDC. Connectors implement this to expose row-level change data with metadata columns (`_change_type`, `_commit_version`, `_commit_timestamp`). Connectors declare properties (`containsCarryoverRows`, `containsIntermediateChanges`, `representsUpdateAsDeleteAndInsert`) that tell Spark what post-processing is needed.
- `ChangelogInfo`: Encapsulates CDC query parameters — range, deduplication mode (`NONE`, `DROP_CARRYOVERS`, `NET_CHANGES`), and `computeUpdates` flag.
- `ChangelogRange`: Sealed interface with three record types — `VersionRange`, `TimestampRange`, and `Unbounded`.
- `TableCatalog.loadChangelog(Identifier, ChangelogInfo)`: Entry point for catalogs to provide a `Changelog`.
- `TableCatalogCapability.SUPPORT_CHANGELOG`: Catalog capability flag that gates CDC support.

#### Analyzer resolution
- `RelationChanges`: Unresolved logical node representing a CDC query.
- `ChangelogTable`: DSv2 `Table` wrapper around a `Changelog`, declaring `BATCH_READ` and `MICRO_BATCH_READ` capabilities.
- Resolution rule in `Analyzer`: resolves `RelationChanges` -> looks up catalog -> calls `loadChangelog()` -> wraps in `ChangelogTable` -> creates `DataSourceV2Relation` (batch) or `StreamingRelationV2` (streaming).
- `CheckAnalysis`: Reports `TABLE_OR_VIEW_NOT_FOUND` for unresolved `RelationChanges`.
- `CTESubstitution`: Blocks CDC queries on CTE relations (mirrors time travel restriction).

#### Utilities
- `ChangelogInfoUtils`: Parses DataFrame API options into a `ChangelogInfo`.
- `TimeTravelSpec.resolveTimestampExpression()`: Extracted shared method for timestamp evaluation, deduplicating logic between time travel and CDC.

#### SQL CHANGES clause

Batch CDC queries:
```sql
SELECT * FROM catalog.table CHANGES FROM VERSION 1 TO VERSION 5
SELECT * FROM catalog.table CHANGES FROM VERSION 1
SELECT * FROM catalog.table CHANGES FROM TIMESTAMP '2026-01-01' TO TIMESTAMP '2026-06-01'
```

Streaming CDC queries:
```sql
STREAM catalog.table CHANGES
STREAM catalog.table CHANGES IDENTIFIED BY source_name
```

- ANTLR grammar (`SqlBaseLexer.g4`, `SqlBaseParser.g4`): Adds `CHANGES`, `FROM VERSION`, `FROM TIMESTAMP`, `TO VERSION`, `TO TIMESTAMP` syntax to `relationPrimary`. Adds `STREAM ... CHANGES` variant to `streamRelation`.
- `AstBuilder`: Parses the `CHANGES` clause into a `RelationChanges` node with a `ChangelogInfo`. Reuses `TimeTravelSpec.resolveTimestampExpression()` for timestamp evaluation.

#### Test infrastructure
- `InMemoryChangelogCatalog`: Test catalog with pre-populated change rows and a working scan pipeline. Reports `containsCarryoverRows = false`.

### Why are the changes needed?

Spark currently has no standardized framework for Change Data Capture (CDC) queries via DSv2 connectors. This PR establishes the connector API contract, analyzer resolution, and SQL syntax so that connectors can expose row-level change data in a uniform way.

### Does this PR introduce _any_ user-facing change?

Yes. New SQL syntax for querying change data:
```sql
SELECT * FROM my_catalog.my_table CHANGES FROM VERSION 1 TO VERSION 5
SELECT id, _change_type FROM my_catalog.my_table CHANGES FROM TIMESTAMP '2026-01-01'
```

The query returns the table's data columns plus CDC metadata columns: `_change_type` (STRING), `_commit_version` (LONG), `_commit_timestamp` (TIMESTAMP).

### How was this patch tested?

- `ChangelogInfoUtilsSuite` — 18 unit tests covering version/timestamp range parsing, deduplication modes, bound inclusivity, timezone handling, and error cases.
- `PlanParserSuite` — Parser tests covering version ranges, timestamp ranges, streaming syntax, error cases, and `CHANGES` as an identifier.
- `ChangelogResolutionSuite` — 5 SQL-path resolution tests: successful resolution, catalog without CDC capability, table not found, schema verification, CTE restriction.
- `ChangelogEndToEndSuite` — 5 SQL-path end-to-end tests using `InMemoryChangelogCatalog`: full data retrieval, column projection, filtering, schema verification, empty results.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

Closes apache#54738 from gengliangwang/cdc-sql.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
terana pushed a commit to terana/spark that referenced this pull request Mar 23, 2026
…rt for CDC queries

### What changes were proposed in this pull request?

This is PR 2/2 of the CDC framework. It adds the DataFrame API (`changes()`) and Spark Connect support. Depends on apache#54738.

**DataFrame API — new methods on `DataFrameReader` and `DataStreamReader`:**
```scala
// Batch
spark.read
  .option("startingVersion", "1")
  .option("endingVersion", "5")
  .option("deduplicationMode", "netChanges")
  .changes("catalog.table")

// Streaming
spark.readStream
  .option("startingVersion", "1")
  .changes("catalog.table")
```

- API definitions in `sql/api` (`DataFrameReader.changes()`, `DataStreamReader.changes()`). Currently only supported for DSv2 tables whose catalog implements `TableCatalog.loadChangelog()`.
- Classic implementation in `sql/core`: Parses options via `ChangelogInfoUtils.fromOptions()` using the session timezone, creates `UnresolvedRelation` + `RelationChanges`.

**Spark Connect support:**
- Proto (`relations.proto`): New `RelationChanges` message (field 46) with `unparsed_identifier`, `options` map, and `is_streaming` flag.
- Client (`DataFrameReader.scala`, `DataStreamReader.scala`): Serializes `changes()` calls into `RelationChanges` proto.
- Server (`SparkConnectPlanner.scala`): `transformRelationChanges()` deserializes the proto, calls `ChangelogInfoUtils.fromOptions()` with the session timezone, and constructs `UnresolvedRelation` + `RelationChanges`.

### Why are the changes needed?

Users need a programmatic API to query CDC data, complementing the SQL `CHANGES` clause added in apache#54738. The DataFrame API provides type-safe, composable access to change data with the standard option-based configuration pattern. Spark Connect support ensures feature parity for remote Spark sessions.

### Does this PR introduce _any_ user-facing change?

Yes. New DataFrame API methods:
```scala
// Before: spark.read.changes("table") throws SparkUnsupportedOperationException
// After:
val changes = spark.read
  .option("startingVersion", "1")
  .changes("my_catalog.my_table")

changes.filter("_change_type = 'insert'").show()
```

Both `DataFrameReader.changes()` and `DataStreamReader.changes()` are now functional in classic and Spark Connect sessions.

### How was this patch tested?

- **`ChangelogResolutionSuite`** — 5 tests: `changes()` resolution, catalog without CDC support, schema rejection on `readStream`, streaming resolution to `StreamingRelationV2`, streaming catalog error.
- **`ChangelogEndToEndSuite`** — 19 end-to-end tests using `InMemoryChangelogCatalog`, each pairing the DataFrame API with the equivalent SQL syntax:
  - Basic retrieval (3 tests): data retrieval, open-ended version range, empty results with schema validation.
  - Projection/filter/aggregation (4 tests): CDC metadata column selection, projection with filter, aggregation on change types, schema verification.
  - Version range filtering (1 test): inclusive range with boundary rows.
  - Bound inclusivity (4 tests): default inclusive bounds (with explicit `INCLUSIVE` keyword), `startingBoundInclusive=false` (SQL: `EXCLUSIVE`), `endingBoundInclusive=false`, both bounds exclusive.
  - CDC options (3 tests): default `dropCarryovers` mode, `deduplicationMode=none`, `netChanges` with `computeUpdates`.
  - Timestamp range (1 test): `startingTimestamp`/`endingTimestamp` (SQL: `FROM TIMESTAMP ... TO TIMESTAMP ...`).
  - Error cases (1 test): user-specified schema rejection.
  - Streaming (4 tests): basic retrieval, `startingVersion` filtering, projection with filter, CDC options pass-through — all with both DataFrame API and SQL (`SELECT * FROM STREAM table CHANGES ...`) variants.
  - Streaming error cases (1 test): schema rejection.
- **`PlanGenerationTestSuite`** — 3 golden files: `read_changes`, `read_changes_with_options`, `streaming_changes_API_with_options`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

Closes apache#54739 from gengliangwang/cdc-dataframe.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants