[Feature](tvf) Support cdc stream tvf for mysql and pg#60116
[Feature](tvf) Support cdc stream tvf for mysql and pg#60116JNSimba merged 10 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
There was a problem hiding this comment.
Pull request overview
This PR introduces a cdc_stream table-valued function (TVF) that enables consuming Change Data Capture (CDC) data from MySQL and PostgreSQL databases directly through SQL queries. The feature builds upon existing CDC streaming job capabilities (#58898, #59461) by exposing them through a TVF interface.
Changes:
- Added
cdc_streamTVF support for MySQL and PostgreSQL CDC streaming - Refactored CDC client to use String-based job IDs instead of Long to support both jobs and TVF queries
- Implemented HTTP streaming response support in the backend for efficient CDC data delivery
Reviewed changes
Copilot reviewed 31 out of 31 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| test_cdc_stream_tvf_mysql.groovy | Regression test for MySQL CDC stream TVF functionality |
| test_cdc_stream_tvf_postgres.groovy | Regression test for PostgreSQL CDC stream TVF functionality |
| test_cdc_stream_tvf_mysql.out | Expected output for MySQL CDC stream TVF test |
| ConfigUtil.java | Added utility method for table list extraction and changed serverId parameter type from long to String |
| PostgresSourceReader.java | Updated to support String job IDs and simplified offset handling for TVF |
| MySqlSourceReader.java | Refactored to always create new split readers and support String job IDs |
| SourceReader.java | Changed interface to use String job IDs |
| JdbcIncrementalSourceReader.java | Simplified split reader logic to always create new readers |
| DorisBatchStreamLoad.java | Updated to use String job IDs |
| PipelineCoordinator.java | Added streaming response support and TVF-specific record fetching |
| StreamException.java | New exception for streaming response errors |
| CommonException.java | New generic exception for common errors |
| ClientController.java | Added REST endpoints for streaming responses and task offset retrieval |
| GlobalExceptionHandler.java | Added handlers for new exception types |
| Env.java | Updated to use String-based job context keys |
| CdcStreamTableValuedFunction.java | New TVF implementation for CDC streaming |
| TableValuedFunctionVisitor.java | Added visitor method for CDC stream TVF |
| CdcStream.java | Nereids expression class for CDC stream TVF |
| StreamingJobUtils.java | Made utility methods public for TVF usage |
| JdbcSourceOffsetProvider.java | Updated to convert job IDs to String for CDC client |
| StreamingMultiTblTask.java | Updated to pass job ID as String |
| BuiltinTableValuedFunctions.java | Registered cdc_stream TVF |
| WriteRecordRequest.java | Removed unused abstract method implementations |
| JobBaseRecordRequest.java | Removed unused abstract method declarations |
| JobBaseConfig.java | Changed jobId type from Long to String |
| FetchTableSplitsRequest.java | Updated to convert Long job ID to String |
| FetchRecordRequest.java | Simplified by removing unused fields |
| CompareOffsetRequest.java | Updated to convert Long job ID to String |
| DataSourceConfigKeys.java | Added TYPE and TABLE configuration keys |
| http_file_reader.h | Added chunk response support flag |
| http_file_reader.cpp | Implemented CDC client integration, chunk response support, and HTTP error handling |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...main/java/org/apache/doris/nereids/trees/expressions/visitor/TableValuedFunctionVisitor.java
Outdated
Show resolved
Hide resolved
...core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/table/CdcStream.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
Outdated
Show resolved
Hide resolved
|
run buildall |
TPC-H: Total hot run time: 32113 ms |
TPC-DS: Total hot run time: 174132 ms |
ClickBench: Total hot run time: 26.75 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 31 out of 31 changed files in this pull request and generated 25 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/controller/ClientController.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/tvf/test_cdc_stream_tvf_postgres.groovy
Show resolved
Hide resolved
fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java
Show resolved
Hide resolved
fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
Outdated
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/tablefunction/CdcStreamTableValuedFunction.java
Outdated
Show resolved
Hide resolved
...nt/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Outdated
Show resolved
Hide resolved
|
run buildall |
TPC-H: Total hot run time: 31405 ms |
TPC-DS: Total hot run time: 173013 ms |
ClickBench: Total hot run time: 26.8 s |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run buildall |
TPC-H: Total hot run time: 30904 ms |
TPC-DS: Total hot run time: 172737 ms |
ClickBench: Total hot run time: 26.76 s |
FE UT Coverage ReportIncrement line coverage |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
FE Regression Coverage ReportIncrement line coverage |
Code reviewNo issues found. Checked for bugs and CLAUDE.md compliance. 🤖 Generated with Claude Code |
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 26636 ms |
TPC-DS: Total hot run time: 169257 ms |
|
run buildall |
TPC-H: Total hot run time: 26241 ms |
TPC-DS: Total hot run time: 167899 ms |
|
run beut |
|
run cloud_p0 |
|
run p0 |
BE UT Coverage ReportIncrement line coverage Increment coverage report
|
|
run cloud_p0 |
|
run p0 |
1 similar comment
|
run p0 |
BE Regression && UT Coverage ReportIncrement line coverage Increment coverage report
|
|
PR approved by at least one committer and no changes requested. |
### What problem does this PR solve? apache#58898 apache#59461 These two features support consuming data from MySQL and PostgreSQL. Based on this, this PR introduces cdc_stream tvf, which allows consuming data from MySQL and PostgreSQL in TVF format. Example ``` select * from cdc_stream( "type" = "mysql", "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", "driver_url" = "mysql-connector-j-8.0.31.jar", "driver_class" = "com.mysql.cj.jdbc.Driver", "user" = "root", "password" = "123456", "database" = "test_cdc_db", "table" = "user_info", "offset" = '{"file":"binlog.000003","pos":"496"}' ) ```
### What problem does this PR solve? #58898 #59461 These two features support consuming data from MySQL and PostgreSQL. Based on this, this PR introduces cdc_stream tvf, which allows consuming data from MySQL and PostgreSQL in TVF format. Example ``` select * from cdc_stream( "type" = "mysql", "jdbc_url" = "jdbc:mysql://127.0.0.1:3306", "driver_url" = "mysql-connector-j-8.0.31.jar", "driver_class" = "com.mysql.cj.jdbc.Driver", "user" = "root", "password" = "123456", "database" = "test_cdc_db", "table" = "user_info", "offset" = '{"file":"binlog.000003","pos":"496"}' ) ```
What problem does this PR solve?
#58898 #59461 These two features support consuming data from MySQL and PostgreSQL.
Based on this, this PR introduces cdc_stream tvf, which allows consuming data from MySQL and PostgreSQL in TVF format.
Example
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)