[Improve](StreamingJob) Support schema change for PostgreSQL streaming job#61182
[Improve](StreamingJob) Support schema change for PostgreSQL streaming job#61182JNSimba merged 7 commits intoapache:masterfrom
Conversation
|
Thank you for your contribution to Apache Doris. Please clearly describe your PR:
|
|
run buildall |
TPC-H: Total hot run time: 27724 ms |
TPC-DS: Total hot run time: 153550 ms |
|
/review |
|
run buildall |
There was a problem hiding this comment.
Pull request overview
Adds PostgreSQL CDC streaming-job schema change (ADD/DROP) support by detecting schema diffs from DML records, executing corresponding Doris DDLs, and persisting schema state for restart/retry safety.
Changes:
- Introduced schema diffing, Doris DDL generation/execution, and persisted
tableSchemasacross FE ↔ cdc_client. - Added PostgreSQL-specific deserializer logic that refreshes PG schema via JDBC upon detecting schema changes.
- Added regression tests (basic + advanced) and unit tests for PG type mapping.
Reviewed changes
Copilot reviewed 28 out of 28 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy | Adds advanced PG schema-change regression scenarios (offset=latest, double-ADD, rename guard, defaults, NOT NULL). |
| regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy | Adds baseline PG schema-change regression coverage (ADD/DROP/RENAME/MODIFY). |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out | Golden output for advanced regression assertions. |
| regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out | Golden output for baseline regression assertions. |
| fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java | Unit tests for PG type-name → Doris-type mapping used during DDL generation. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java | Executes schema-change DDLs on Doris FE and swallows idempotent errors for retries. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java | Builds ALTER TABLE SQL and maps PG column metadata to Doris column types. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java | Centralizes Basic auth header used by FE HTTP calls. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java | Injects PG schema refresher into deserializer and adds JDBC per-table schema refresh helper. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java | Switches to new deserializer result type and enables MySQL schema-change emission. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java | Changes deserialize contract to return DeserializeResult and adds schema persistence hooks. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java | Loads/persists table schemas for stream split startup and restart correctness. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java | Implements schema JSON serialize/deserialize and in-memory schema application logic. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java | Adds setTableSchemas hook for schema-aware deserializers. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java | Detects PG schema changes via after-schema diff + JDBC refresh, generates Doris DDL, returns SCHEMA_CHANGE results. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java | Introduces MySQL-specific deserializer stub for schema-change events. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java | New unified deserialize result type carrying DML, DDLs, and updated schemas. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java | Updates base deserializer to return DeserializeResult and hold tableSchemas. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java | Reuses centralized Basic auth header helper. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java | Extends commitOffset to include persisted tableSchemas and parses FE JSON response code. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java | Executes DDLs before DML on SCHEMA_CHANGE results and persists updated schemas back to FE. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java | Adds Doris type constants used by schema-change type mapping. |
| fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java | Adds DORIS_TARGET_DB key for deserializer context. |
| fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java | Persists tableSchemas alongside offsets for restart/retry correctness. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java | Passes persisted tableSchemas into cdc_client read/write requests. |
| fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java | Stores tableSchemas from commitOffset requests into the offset provider. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java | Adds tableSchemas field to requests sent to cdc_client. |
| fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java | Adds tableSchemas field to FE commitOffset API payload. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
Outdated
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Code Review Summary — PR #61182: PostgreSQL Schema Change Support for CDC Streaming Jobs
Overall this is a well-structured and carefully designed feature. The two-pass diff approach (fast in-memory name comparison → JDBC refresh → exact diff) is clever, the rename guard is a good safety measure, and the idempotent DDL execution handles retries gracefully. Tests are comprehensive.
I found 3 substantive issues (2 quoting/injection bugs, 1 potential NPE) and 2 minor nits. Details below.
Critical Checkpoint Conclusions (per code-review skill §1.3)
| Checkpoint | Conclusion |
|---|---|
| Goal | ✅ Code accomplishes stated goal. PG ADD/DROP COLUMN detected and applied; RENAME guarded; tests prove all scenarios. |
| Modification size | ✅ Reasonably scoped. New files are well-separated by concern. |
| Concurrency | ✅ tableSchemas uses ConcurrentHashMap. Processing is single-threaded per task (writeRecords sequential). No lock concerns. |
| Lifecycle | ✅ No circular references. JDBC connections use try-with-resources. |
| Config items | ✅ None added. |
| Incompatible changes | ✅ commitOffset signature changed but is internal-only API. |
| Parallel code paths | ✅ MySQL path correctly mirrors PG structure. includeSchemaChanges=true intentional for future MySQL support. |
| Persistence / EditLog | ✅ tableSchemas persisted via existing JdbcSourceOffsetProvider Gson path. Replay logic updated in replayIfNeed. |
| Test coverage | quoteDefaultValue, identifier, diffSchemaByName, buildAddColumnSql. |
| Observability | ✅ Good INFO-level logging at detection, DDL execution, and idempotent skip. |
Substantive Issues
-
Potential NPE in
PostgresDebeziumJsonDeserializer—refreshSingleTableSchema()can returnnullif the table was dropped between detection and JDBC refresh. The caller dereferences without null-check → NPE. See inline comment. -
quoteDefaultValue— unescaped single quotes — If a PG default value contains'(e.g.it's), the generated SQL will be malformed:DEFAULT 'it's'. Need to escape:defaultValue.replace("'", "\\'"). See inline comment. -
identifier— unescaped backticks — A column nameda`bproduces`a`b`which is malformed Doris SQL. Need:name.replace("\", "``")`. See inline comment.
Minor Nits
CommitOffsetRequest.java— newtableSchemasfield isprivatewhile all 7 existing fields arepublic. Functionally fine (Lombok), but visually inconsistent.JobBaseRecordRequest.java— same pattern: new fieldprivatevs existingprotected.
Things Done Well
- Two-pass diff design avoids unnecessary JDBC round-trips for non-schema-change records
- Rename guard (simultaneous ADD+DROP → skip + warn) is a safe default
DorisBatchStreamLoad.commitOffsetimprovement: checking JSONcodefield instead of just HTTP 200tryLoadTableSchemasFromRequest()documentation explaining MySQL schema-mismatch is excellentDeserializeResultwith clear type enum and factory methods is clean design- Regression tests follow conventions (qt_ prefix, DROP before CREATE, ordered results)
...ain/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java
Show resolved
Hide resolved
TPC-H: Total hot run time: 27731 ms |
TPC-DS: Total hot run time: 153622 ms |
|
run buildall |
TPC-H: Total hot run time: 27793 ms |
|
/review |
TPC-DS: Total hot run time: 152518 ms |
FE UT Coverage ReportIncrement line coverage |
|
run cloud_p0 |
|
run external |
|
run buildall |
|
run buildall |
TPC-H: Total hot run time: 27504 ms |
TPC-DS: Total hot run time: 153571 ms |
|
run external |
|
/review |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 28 out of 28 changed files in this pull request and generated 3 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
Show resolved
Hide resolved
...c/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Code Review: PostgreSQL CDC Schema Change Support
Reviewed all 28 changed files (2120+ additions). The overall design is solid — the three-stage detection (name diff → JDBC refresh → filtered diff), rename guard, and idempotent DDL execution are well thought out. Below are findings organized by the critical checkpoints from our review guidelines.
Critical Checkpoint Conclusions
1. Goal & correctness: The PR accomplishes its goal of enabling ADD/DROP COLUMN detection for PG CDC. The rename guard (simultaneous ADD+DROP → skip DDL) is a reasonable safety measure. The limitation of not supporting MODIFY COLUMN type is explicitly documented and acceptable given the Kafka Connect type ambiguity.
2. Modification scope: The change is focused and well-structured. DeserializeResult cleanly replaces the previous List<String> return type. The AbstractCdcSourceReader base class properly abstracts common schema tracking logic.
3. Concurrency: tableSchemas uses ConcurrentHashMap ✅. applySchemaChange() uses putAll (non-atomic) but is only called from the single-threaded writeRecords() path in PipelineCoordinator ✅. The writeRecords → forceFlush → DDL → apply → continue-DML ordering is correct and ensures data consistency.
4. Lifecycle management: Schema lifecycle is clean: FE persists via @SerializedName("ts") in JdbcSourceOffsetProvider, replayed on restart via replayIfNeed(), loaded into CDC client via tryLoadTableSchemasFromRequest(). No circular references detected.
5. Configuration items: No new user-facing configuration items added (schema change detection is automatic). ✅
6. Incompatible changes: The commitOffset signature changed to accept tableSchemas parameter. The DeserializeResult replaces List<String>. Both are internal CDC client APIs, not public interfaces. The @SerializedName("ts") field in JdbcSourceOffsetProvider is additive and backward-compatible (Gson ignores unknown fields on deserialization). ✅
7. Parallel code paths: MySQL path (MySqlDebeziumJsonDeserializer) has a TODO for handleSchemaChangeEvent that returns empty(). This is pre-existing behavior (previously returned Collections.emptyList()). Not a regression.
8. Transaction & persistence: tableSchemas is persisted atomically with offset in commitOffset() under writeLock() in StreamingInsertJob.java:1148-1173. FE restart correctly restores via replayIfNeed(). ✅
9. Test coverage: Two comprehensive regression tests cover: snapshot+binlog schema change, ADD/DROP/rename-guard scenarios, offset=latest (no snapshot) path, double ADD, DROP+ADD rename guard, UPDATE after rename guard, ADD with DEFAULT, ADD NOT NULL with DEFAULT. Unit tests cover PG type mapping. ✅ The .out files have the auto-generated header comment.
10. Observability: Good INFO-level logging at key decision points (DDL detection, execution, schema serialization). WARN-level for rename guard skip and idempotent DDL skip. ✅
11. Performance: refreshSingleTableSchema opens a new JDBC connection per schema change detection event. This is acceptable since schema changes are rare. No hot-path performance concerns.
Issues Found
See inline comments for specific issues. Summary:
-
[Medium] Hardcoded auth header in
HttpUtil.getAuthHeader()—"Basic YWRtaW46"(base64 ofadmin:). Pre-existing pattern, but now centralized. Should be parameterized or at minimum documented. -
[Low]
refreshSingleTableSchemaswallows exception context — wraps in bareRuntimeExceptioninstead ofIOException, which means theIOExceptioncatch inPostgresDebeziumJsonDeserializer.deserialize()won't catch JDBC failures from the refresher. TheRuntimeExceptionwill propagate uncaught up toPipelineCoordinator.writeRecords()where it's caught by the genericExceptionhandler — so it's functionally safe, but the error message could be improved. -
[Suggestion]
bpcharUTF-8 multiplier — multiplying length by 3 is reasonable for UTF-8 but could exceed Doris CHAR(255) limit. The code handles this by switching to VARCHAR when len > 255, which is correct. -
[Info]
commitOffsetresponse parsing — now requirescode == 0in response body (previously just HTTP 200). This is stricter but correct — it properly validates the FE response.
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java
Show resolved
Hide resolved
...nt/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java
Show resolved
Hide resolved
...ain/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java
Show resolved
Hide resolved
...c_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java
Show resolved
Hide resolved
fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java
Show resolved
Hide resolved
|
run buildall |
TPC-H: Total hot run time: 27731 ms |
TPC-DS: Total hot run time: 153314 ms |
|
run p0 |
|
run external |
2 similar comments
|
run external |
|
run external |
|
PR approved by at least one committer and no changes requested. |
|
PR approved by anyone and no changes requested. |
What problem does this PR solve?
Summary
Added Schema Change support to the CDC pipeline of PostgreSQL Streaming Jobs, enabling Doris target tables to automatically follow DDL changes (ADD COLUMN / DROP COLUMN) from upstream PostgreSQL without manual intervention.
Background
Unlike MySQL Binlog, PostgreSQL WAL does not contain explicit DDL events. Schema Changes can only be detected by diffing the afterSchema field in the DML record with the locally cached schema.
Implementation
Detection process (three stages):
First diff (memory, name comparison): Compares the afterSchema field name of the current DML record with the cached tableSchemas. If a difference is found, proceeds to the next step.
JDBC refresh: Fetches the current real-time schema via PostgreSQL JDBC (fresh). 3. Second diff (exact comparison): Based on the afterSchema (not fresh), it only processes column changes already perceived in the current DML record, avoiding premature execution of subsequent DDL changes for which no DML record has yet been generated.
ADD only → generates ALTER TABLE … ADD COLUMN
DROP only → generates ALTER TABLE … DROP COLUMN
ADD + DROP simultaneously → Rename Guard: If it's determined to be a potential column renaming, no DDL is executed; only the cache is updated, and a WARN log is printed prompting the user to manually execute RENAME in Doris.
Idempotency: SchemaChangeManager silently handles "Can not add column which already exists" / "Column does not exist" errors, ensuring retry safety.
Limitations
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)