diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java index 6ebf75a99aaa90..f006d791528fdb 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReader.java @@ -20,6 +20,7 @@ import org.apache.doris.cdcclient.source.deserialize.DeserializeResult; import org.apache.doris.cdcclient.source.deserialize.SourceRecordDeserializer; import org.apache.doris.cdcclient.utils.SchemaChangeManager; +import org.apache.doris.job.cdc.request.JobBaseConfig; import org.apache.doris.job.cdc.request.JobBaseRecordRequest; import org.apache.flink.cdc.connectors.base.utils.SerializerUtils; @@ -37,6 +38,7 @@ import io.debezium.document.Document; import io.debezium.document.DocumentReader; import io.debezium.document.DocumentWriter; +import io.debezium.relational.Column; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; import lombok.Getter; @@ -63,6 +65,49 @@ public abstract class AbstractCdcSourceReader implements SourceReader { protected SourceRecordDeserializer serializer; protected Map tableSchemas; + private final Map> splitKeyClassCache = new ConcurrentHashMap<>(); + + protected abstract Class probeSplitKeyClass( + TableId tableId, Column splitColumn, JobBaseConfig jobConfig); + + protected Class resolveSplitKeyClass( + TableId tableId, Column splitColumn, JobBaseConfig jobConfig) { + String key = tableId.identifier() + "." + splitColumn.name(); + return splitKeyClassCache.computeIfAbsent( + key, k -> probeSplitKeyClass(tableId, splitColumn, jobConfig)); + } + + protected static Object[] convertBounds(Object[] raw, Class target, ObjectMapper mapper) { + if (raw == null) { + return null; + } + Object[] out = new Object[raw.length]; + for (int i = 0; i < raw.length; i++) { + out[i] = convertBound(raw[i], target, mapper); + } + return out; + } + + private static Object convertBound(Object v, Class target, ObjectMapper mapper) { + if (v == null) { + return null; + } + if (target.isInstance(v)) { + return v; + } + String s = v.toString(); + if (target == java.sql.Date.class) { + return java.sql.Date.valueOf(s); + } + if (target == java.sql.Timestamp.class) { + return java.sql.Timestamp.valueOf(s); + } + if (target == java.sql.Time.class) { + return java.sql.Time.valueOf(s); + } + return mapper.convertValue(v, target); + } + /** * Load tableSchemas from a JSON string (produced by {@link #serializeTableSchemas()}). Used * when a binlog/stream split is resumed from FE-persisted state. diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index 10cb98e448abae..ae64b695e95075 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -614,8 +614,6 @@ protected abstract Fetcher getBinlogSplitReader( createSnapshotSplit(Map offset, JobBaseConfig jobConfig) { SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, SnapshotSplit.class); TableId tableId = TableId.parse(snapshotSplit.getTableId(), false); - Object[] splitStart = snapshotSplit.getSplitStart(); - Object[] splitEnd = snapshotSplit.getSplitEnd(); List splitKeys = snapshotSplit.getSplitKey(); Map tableSchemas = getTableSchemas(jobConfig); TableChanges.TableChange tableChange = tableSchemas.get(tableId); @@ -623,7 +621,18 @@ protected abstract Fetcher getBinlogSplitReader( tableChange, "Can not find table " + tableId + " in job " + jobConfig.getJobId()); // only support one split key String splitKey = splitKeys.get(0); - io.debezium.relational.Column splitColumn = tableChange.getTable().columnWithName(splitKey); + Column splitColumn = tableChange.getTable().columnWithName(splitKey); + Preconditions.checkNotNull( + splitColumn, + "Split key column " + + splitKey + + " not found in table " + + tableId + + " for job " + + jobConfig.getJobId()); + Class keyClass = resolveSplitKeyClass(tableId, splitColumn, jobConfig); + Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), keyClass, objectMapper); + Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), keyClass, objectMapper); RowType splitType = getSplitType(splitColumn); org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit split = new org.apache.flink.cdc.connectors.base.source.meta.split.SnapshotSplit( @@ -660,6 +669,7 @@ private Tuple2 createStreamSplit( .sorted(Comparator.comparing(AbstractSourceSplit::getSplitId)) .toList(); + Map tableSchemas = getTableSchemas(config); for (SnapshotSplit split : assignedSplitLists) { // find the min offset Map offsetMap = split.getHighWatermark(); @@ -670,12 +680,29 @@ private Tuple2 createStreamSplit( if (maxOffsetFinishSplits == null || sourceOffset.isAfter(maxOffsetFinishSplits)) { maxOffsetFinishSplits = sourceOffset; } + TableId tid = TableId.parse(split.getTableId()); + TableChanges.TableChange tableChange = tableSchemas.get(tid); + Preconditions.checkNotNull( + tableChange, "Can not find table " + tid + " in job " + config.getJobId()); + String splitKey = split.getSplitKey().get(0); + Column splitColumn = tableChange.getTable().columnWithName(splitKey); + Preconditions.checkNotNull( + splitColumn, + "Split key column " + + splitKey + + " not found in table " + + tid + + " for job " + + config.getJobId()); + Class keyClass = resolveSplitKeyClass(tid, splitColumn, config); + Object[] start = convertBounds(split.getSplitStart(), keyClass, objectMapper); + Object[] end = convertBounds(split.getSplitEnd(), keyClass, objectMapper); finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( - TableId.parse(split.getTableId()), + tid, split.getSplitId(), - split.getSplitStart(), - split.getSplitEnd(), + start, + end, sourceOffset, getOffsetFactory())); } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index bf8ac56312bbeb..84340f5e13737c 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -61,6 +61,7 @@ import org.apache.flink.cdc.connectors.mysql.source.split.SourceRecords; import org.apache.flink.cdc.connectors.mysql.source.utils.ChunkUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.RecordUtils; +import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils; import org.apache.flink.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import org.apache.flink.cdc.connectors.mysql.table.StartupMode; import org.apache.flink.cdc.connectors.mysql.table.StartupOptions; @@ -69,7 +70,9 @@ import org.apache.kafka.connect.source.SourceRecord; import java.io.IOException; +import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.Statement; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -590,8 +593,6 @@ private MySqlSnapshotSplit createSnapshotSplit( Map offset, JobBaseConfig jobConfig) throws JsonProcessingException { SnapshotSplit snapshotSplit = objectMapper.convertValue(offset, SnapshotSplit.class); TableId tableId = TableId.parse(snapshotSplit.getTableId()); - Object[] splitStart = snapshotSplit.getSplitStart(); - Object[] splitEnd = snapshotSplit.getSplitEnd(); List splitKeys = snapshotSplit.getSplitKey(); Map tableSchemas = getTableSchemas(jobConfig); TableChanges.TableChange tableChange = tableSchemas.get(tableId); @@ -600,6 +601,17 @@ private MySqlSnapshotSplit createSnapshotSplit( // only support one split key String splitKey = splitKeys.get(0); Column splitColumn = tableChange.getTable().columnWithName(splitKey); + Preconditions.checkNotNull( + splitColumn, + "Split key column " + + splitKey + + " not found in table " + + tableId + + " for job " + + jobConfig.getJobId()); + Class keyClass = resolveSplitKeyClass(tableId, splitColumn, jobConfig); + Object[] splitStart = convertBounds(snapshotSplit.getSplitStart(), keyClass, objectMapper); + Object[] splitEnd = convertBounds(snapshotSplit.getSplitEnd(), keyClass, objectMapper); RowType splitType = ChunkUtils.getChunkKeyColumnType(splitColumn, false); MySqlSnapshotSplit split = new MySqlSnapshotSplit( @@ -631,6 +643,7 @@ private Tuple2 createBinlogSplit( .sorted(Comparator.comparing(AbstractSourceSplit::getSplitId)) .toList(); + Map tableSchemas = getTableSchemas(config); for (SnapshotSplit split : assignedSplitLists) { // find the min binlog offset Map offsetMap = split.getHighWatermark(); @@ -641,13 +654,26 @@ private Tuple2 createBinlogSplit( if (maxOffsetFinishSplits == null || binlogOffset.isAfter(maxOffsetFinishSplits)) { maxOffsetFinishSplits = binlogOffset; } + TableId tid = TableId.parse(split.getTableId()); + TableChanges.TableChange tableChange = tableSchemas.get(tid); + Preconditions.checkNotNull( + tableChange, "Can not find table " + tid + " in job " + config.getJobId()); + String splitKey = split.getSplitKey().get(0); + Column splitColumn = tableChange.getTable().columnWithName(splitKey); + Preconditions.checkNotNull( + splitColumn, + "Split key column " + + splitKey + + " not found in table " + + tid + + " for job " + + config.getJobId()); + Class keyClass = resolveSplitKeyClass(tid, splitColumn, config); + Object[] start = convertBounds(split.getSplitStart(), keyClass, objectMapper); + Object[] end = convertBounds(split.getSplitEnd(), keyClass, objectMapper); finishedSnapshotSplitInfos.add( new FinishedSnapshotSplitInfo( - TableId.parse(split.getTableId()), - split.getSplitId(), - split.getSplitStart(), - split.getSplitEnd(), - binlogOffset)); + tid, split.getSplitId(), start, end, binlogOffset)); } } @@ -1050,6 +1076,24 @@ private Map getTableSchemas(JobBaseConfig con return schemas; } + @Override + protected Class probeSplitKeyClass( + TableId tableId, Column splitColumn, JobBaseConfig jobConfig) { + MySqlSourceConfig sourceConfig = getSourceConfig(jobConfig); + String sql = + String.format( + "SELECT %s FROM %s WHERE 1=0", + StatementUtils.quote(splitColumn.name()), StatementUtils.quote(tableId)); + try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig); + Statement st = jdbc.connection().createStatement(); + ResultSet rs = st.executeQuery(sql)) { + return Class.forName(rs.getMetaData().getColumnClassName(1)); + } catch (Exception e) { + throw new RuntimeException( + "Probe split key class failed for " + tableId + "." + splitColumn.name(), e); + } + } + private Map discoverTableSchemas(JobBaseConfig config) { MySqlSourceConfig sourceConfig = getSourceConfig(config); try (MySqlConnection jdbc = DebeziumUtils.createMySqlConnection(sourceConfig)) { diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 6638787ea48155..8bf53a1eb97ea1 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -49,10 +49,13 @@ import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffset; import org.apache.flink.cdc.connectors.postgres.source.offset.PostgresOffsetFactory; import org.apache.flink.cdc.connectors.postgres.source.utils.CustomPostgresSchema; +import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresQueryUtils; import org.apache.flink.cdc.connectors.postgres.source.utils.PostgresTypeUtils; import org.apache.flink.cdc.connectors.postgres.source.utils.TableDiscoveryUtils; import org.apache.flink.table.types.DataType; +import java.sql.ResultSet; +import java.sql.Statement; import java.time.Duration; import java.time.Instant; import java.util.Collections; @@ -366,6 +369,26 @@ protected DataType fromDbzColumn(Column splitColumn) { return PostgresTypeUtils.fromDbzColumn(splitColumn); } + @Override + protected Class probeSplitKeyClass( + TableId tableId, Column splitColumn, JobBaseConfig jobConfig) { + PostgresSourceConfig sourceConfig = getSourceConfig(jobConfig); + String sql = + String.format( + "SELECT %s FROM %s WHERE 1=0", + PostgresQueryUtils.quote(splitColumn.name()), + PostgresQueryUtils.quote(tableId)); + try (JdbcConnection jdbc = + new PostgresDialect(sourceConfig).openJdbcConnection(sourceConfig); + Statement st = jdbc.connection().createStatement(); + ResultSet rs = st.executeQuery(sql)) { + return Class.forName(rs.getMetaData().getColumnClassName(1)); + } catch (Exception e) { + throw new RuntimeException( + "Probe split key class failed for " + tableId + "." + splitColumn.name(), e); + } + } + /** * Why not call dialect.displayCurrentOffset(sourceConfig) ? The underlying system calls * `txid_current()` to advance the WAL log. Here, it's just a query; retrieving the LSN is diff --git a/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java new file mode 100644 index 00000000000000..fbca1ad41e641f --- /dev/null +++ b/fs_brokers/cdc_client/src/test/java/org/apache/doris/cdcclient/source/reader/AbstractCdcSourceReaderTest.java @@ -0,0 +1,165 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.cdcclient.source.reader; + +import org.apache.doris.job.cdc.split.SnapshotSplit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.Timestamp; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import com.fasterxml.jackson.databind.ObjectMapper; + +class AbstractCdcSourceReaderTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + @Test + void convertBoundsRestoresDateFromString() { + Object[] raw = new Object[] {"2025-01-06"}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, MAPPER); + assertTrue(out[0] instanceof Date); + assertEquals(Date.valueOf("2025-01-06"), out[0]); + } + + @Test + void convertBoundsRestoresTimestampFromString() { + Object[] raw = new Object[] {"2025-01-06 12:34:56"}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, Timestamp.class, MAPPER); + assertTrue(out[0] instanceof Timestamp); + assertEquals(Timestamp.valueOf("2025-01-06 12:34:56"), out[0]); + } + + @Test + void convertBoundsRestoresLongFromJsonInteger() { + // Jackson typically deserializes JSON integer to Integer; restore as Long for int8 cols. + Object[] raw = new Object[] {123456}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, Long.class, MAPPER); + assertTrue(out[0] instanceof Long); + assertEquals(123456L, out[0]); + } + + @Test + void convertBoundsRestoresBigDecimalFromString() { + Object[] raw = new Object[] {"12.34"}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, BigDecimal.class, MAPPER); + assertTrue(out[0] instanceof BigDecimal); + assertEquals(new BigDecimal("12.34"), out[0]); + } + + @Test + void convertBoundsPreservesStringForVarcharColumns() { + Object[] raw = new Object[] {"event_id_42"}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, String.class, MAPPER); + assertEquals("event_id_42", out[0]); + } + + @Test + void convertBoundsReturnsNullForNullInput() { + assertNull(AbstractCdcSourceReader.convertBounds(null, Date.class, MAPPER)); + } + + @Test + void convertBoundsKeepsNullElement() { + Object[] raw = new Object[] {null}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, MAPPER); + assertEquals(1, out.length); + assertNull(out[0]); + } + + @Test + void convertBoundsHandlesMultiElementArray() { + Object[] raw = new Object[] {"2025-01-06", null}; + Object[] out = AbstractCdcSourceReader.convertBounds(raw, Date.class, MAPPER); + assertEquals(2, out.length); + assertEquals(Date.valueOf("2025-01-06"), out[0]); + assertNull(out[1]); + } + + @Test + void restoresDateChunkKeyAfterFeRoundTrip() { + Map feOffset = new HashMap<>(); + feOffset.put("splitId", "public.events:0"); + feOffset.put("tableId", "public.events"); + feOffset.put("splitKey", Arrays.asList("event_date")); + feOffset.put("splitStart", null); + feOffset.put("splitEnd", Arrays.asList("2025-01-06")); + + SnapshotSplit deserialized = MAPPER.convertValue(feOffset, SnapshotSplit.class); + assertEquals(String.class, deserialized.getSplitEnd()[0].getClass()); + + Object[] restored = + AbstractCdcSourceReader.convertBounds( + deserialized.getSplitEnd(), Date.class, MAPPER); + assertEquals(Date.class, restored[0].getClass()); + assertEquals(Date.valueOf("2025-01-06"), restored[0]); + } + + @Test + void restoresTimestampChunkKeyAfterFeRoundTrip() { + Map feOffset = new HashMap<>(); + feOffset.put("splitId", "public.orders:7"); + feOffset.put("tableId", "public.orders"); + feOffset.put("splitKey", Arrays.asList("created_at")); + feOffset.put("splitStart", Arrays.asList("2025-01-06 00:00:00")); + feOffset.put("splitEnd", Arrays.asList("2025-01-07 00:00:00")); + + SnapshotSplit deserialized = MAPPER.convertValue(feOffset, SnapshotSplit.class); + + Object[] restoredStart = + AbstractCdcSourceReader.convertBounds( + deserialized.getSplitStart(), Timestamp.class, MAPPER); + Object[] restoredEnd = + AbstractCdcSourceReader.convertBounds( + deserialized.getSplitEnd(), Timestamp.class, MAPPER); + assertEquals(Timestamp.class, restoredStart[0].getClass()); + assertEquals(Timestamp.valueOf("2025-01-06 00:00:00"), restoredStart[0]); + assertEquals(Timestamp.valueOf("2025-01-07 00:00:00"), restoredEnd[0]); + } + + @Test + void restoresBigintChunkKeyAfterFeRoundTrip() { + Map feOffset = new HashMap<>(); + feOffset.put("splitId", "public.orders:0"); + feOffset.put("tableId", "public.orders"); + feOffset.put("splitKey", Arrays.asList("id")); + feOffset.put("splitStart", Arrays.asList(100)); + feOffset.put("splitEnd", Arrays.asList(200)); + + SnapshotSplit deserialized = MAPPER.convertValue(feOffset, SnapshotSplit.class); + + Object[] restoredStart = + AbstractCdcSourceReader.convertBounds( + deserialized.getSplitStart(), Long.class, MAPPER); + Object[] restoredEnd = + AbstractCdcSourceReader.convertBounds( + deserialized.getSplitEnd(), Long.class, MAPPER); + assertEquals(Long.class, restoredStart[0].getClass()); + assertEquals(100L, restoredStart[0]); + assertEquals(200L, restoredEnd[0]); + } +} diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out new file mode 100644 index 00000000000000..d6105d7a899596 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot_date_pk -- +2025-01-01 A1 +2025-01-02 B1 +2025-01-03 C1 +2025-01-04 D1 +2025-01-05 E1 + +-- !select_snapshot_composite_pk -- +2025-02-01 1 A2 +2025-02-02 2 B2 +2025-02-03 3 C2 +2025-02-04 4 D2 +2025-02-05 5 E2 + +-- !select_binlog_date_pk -- +2025-01-02 B1_upd +2025-01-03 C1 +2025-01-04 D1 +2025-01-05 E1 +2025-01-06 F1 + +-- !select_binlog_composite_pk -- +2025-02-02 2 B2_upd +2025-02-03 3 C2 +2025-02-04 4 D2 +2025-02-05 5 E2 +2025-02-06 6 F2 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out new file mode 100644 index 00000000000000..d6105d7a899596 --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.out @@ -0,0 +1,29 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_snapshot_date_pk -- +2025-01-01 A1 +2025-01-02 B1 +2025-01-03 C1 +2025-01-04 D1 +2025-01-05 E1 + +-- !select_snapshot_composite_pk -- +2025-02-01 1 A2 +2025-02-02 2 B2 +2025-02-03 3 C2 +2025-02-04 4 D2 +2025-02-05 5 E2 + +-- !select_binlog_date_pk -- +2025-01-02 B1_upd +2025-01-03 C1 +2025-01-04 D1 +2025-01-05 E1 +2025-01-06 F1 + +-- !select_binlog_composite_pk -- +2025-02-02 2 B2_upd +2025-02-03 3 C2 +2025-02-04 4 D2 +2025-02-05 5 E2 +2025-02-06 6 F2 + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy new file mode 100644 index 00000000000000..85570960de5dca --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_date_pk.groovy @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_date_pk", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_date_pk_name" + def currentDb = (sql "select database()")[0][0] + def tableDate = "events_mysql_date_pk" + def tableComposite = "events_mysql_date_id_pk" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${tableDate} force""" + sql """drop table if exists ${currentDb}.${tableComposite} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableDate}""" + sql """CREATE TABLE ${mysqlDb}.${tableDate} ( + `event_date` date NOT NULL, + `payload` varchar(200) DEFAULT NULL, + PRIMARY KEY (`event_date`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) VALUES ('2025-01-01', 'A1')""" + sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) VALUES ('2025-01-02', 'B1')""" + sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) VALUES ('2025-01-03', 'C1')""" + sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) VALUES ('2025-01-04', 'D1')""" + sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) VALUES ('2025-01-05', 'E1')""" + + sql """DROP TABLE IF EXISTS ${mysqlDb}.${tableComposite}""" + sql """CREATE TABLE ${mysqlDb}.${tableComposite} ( + `event_date` date NOT NULL, + `id` int NOT NULL, + `payload` varchar(200) DEFAULT NULL, + PRIMARY KEY (`event_date`, `id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-01', 1, 'A2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-02', 2, 'B2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-03', 3, 'C2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-04', 4, 'D2')""" + sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-05', 5, 'E2')""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}/${mysqlDb}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${tableDate},${tableComposite}", + "offset" = "initial", + "snapshot_split_size" = "1", + "snapshot_parallelism" = "2" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '5' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by event_date asc """ + qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} order by event_date asc, id asc """ + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${tableDate} (event_date, payload) VALUES ('2025-01-06', 'F1')""" + sql """UPDATE ${mysqlDb}.${tableDate} SET payload = 'B1_upd' WHERE event_date = '2025-01-02'""" + sql """DELETE FROM ${mysqlDb}.${tableDate} WHERE event_date = '2025-01-01'""" + + sql """INSERT INTO ${mysqlDb}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-06', 6, 'F2')""" + sql """UPDATE ${mysqlDb}.${tableComposite} SET payload = 'B2_upd' WHERE event_date = '2025-02-02' AND id = 2""" + sql """DELETE FROM ${mysqlDb}.${tableComposite} WHERE event_date = '2025-02-01' AND id = 1""" + } + + sleep(60000) + + qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by event_date asc """ + qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite} order by event_date asc, id asc """ + + def jobInfo = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + assert jobInfo.get(0).get(0) == "RUNNING" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy new file mode 100644 index 00000000000000..ec43d22f9884e1 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_date_pk.groovy @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_postgres_job_date_pk", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_postgres_job_date_pk_name" + def currentDb = (sql "select database()")[0][0] + def tableDate = "events_pg_date_pk" + def tableComposite = "events_pg_date_id_pk" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${tableDate} force""" + sql """drop table if exists ${currentDb}.${tableComposite} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableDate}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableDate} ( + "event_date" date PRIMARY KEY, + "payload" varchar(200) + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, payload) VALUES ('2025-01-01', 'A1');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, payload) VALUES ('2025-01-02', 'B1');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, payload) VALUES ('2025-01-03', 'C1');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, payload) VALUES ('2025-01-04', 'D1');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, payload) VALUES ('2025-01-05', 'E1');""" + + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${tableComposite}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${tableComposite} ( + "event_date" date NOT NULL, + "id" int4 NOT NULL, + "payload" varchar(200), + PRIMARY KEY ("event_date", "id") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-01', 1, 'A2');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-02', 2, 'B2');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-03', 3, 'C2');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-04', 4, 'D2');""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-05', 5, 'E2');""" + } + + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${tableDate},${tableComposite}", + "offset" = "initial", + "snapshot_split_size" = "1", + "snapshot_parallelism" = "2" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '5' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex + } + + qt_select_snapshot_date_pk """ SELECT * FROM ${tableDate} order by event_date asc """ + qt_select_snapshot_composite_pk """ SELECT * FROM ${tableComposite} order by event_date asc, id asc """ + + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableDate} (event_date, payload) VALUES ('2025-01-06', 'F1');""" + sql """UPDATE ${pgDB}.${pgSchema}.${tableDate} SET payload = 'B1_upd' WHERE event_date = '2025-01-02';""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${tableDate} WHERE event_date = '2025-01-01';""" + + sql """INSERT INTO ${pgDB}.${pgSchema}.${tableComposite} (event_date, id, payload) VALUES ('2025-02-06', 6, 'F2');""" + sql """UPDATE ${pgDB}.${pgSchema}.${tableComposite} SET payload = 'B2_upd' WHERE event_date = '2025-02-02' AND id = 2;""" + sql """DELETE FROM ${pgDB}.${pgSchema}.${tableComposite} WHERE event_date = '2025-02-01' AND id = 1;""" + } + + sleep(60000) + + qt_select_binlog_date_pk """ SELECT * FROM ${tableDate} order by event_date asc """ + qt_select_binlog_composite_pk """ SELECT * FROM ${tableComposite} order by event_date asc, id asc """ + + def jobInfo = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + assert jobInfo.get(0).get(0) == "RUNNING" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + } +}