From 450010975cbf3b0281d9c14c45c58bde3d9c8d6f Mon Sep 17 00:00:00 2001 From: wudi Date: Thu, 12 Mar 2026 15:08:29 +0800 Subject: [PATCH] [Improve](StreamingJob) Support schema change for PostgreSQL streaming job (#61182) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What problem does this PR solve? #### Summary Added Schema Change support to the CDC pipeline of PostgreSQL Streaming Jobs, enabling Doris target tables to automatically follow DDL changes (ADD COLUMN / DROP COLUMN) from upstream PostgreSQL without manual intervention. #### Background Unlike MySQL Binlog, PostgreSQL WAL does not contain explicit DDL events. Schema Changes can only be detected by diffing the afterSchema field in the DML record with the locally cached schema. #### Implementation Detection process (three stages): 1. First diff (memory, name comparison): Compares the afterSchema field name of the current DML record with the cached tableSchemas. If a difference is found, proceeds to the next step. 2. JDBC refresh: Fetches the current real-time schema via PostgreSQL JDBC (fresh). 3. Second diff (exact comparison): Based on the afterSchema (not fresh), it only processes column changes already perceived in the current DML record, avoiding premature execution of subsequent DDL changes for which no DML record has yet been generated. - ADD only → generates ALTER TABLE … ADD COLUMN - DROP only → generates ALTER TABLE … DROP COLUMN - ADD + DROP simultaneously → Rename Guard: If it's determined to be a potential column renaming, no DDL is executed; only the cache is updated, and a WARN log is printed prompting the user to manually execute RENAME in Doris. Idempotency: SchemaChangeManager silently handles "Can not add column which already exists" / "Column does not exist" errors, ensuring retry safety. #### Limitations - RENAME COLUMN not supported: If ADD + DROP simultaneously triggers Rename Guard, the DDL is skipped, requiring the user to manually execute ALTER TABLE … RENAME COLUMN in Doris. Data flow then automatically resumes. - MODIFY COLUMN type not supported: Type changes are not visible during the name diff stage, no DDL is generated, and the Doris column type remains unchanged. - MODIFY COLUMN is not supported: Column type changes are ignored by design. Since type modifications do not change column names, they cannot be detected during the name diff stage, and therefore no DDL will be generated. --- .../job/cdc/request/CommitOffsetRequest.java | 17 +- .../job/cdc/request/JobBaseRecordRequest.java | 1 + .../insert/streaming/StreamingInsertJob.java | 4 + .../streaming/StreamingMultiTblTask.java | 6 + .../offset/jdbc/JdbcSourceOffsetProvider.java | 4 + .../doris/cdcclient/common/Constants.java | 2 + .../doris/cdcclient/common/DorisType.java | 47 +++ .../service/PipelineCoordinator.java | 61 +++- .../cdcclient/sink/DorisBatchStreamLoad.java | 25 +- .../doris/cdcclient/sink/HttpPutBuilder.java | 3 +- .../deserialize/DebeziumJsonDeserializer.java | 18 +- .../source/deserialize/DeserializeResult.java | 92 +++++ .../MySqlDebeziumJsonDeserializer.java | 66 ++++ .../PostgresDebeziumJsonDeserializer.java | 248 +++++++++++++ .../deserialize/SourceRecordDeserializer.java | 5 + .../reader/AbstractCdcSourceReader.java | 171 +++++++++ .../reader/JdbcIncrementalSourceReader.java | 24 +- .../cdcclient/source/reader/SourceReader.java | 18 +- .../reader/mysql/MySqlSourceReader.java | 21 +- .../reader/postgres/PostgresSourceReader.java | 32 ++ .../doris/cdcclient/utils/HttpUtil.java | 4 + .../cdcclient/utils/SchemaChangeHelper.java | 291 +++++++++++++++ .../cdcclient/utils/SchemaChangeManager.java | 149 ++++++++ .../utils/SchemaChangeHelperTest.java | 194 ++++++++++ .../cdc/test_streaming_postgres_job_sc.out | 32 ++ ...est_streaming_postgres_job_sc_advanced.out | 31 ++ .../cdc/test_streaming_postgres_job_sc.groovy | 269 ++++++++++++++ ..._streaming_postgres_job_sc_advanced.groovy | 344 ++++++++++++++++++ 28 files changed, 2132 insertions(+), 47 deletions(-) create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java create mode 100644 fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java create mode 100644 fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java index 3d2d221ea49e7c..747e82b4feebc1 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/CommitOffsetRequest.java @@ -22,12 +22,10 @@ import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; -import lombok.ToString; @Getter @Setter @NoArgsConstructor -@ToString @AllArgsConstructor @Builder public class CommitOffsetRequest { @@ -38,4 +36,19 @@ public class CommitOffsetRequest { public long filteredRows; public long loadedRows; public long loadBytes; + public String tableSchemas; + + @Override + public String toString() { + return "CommitOffsetRequest{" + + "jobId=" + jobId + + ", taskId=" + taskId + + ", offset='" + offset + "'" + + ", scannedRows=" + scannedRows + + ", filteredRows=" + filteredRows + + ", loadedRows=" + loadedRows + + ", loadBytes=" + loadBytes + + ", tableSchemasSize=" + (tableSchemas != null ? tableSchemas.length() : 0) + + "}"; + } } diff --git a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java index 282913e2dd2d0e..27784b1701be02 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java +++ b/fe/fe-common/src/main/java/org/apache/doris/job/cdc/request/JobBaseRecordRequest.java @@ -28,4 +28,5 @@ @EqualsAndHashCode(callSuper = true) public abstract class JobBaseRecordRequest extends JobBaseConfig { protected Map meta; + protected String tableSchemas; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 0b0faa0f64ff70..de274012278e80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -1171,6 +1171,10 @@ public void commitOffset(CommitOffsetRequest offsetRequest) throws JobException JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider; op.setHasMoreData(false); } + if (offsetRequest.getTableSchemas() != null) { + JdbcSourceOffsetProvider op = (JdbcSourceOffsetProvider) offsetProvider; + op.setTableSchemas(offsetRequest.getTableSchemas()); + } persistOffsetProviderIfNeed(); log.info("Streaming multi table job {} task {} commit offset successfully, offset: {}", getJobId(), offsetRequest.getTaskId(), offsetRequest.getOffset()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java index b6a4e8c939bb32..e22f5821618c01 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingMultiTblTask.java @@ -195,6 +195,12 @@ private WriteRecordRequest buildRequestParams() throws JobException { String feAddr = Env.getCurrentEnv().getMasterHost() + ":" + Env.getCurrentEnv().getMasterHttpPort(); request.setFrontendAddress(feAddr); request.setMaxInterval(jobProperties.getMaxIntervalSecond()); + if (offsetProvider instanceof JdbcSourceOffsetProvider) { + String schemas = ((JdbcSourceOffsetProvider) offsetProvider).getTableSchemas(); + if (schemas != null) { + request.setTableSchemas(schemas); + } + } return request; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index b2f0d02e538611..e0227c821497a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -86,6 +86,9 @@ public class JdbcSourceOffsetProvider implements SourceOffsetProvider { @SerializedName("bop") Map binlogOffsetPersist; + @SerializedName("ts") + String tableSchemas; + volatile boolean hasMoreData = true; public JdbcSourceOffsetProvider(Long jobId, DataSourceType sourceType, Map sourceProperties) { @@ -352,6 +355,7 @@ public void replayIfNeed(StreamingInsertJob job) throws JobException { JdbcSourceOffsetProvider.class); this.binlogOffsetPersist = replayFromPersist.getBinlogOffsetPersist(); this.chunkHighWatermarkMap = replayFromPersist.getChunkHighWatermarkMap(); + this.tableSchemas = replayFromPersist.getTableSchemas(); log.info("Replaying offset provider for job {}, binlogOffset size {}, chunkHighWatermark size {}", getJobId(), binlogOffsetPersist == null ? 0 : binlogOffsetPersist.size(), diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java index 04ec118a6c2d4e..953903a80323a6 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/Constants.java @@ -23,4 +23,6 @@ public class Constants { // Debezium default properties public static final long DEBEZIUM_HEARTBEAT_INTERVAL_MS = 3000L; + + public static final String DORIS_TARGET_DB = "doris_target_db"; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java new file mode 100644 index 00000000000000..3aad97bb0cd391 --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/common/DorisType.java @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.common; + +public class DorisType { + public static final String BOOLEAN = "BOOLEAN"; + public static final String TINYINT = "TINYINT"; + public static final String SMALLINT = "SMALLINT"; + public static final String INT = "INT"; + public static final String BIGINT = "BIGINT"; + public static final String LARGEINT = "LARGEINT"; + // largeint is bigint unsigned in information_schema.COLUMNS + public static final String BIGINT_UNSIGNED = "BIGINT UNSIGNED"; + public static final String FLOAT = "FLOAT"; + public static final String DOUBLE = "DOUBLE"; + public static final String DECIMAL = "DECIMAL"; + public static final String DATE = "DATE"; + public static final String DATETIME = "DATETIME"; + public static final String CHAR = "CHAR"; + public static final String VARCHAR = "VARCHAR"; + public static final String STRING = "STRING"; + public static final String HLL = "HLL"; + public static final String BITMAP = "BITMAP"; + public static final String ARRAY = "ARRAY"; + public static final String JSONB = "JSONB"; + public static final String JSON = "JSON"; + public static final String MAP = "MAP"; + public static final String STRUCT = "STRUCT"; + public static final String VARIANT = "VARIANT"; + public static final String IPV4 = "IPV4"; + public static final String IPV6 = "IPV6"; +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java index 614c506619fa8d..97aa4b7f5f27ac 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/service/PipelineCoordinator.java @@ -21,8 +21,10 @@ import org.apache.doris.cdcclient.common.Env; import org.apache.doris.cdcclient.model.response.RecordWithMeta; import org.apache.doris.cdcclient.sink.DorisBatchStreamLoad; +import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.cdcclient.source.reader.SplitReadResult; +import org.apache.doris.cdcclient.utils.SchemaChangeManager; import org.apache.doris.job.cdc.request.FetchRecordRequest; import org.apache.doris.job.cdc.request.WriteRecordRequest; import org.apache.doris.job.cdc.split.BinlogSplit; @@ -166,11 +168,12 @@ private RecordWithMeta buildRecordResponse( } // Process data messages - List serializedRecords = + DeserializeResult result = sourceReader.deserialize(fetchRecord.getConfig(), element); - if (!CollectionUtils.isEmpty(serializedRecords)) { + if (result.getType() == DeserializeResult.Type.DML + && !CollectionUtils.isEmpty(result.getRecords())) { recordCount++; - recordResponse.getRecords().addAll(serializedRecords); + recordResponse.getRecords().addAll(result.getRecords()); hasReceivedData = true; lastMessageIsHeartbeat = false; } @@ -236,21 +239,34 @@ public CompletableFuture writeRecordsAsync(WriteRecordRequest writeRecordR *

Heartbeat events will carry the latest offset. */ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception { + // Extract connection parameters up front for use throughout this method + String feAddr = writeRecordRequest.getFrontendAddress(); + String targetDb = writeRecordRequest.getTargetDb(); + String token = writeRecordRequest.getToken(); + + // Enrich the source config with the Doris target DB so the deserializer can build + // DDL referencing the correct Doris database, not the upstream source database. + Map deserializeContext = new HashMap<>(writeRecordRequest.getConfig()); + deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb); + SourceReader sourceReader = Env.getCurrentEnv().getReader(writeRecordRequest); DorisBatchStreamLoad batchStreamLoad = null; long scannedRows = 0L; int heartbeatCount = 0; SplitReadResult readResult = null; + boolean hasExecuteDDL = false; + boolean isSnapshotSplit = false; try { // 1. submit split async readResult = sourceReader.prepareAndSubmitSplit(writeRecordRequest); batchStreamLoad = getOrCreateBatchStreamLoad(writeRecordRequest); - boolean isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit()); + isSnapshotSplit = sourceReader.isSnapshotSplit(readResult.getSplit()); long startTime = System.currentTimeMillis(); long maxIntervalMillis = writeRecordRequest.getMaxInterval() * 1000; boolean shouldStop = false; boolean lastMessageIsHeartbeat = false; + LOG.info( "Start polling records for jobId={} taskId={}, isSnapshotSplit={}", writeRecordRequest.getJobId(), @@ -309,15 +325,22 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception } // Process data messages - List serializedRecords = - sourceReader.deserialize(writeRecordRequest.getConfig(), element); - - if (!CollectionUtils.isEmpty(serializedRecords)) { - String database = writeRecordRequest.getTargetDb(); + DeserializeResult result = + sourceReader.deserialize(deserializeContext, element); + + if (result.getType() == DeserializeResult.Type.SCHEMA_CHANGE) { + // Flush pending data before DDL + batchStreamLoad.forceFlush(); + SchemaChangeManager.executeDdls(feAddr, targetDb, token, result.getDdls()); + hasExecuteDDL = true; + sourceReader.applySchemaChange(result.getUpdatedSchemas()); + lastMessageIsHeartbeat = false; + } + if (!CollectionUtils.isEmpty(result.getRecords())) { String table = extractTable(element); - for (String record : serializedRecords) { + for (String record : result.getRecords()) { scannedRows++; - batchStreamLoad.writeRecord(database, table, record.getBytes()); + batchStreamLoad.writeRecord(targetDb, table, record.getBytes()); } // Mark last message as data (not heartbeat) lastMessageIsHeartbeat = false; @@ -346,8 +369,22 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception // The offset must be reset before commitOffset to prevent the next taskId from being create // by the fe. batchStreamLoad.resetTaskId(); + + // Serialize tableSchemas back to FE when: + // 1. A DDL was executed (in-memory schema was updated), OR + // 2. It's a binlog split AND FE had no schema (FE tableSchemas was null) — this covers + // incremental-only startup and the first binlog round after snapshot completes. + String tableSchemas = null; + boolean feHadNoSchema = writeRecordRequest.getTableSchemas() == null; + if (hasExecuteDDL || (!isSnapshotSplit && feHadNoSchema)) { + tableSchemas = sourceReader.serializeTableSchemas(); + } batchStreamLoad.commitOffset( - currentTaskId, metaResponse, scannedRows, batchStreamLoad.getLoadStatistic()); + currentTaskId, + metaResponse, + scannedRows, + batchStreamLoad.getLoadStatistic(), + tableSchemas); } public static boolean isHeartbeatEvent(SourceRecord record) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java index 92a2f9db2b6ed7..72e84c4413c1c6 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/DorisBatchStreamLoad.java @@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.Getter; import lombok.Setter; @@ -503,7 +504,8 @@ public void commitOffset( String taskId, List> meta, long scannedRows, - LoadStatistic loadStatistic) { + LoadStatistic loadStatistic, + String tableSchemas) { try { String url = String.format(COMMIT_URL_PATTERN, frontendAddress, targetDb); CommitOffsetRequest commitRequest = @@ -515,6 +517,7 @@ public void commitOffset( .filteredRows(loadStatistic.getFilteredRows()) .loadedRows(loadStatistic.getLoadedRows()) .loadBytes(loadStatistic.getLoadBytes()) + .tableSchemas(tableSchemas) .build(); String param = OBJECT_MAPPER.writeValueAsString(commitRequest); @@ -527,7 +530,11 @@ public void commitOffset( .commit() .setEntity(new StringEntity(param)); - LOG.info("commit offset for jobId {} taskId {}, params {}", jobId, taskId, param); + LOG.info( + "commit offset for jobId {} taskId {}, commitRequest {}", + jobId, + taskId, + commitRequest.toString()); Throwable resEx = null; int retry = 0; while (retry <= RETRY) { @@ -541,11 +548,15 @@ public void commitOffset( : ""; LOG.info("commit result {}", responseBody); if (statusCode == 200) { - LOG.info("commit offset for jobId {} taskId {}", jobId, taskId); - // A 200 response indicates that the request was successful, and - // information such as offset and statistics may have already been - // updated. Retrying may result in repeated updates. - return; + JsonNode root = OBJECT_MAPPER.readTree(responseBody); + JsonNode code = root.get("code"); + if (code != null && code.asInt() == 0) { + LOG.info( + "commit offset for jobId {} taskId {} successfully", + jobId, + taskId); + return; + } } LOG.error( "commit offset failed with {}, reason {}, to retry", diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java index 3abd9eaabc285e..d24f61397a2977 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/sink/HttpPutBuilder.java @@ -18,6 +18,7 @@ package org.apache.doris.cdcclient.sink; import org.apache.doris.cdcclient.common.Constants; +import org.apache.doris.cdcclient.utils.HttpUtil; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.MapUtils; @@ -69,7 +70,7 @@ public HttpPutBuilder enable2PC() { } public HttpPutBuilder addTokenAuth(String token) { - header.put(HttpHeaders.AUTHORIZATION, "Basic YWRtaW46"); + header.put(HttpHeaders.AUTHORIZATION, HttpUtil.getAuthHeader()); header.put("token", token); return this; } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java index 556c186b5d4a55..065a3da2c09b16 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DebeziumJsonDeserializer.java @@ -38,7 +38,6 @@ import java.time.LocalTime; import java.time.ZoneId; import java.util.ArrayList; -import java.util.Collections; import java.util.Date; import java.util.HashMap; import java.util.List; @@ -58,6 +57,8 @@ import io.debezium.data.geometry.Geography; import io.debezium.data.geometry.Geometry; import io.debezium.data.geometry.Point; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; import io.debezium.time.MicroTime; import io.debezium.time.MicroTimestamp; import io.debezium.time.NanoTime; @@ -65,17 +66,19 @@ import io.debezium.time.Time; import io.debezium.time.Timestamp; import io.debezium.time.ZonedTimestamp; +import lombok.Getter; import lombok.Setter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -/** SourceRecord ==> [{},{}] */ +/** SourceRecord ==> DeserializeResult */ public class DebeziumJsonDeserializer - implements SourceRecordDeserializer> { + implements SourceRecordDeserializer { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(DebeziumJsonDeserializer.class); private static ObjectMapper objectMapper = new ObjectMapper(); @Setter private ZoneId serverTimeZone = ZoneId.systemDefault(); + @Getter @Setter protected Map tableSchemas; public DebeziumJsonDeserializer() {} @@ -86,15 +89,14 @@ public void init(Map props) { } @Override - public List deserialize(Map context, SourceRecord record) + public DeserializeResult deserialize(Map context, SourceRecord record) throws IOException { if (RecordUtils.isDataChangeRecord(record)) { LOG.trace("Process data change record: {}", record); - return deserializeDataChangeRecord(record); - } else if (RecordUtils.isSchemaChangeEvent(record)) { - return Collections.emptyList(); + List rows = deserializeDataChangeRecord(record); + return DeserializeResult.dml(rows); } else { - return Collections.emptyList(); + return DeserializeResult.empty(); } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java new file mode 100644 index 00000000000000..c1e69c77e5aa5b --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/DeserializeResult.java @@ -0,0 +1,92 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.deserialize; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + +/** Result of deserializing a SourceRecord. */ +public class DeserializeResult { + + public enum Type { + DML, + SCHEMA_CHANGE, + EMPTY + } + + private final Type type; + private final List records; + private final List ddls; + private final Map updatedSchemas; + + private DeserializeResult( + Type type, + List records, + List ddls, + Map updatedSchemas) { + this.type = type; + this.records = records; + this.ddls = ddls; + this.updatedSchemas = updatedSchemas; + } + + public static DeserializeResult dml(List records) { + return new DeserializeResult(Type.DML, records, null, null); + } + + public static DeserializeResult schemaChange( + List ddls, Map updatedSchemas) { + return new DeserializeResult( + Type.SCHEMA_CHANGE, Collections.emptyList(), ddls, updatedSchemas); + } + + /** + * Schema change result that also carries DML records from the triggering record. The + * coordinator should execute DDLs first, then write the records. + */ + public static DeserializeResult schemaChange( + List ddls, + Map updatedSchemas, + List records) { + return new DeserializeResult(Type.SCHEMA_CHANGE, records, ddls, updatedSchemas); + } + + public static DeserializeResult empty() { + return new DeserializeResult(Type.EMPTY, Collections.emptyList(), null, null); + } + + public Type getType() { + return type; + } + + public List getRecords() { + return records; + } + + public List getDdls() { + return ddls; + } + + public Map getUpdatedSchemas() { + return updatedSchemas; + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java new file mode 100644 index 00000000000000..b64c7186983784 --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/MySqlDebeziumJsonDeserializer.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.deserialize; + +import org.apache.doris.job.cdc.DataSourceConfigKeys; + +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; +import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.IOException; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * MySQL-specific deserializer that handles DDL schema change events. + * + *

When a schema change event is detected, it parses the HistoryRecord, computes the diff against + * stored tableSchemas, generates Doris ALTER TABLE SQL, and returns a SCHEMA_CHANGE result. + */ +public class MySqlDebeziumJsonDeserializer extends DebeziumJsonDeserializer { + private static final long serialVersionUID = 1L; + private static final Logger LOG = LoggerFactory.getLogger(MySqlDebeziumJsonDeserializer.class); + private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = + new FlinkJsonTableChangeSerializer(); + + private String targetDb; + + @Override + public void init(Map props) { + super.init(props); + this.targetDb = props.get(DataSourceConfigKeys.DATABASE); + } + + @Override + public DeserializeResult deserialize(Map context, SourceRecord record) + throws IOException { + if (RecordUtils.isSchemaChangeEvent(record)) { + return handleSchemaChangeEvent(record, context); + } + return super.deserialize(context, record); + } + + private DeserializeResult handleSchemaChangeEvent( + SourceRecord record, Map context) { + // todo: record has mysql ddl, need to convert doris ddl + return DeserializeResult.empty(); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java new file mode 100644 index 00000000000000..2dc2310054b70e --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/PostgresDebeziumJsonDeserializer.java @@ -0,0 +1,248 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.deserialize; + +import org.apache.doris.cdcclient.common.Constants; +import org.apache.doris.cdcclient.utils.SchemaChangeHelper; + +import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; +import org.apache.flink.util.Preconditions; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static io.debezium.connector.AbstractSourceInfo.SCHEMA_NAME_KEY; +import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY; + +import io.debezium.data.Envelope; +import io.debezium.relational.Column; +import io.debezium.relational.TableEditor; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * PostgreSQL-specific deserializer that detects schema changes (ADD/DROP column only) by comparing + * the record's Kafka Connect schema field names with stored tableSchemas. + * + *

Because PostgreSQL does not emit DDL events in the WAL stream, schema detection is done by + * comparing the "after" struct field names in each DML record against the known column set. + * + *

Type comparison is intentionally skipped to avoid false positives caused by Kafka Connect type + * ambiguity (e.g. text/varchar/json/uuid all appear as STRING). When a column add or drop is + * detected, the accurate column types are fetched directly from PostgreSQL via the injected {@link + * #pgSchemaRefresher} callback. + * + *

MODIFY column type is not supported — users must manually execute ALTER TABLE ... MODIFY + * COLUMN in Doris when a PG column type changes. + */ +public class PostgresDebeziumJsonDeserializer extends DebeziumJsonDeserializer { + private static final long serialVersionUID = 1L; + private static final Logger LOG = + LoggerFactory.getLogger(PostgresDebeziumJsonDeserializer.class); + + /** + * Callback to fetch the current PG table schema for a single table via JDBC. Injected by {@link + * org.apache.doris.cdcclient.source.reader.postgres.PostgresSourceReader} after initialization. + */ + private transient Function pgSchemaRefresher; + + public void setPgSchemaRefresher(Function refresher) { + this.pgSchemaRefresher = refresher; + } + + @Override + public DeserializeResult deserialize(Map context, SourceRecord record) + throws IOException { + if (!RecordUtils.isDataChangeRecord(record)) { + return DeserializeResult.empty(); + } + + Schema valueSchema = record.valueSchema(); + if (valueSchema == null) { + return super.deserialize(context, record); + } + + Field afterField = valueSchema.field(Envelope.FieldName.AFTER); + if (afterField == null) { + return super.deserialize(context, record); + } + + Schema afterSchema = afterField.schema(); + TableId tableId = extractTableId(record); + TableChanges.TableChange stored = tableSchemas != null ? tableSchemas.get(tableId) : null; + + // No baseline schema available — cannot detect changes, fall through to normal + // deserialization + if (stored == null || stored.getTable() == null) { + LOG.debug( + "No stored schema for table {}, skipping schema change detection.", + tableId.identifier()); + return super.deserialize(context, record); + } + + // First pass: name-only diff — fast, in-memory, no type comparison, no false positives + SchemaChangeHelper.SchemaDiff nameDiff = + SchemaChangeHelper.diffSchemaByName(afterSchema, stored); + if (nameDiff.isEmpty()) { + return super.deserialize(context, record); + } + + Preconditions.checkNotNull( + pgSchemaRefresher, + "pgSchemaRefresher callback is not set. Cannot fetch fresh PG schema for change detection."); + + // the last fresh schema + TableChanges.TableChange fresh = pgSchemaRefresher.apply(tableId); + if (fresh == null || fresh.getTable() == null) { + // Cannot proceed: DDL must be executed before the triggering DML record is written, + // otherwise new column data in this record would be silently dropped. + // Throwing here causes the batch to be retried from the same offset. + throw new IOException( + "Failed to fetch fresh schema for table " + + tableId.identifier() + + "; cannot apply schema change safely. Will retry."); + } + + // Second diff: use afterSchema as the source of truth for which columns the current WAL + // record is aware of. Only process additions/drops visible in afterSchema — columns that + // exist in fresh (JDBC) but are absent from afterSchema belong to a later DDL that has not + // yet produced a DML record, and will be processed when that DML record arrives. + // + // pgAdded: present in afterSchema but absent from stored → look up Column in fresh for + // accurate PG type metadata. If fresh doesn't have the column yet (shouldn't + // happen normally), skip it. + // pgDropped: present in stored but absent from afterSchema. + List pgAdded = new ArrayList<>(); + List pgDropped = new ArrayList<>(); + + for (Field field : afterSchema.fields()) { + if (stored.getTable().columnWithName(field.name()) == null) { + Column freshCol = fresh.getTable().columnWithName(field.name()); + if (freshCol != null) { + pgAdded.add(freshCol); + } + } + } + + for (Column col : stored.getTable().columns()) { + if (afterSchema.field(col.name()) == null) { + pgDropped.add(col.name()); + } + } + + // Second diff is empty: nameDiff was a false positive (PG schema unchanged vs stored). + // This happens when pgSchemaRefresher returns a schema ahead of the current WAL position + // (e.g. a later DDL was already applied in PG while we're still consuming older DML + // records). + // No DDL needed, no tableSchema update, no extra stream load — just process the DML + // normally. + if (pgAdded.isEmpty() && pgDropped.isEmpty()) { + return super.deserialize(context, record); + } + + // Build updatedSchemas from fresh filtered to afterSchema columns only, so that the stored + // cache does not jump ahead to include columns not yet seen by any DML record. Those + // unseen columns will trigger their own schema change when their first DML record arrives. + TableEditor editor = fresh.getTable().edit(); + for (Column col : fresh.getTable().columns()) { + if (afterSchema.field(col.name()) == null) { + editor.removeColumn(col.name()); + } + } + TableChanges.TableChange filteredChange = + new TableChanges.TableChange(TableChanges.TableChangeType.ALTER, editor.create()); + Map updatedSchemas = new HashMap<>(); + updatedSchemas.put(tableId, filteredChange); + + // Rename guard: simultaneous ADD+DROP may be a column RENAME — skip DDL to avoid data loss. + // Users must manually RENAME the column in Doris. + if (!pgAdded.isEmpty() && !pgDropped.isEmpty()) { + LOG.warn( + "[SCHEMA-CHANGE-SKIPPED] Table: {}\n" + + "Potential RENAME detected (simultaneous DROP+ADD).\n" + + "Dropped columns: {}\n" + + "Added columns: {}\n" + + "No DDL emitted to prevent data loss.\n" + + "Action required: manually RENAME column(s) in Doris," + + " then data will resume.", + tableId.identifier(), + pgDropped, + pgAdded.stream().map(Column::name).collect(Collectors.toList())); + List dmlRecords = super.deserialize(context, record).getRecords(); + return DeserializeResult.schemaChange( + Collections.emptyList(), updatedSchemas, dmlRecords); + } + + // Generate DDLs using accurate PG column types + String db = context.get(Constants.DORIS_TARGET_DB); + List ddls = new ArrayList<>(); + + for (String colName : pgDropped) { + ddls.add(SchemaChangeHelper.buildDropColumnSql(db, tableId.table(), colName)); + } + + for (Column col : pgAdded) { + String colType = SchemaChangeHelper.columnToDorisType(col); + String nullable = col.isOptional() ? "" : " NOT NULL"; + // pgAdded only contains columns present in afterSchema, so field lookup is safe. + // afterSchema.defaultValue() returns an already-deserialized Java object + // (e.g. String "hello", Integer 42) — no PG SQL cast suffix to strip. + // PG WAL DML records do not carry column comment metadata. + Object defaultObj = afterSchema.field(col.name()).schema().defaultValue(); + ddls.add( + SchemaChangeHelper.buildAddColumnSql( + db, + tableId.table(), + col.name(), + colType + nullable, + defaultObj != null ? String.valueOf(defaultObj) : null, + null)); + } + + List dmlRecords = super.deserialize(context, record).getRecords(); + + LOG.info( + "Postgres schema change detected for table {}: added={}, dropped={}. DDLs: {}", + tableId.identifier(), + pgAdded.stream().map(Column::name).collect(Collectors.toList()), + pgDropped, + ddls); + + return DeserializeResult.schemaChange(ddls, updatedSchemas, dmlRecords); + } + + private TableId extractTableId(SourceRecord record) { + Struct value = (Struct) record.value(); + Struct source = value.getStruct(Envelope.FieldName.SOURCE); + String schemaName = source.getString(SCHEMA_NAME_KEY); + String tableName = source.getString(TABLE_NAME_KEY); + return new TableId(null, schemaName, tableName); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java index f93567a230abea..cc0a519da0716e 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/deserialize/SourceRecordDeserializer.java @@ -21,8 +21,13 @@ import java.io.Serializable; import java.util.Map; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + public interface SourceRecordDeserializer extends Serializable { void init(Map props); C deserialize(Map context, T record) throws IOException; + + default void setTableSchemas(Map tableSchemas) {} } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java new file mode 100644 index 00000000000000..6ebf75a99aaa90 --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java @@ -0,0 +1,171 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.reader; + +import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; +import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer; +import org.apache.doris.cdcclient.utils.SchemaChangeManager; +import org.apache.doris.job.cdc.request.JobBaseRecordRequest; + +import org.apache.flink.cdc.connectors.base.utils.SerializerUtils; +import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; +import org.apache.kafka.connect.source.SourceRecord; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.debezium.document.Document; +import io.debezium.document.DocumentReader; +import io.debezium.document.DocumentWriter; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; +import lombok.Getter; +import lombok.Setter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Abstract base class providing common schema-tracking functionality for CDC source readers. + * + *

Handles serialization/deserialization of {@code tableSchemas} between FE and cdc_client, and + * provides a helper to load schemas from the incoming {@link JobBaseRecordRequest}. + */ +@Getter +@Setter +public abstract class AbstractCdcSourceReader implements SourceReader { + private static final Logger LOG = LoggerFactory.getLogger(AbstractCdcSourceReader.class); + + protected static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = + new FlinkJsonTableChangeSerializer(); + + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + protected SourceRecordDeserializer serializer; + protected Map tableSchemas; + + /** + * Load tableSchemas from a JSON string (produced by {@link #serializeTableSchemas()}). Used + * when a binlog/stream split is resumed from FE-persisted state. + * + *

Format: {@code [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}. + */ + public void loadTableSchemasFromJson(String json) throws IOException { + if (json == null || json.isEmpty()) { + return; + } + JsonNode root = OBJECT_MAPPER.readTree(json); + Map map = new ConcurrentHashMap<>(); + DocumentReader docReader = DocumentReader.defaultReader(); + for (JsonNode entry : root) { + boolean uc = entry.path("uc").asBoolean(false); + TableId tableId = TableId.parse(entry.get("i").asText(), uc); + Document doc = docReader.read(OBJECT_MAPPER.writeValueAsString(entry.get("c"))); + TableChanges.TableChange change = FlinkJsonTableChangeSerializer.fromDocument(doc, uc); + map.put(tableId, change); + } + this.tableSchemas = map; + this.serializer.setTableSchemas(map); + LOG.info("Loaded {} table schemas from JSON", map.size()); + } + + /** + * Serialize current tableSchemas to a compact JSON string for FE persistence. + * + *

Stores the Debezium document as a nested JSON object (not a string) to avoid redundant + * escaping. Format: {@code + * [{"i":"\"schema\".\"table\"","uc":false,"c":{...debeziumDoc...}},...]}. + */ + @Override + public String serializeTableSchemas() { + if (tableSchemas == null || tableSchemas.isEmpty()) { + return null; + } + try { + DocumentWriter docWriter = DocumentWriter.defaultWriter(); + ArrayNode result = OBJECT_MAPPER.createArrayNode(); + for (Map.Entry e : tableSchemas.entrySet()) { + TableId tableId = e.getKey(); + // useCatalogBeforeSchema: false when catalog is null but schema is set (e.g. PG) + boolean uc = SerializerUtils.shouldUseCatalogBeforeSchema(tableId); + ObjectNode entry = OBJECT_MAPPER.createObjectNode(); + entry.put("i", tableId.toDoubleQuotedString()); + entry.put("uc", uc); + // parse compact doc JSON into a JsonNode so "c" is a nested object, not a string + entry.set( + "c", + OBJECT_MAPPER.readTree( + docWriter.write(TABLE_CHANGE_SERIALIZER.toDocument(e.getValue())))); + result.add(entry); + } + return OBJECT_MAPPER.writeValueAsString(result); + } catch (Exception e) { + // Return null so the current batch is not failed — data keeps flowing and + // schema persistence will be retried on the next DDL or feHadNoSchema batch. + // For PostgreSQL this is safe: WAL records carry afterSchema so the next DML + // will re-trigger schema-change detection and self-heal. + // WARNING: for MySQL (schema change not yet implemented), returning null here + // is dangerous — MySQL binlog has no inline schema, so loading a stale + // pre-DDL schema from FE on the next task would cause column mismatches + // (flink-cdc#732). When MySQL schema change is implemented, this must throw + // instead of returning null to prevent committing the offset with a stale schema. + LOG.error( + "Failed to serialize tableSchemas, schema will not be persisted to FE" + + " in this cycle. Will retry on next DDL or batch.", + e); + return null; + } + } + + /** Apply schema changes to in-memory tableSchemas and notify the serializer. */ + @Override + public void applySchemaChange(Map updatedSchemas) { + if (updatedSchemas == null || updatedSchemas.isEmpty()) { + return; + } + if (tableSchemas == null) { + tableSchemas = new ConcurrentHashMap<>(updatedSchemas); + } else { + tableSchemas.putAll(updatedSchemas); + } + serializer.setTableSchemas(tableSchemas); + } + + /** + * Load FE-persisted tableSchemas into memory from the incoming request. + * + *

FE's schema and offset are always committed together, so FE's schema always corresponds to + * the starting offset of the current batch. Loading it unconditionally ensures the deserializer + * uses the correct baseline — particularly critical for MySQL: Flink CDC only retains the + * latest schema in memory, so if a previous batch executed a DDL but failed to commit the + * offset, retrying from the pre-DDL offset with a stale post-DDL cache would cause + * schema-mismatch errors on every retry (see flink-cdc#732). PostgreSQL is unaffected by this + * because WAL records carry the schema at the time they were written, but loading FE's schema + * unconditionally is still correct: any re-detected DDL will be handled idempotently by {@link + * SchemaChangeManager}. + * + *

Call this at the start of preparing a binlog/stream split. + */ + protected void tryLoadTableSchemasFromRequest(JobBaseRecordRequest baseReq) throws IOException { + loadTableSchemasFromJson(baseReq.getTableSchemas()); + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 36111d0fbf4c0c..5b8e343faae5cb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -18,7 +18,7 @@ package org.apache.doris.cdcclient.source.reader; import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer; -import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer; +import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; import org.apache.doris.cdcclient.source.factory.DataSource; import org.apache.doris.job.cdc.DataSourceConfigKeys; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; @@ -83,11 +83,9 @@ import org.slf4j.LoggerFactory; @Data -public abstract class JdbcIncrementalSourceReader implements SourceReader { +public abstract class JdbcIncrementalSourceReader extends AbstractCdcSourceReader { private static final Logger LOG = LoggerFactory.getLogger(JdbcIncrementalSourceReader.class); private static ObjectMapper objectMapper = new ObjectMapper(); - private SourceRecordDeserializer> serializer; - private Map tableSchemas; // Support for multiple snapshot splits private List< @@ -334,6 +332,22 @@ private SplitReadResult prepareSnapshotSplits( /** Prepare stream split */ private SplitReadResult prepareStreamSplit( Map offsetMeta, JobBaseRecordRequest baseReq) throws Exception { + // Load tableSchemas from FE if available (avoids re-discover on restart) + tryLoadTableSchemasFromRequest(baseReq); + // If still null (incremental-only startup, or snapshot→binlog transition where FE never + // persisted schema), do a JDBC discover so the deserializer has a baseline to diff against. + if (this.tableSchemas == null) { + LOG.info( + "No tableSchemas available for stream split, discovering via JDBC for job {}", + baseReq.getJobId()); + Map discovered = getTableSchemas(baseReq); + this.tableSchemas = new java.util.concurrent.ConcurrentHashMap<>(discovered); + this.serializer.setTableSchemas(this.tableSchemas); + LOG.info( + "Discovered {} table schema(s) for job {}", + discovered.size(), + baseReq.getJobId()); + } Tuple2 splitFlag = createStreamSplit(offsetMeta, baseReq); this.streamSplit = splitFlag.f0.asStreamSplit(); this.streamReader = getBinlogSplitReader(baseReq); @@ -908,7 +922,7 @@ public void close(JobBaseConfig jobConfig) { } @Override - public List deserialize(Map config, SourceRecord element) + public DeserializeResult deserialize(Map config, SourceRecord element) throws IOException { return serializer.deserialize(config, element); } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java index 6c1a018dde32c0..fa4578d509b1c2 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/SourceReader.java @@ -17,6 +17,7 @@ package org.apache.doris.cdcclient.source.reader; +import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; import org.apache.doris.cdcclient.source.factory.DataSource; import org.apache.doris.job.cdc.request.CompareOffsetRequest; import org.apache.doris.job.cdc.request.FetchTableSplitsRequest; @@ -32,6 +33,9 @@ import java.util.List; import java.util.Map; +import io.debezium.relational.TableId; +import io.debezium.relational.history.TableChanges; + /** Source Reader Interface */ public interface SourceReader { String SPLIT_ID = "splitId"; @@ -75,7 +79,19 @@ public interface SourceReader { /** Called when closing */ void close(JobBaseConfig jobConfig); - List deserialize(Map config, SourceRecord element) throws IOException; + DeserializeResult deserialize(Map config, SourceRecord element) + throws IOException; + + /** + * Apply schema changes to the in-memory tableSchemas. Called after schema change is executed on + * Doris. + */ + default void applySchemaChange(Map updatedSchemas) {} + + /** Serialize current tableSchemas to JSON for persistence via commitOffset. */ + default String serializeTableSchemas() { + return null; + } /** * Commits the given offset with the source database. Used by some source like Postgres to diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 83c9b349e4e5ad..11e5007894d293 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -17,11 +17,11 @@ package org.apache.doris.cdcclient.source.reader.mysql; -import org.apache.doris.cdcclient.source.deserialize.DebeziumJsonDeserializer; -import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer; +import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; +import org.apache.doris.cdcclient.source.deserialize.MySqlDebeziumJsonDeserializer; import org.apache.doris.cdcclient.source.factory.DataSource; +import org.apache.doris.cdcclient.source.reader.AbstractCdcSourceReader; import org.apache.doris.cdcclient.source.reader.SnapshotReaderContext; -import org.apache.doris.cdcclient.source.reader.SourceReader; import org.apache.doris.cdcclient.source.reader.SplitReadResult; import org.apache.doris.cdcclient.source.reader.SplitRecords; import org.apache.doris.cdcclient.utils.ConfigUtil; @@ -62,7 +62,6 @@ import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import org.apache.flink.cdc.connectors.mysql.table.StartupMode; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; -import org.apache.flink.cdc.debezium.history.FlinkJsonTableChangeSerializer; import org.apache.flink.table.types.logical.RowType; import org.apache.kafka.connect.source.SourceRecord; @@ -110,13 +109,9 @@ import org.slf4j.LoggerFactory; @Data -public class MySqlSourceReader implements SourceReader { +public class MySqlSourceReader extends AbstractCdcSourceReader { private static final Logger LOG = LoggerFactory.getLogger(MySqlSourceReader.class); private static ObjectMapper objectMapper = new ObjectMapper(); - private static final FlinkJsonTableChangeSerializer TABLE_CHANGE_SERIALIZER = - new FlinkJsonTableChangeSerializer(); - private SourceRecordDeserializer> serializer; - private Map tableSchemas; // Support for multiple snapshot splits with Round-Robin polling private List< @@ -135,7 +130,7 @@ public class MySqlSourceReader implements SourceReader { private MySqlBinlogSplitState binlogSplitState; public MySqlSourceReader() { - this.serializer = new DebeziumJsonDeserializer(); + this.serializer = new MySqlDebeziumJsonDeserializer(); this.snapshotReaderContexts = new ArrayList<>(); } @@ -339,6 +334,8 @@ private SplitReadResult prepareSnapshotSplits( /** Prepare binlog split */ private SplitReadResult prepareBinlogSplit( Map offsetMeta, JobBaseRecordRequest baseReq) throws Exception { + // Load tableSchemas from FE if available (avoids re-discover on restart) + tryLoadTableSchemasFromRequest(baseReq); Tuple2 splitFlag = createBinlogSplit(offsetMeta, baseReq); this.binlogSplit = (MySqlBinlogSplit) splitFlag.f0; this.binlogReader = getBinlogSplitReader(baseReq); @@ -778,6 +775,8 @@ private MySqlSourceConfig generateMySqlConfig(Map cdcConfig, Str configFactory.serverTimeZone( ConfigUtil.getTimeZoneFromProps(cu.getOriginalProperties()).toString()); + // Schema change handling for MySQL is not yet implemented; keep disabled to avoid + // unnecessary processing overhead until DDL support is added. configFactory.includeSchemaChanges(false); String includingTables = cdcConfig.get(DataSourceConfigKeys.INCLUDE_TABLES); @@ -992,7 +991,7 @@ public void close(JobBaseConfig jobConfig) { } @Override - public List deserialize(Map config, SourceRecord element) + public DeserializeResult deserialize(Map config, SourceRecord element) throws IOException { return serializer.deserialize(config, element); } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 5a9fa095941ace..f41b89535bb3f5 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -19,6 +19,7 @@ import org.apache.doris.cdcclient.common.Constants; import org.apache.doris.cdcclient.exception.CdcClientException; +import org.apache.doris.cdcclient.source.deserialize.PostgresDebeziumJsonDeserializer; import org.apache.doris.cdcclient.source.factory.DataSource; import org.apache.doris.cdcclient.source.reader.JdbcIncrementalSourceReader; import org.apache.doris.cdcclient.utils.ConfigUtil; @@ -54,6 +55,7 @@ import java.time.Duration; import java.time.Instant; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -82,6 +84,7 @@ public class PostgresSourceReader extends JdbcIncrementalSourceReader { public PostgresSourceReader() { super(); + this.setSerializer(new PostgresDebeziumJsonDeserializer()); } @Override @@ -91,6 +94,12 @@ public void initialize(long jobId, DataSource dataSource, Map co LOG.info("Creating slot for job {}, user {}", jobId, sourceConfig.getUsername()); createSlotForGlobalStreamSplit(dialect); super.initialize(jobId, dataSource, config); + // Inject PG schema refresher so the deserializer can fetch accurate column types on DDL + if (serializer instanceof PostgresDebeziumJsonDeserializer) { + ((PostgresDebeziumJsonDeserializer) serializer) + .setPgSchemaRefresher( + tableId -> refreshSingleTableSchema(tableId, config, jobId)); + } } /** @@ -343,6 +352,29 @@ protected Map discoverTableSchemas(JobBaseCon } } + /** + * Fetch the current schema for a single table directly from PostgreSQL via JDBC. + * + *

Called by {@link PostgresDebeziumJsonDeserializer} when a schema change (ADD/DROP column) + * is detected, to obtain accurate PG column types for DDL generation. + * + * @return the fresh {@link TableChanges.TableChange} + */ + private TableChanges.TableChange refreshSingleTableSchema( + TableId tableId, Map config, long jobId) { + PostgresSourceConfig sourceConfig = generatePostgresConfig(config, jobId, 0); + PostgresDialect dialect = new PostgresDialect(sourceConfig); + try (JdbcConnection jdbcConnection = dialect.openJdbcConnection(sourceConfig)) { + CustomPostgresSchema customPostgresSchema = + new CustomPostgresSchema((PostgresConnection) jdbcConnection, sourceConfig); + Map schemas = + customPostgresSchema.getTableSchema(Collections.singletonList(tableId)); + return schemas.get(tableId); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + @Override protected FetchTask createFetchTaskFromSplit( JobBaseConfig jobConfig, SourceSplitBase split) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java index 4d1356003fba60..05407b2c89d039 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/HttpUtil.java @@ -52,4 +52,8 @@ protected boolean isRedirectable(String method) { .addInterceptorLast(new RequestContent(true)) .build(); } + + public static String getAuthHeader() { + return "Basic YWRtaW46"; + } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java new file mode 100644 index 00000000000000..5eea4f1f16f61a --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeHelper.java @@ -0,0 +1,291 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.DorisType; + +import org.apache.flink.util.Preconditions; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +import io.debezium.relational.Column; +import io.debezium.relational.history.TableChanges; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Utility class for generating Doris ALTER TABLE SQL from schema diffs. */ +public class SchemaChangeHelper { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeHelper.class); + private static final String ADD_DDL = "ALTER TABLE %s ADD COLUMN %s %s"; + private static final String DROP_DDL = "ALTER TABLE %s DROP COLUMN %s"; + + private SchemaChangeHelper() {} + + // ─── Schema diff result ──────────────────────────────────────────────────── + + /** + * Holds the result of a full schema comparison between an after-schema and stored TableChange. + */ + public static class SchemaDiff { + /** Fields present in afterSchema but absent from stored. */ + public final List added; + + /** Column names present in stored but absent from afterSchema. */ + public final List dropped; + + /** Same-named columns whose Doris type or default value differs. */ + public final Map modified; + + public SchemaDiff(List added, List dropped, Map modified) { + this.added = added; + this.dropped = dropped; + this.modified = modified; + } + + public boolean isEmpty() { + return added.isEmpty() && dropped.isEmpty() && modified.isEmpty(); + } + } + + // ─── Schema-diff helpers (Kafka Connect schema ↔ stored TableChange) ────── + + /** + * Name-only schema diff: compare field names in {@code afterSchema} against the stored {@link + * TableChanges.TableChange}, detecting added and dropped columns by name only. + * + *

Only support add and drop and not support modify and rename + * + *

When {@code stored} is null or empty, both lists are empty (no baseline to diff against). + */ + public static SchemaDiff diffSchemaByName(Schema afterSchema, TableChanges.TableChange stored) { + List added = new ArrayList<>(); + List dropped = new ArrayList<>(); + + if (afterSchema == null || stored == null || stored.getTable() == null) { + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // Detect added: fields present in afterSchema but absent from stored + for (Field field : afterSchema.fields()) { + if (stored.getTable().columnWithName(field.name()) == null) { + added.add(field); + } + } + + // Detect dropped: columns present in stored but absent from afterSchema + for (Column col : stored.getTable().columns()) { + if (afterSchema.field(col.name()) == null) { + dropped.add(col.name()); + } + } + + return new SchemaDiff(added, dropped, new LinkedHashMap<>()); + } + + // ─── Quoting helpers ────────────────────────────────────────────────────── + + /** Wrap a name in backticks if not already quoted. */ + public static String identifier(String name) { + if (name.startsWith("`") && name.endsWith("`")) { + return name; + } + return "`" + name + "`"; + } + + /** Return a fully-qualified {@code `db`.`table`} identifier string. */ + public static String quoteTableIdentifier(String db, String table) { + return identifier(db) + "." + identifier(table); + } + + /** + * Format a default value (already a plain Java string, not a raw SQL expression) into a form + * suitable for a Doris {@code DEFAULT} clause. + * + *

The caller is expected to pass a deserialized value — e.g. obtained from the + * Kafka Connect schema via {@code field.schema().defaultValue().toString()} — rather than a raw + * PG SQL expression. This avoids having to strip PG-specific type casts ({@code ::text}, etc.). + * + *

    + *
  • SQL keywords ({@code NULL}, {@code CURRENT_TIMESTAMP}, {@code TRUE}, {@code FALSE}) are + * returned as-is. + *
  • Numeric literals are returned as-is (no quotes). + *
  • Everything else is wrapped in single quotes. + *
+ */ + public static String quoteDefaultValue(String defaultValue) { + if (defaultValue == null) { + return null; + } + if (defaultValue.equalsIgnoreCase("current_timestamp") + || defaultValue.equalsIgnoreCase("null") + || defaultValue.equalsIgnoreCase("true") + || defaultValue.equalsIgnoreCase("false")) { + return defaultValue; + } + try { + Double.parseDouble(defaultValue); + return defaultValue; + } catch (NumberFormatException ignored) { + // fall through + } + return "'" + defaultValue.replace("'", "''") + "'"; + } + + /** Escape single quotes inside a COMMENT string. */ + public static String quoteComment(String comment) { + if (comment == null) { + return ""; + } + return comment.replace("'", "''"); + } + + // ─── DDL builders ───────────────────────────────────────────────────────── + + /** + * Build {@code ALTER TABLE ... ADD COLUMN} SQL. + * + * @param db target database + * @param table target table + * @param colName column name + * @param colType Doris column type string (including optional NOT NULL) + * @param defaultValue optional DEFAULT value; {@code null} = omit DEFAULT clause + * @param comment optional COMMENT; {@code null}/empty = omit COMMENT clause + */ + public static String buildAddColumnSql( + String db, + String table, + String colName, + String colType, + String defaultValue, + String comment) { + StringBuilder sb = + new StringBuilder( + String.format( + ADD_DDL, + quoteTableIdentifier(db, table), + identifier(colName), + colType)); + if (defaultValue != null) { + sb.append(" DEFAULT ").append(quoteDefaultValue(defaultValue)); + } + appendComment(sb, comment); + return sb.toString(); + } + + /** Build {@code ALTER TABLE ... DROP COLUMN} SQL. */ + public static String buildDropColumnSql(String db, String table, String colName) { + return String.format(DROP_DDL, quoteTableIdentifier(db, table), identifier(colName)); + } + + // ─── Type mapping ───────────────────────────────────────────────────────── + + /** Convert a Debezium Column to a Doris column type string (via PG type name). */ + public static String columnToDorisType(Column column) { + return pgTypeNameToDorisType(column.typeName(), column.length(), column.scale().orElse(-1)); + } + + /** Map a PostgreSQL native type name to a Doris type string. */ + static String pgTypeNameToDorisType(String pgTypeName, int length, int scale) { + Preconditions.checkNotNull(pgTypeName); + switch (pgTypeName.toLowerCase()) { + case "bool": + return DorisType.BOOLEAN; + case "bit": + return length == 1 ? DorisType.BOOLEAN : DorisType.STRING; + case "int2": + case "smallserial": + return DorisType.SMALLINT; + case "int4": + case "serial": + return DorisType.INT; + case "int8": + case "bigserial": + return DorisType.BIGINT; + case "float4": + return DorisType.FLOAT; + case "float8": + return DorisType.DOUBLE; + case "numeric": + { + int p = length > 0 ? Math.min(length, 38) : 38; + int s = scale >= 0 ? scale : 9; + return String.format("%s(%d, %d)", DorisType.DECIMAL, p, s); + } + case "bpchar": + { + if (length <= 0) { + return DorisType.STRING; + } + int len = length * 3; + if (len > 255) { + return String.format("%s(%s)", DorisType.VARCHAR, len); + } else { + return String.format("%s(%s)", DorisType.CHAR, len); + } + } + case "date": + return DorisType.DATE; + case "timestamp": + case "timestamptz": + { + int s = (scale >= 0 && scale <= 6) ? scale : 6; + return String.format("%s(%d)", DorisType.DATETIME, s); + } + // All remaining types map to STRING (aligned with JdbcPostgreSQLClient) + case "point": + case "line": + case "lseg": + case "box": + case "path": + case "polygon": + case "circle": + case "varchar": + case "text": + case "time": + case "timetz": + case "interval": + case "cidr": + case "inet": + case "macaddr": + case "varbit": + case "uuid": + case "bytea": + return DorisType.STRING; + case "json": + case "jsonb": + return DorisType.JSON; + default: + LOG.warn("Unrecognized PostgreSQL type '{}', defaulting to STRING", pgTypeName); + return DorisType.STRING; + } + } + + // ─── Internal helpers ───────────────────────────────────────────────────── + + private static void appendComment(StringBuilder sb, String comment) { + if (comment != null && !comment.isEmpty()) { + sb.append(" COMMENT '").append(quoteComment(comment)).append("'"); + } + } +} diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java new file mode 100644 index 00000000000000..b392df9cfcd6ae --- /dev/null +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/utils/SchemaChangeManager.java @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.util.EntityUtils; + +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Static utility class for executing DDL schema changes on the Doris FE via HTTP. */ +public class SchemaChangeManager { + + private static final Logger LOG = LoggerFactory.getLogger(SchemaChangeManager.class); + private static final String SCHEMA_CHANGE_API = "http://%s/api/query/default_cluster/%s"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final String COLUMN_EXISTS_MSG = "Can not add column which already exists"; + private static final String COLUMN_NOT_EXISTS_MSG = "Column does not exists"; + + private SchemaChangeManager() {} + + /** + * Execute a list of DDL statements on FE. Each statement is sent independently. + * + *

Idempotent errors (ADD COLUMN when column already exists, DROP COLUMN when column does not + * exist) are logged as warnings and silently skipped, so retries on a different BE after a + * failed commitOffset do not cause infinite failures. + * + * @param feAddr Doris FE address (host:port) + * @param db target database + * @param token FE auth token + * @param sqls DDL statements to execute + */ + public static void executeDdls(String feAddr, String db, String token, List sqls) + throws IOException { + if (sqls == null || sqls.isEmpty()) { + LOG.info("No DDL statements to execute"); + return; + } + for (String stmt : sqls) { + stmt = stmt.trim(); + if (stmt.isEmpty()) { + continue; + } + LOG.info("Executing DDL on FE {}: {}", feAddr, stmt); + execute(feAddr, db, token, stmt); + } + } + + /** + * Execute a single SQL statement via the FE query API. + * + *

Idempotent errors are swallowed with a warning; all other errors throw {@link + * IOException}. + */ + public static void execute(String feAddr, String db, String token, String sql) + throws IOException { + HttpPost post = buildHttpPost(feAddr, db, token, sql); + String responseBody = handleResponse(post); + LOG.info("Executed DDL {} with response: {}", sql, responseBody); + parseResponse(sql, responseBody); + } + + // ─── Internal helpers ───────────────────────────────────────────────────── + + private static HttpPost buildHttpPost(String feAddr, String db, String token, String sql) + throws IOException { + String url = String.format(SCHEMA_CHANGE_API, feAddr, db); + Map bodyMap = new HashMap<>(); + bodyMap.put("stmt", sql); + String body = OBJECT_MAPPER.writeValueAsString(bodyMap); + + HttpPost post = new HttpPost(url); + post.setHeader("Content-Type", "application/json;charset=UTF-8"); + post.setHeader("Authorization", HttpUtil.getAuthHeader()); + post.setHeader("token", token); + post.setEntity(new StringEntity(body, "UTF-8")); + return post; + } + + private static String handleResponse(HttpPost request) throws IOException { + try (CloseableHttpClient client = HttpUtil.getHttpClient(); + CloseableHttpResponse response = client.execute(request)) { + String responseBody = + response.getEntity() != null ? EntityUtils.toString(response.getEntity()) : ""; + LOG.debug("HTTP [{}]: {}", request.getURI(), responseBody); + return responseBody; + } + } + + /** + * Parse the FE response. Idempotent errors are logged as warnings and skipped; all other errors + * throw. + * + *

Idempotent conditions (can occur when a previous commitOffset failed and a fresh BE + * re-detects and re-executes the same DDL): + * + *

    + *
  • ADD COLUMN — "Can not add column which already exists": column was already added. + *
  • DROP COLUMN — "Column does not exists": column was already dropped. + *
+ */ + private static void parseResponse(String sql, String responseBody) throws IOException { + JsonNode root = OBJECT_MAPPER.readTree(responseBody); + JsonNode code = root.get("code"); + if (code != null && code.asInt() == 0) { + return; + } + + String msg = root.path("msg").asText(""); + + if (msg.contains(COLUMN_EXISTS_MSG)) { + LOG.warn("[DDL-IDEMPOTENT] Skipped ADD COLUMN (column already exists). SQL: {}", sql); + return; + } + if (msg.contains(COLUMN_NOT_EXISTS_MSG)) { + LOG.warn("[DDL-IDEMPOTENT] Skipped DROP COLUMN (column already absent). SQL: {}", sql); + return; + } + + LOG.warn("DDL execution failed. SQL: {}. Response: {}", sql, responseBody); + throw new IOException("Failed to execute schema change: " + responseBody); + } +} diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java new file mode 100644 index 00000000000000..b71fe609d4ace9 --- /dev/null +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/utils/SchemaChangeHelperTest.java @@ -0,0 +1,194 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.utils; + +import org.apache.doris.cdcclient.common.DorisType; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +/** Unit tests for {@link SchemaChangeHelper#pgTypeNameToDorisType}. */ +class SchemaChangeHelperTest { + + // ─── Integer types ──────────────────────────────────────────────────────── + + @Test + void integerTypes() { + assertEquals(DorisType.SMALLINT, map("int2", -1, -1)); + assertEquals(DorisType.SMALLINT, map("smallserial", -1, -1)); + assertEquals(DorisType.INT, map("int4", -1, -1)); + assertEquals(DorisType.INT, map("serial", -1, -1)); + assertEquals(DorisType.BIGINT, map("int8", -1, -1)); + assertEquals(DorisType.BIGINT, map("bigserial", -1, -1)); + } + + @Test + void floatTypes() { + assertEquals(DorisType.FLOAT, map("float4", -1, -1)); + assertEquals(DorisType.DOUBLE, map("float8", -1, -1)); + } + + // ─── Boolean / bit ─────────────────────────────────────────────────────── + + @Test + void boolType() { + assertEquals(DorisType.BOOLEAN, map("bool", -1, -1)); + } + + @Test + void bitType_singleBit_isBoolean() { + assertEquals(DorisType.BOOLEAN, map("bit", 1, -1)); + } + + @Test + void bitType_multiBit_isString() { + assertEquals(DorisType.STRING, map("bit", 8, -1)); + assertEquals(DorisType.STRING, map("bit", 64, -1)); + } + + // ─── Numeric / decimal ─────────────────────────────────────────────────── + + @Test + void numericType_defaultPrecisionScale() { + // length <= 0, scale < 0 → DECIMAL(38, 9) + assertEquals("DECIMAL(38, 9)", map("numeric", 0, -1)); + assertEquals("DECIMAL(38, 9)", map("numeric", -1, -1)); + } + + @Test + void numericType_explicitPrecisionScale() { + assertEquals("DECIMAL(10, 2)", map("numeric", 10, 2)); + assertEquals("DECIMAL(5, 0)", map("numeric", 5, 0)); + } + + @Test + void numericType_precisionCappedAt38() { + assertEquals("DECIMAL(38, 4)", map("numeric", 50, 4)); + assertEquals("DECIMAL(38, 9)", map("numeric", 100, -1)); + } + + // ─── Char types ────────────────────────────────────────────────────────── + + @Test + void bpchar_shortLength_isChar() { + // length=10 → 10*3=30 ≤ 255 → CHAR(30) + assertEquals("CHAR(30)", map("bpchar", 10, -1)); + assertEquals("CHAR(3)", map("bpchar", 1, -1)); + } + + @Test + void bpchar_longLength_isVarchar() { + // length=100 → 100*3=300 > 255 → VARCHAR(300) + assertEquals("VARCHAR(300)", map("bpchar", 100, -1)); + assertEquals("VARCHAR(768)", map("bpchar", 256, -1)); + } + + @Test + void varcharAndText_isString() { + assertEquals(DorisType.STRING, map("varchar", 50, -1)); + assertEquals(DorisType.STRING, map("varchar", -1, -1)); + assertEquals(DorisType.STRING, map("text", -1, -1)); + } + + // ─── Date / time ───────────────────────────────────────────────────────── + + @Test + void dateType() { + assertEquals(DorisType.DATE, map("date", -1, -1)); + } + + @Test + void timestampType_defaultScale_isDatetime6() { + // scale < 0 or > 6 → default to 6 + assertEquals("DATETIME(6)", map("timestamp", -1, -1)); + assertEquals("DATETIME(6)", map("timestamptz", -1, -1)); + assertEquals("DATETIME(6)", map("timestamp", -1, 7)); + } + + @Test + void timestampType_explicitScale() { + assertEquals("DATETIME(3)", map("timestamp", -1, 3)); + assertEquals("DATETIME(0)", map("timestamptz", -1, 0)); + assertEquals("DATETIME(6)", map("timestamp", -1, 6)); + } + + @Test + void timeTypes_isString() { + assertEquals(DorisType.STRING, map("time", -1, -1)); + assertEquals(DorisType.STRING, map("timetz", -1, -1)); + assertEquals(DorisType.STRING, map("interval", -1, -1)); + } + + // ─── JSON ──────────────────────────────────────────────────────────────── + + @Test + void jsonTypes() { + assertEquals(DorisType.JSON, map("json", -1, -1)); + assertEquals(DorisType.JSON, map("jsonb", -1, -1)); + } + + // ─── Geometric / network / misc types (all map to STRING) ──────────────── + + @Test + void networkAndMiscTypes_isString() { + assertEquals(DorisType.STRING, map("inet", -1, -1)); + assertEquals(DorisType.STRING, map("cidr", -1, -1)); + assertEquals(DorisType.STRING, map("macaddr", -1, -1)); + assertEquals(DorisType.STRING, map("uuid", -1, -1)); + assertEquals(DorisType.STRING, map("bytea", -1, -1)); + assertEquals(DorisType.STRING, map("varbit", -1, -1)); + } + + @Test + void geometricTypes_isString() { + assertEquals(DorisType.STRING, map("point", -1, -1)); + assertEquals(DorisType.STRING, map("line", -1, -1)); + assertEquals(DorisType.STRING, map("lseg", -1, -1)); + assertEquals(DorisType.STRING, map("box", -1, -1)); + assertEquals(DorisType.STRING, map("path", -1, -1)); + assertEquals(DorisType.STRING, map("polygon", -1, -1)); + assertEquals(DorisType.STRING, map("circle", -1, -1)); + } + + // ─── Unknown type fallback ─────────────────────────────────────────────── + + @Test + void unknownType_defaultsToString() { + assertEquals(DorisType.STRING, map("custom_type", -1, -1)); + assertEquals(DorisType.STRING, map("user_defined_enum", -1, -1)); + } + + // ─── Case-insensitive matching ──────────────────────────────────────────── + + @Test + void caseInsensitive() { + assertEquals(DorisType.INT, map("INT4", -1, -1)); + assertEquals(DorisType.BIGINT, map("INT8", -1, -1)); + assertEquals(DorisType.BOOLEAN, map("BOOL", -1, -1)); + assertEquals(DorisType.FLOAT, map("FLOAT4", -1, -1)); + assertEquals(DorisType.JSON, map("JSON", -1, -1)); + assertEquals(DorisType.STRING, map("TEXT", -1, -1)); + } + + // ─── helper ────────────────────────────────────────────────────────────── + + private static String map(String pgType, int length, int scale) { + return SchemaChangeHelper.pgTypeNameToDorisType(pgType, length, scale); + } +} diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out new file mode 100644 index 00000000000000..a0f35f029436f1 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.out @@ -0,0 +1,32 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !snapshot -- +A1 1 +B1 2 + +-- !add_column -- +A1 1 \N +B1 2 \N +C1 10 hello + +-- !add_column_dml -- +B1 99 updated +C1 10 world + +-- !drop_column -- +B1 99 +C1 10 +D1 20 + +-- !rename -- +B1 99 +C1 10 +D1 20 +E1 \N + +-- !modify -- +B1 99 +C1 10 +D1 20 +E1 \N +F1 \N + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out new file mode 100644 index 00000000000000..24907705c464fb --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.out @@ -0,0 +1,31 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !baseline -- +C1 30 + +-- !double_add -- +C1 30 \N \N +D1 40 hello 42 + +-- !rename_guard -- +C1 30 \N \N +D1 40 hello 42 +E1 50 \N 10 + +-- !rename_guard_update -- +C1 30 \N \N +D1 99 \N 42 +E1 50 \N 10 + +-- !default_col -- +C1 30 default_val +D1 99 default_val +E1 50 default_val +F1 60 default_val + +-- !not_null_col -- +C1 30 default_val required +D1 99 default_val required +E1 50 default_val required +F1 60 default_val required +G1 70 g1c4 explicit + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy new file mode 100644 index 00000000000000..bd590e1d97fa6e --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc.groovy @@ -0,0 +1,269 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Schema-change regression for the PostgreSQL CDC streaming job. + * + * Covers four scenarios in sequence on a single table: + * 1. ADD COLUMN – column added in PG → DDL executed in Doris, new data lands correctly. + * Also verifies: pre-ADD rows get NULL for the new column (existing-data + * correctness), and UPDATE/DELETE right after ADD COLUMN are propagated. + * 2. DROP COLUMN – column dropped in PG → DDL executed in Doris, subsequent data lands correctly. + * 3. RENAME COLUMN – rename detected as simultaneous ADD+DROP (rename guard) → + * no DDL in Doris, 'age' column remains, new rows get age=NULL. + * 4. MODIFY COLUMN – type-only change is invisible to the name-based diff → + * no DDL in Doris, data continues to flow. + */ +suite("test_streaming_postgres_job_sc", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_name_sc" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1_sc" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // ── helpers ─────────────────────────────────────────────────────────── + + // Wait until a specific row appears in the Doris target table. + def waitForRow = { String rowName -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'" + )[0][0] as int > 0 + }) + } + + // Wait until a column either exists or no longer exists in the Doris table. + def waitForColumn = { String colName, boolean shouldExist -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def desc = sql "DESC ${table1}" + desc.any { it[0] == colName } == shouldExist + }) + } + + // Wait until a specific row disappears from the Doris target table. + def waitForRowGone = { String rowName -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'" + )[0][0] as int == 0 + }) + } + + // Wait until a specific column value matches the expected value for a row. + // Comparison is done as strings to avoid JDBC numeric type mismatches (e.g. Short vs Integer). + def waitForValue = { String rowName, String colName, Object expected -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql "SELECT ${colName} FROM ${table1} WHERE name='${rowName}'" + rows.size() == 1 && String.valueOf(rows[0][0]) == String.valueOf(expected) + }) + } + + // Dump job/task state on assertion failures for easier debugging. + def dumpJobState = { + log.info("jobs : " + sql("""select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks : " + sql("""select * from tasks("type"="insert") where JobName='${jobName}'""")) + } + + // ── 0. Create PG table and insert snapshot rows ─────────────────────── + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgSchema}.${table1} ( + name VARCHAR(200) PRIMARY KEY, + age INT2 + )""" + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 1)""" + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 2)""" + } + + // ── 1. Start streaming job ──────────────────────────────────────────── + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + + // Verify the table was auto-created with the expected initial schema. + assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() == 1 + // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), Extra(5) + def initDesc = sql "DESC ${currentDb}.${table1}" + assert initDesc.find { it[0] == 'name' }[1] == 'varchar(65533)' : "name must be varchar(65533)" + assert initDesc.find { it[0] == 'age' }[1] == 'smallint' : "age must be smallint" + assert initDesc.find { it[0] == 'name' }[3] == 'true' : "name must be primary key" + + // Wait for snapshot to finish (job completes ≥ 2 tasks). + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(1, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") + where Name='${jobName}' and ExecuteType='STREAMING'""" + cnt.size() == 1 && cnt[0][0] as int >= 2 + }) + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // Snapshot data: A1(1), B1(2) + qt_snapshot """ SELECT name, age FROM ${table1} ORDER BY name """ + + // ── Phase 1: ADD COLUMN c1 ──────────────────────────────────────────── + // PG adds VARCHAR column c1; CDC detects ADD via name diff and executes + // ALTER TABLE … ADD COLUMN c1 on Doris. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 VARCHAR(50)""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1) VALUES ('C1', 10, 'hello')""" + } + + try { + waitForColumn('c1', true) + waitForRow('C1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // Verify c1 was added to Doris and the new row is present. + assert (sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must exist in Doris after ADD COLUMN" + + // Pre-ADD rows must have NULL for the new column (existing-data correctness). + assert (sql "SELECT c1 FROM ${table1} WHERE name='A1'")[0][0] == null : "A1.c1 must be NULL (pre-ADD row)" + assert (sql "SELECT c1 FROM ${table1} WHERE name='B1'")[0][0] == null : "B1.c1 must be NULL (pre-ADD row)" + + // A1(1,null), B1(2,null), C1(10,'hello') + qt_add_column """ SELECT name, age, c1 FROM ${table1} ORDER BY name """ + + // ── Phase 1b: UPDATE / DELETE immediately after ADD COLUMN ─────────── + // Verifies that UPDATE (touching the new column) and DELETE on pre-existing rows + // are correctly propagated to Doris after the schema change. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // Update the new column on the just-inserted row. + sql """UPDATE ${pgSchema}.${table1} SET c1='world' WHERE name='C1'""" + // Update both an old column and the new column on a pre-existing row. + sql """UPDATE ${pgSchema}.${table1} SET age=99, c1='updated' WHERE name='B1'""" + // Delete a pre-existing row. + sql """DELETE FROM ${pgSchema}.${table1} WHERE name='A1'""" + } + + try { + waitForRowGone('A1') + waitForValue('B1', 'age', 99) + waitForValue('C1', 'c1', 'world') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // A1 deleted; B1(99,'updated'); C1(10,'world') + qt_add_column_dml """ SELECT name, age, c1 FROM ${table1} ORDER BY name """ + + // ── Phase 2: DROP COLUMN c1 ─────────────────────────────────────────── + // PG drops c1; CDC detects DROP and executes ALTER TABLE … DROP COLUMN c1 on Doris. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age) VALUES ('D1', 20)""" + } + + try { + waitForColumn('c1', false) + waitForRow('D1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // Verify c1 was removed from Doris and data flows without it. + assert !(sql "DESC ${table1}").any { it[0] == 'c1' } : "c1 column must be gone from Doris after DROP COLUMN" + // B1(99), C1(10), D1(20) [A1 was deleted in Phase 1b] + qt_drop_column """ SELECT name, age FROM ${table1} ORDER BY name """ + + // ── Phase 3: RENAME COLUMN age → age2 (rename guard) ───────────────── + // PG rename looks like a simultaneous ADD(age2) + DROP(age) to the name diff. + // The rename guard detects this and emits a WARN with no DDL, so Doris schema + // is unchanged. New PG rows carry 'age2' which has no matching column in Doris, + // so 'age' is NULL for those rows. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} RENAME COLUMN age TO age2""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES ('E1', 30)""" + } + + try { + waitForRow('E1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // 'age' must still exist; 'age2' must NOT have been added. + def descAfterRename = sql "DESC ${table1}" + assert descAfterRename.any { it[0] == 'age' } : "'age' column must remain after rename guard" + assert !descAfterRename.any { it[0] == 'age2' } : "'age2' must NOT be added (rename guard, no DDL)" + // B1(99), C1(10), D1(20), E1(null) — age=NULL because PG sends age2 which Doris ignores + qt_rename """ SELECT name, age FROM ${table1} ORDER BY name """ + + // ── Phase 4: MODIFY COLUMN type (name-only diff, no DDL) ───────────── + // Type-only change is invisible to the name-based diff, so no DDL is emitted. + // Data continues to flow; age2 values still have no mapping in Doris → age=NULL. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} ALTER COLUMN age2 TYPE INT4""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age2) VALUES ('F1', 50)""" + } + + try { + waitForRow('F1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // Doris 'age' column type must remain smallint (mapped from PG int2). + assert (sql "DESC ${table1}").find { it[0] == 'age' }[1] == 'smallint' \ + : "Doris 'age' type must remain smallint after type-only change in PG" + // B1(99), C1(10), D1(20), E1(null), F1(null) + qt_modify """ SELECT name, age FROM ${table1} ORDER BY name """ + + assert (sql """select * from jobs("type"="insert") where Name='${jobName}'""")[0][5] == "RUNNING" + + // ── Cleanup ─────────────────────────────────────────────────────────── + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + assert (sql """select count(1) from jobs("type"="insert") where Name='${jobName}'""")[0][0] == 0 + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy new file mode 100644 index 00000000000000..6593f1cf4f2ac4 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_sc_advanced.groovy @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Advanced schema-change regression for the PostgreSQL CDC streaming job. + * + * Key differences from the basic schema-change test: + * - Uses offset=latest (incremental-only, no snapshot) to cover the code path where + * tableSchemas are discovered from PG JDBC rather than derived from snapshot splits. + * This exercises the feHadNoSchema=true branch in PipelineCoordinator. + * + * Covers uncommon scenarios: + * 1. Simultaneous double ADD – two columns added in PG before any DML triggers detection; + * both ALTER TABLEs are generated and executed in a single detection event. + * 2. DROP + ADD simultaneously (rename guard) – dropping one column while adding another + * is treated as a potential rename; no DDL is emitted but the cached schema is updated. + * 3. UPDATE on existing rows after rename guard – verifies that a row whose old column (c1) + * was dropped in PG gets c1=NULL in Doris after the next UPDATE (stream load replaces the + * whole row without c1 since PG no longer has it). + * 4. ADD COLUMN with DEFAULT value – verifies that the DEFAULT clause is passed through to + * Doris and that pre-existing rows automatically receive the default value after the DDL. + * 5. ADD COLUMN NOT NULL with DEFAULT – verifies the NOT NULL path in SchemaChangeHelper + * (col.isOptional()=false → appends NOT NULL) and that Doris accepts the DDL when a + * DEFAULT is present (satisfying the NOT NULL constraint for existing rows). + */ +suite("test_streaming_postgres_job_sc_advanced", + "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + + def jobName = "test_streaming_pg_sc_advanced" + def currentDb = (sql "select database()")[0][0] + def table1 = "user_info_pg_normal1_sc_adv" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // ── helpers ─────────────────────────────────────────────────────────── + + def waitForRow = { String rowName -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'" + )[0][0] as int > 0 + }) + } + + def waitForRowGone = { String rowName -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + (sql "SELECT COUNT(*) FROM ${table1} WHERE name='${rowName}'" + )[0][0] as int == 0 + }) + } + + def waitForColumn = { String colName, boolean shouldExist -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def desc = sql "DESC ${table1}" + desc.any { it[0] == colName } == shouldExist + }) + } + + // Comparison is done as strings to avoid JDBC numeric type mismatches. + def waitForValue = { String rowName, String colName, Object expected -> + Awaitility.await().atMost(120, SECONDS).pollInterval(2, SECONDS).until({ + def rows = sql "SELECT ${colName} FROM ${table1} WHERE name='${rowName}'" + rows.size() == 1 && String.valueOf(rows[0][0]) == String.valueOf(expected) + }) + } + + def dumpJobState = { + log.info("jobs : " + sql("""select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks : " + sql("""select * from tasks("type"="insert") where JobName='${jobName}'""")) + } + + // ── 0. Pre-create PG table with existing rows ───────────────────────── + // A1, B1 are inserted BEFORE the job starts with offset=latest. + // They will NOT appear in Doris (no snapshot taken). + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgSchema}.${table1} ( + name VARCHAR(200) PRIMARY KEY, + age INT4 + )""" + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('A1', 10)""" + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('B1', 20)""" + } + + // ── 1. Start streaming job with offset=latest ───────────────────────── + // The Doris table is auto-created from the PG schema at job creation time. + // Streaming begins from the current WAL LSN — A1 and B1 are not captured. + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + )""" + + assert (sql "SHOW TABLES FROM ${currentDb} LIKE '${table1}'").size() == 1 + + // Wait for job to enter RUNNING state (streaming split established). + try { + Awaitility.await().atMost(120, SECONDS).pollInterval(1, SECONDS).until({ + def rows = sql """select Status from jobs("type"="insert") + where Name='${jobName}' and ExecuteType='STREAMING'""" + rows.size() == 1 && rows[0][0] == "RUNNING" + }) + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // Baseline: insert C1 to verify streaming is active. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgSchema}.${table1} VALUES ('C1', 30)""" + } + + try { + waitForRow('C1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // A1, B1 must NOT be present (offset=latest, no snapshot). + assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='A1'")[0][0] as int == 0 \ + : "A1 must not be present (offset=latest)" + assert (sql "SELECT COUNT(*) FROM ${table1} WHERE name='B1'")[0][0] as int == 0 \ + : "B1 must not be present (offset=latest)" + + // Only C1(30) should be in Doris. + qt_baseline """ SELECT name, age FROM ${table1} ORDER BY name """ + + // ── Phase 1: Simultaneous double ADD (c1 TEXT, c2 INT4) ────────────── + // Both ALTER TABLEs happen in PG before any DML triggers CDC detection. + // The single INSERT D1 triggers the detection, which fetches the fresh PG schema + // (already containing both c1 and c2), and generates two ADD COLUMN DDLs in one shot. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c1 TEXT""" + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c2 INT4""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age, c1, c2) VALUES ('D1', 40, 'hello', 42)""" + } + + try { + waitForColumn('c1', true) + waitForColumn('c2', true) + waitForRow('D1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), Extra(5) + def descAfterDoubleAdd = sql "DESC ${table1}" + assert descAfterDoubleAdd.find { it[0] == 'c1' }[1] == 'text' : "c1 must be added as text" + assert descAfterDoubleAdd.find { it[0] == 'c2' }[1] == 'int' : "c2 must be added as int" + + // Pre-double-ADD row C1 must have NULL for both new columns. + assert (sql "SELECT c1 FROM ${table1} WHERE name='C1'")[0][0] == null : "C1.c1 must be NULL" + assert (sql "SELECT c2 FROM ${table1} WHERE name='C1'")[0][0] == null : "C1.c2 must be NULL" + + // C1(30,null,null), D1(40,'hello',42) + qt_double_add """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY name """ + + // ── Phase 2: DROP c1 + ADD c3 simultaneously (rename guard) ────────── + // Dropping c1 and adding c3 in the same batch looks like a rename to the CDC detector: + // simultaneous ADD+DROP triggers the guard → no DDL emitted, cached schema updated to + // reflect the fresh PG state (c1 gone, c3 present). + // Doris table is left with c1 still present; c3 is never added. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """ALTER TABLE ${pgSchema}.${table1} DROP COLUMN c1""" + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c3 INT4""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3) VALUES ('E1', 50, 10, 99)""" + } + + try { + waitForRow('E1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + def descAfterRenameGuard = sql "DESC ${table1}" + assert descAfterRenameGuard.any { it[0] == 'c1' } : "c1 must remain (rename guard prevented DROP)" + assert !descAfterRenameGuard.any { it[0] == 'c3' } : "c3 must NOT be added (rename guard prevented ADD)" + + // E1.c1=NULL: PG has c3 (not c1), Doris ignores c3 and writes NULL for c1. + assert (sql "SELECT c1 FROM ${table1} WHERE name='E1'")[0][0] == null : "E1.c1 must be NULL" + + // C1(30,null,null), D1(40,'hello',42), E1(50,null,10) + qt_rename_guard """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY name """ + + // ── Phase 3: UPDATE existing row after rename guard ─────────────────── + // D1 had c1='hello' at insert time. After the rename guard fires, the cached schema + // reflects PG reality (c1 gone, c3 present). When D1 is updated in PG (only c3 exists + // for non-key columns), the DML record carries no c1 field. Stream load replaces the + // entire row → D1.c1 becomes NULL in Doris. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // PG now has columns: name, age, c2, c3 (c1 was dropped) + sql """UPDATE ${pgSchema}.${table1} SET age=99, c3=88 WHERE name='D1'""" + } + + try { + waitForValue('D1', 'age', 99) + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // D1.c1 was 'hello' but after UPDATE the stream load has no c1 field → + // Doris replaces the row without c1 → c1=NULL. + assert (sql "SELECT c1 FROM ${table1} WHERE name='D1'")[0][0] == null \ + : "D1.c1 must be NULL after UPDATE (c1 dropped from PG, not in stream load record)" + + // C1(30,null,null), D1(99,null,null), E1(50,null,10) + qt_rename_guard_update """ SELECT name, age, c1, c2 FROM ${table1} ORDER BY name """ + + // ── Phase 4: ADD COLUMN with DEFAULT value ──────────────────────────── + // PG adds a nullable TEXT column with a DEFAULT value. + // buildAddColumnSql picks up col.defaultValueExpression() and appends DEFAULT 'default_val' + // to the Doris ALTER TABLE. After the DDL, Doris fills the default for all pre-existing + // rows (metadata operation), so C1/D1/E1 all get c4='default_val' without any DML replay. + // F1 is inserted without an explicit c4 value → PG fills in the default → WAL record + // already carries c4='default_val', so Doris writes 'default_val' for F1 as well. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // PG current cols: name, age, c2, c3 + sql """ALTER TABLE ${pgSchema}.${table1} ADD COLUMN c4 TEXT DEFAULT 'default_val'""" + // Trigger schema-change detection; omit c4 → PG fills default in WAL record. + sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3) VALUES ('F1', 60, 20, 77)""" + } + + try { + waitForColumn('c4', true) + waitForRow('F1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), Extra(5) + def descAfterDefaultAdd = sql "DESC ${table1}" + def c4Row = descAfterDefaultAdd.find { it[0] == 'c4' } + assert c4Row != null : "c4 must be added" + assert c4Row[4] == 'default_val' : "c4 must carry DEFAULT 'default_val', got: ${c4Row[4]}" + + // Pre-existing rows receive the default value from Doris's ALTER TABLE (not from DML replay). + try { + waitForValue('C1', 'c4', 'default_val') + waitForValue('D1', 'c4', 'default_val') + waitForValue('E1', 'c4', 'default_val') + waitForValue('F1', 'c4', 'default_val') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // C1(30,_,default_val), D1(99,_,default_val), E1(50,_,default_val), F1(60,_,default_val) + qt_default_col """ SELECT name, age, c4 FROM ${table1} ORDER BY name """ + + // ── Phase 5: ADD COLUMN NOT NULL with DEFAULT ───────────────────────── + // In PG, adding a NOT NULL column to a non-empty table requires a DEFAULT so existing rows + // satisfy the constraint. Debezium captures col.isOptional()=false, so SchemaChangeHelper + // appends NOT NULL to the Doris column type, and the DEFAULT clause is also passed through. + // With both NOT NULL and DEFAULT, Doris can apply the DDL: existing rows get the default + // value (satisfying NOT NULL), and new rows must supply a value or receive the default. + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // PG current cols: name, age, c2, c3, c4 + sql """ALTER TABLE ${pgSchema}.${table1} + ADD COLUMN c5 TEXT NOT NULL DEFAULT 'required'""" + sql """INSERT INTO ${pgSchema}.${table1} (name, age, c2, c3, c4, c5) + VALUES ('G1', 70, 30, 66, 'g1c4', 'explicit')""" + } + + try { + waitForColumn('c5', true) + waitForRow('G1') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // DESC columns: Field(0), Type(1), Null(2), Key(3), Default(4), Extra(5) + def descAfterNotNullAdd = sql "DESC ${table1}" + def c5Row = descAfterNotNullAdd.find { it[0] == 'c5' } + assert c5Row != null : "c5 must be added" + assert c5Row[4] == 'required' : "c5 must carry DEFAULT 'required', got: ${c5Row[4]}" + + // Pre-existing rows must have the default value (Doris ALTER TABLE fills it). + // G1 was inserted with an explicit 'explicit' value. + try { + waitForValue('C1', 'c5', 'required') + waitForValue('D1', 'c5', 'required') + waitForValue('G1', 'c5', 'explicit') + } catch (Exception ex) { + dumpJobState() + throw ex + } + + // C1(_,default_val,required), D1(_,default_val,required), ...G1(_,g1c4,explicit) + qt_not_null_col """ SELECT name, age, c4, c5 FROM ${table1} ORDER BY name """ + + assert (sql """select * from jobs("type"="insert") where Name='${jobName}'""")[0][5] == "RUNNING" + + // ── Cleanup ─────────────────────────────────────────────────────────── + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + assert (sql """select count(1) from jobs("type"="insert") where Name='${jobName}'""")[0][0] == 0 + } +}