[Improve](Streamingjob) support exclude_columns for Postgres streaming job #61267
[Improve](Streamingjob) support exclude_columns for Postgres streaming job #61267JNSimba wants to merge 9 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
/review |
There was a problem hiding this comment.
Pull request overview
Adds column-level filtering for PostgreSQL CDC streaming jobs via per-table property table.<tableName>.exclude_columns, ensuring excluded columns are omitted during Doris table creation, DML ingestion, and Postgres schema-change handling.
Changes:
- FE: validate/parse
table.<tableName>.exclude_columns, reject non-existent columns and PK columns, and omit excluded columns from generatedCREATE TABLE. - cdc_client: parse
exclude_columnsand filter excluded fields from Debezium DML rows; skip ADD/DROP DDL for excluded columns during PG schema change detection. - Regression: add a Postgres streaming job suite covering validation, snapshot/incremental DML filtering, and DROP/ADD excluded-column scenarios.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy | New regression suite validating exclude-column behavior end-to-end |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.out | Expected outputs for the new regression suite |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java | Adds parsing utility for table.<table>.exclude_columns |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java | Filters excluded columns out of DML JSON rows |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java | Skips schema-change DDL for excluded columns |
| fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java | Applies exclude-column validation and omission during Doris table creation |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java | Allows per-table config keys by suffix for source validation |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/DataSourceConfigKeys.java | Introduces constants for per-table config key construction |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
...nt/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
Outdated
Show resolved
Hide resolved
...rc/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Code Review Summary
This PR adds column-level filtering support (exclude_columns) for PostgreSQL CDC streaming jobs. The feature is well-structured across FE validation, CDC client DML filtering, and schema change handling. The regression test is comprehensive, covering snapshot, incremental DML, DROP and re-ADD of excluded columns.
Critical Checkpoint Conclusions
Goal & Correctness: The PR accomplishes its stated goal. FE validates excluded columns exist and are not PK columns, the Doris CREATE TABLE omits them, the CDC client skips excluded columns during DML deserialization, and schema change DDLs are suppressed for excluded columns. Tests prove the key scenarios.
Modification focus: The change is focused and well-scoped to the exclude_columns feature.
Concurrency: No new concurrency concerns; the deserializer operates single-threaded per task, and the config map is read-only.
Lifecycle / Static init: No lifecycle issues or SIOF risks introduced.
Configuration items added: Two new constants (TABLE, TABLE_EXCLUDE_COLUMNS_SUFFIX) and the per-table config key pattern table.<name>.exclude_columns. See issues below regarding validation gaps.
Incompatible changes: None. This is purely additive.
Parallel code paths: The MySQL CDC deserializer (MySqlDebeziumJsonDeserializer) does not have DDL handling yet (it's a stub), so no parallel path needs updating now. The base class DebeziumJsonDeserializer already applies DML filtering for all connectors.
Test coverage: Good end-to-end coverage: validation errors, snapshot filtering, incremental DML (INSERT/UPDATE/DELETE), DROP excluded column, re-ADD excluded column. Uses Awaitility polling instead of fixed sleeps.
Observability: Good INFO-level logging for schema change skip decisions.
Performance: See inline comment about per-record re-parsing.
Issues Found
See inline comments for details:
-
[Performance]
ConfigUtil.parseExcludeColumns()is called on every DML record in the hot path, re-parsing the comma-separated string and creating a newHashSeteach time. Should be cached per table name in the deserializer. -
[Validation Gap]
DataSourceConfigValidator.validateSource()accepts malformed keys liketable.exclude_columns(missing table name segment) because it only checks the suffix after the last dot. Should verify at least 3 dot-separated segments. -
[Incorrect Javadoc]
ConfigUtil.parseExcludeColumns()Javadoc says@return lower-cased column name setbut the implementation does NOT lowercase column names. -
[Code Duplication]
parseExcludeColumnsis duplicated identically inStreamingJobUtils(FE) andConfigUtil(CDC client). Both modules depend onfe-commonwhereDataSourceConfigKeyslives -- this method should be consolidated there.
...nt/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
Show resolved
Hide resolved
...rc/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/ConfigUtil.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
...rc/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java
Show resolved
Hide resolved
fe/fe-core/src/main/java/org/apache/doris/job/util/StreamingJobUtils.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_col_filter.groovy
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
You can also share your feedback on Copilot code review. Take the survey.
|
run buildall |
FE UT Coverage ReportIncrement line coverage |
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 27553 ms |
TPC-DS: Total hot run time: 153522 ms |
FE UT Coverage ReportIncrement line coverage |
|
run external |
|
run buildall |
TPC-H: Total hot run time: 26872 ms |
TPC-DS: Total hot run time: 168537 ms |
|
run p0 |
|
run cloud_p0 |
1 similar comment
|
run cloud_p0 |
What problem does this PR solve?
Add column-level filtering support for PostgreSQL CDC streaming jobs via the
table.<tableName>.exclude_columnsproperty. Users can specify a comma-separatedlist of columns to exclude from synchronization.
Syntax example:
Changes
FE (validation & table creation)
per-table config key (using suffix allowlist)
they exist in the upstream PG table and are not PK columns, then exclude them
from the Doris CREATE TABLE statement
cdc_client (DML filtering & schema change handling)
schema change detection, so the Doris table is never modified for columns it
was never meant to have
Behavior
Tests
snapshot filtering, incremental DML filtering, DROP excluded column, re-ADD
excluded column; uses Awaitility polling instead of fixed sleeps
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)