-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[Improve](StreamingJob) add max_filter_ratio and strict mode for mysql/pg streaming job #60473
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
TPC-H: Total hot run time: 32066 ms |
ClickBench: Total hot run time: 28.69 s |
|
run buildall |
TPC-H: Total hot run time: 31554 ms |
ClickBench: Total hot run time: 28.33 s |
FE UT Coverage ReportIncrement line coverage |
FE Regression Coverage ReportIncrement line coverage |
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 32090 ms |
ClickBench: Total hot run time: 28.38 s |
|
run p0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds support for load.max_filter_ratio and load.strict_mode properties to MySQL/PostgreSQL streaming jobs, enabling error tolerance configuration for data quality monitoring.
Changes:
- Added data quality monitoring with configurable filter ratio thresholds using a sliding window approach
- Introduced
LoadStatisticclass to track filtered rows, loaded rows, and load bytes - Modified target properties validation to support load properties prefix
- Refactored statistics tracking from
scannedBytestoloadBytesand addedfilteredRowstracking
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| LoadStatistic.java | New class to track load statistics (filtered/loaded rows, bytes) |
| HttpPutBuilder.java | Changed properties parameter type from Properties to Map<String, String> |
| DorisBatchStreamLoad.java | Added load statistics tracking and stream load properties support |
| PipelineCoordinator.java | Updated to pass LoadStatistic object in commitOffset |
| StreamingJobUtils.java | Moved TABLE_PROPS_PREFIX constant to DataSourceConfigKeys |
| StreamingMultiTblTask.java | Generate stream load properties based on max_filter_ratio and strict_mode |
| StreamingJobStatistic.java | Added filteredRows field |
| StreamingJobSchedulerTask.java | Initialize sampleStartTime when job transitions to RUNNING |
| StreamingInsertJob.java | Implemented checkDataQuality method with sliding window monitoring |
| DataSourceConfigValidator.java | Updated to allow load properties prefix in target validation |
| StreamingJobAction.java | Removed CommitOffsetRequest inner class (moved to separate file) |
| DorisParser.g4 | Made sourceProperties optional in jobFromToClause grammar |
| CommitOffsetRequest.java | New file with fields for filtered/loaded rows and load bytes |
| DataSourceConfigKeys.java | Added TABLE_PROPS_PREFIX and LOAD_PROPERTIES constants |
| WriteRecordRequest.java | Added streamLoadProps field, removed unused abstract methods |
| JobBaseRecordRequest.java | Removed unused abstract methods |
| FetchRecordRequest.java | Removed unused method implementations |
| Test files | Updated to parse JSON statistics and verify new fields; adjusted expected byte counts |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
...-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
Show resolved
Hide resolved
...-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_split.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job.groovy
Show resolved
Hide resolved
...rc/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job.groovy
Show resolved
Hide resolved
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_restart_fe.groovy
Show resolved
Hide resolved
...re/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/LoadStatistic.java
Show resolved
Hide resolved
|
run buildall |
TPC-H: Total hot run time: 32501 ms |
ClickBench: Total hot run time: 28.32 s |
FE Regression Coverage ReportIncrement line coverage |
|
run cloud_p0 |
FE Regression Coverage ReportIncrement line coverage |
liaoxin01
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
sollhui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
…l/pg streaming job (#60473) ### What problem does this PR solve? Related PR: #58898 #59461 In some scenarios, it is necessary to tolerate a certain amount of erroneous data. Supported parameters: `load.strict_mode`: Whether to enable strict mode, defaults to false. `load.max_filter_ratio`: The maximum allowed filtering rate within the sampling window, defaults to zero tolerance. The sampling window is `max_interval * 10`. That is, if the number of erroneous rows/total rows exceeds `max_filter_ratio` within the sampling window, the job will be paused, requiring manual intervention to check data quality issues. eg: ``` CREATE JOB test_streaming_mysql_job_errormsg ON STREAMING FROM MYSQL ( "jdbc_url" = "jdbc:mysql://127.0.0.1:3308", ...... ) TO DATABASE database ( "table.create.properties.replication_num" = "1" ... "load.max_filter_ratio" = "1" ) ```
What problem does this PR solve?
Related PR: #58898 #59461
In some scenarios, it is necessary to tolerate a certain amount of erroneous data.
Supported parameters:
load.strict_mode: Whether to enable strict mode, defaults to false.load.max_filter_ratio: The maximum allowed filtering rate within the sampling window, defaults to zero tolerance. The sampling window ismax_interval * 10. That is, if the number of erroneous rows/total rows exceedsmax_filter_ratiowithin the sampling window, the job will be paused, requiring manual intervention to check data quality issues.eg:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)