Skip to content

[SPARK-55949][SQL][CONNECT] Add DataFrame API and Spark Connect support for CDC queries#54739

Closed
gengliangwang wants to merge 12 commits intoapache:masterfrom
gengliangwang:cdc-dataframe
Closed

[SPARK-55949][SQL][CONNECT] Add DataFrame API and Spark Connect support for CDC queries#54739
gengliangwang wants to merge 12 commits intoapache:masterfrom
gengliangwang:cdc-dataframe

Conversation

@gengliangwang
Copy link
Member

@gengliangwang gengliangwang commented Mar 10, 2026

What changes were proposed in this pull request?

This is PR 2/2 of the CDC framework. It adds the DataFrame API (changes()) and Spark Connect support. Depends on #54738.

DataFrame API — new methods on DataFrameReader and DataStreamReader:

// Batch
spark.read
  .option("startingVersion", "1")
  .option("endingVersion", "5")
  .option("deduplicationMode", "netChanges")
  .changes("catalog.table")

// Streaming
spark.readStream
  .option("startingVersion", "1")
  .changes("catalog.table")
  • API definitions in sql/api (DataFrameReader.changes(), DataStreamReader.changes()). Currently only supported for DSv2 tables whose catalog implements TableCatalog.loadChangelog().
  • Classic implementation in sql/core: Parses options via ChangelogInfoUtils.fromOptions() using the session timezone, creates UnresolvedRelation + RelationChanges.

Spark Connect support:

  • Proto (relations.proto): New RelationChanges message (field 46) with unparsed_identifier, options map, and is_streaming flag.
  • Client (DataFrameReader.scala, DataStreamReader.scala): Serializes changes() calls into RelationChanges proto.
  • Server (SparkConnectPlanner.scala): transformRelationChanges() deserializes the proto, calls ChangelogInfoUtils.fromOptions() with the session timezone, and constructs UnresolvedRelation + RelationChanges.

Why are the changes needed?

Users need a programmatic API to query CDC data, complementing the SQL CHANGES clause added in #54738. The DataFrame API provides type-safe, composable access to change data with the standard option-based configuration pattern. Spark Connect support ensures feature parity for remote Spark sessions.

Does this PR introduce any user-facing change?

Yes. New DataFrame API methods:

// Before: spark.read.changes("table") throws SparkUnsupportedOperationException
// After:
val changes = spark.read
  .option("startingVersion", "1")
  .changes("my_catalog.my_table")

changes.filter("_change_type = 'insert'").show()

Both DataFrameReader.changes() and DataStreamReader.changes() are now functional in classic and Spark Connect sessions.

How was this patch tested?

  • ChangelogResolutionSuite — 5 tests: changes() resolution, catalog without CDC support, schema rejection on readStream, streaming resolution to StreamingRelationV2, streaming catalog error.
  • ChangelogEndToEndSuite — 19 end-to-end tests using InMemoryChangelogCatalog, each pairing the DataFrame API with the equivalent SQL syntax:
    • Basic retrieval (3 tests): data retrieval, open-ended version range, empty results with schema validation.
    • Projection/filter/aggregation (4 tests): CDC metadata column selection, projection with filter, aggregation on change types, schema verification.
    • Version range filtering (1 test): inclusive range with boundary rows.
    • Bound inclusivity (4 tests): default inclusive bounds (with explicit INCLUSIVE keyword), startingBoundInclusive=false (SQL: EXCLUSIVE), endingBoundInclusive=false, both bounds exclusive.
    • CDC options (3 tests): default dropCarryovers mode, deduplicationMode=none, netChanges with computeUpdates.
    • Timestamp range (1 test): startingTimestamp/endingTimestamp (SQL: FROM TIMESTAMP ... TO TIMESTAMP ...).
    • Error cases (1 test): user-specified schema rejection.
    • Streaming (4 tests): basic retrieval, startingVersion filtering, projection with filter, CDC options pass-through — all with both DataFrame API and SQL (SELECT * FROM STREAM table CHANGES ...) variants.
    • Streaming error cases (1 test): schema rejection.
  • PlanGenerationTestSuite — 3 golden files: read_changes, read_changes_with_options, streaming_changes_API_with_options.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

@gengliangwang gengliangwang marked this pull request as draft March 10, 2026 20:12
@gengliangwang gengliangwang changed the title [SPARK-XXXXX][SQL] Add DataFrame API and Spark Connect support for CDC queries [SPARK-55949][SQL] Add DataFrame API and Spark Connect support for CDC queries Mar 10, 2026

/** @inheritdoc */
def changes(tableName: String): DataFrame = {
assertNoSpecifiedSchema("changes")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if source is null?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

@aokolnychyi
Copy link
Contributor

aokolnychyi commented Mar 17, 2026

Let's get #54738 in first.

…C queries

Add DataFrameReader.changes() and DataStreamReader.changes() APIs:

- API definitions in sql/api (DataFrameReader, DataStreamReader)
- Classic implementation using ChangelogInfoUtils + RelationChanges
- Spark Connect: ReadChanges proto message, client serialization,
  server-side planner deserialization
- Remove ConnectClientUnsupportedErrors.tableCDC() (no longer needed)
- DataFrame API tests in ChangelogResolutionSuite and ChangelogEndToEndSuite
- Spark Connect PlanGenerationTestSuite golden files
@gengliangwang gengliangwang marked this pull request as ready for review March 19, 2026 03:33
@gengliangwang
Copy link
Member Author

cc @johanl-db as well

* }}}
*
* @param tableName
* is either a qualified or unqualified name that designates a table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: can we shorten it to fit on one line?

Copy link
Member Author

@gengliangwang gengliangwang Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Every @param in these files (and the project) uses the multi-line format:

This is enforced by scalafmt 3.8.6 with docstrings.style = Asterisk. This style automatically breaks @param tags so the description starts
on the next line with 2-space indent — regardless of line length.

* }}}
*
* @param tableName
* is either a qualified or unqualified name that designates a table.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.

Copy link
Member Author

@gengliangwang gengliangwang Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

"endingversion": "5"
}
}
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we ned empty line at the end?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no trailing empty line — the file ends with } (no trailing newline), which is consistent with all other JSON files in the query-tests/queries/ directory.

"computeupdates": "true"
}
}
} No newline at end of file
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above — no trailing empty line, consistent with the other JSON files.

sparkSession.newDataFrame { builder =>
val changesBuilder = builder.getRelationChangesBuilder
.setUnparsedIdentifier(tableName)
extraOptions.foreach { case (k, v) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use putAllOptions here to be consistent with table method above?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Updated to use putAllOptions to be consistent with the table method.

val changesBuilder = builder.getRelationChangesBuilder
.setUnparsedIdentifier(tableName)
.setIsStreaming(true)
sourceBuilder.getOptionsMap.forEach { (k, v) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question about putAllOptions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val tableName = rel.getUnparsedIdentifier
val options = new CaseInsensitiveStringMap(rel.getOptionsMap)
val changelogInfo = ChangelogInfoUtils.fromOptions(
options, session.sessionState.conf.sessionLocalTimeZone)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: I would define a temp var for time zone and would move UnresolvedRelation on one line.

val timeZone = session.sessionState.conf.sessionLocalTimeZone
val changelogInfo = ChangelogInfoUtils.fromOptions(options, timeZone)
val ident = parser.parseMultipartIdentifier(tableName)
val relation = UnresolvedRelation(ident, options, isStreaming = rel.getIsStreaming)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Extracted timeZone var and put UnresolvedRelation on one line.

@zhengruifeng zhengruifeng changed the title [SPARK-55949][SQL] Add DataFrame API and Spark Connect support for CDC queries [SPARK-55949][SQL][CONNECT] Add DataFrame API and Spark Connect support for CDC queries Mar 20, 2026
@gengliangwang
Copy link
Member Author

@aokolnychyi @HyukjinKwon @viirya @zhengruifeng Thanks for the review. Merging to master.

terana pushed a commit to terana/spark that referenced this pull request Mar 23, 2026
…rt for CDC queries

### What changes were proposed in this pull request?

This is PR 2/2 of the CDC framework. It adds the DataFrame API (`changes()`) and Spark Connect support. Depends on apache#54738.

**DataFrame API — new methods on `DataFrameReader` and `DataStreamReader`:**
```scala
// Batch
spark.read
  .option("startingVersion", "1")
  .option("endingVersion", "5")
  .option("deduplicationMode", "netChanges")
  .changes("catalog.table")

// Streaming
spark.readStream
  .option("startingVersion", "1")
  .changes("catalog.table")
```

- API definitions in `sql/api` (`DataFrameReader.changes()`, `DataStreamReader.changes()`). Currently only supported for DSv2 tables whose catalog implements `TableCatalog.loadChangelog()`.
- Classic implementation in `sql/core`: Parses options via `ChangelogInfoUtils.fromOptions()` using the session timezone, creates `UnresolvedRelation` + `RelationChanges`.

**Spark Connect support:**
- Proto (`relations.proto`): New `RelationChanges` message (field 46) with `unparsed_identifier`, `options` map, and `is_streaming` flag.
- Client (`DataFrameReader.scala`, `DataStreamReader.scala`): Serializes `changes()` calls into `RelationChanges` proto.
- Server (`SparkConnectPlanner.scala`): `transformRelationChanges()` deserializes the proto, calls `ChangelogInfoUtils.fromOptions()` with the session timezone, and constructs `UnresolvedRelation` + `RelationChanges`.

### Why are the changes needed?

Users need a programmatic API to query CDC data, complementing the SQL `CHANGES` clause added in apache#54738. The DataFrame API provides type-safe, composable access to change data with the standard option-based configuration pattern. Spark Connect support ensures feature parity for remote Spark sessions.

### Does this PR introduce _any_ user-facing change?

Yes. New DataFrame API methods:
```scala
// Before: spark.read.changes("table") throws SparkUnsupportedOperationException
// After:
val changes = spark.read
  .option("startingVersion", "1")
  .changes("my_catalog.my_table")

changes.filter("_change_type = 'insert'").show()
```

Both `DataFrameReader.changes()` and `DataStreamReader.changes()` are now functional in classic and Spark Connect sessions.

### How was this patch tested?

- **`ChangelogResolutionSuite`** — 5 tests: `changes()` resolution, catalog without CDC support, schema rejection on `readStream`, streaming resolution to `StreamingRelationV2`, streaming catalog error.
- **`ChangelogEndToEndSuite`** — 19 end-to-end tests using `InMemoryChangelogCatalog`, each pairing the DataFrame API with the equivalent SQL syntax:
  - Basic retrieval (3 tests): data retrieval, open-ended version range, empty results with schema validation.
  - Projection/filter/aggregation (4 tests): CDC metadata column selection, projection with filter, aggregation on change types, schema verification.
  - Version range filtering (1 test): inclusive range with boundary rows.
  - Bound inclusivity (4 tests): default inclusive bounds (with explicit `INCLUSIVE` keyword), `startingBoundInclusive=false` (SQL: `EXCLUSIVE`), `endingBoundInclusive=false`, both bounds exclusive.
  - CDC options (3 tests): default `dropCarryovers` mode, `deduplicationMode=none`, `netChanges` with `computeUpdates`.
  - Timestamp range (1 test): `startingTimestamp`/`endingTimestamp` (SQL: `FROM TIMESTAMP ... TO TIMESTAMP ...`).
  - Error cases (1 test): user-specified schema rejection.
  - Streaming (4 tests): basic retrieval, `startingVersion` filtering, projection with filter, CDC options pass-through — all with both DataFrame API and SQL (`SELECT * FROM STREAM table CHANGES ...`) variants.
  - Streaming error cases (1 test): schema rejection.
- **`PlanGenerationTestSuite`** — 3 golden files: `read_changes`, `read_changes_with_options`, `streaming_changes_API_with_options`.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Claude Opus 4.6)

Closes apache#54739 from gengliangwang/cdc-dataframe.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
zhengruifeng pushed a commit that referenced this pull request Mar 26, 2026
### What changes were proposed in this pull request?

Add `changes()` method to PySpark `DataFrameReader` and `DataStreamReader` for both classic and Spark Connect modes.

**Classic PySpark:**
- `DataFrameReader.changes(tableName)` — delegates to `self._jreader.changes(tableName)`
- `DataStreamReader.changes(tableName)` — delegates to `self._jreader.changes(tableName)` with type checking

**Spark Connect PySpark:**
- New `RelationChanges` plan class in `plan.py` that serializes to the `RelationChanges` protobuf message
- `DataFrameReader.changes(tableName)` — creates `RelationChanges` plan (batch)
- `DataStreamReader.changes(tableName)` — creates `RelationChanges` plan with `is_streaming=True`

### Why are the changes needed?

To expose the CDC `changes()` API added in #54739 to Python users.

### Does this PR introduce _any_ user-facing change?

Yes. PySpark users can now use:
```python
# Batch
df = spark.read.option("startingVersion", "1").changes("my_table")

# Streaming
df = spark.readStream.option("startingVersion", "1").changes("my_table")
```

### How was this patch tested?

7 plan generation tests in `test_connect_plan.py` covering:
- Batch read with version/timestamp options
- No-options and multi-part table names
- Proto `oneof` discriminator verification
- Streaming via direct plan and via `DataStreamReader`
- `print()` debug output

### Was this patch authored or co-authored using generative AI tooling?

Yes.

Closes #54746 from gengliangwang/cdc-pyspark.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants