From 20f4b73cb7ef0b707e2ffe27bb1c08a9cfd23686 Mon Sep 17 00:00:00 2001 From: wudi Date: Fri, 24 Apr 2026 20:12:37 +0800 Subject: [PATCH] [Improve](streaming-job) support specifying offset for StreamingInsertJob create and alter (#62490) ## Summary - Support specifying offset (binlog position, LSN, named modes) when creating or altering a StreamingInsertJob via `FROM MYSQL/POSTGRES` path. - FE: extend `DataSourceConfigValidator` to validate offset formats (initial/snapshot/latest/earliest/JSON); `earliest` is MySQL-only, rejected for PostgreSQL. Implement `JdbcSourceOffsetProvider.deserializeOffsetProperty()` for named modes and JSON offset parsing. Remove S3-only restriction in `StreamingInsertJob.initInsertJob()` and `modifyPropertiesInternal()` so CDC jobs can also use offset property. On ALTER, sync offset to `sourceProperties` for the FROM...TO path. - BE: support JSON LSN offset `{"lsn":"N"}` in `PostgresSourceReader.generatePostgresConfig()`, and handle `SPECIFIC_OFFSETS` mode in `JdbcIncrementalSourceReader.getStartOffsetFromConfig()`. - Fix `JdbcOffset.isValidOffset()` and `toSerializedJson()` to return meaningful values instead of hardcoded false/null. --- .../streaming/DataSourceConfigValidator.java | 55 +++- .../insert/streaming/StreamingInsertJob.java | 16 +- .../job/offset/SourceOffsetProvider.java | 7 + .../doris/job/offset/jdbc/JdbcOffset.java | 18 +- .../offset/jdbc/JdbcSourceOffsetProvider.java | 27 +- .../trees/plans/commands/AlterJobCommand.java | 16 +- .../plans/commands/CreateJobCommand.java | 3 +- .../DataSourceConfigValidatorTest.java | 13 +- .../reader/JdbcIncrementalSourceReader.java | 4 +- .../reader/mysql/MySqlSourceReader.java | 23 +- .../reader/postgres/PostgresSourceReader.java | 14 +- ...est_streaming_mysql_job_special_offset.out | 11 + ..._streaming_postgres_job_special_offset.out | 13 + ...st_streaming_mysql_job_create_alter.groovy | 14 - .../cdc/test_streaming_mysql_job_priv.groovy | 10 +- ..._streaming_mysql_job_special_offset.groovy | 229 ++++++++++++++++ ...mysql_job_special_offset_restart_fe.groovy | 140 ++++++++++ ...reaming_postgres_job_special_offset.groovy | 249 ++++++++++++++++++ ...g_job_cdc_stream_mysql_alter_offset.groovy | 164 ++++++++++++ 19 files changed, 973 insertions(+), 53 deletions(-) create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.out create mode 100644 regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.out create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy create mode 100644 regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java index 006c98a48d9171..0e87828f42fc80 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidator.java @@ -18,8 +18,11 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.common.DataSourceType; import org.apache.doris.nereids.trees.plans.commands.LoadCommand; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Sets; import java.util.Map; @@ -27,10 +30,12 @@ import java.util.regex.Pattern; public class DataSourceConfigValidator { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); // PostgreSQL unquoted identifier: lowercase letters, digits, underscores, not starting with a digit. private static final Pattern PG_IDENTIFIER_PATTERN = Pattern.compile("^[a-z_][a-z0-9_]*$"); private static final int PG_MAX_IDENTIFIER_LENGTH = 63; + private static final Set ALLOW_SOURCE_KEYS = Sets.newHashSet( DataSourceConfigKeys.JDBC_URL, DataSourceConfigKeys.USER, @@ -58,7 +63,8 @@ public class DataSourceConfigValidator { private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + "."; - public static void validateSource(Map input) throws IllegalArgumentException { + public static void validateSource(Map input, + String dataSourceType) throws IllegalArgumentException { for (Map.Entry entry : input.entrySet()) { String key = entry.getKey(); String value = entry.getValue(); @@ -86,7 +92,7 @@ public static void validateSource(Map input) throws IllegalArgum throw new IllegalArgumentException("Unexpected key: '" + key + "'"); } - if (!isValidValue(key, value)) { + if (!isValidValue(key, value, dataSourceType)) { throw new IllegalArgumentException("Invalid value for key '" + key + "': " + value); } } @@ -110,16 +116,13 @@ public static void validateTarget(Map input) throws IllegalArgum } } - private static boolean isValidValue(String key, String value) { + private static boolean isValidValue(String key, String value, String dataSourceType) { if (value == null || value.isEmpty()) { return false; } - if (key.equals(DataSourceConfigKeys.OFFSET) - && !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL) - || value.equals(DataSourceConfigKeys.OFFSET_LATEST) - || value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) { - return false; + if (key.equals(DataSourceConfigKeys.OFFSET)) { + return isValidOffset(value, dataSourceType); } // slot_name / publication_name are interpolated into PG DDL without quoting, @@ -132,4 +135,40 @@ private static boolean isValidValue(String key, String value) { return true; } + /** + * Check if the offset value is valid for the given data source type. + * Supported: initial, snapshot, latest, JSON binlog/lsn position. + * earliest is only supported for MySQL. + */ + public static boolean isValidOffset(String offset, String dataSourceType) { + if (offset == null || offset.isEmpty()) { + return false; + } + if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset) + || DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset)) { + return true; + } + // earliest only for MySQL + if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) { + return DataSourceType.MYSQL.name().equalsIgnoreCase(dataSourceType); + } + if (isJsonOffset(offset)) { + return true; + } + return false; + } + + public static boolean isJsonOffset(String offset) { + if (offset == null || offset.trim().isEmpty()) { + return false; + } + try { + JsonNode node = OBJECT_MAPPER.readTree(offset); + return node.isObject(); + } catch (Exception e) { + return false; + } + } + } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java index 76937531e8e052..d098ff5d83c25f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java @@ -369,6 +369,17 @@ public void initLogicalPlan(boolean regen) { } } + /** + * Validate the offset format for ALTER JOB, delegating to the provider. + */ + public void validateAlterOffset(String offset) throws AnalysisException { + try { + offsetProvider.validateAlterOffset(offset); + } catch (Exception ex) { + throw new AnalysisException(ex.getMessage()); + } + } + /** * Check whether Offset can be serialized into the corresponding data source * */ @@ -793,11 +804,12 @@ public void replayOnUpdated(StreamingInsertJob replayJob) { */ private void modifyPropertiesInternal(Map inputProperties) throws AnalysisException, JobException { StreamingJobProperties inputStreamProps = new StreamingJobProperties(inputProperties); - if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty()) - && S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) { + if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) { Offset offset = validateOffset(inputStreamProps.getOffsetProperty()); this.offsetProvider.updateOffset(offset); this.offsetProviderPersist = offsetProvider.getPersistInfo(); + log.info("modifyPropertiesInternal: offset updated to {}, job {}", + inputStreamProps.getOffsetProperty(), getJobId()); if (Config.isCloudMode()) { resetCloudProgress(offset); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java index 72c190b35ee4a0..49c718c59504c5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java @@ -114,6 +114,13 @@ default void initOnCreate() throws JobException {} */ Offset deserializeOffsetProperty(String offset); + /** + * Validate the offset format for ALTER JOB. + * Each provider defines its own rules (e.g. CDC only allows JSON specific offset). + */ + default void validateAlterOffset(String offset) throws Exception { + } + /** * Replaying OffsetProvider is currently only required by JDBC. * diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java index dea2c244d6d78c..d6a29900a6c524 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcOffset.java @@ -32,6 +32,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.MapUtils; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -48,7 +49,20 @@ public class JdbcOffset implements Offset { @Override public String toSerializedJson() { - return null; + if (splits == null || splits.isEmpty()) { + return null; + } + // Serialize to the same flat format that deserializeOffset() expects: + // [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}] + Preconditions.checkState(splits.size() == 1 && splits.get(0) instanceof BinlogSplit, + "toSerializedJson only supports single BinlogSplit"); + BinlogSplit binlog = (BinlogSplit) splits.get(0); + Map map = new HashMap<>(); + map.put(JdbcSourceOffsetProvider.SPLIT_ID, binlog.getSplitId()); + if (binlog.getStartingOffset() != null) { + map.putAll(binlog.getStartingOffset()); + } + return new Gson().toJson(Collections.singletonList(map)); } @Override @@ -58,7 +72,7 @@ public boolean isEmpty() { @Override public boolean isValidOffset() { - return false; + return splits != null && !splits.isEmpty(); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java index 56ec62c127ceb2..605de01bd2223c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java +++ b/fe/fe-core/src/main/java/org/apache/doris/job/offset/jdbc/JdbcSourceOffsetProvider.java @@ -18,6 +18,7 @@ package org.apache.doris.job.offset.jdbc; import org.apache.doris.catalog.Env; +import org.apache.doris.common.AnalysisException; import org.apache.doris.httpv2.entity.ResponseBody; import org.apache.doris.httpv2.rest.RestApiStatusCode; import org.apache.doris.job.cdc.DataSourceConfigKeys; @@ -29,6 +30,7 @@ import org.apache.doris.job.cdc.split.SnapshotSplit; import org.apache.doris.job.common.DataSourceType; import org.apache.doris.job.exception.JobException; +import org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator; import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob; import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties; import org.apache.doris.job.offset.Offset; @@ -361,10 +363,33 @@ public Offset deserializeOffset(String offset) { @Override public Offset deserializeOffsetProperty(String offset) { - // no need cause cdc_stream has offset property + if (offset == null || offset.trim().isEmpty()) { + return null; + } + // JSON format: {"file":"binlog.000003","pos":154} or {"lsn":"123456"} + if (DataSourceConfigValidator.isJsonOffset(offset)) { + try { + Map offsetMap = objectMapper.readValue(offset, + new TypeReference>() {}); + return new JdbcOffset(Collections.singletonList(new BinlogSplit(offsetMap))); + } catch (Exception e) { + log.warn("Failed to parse JSON offset: {}", offset, e); + return null; + } + } return null; } + @Override + public void validateAlterOffset(String offset) throws Exception { + if (!DataSourceConfigValidator.isJsonOffset(offset)) { + throw new AnalysisException( + "ALTER JOB for CDC only supports JSON specific offset, " + + "e.g. '{\"file\":\"binlog.000001\",\"pos\":\"154\"}' for MySQL " + + "or '{\"lsn\":\"12345678\"}' for PostgreSQL"); + } + } + /** * Replay snapshot splits if needed */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java index 089a00f354c839..88bbc9adc836b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterJobCommand.java @@ -166,7 +166,8 @@ private void validate() throws Exception { boolean sourcePropModified = isPropertiesModified(streamingJob.getSourceProperties(), this.getSourceProperties()); if (sourcePropModified) { - DataSourceConfigValidator.validateSource(this.getSourceProperties()); + DataSourceConfigValidator.validateSource(this.getSourceProperties(), + streamingJob.getDataSourceType().name()); checkUnmodifiableSourceProperties(streamingJob.getSourceProperties()); } @@ -213,6 +214,14 @@ private void checkUnmodifiableSourceProperties(Map originSourceP "The exclude_tables property cannot be modified in ALTER JOB"); } + if (sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) { + Preconditions.checkArgument(Objects.equals( + originSourceProperties.get(DataSourceConfigKeys.OFFSET), + sourceProperties.get(DataSourceConfigKeys.OFFSET)), + "The offset in source properties cannot be modified in ALTER JOB. " + + "Use PROPERTIES('offset'='{...}') to alter offset"); + } + // slot_name / publication_name decide Doris-vs-user ownership at create time; flipping // them afterwards would orphan Doris-created resources or let Doris drop user-owned ones. if (sourceProperties.containsKey(DataSourceConfigKeys.SLOT_NAME)) { @@ -233,9 +242,8 @@ private void checkUnmodifiableSourceProperties(Map originSourceP private void validateProps(StreamingInsertJob streamingJob) throws AnalysisException { StreamingJobProperties jobProperties = new StreamingJobProperties(properties); jobProperties.validate(); - // from to job no need valiate offset in job properties - if (streamingJob.getDataSourceType() == null - && jobProperties.getOffsetProperty() != null) { + if (jobProperties.getOffsetProperty() != null) { + streamingJob.validateAlterOffset(jobProperties.getOffsetProperty()); streamingJob.validateOffset(jobProperties.getOffsetProperty()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java index 1640143d277f01..ff278293e8216d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateJobCommand.java @@ -74,7 +74,8 @@ private void validate() throws JobException { } if (StringUtils.isNotEmpty(createJobInfo.getSourceType())) { - DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties()); + DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties(), + createJobInfo.getSourceType()); DataSourceConfigValidator.validateTarget(createJobInfo.getTargetProperties()); } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java index 50fd2ece020edd..2f71b7664a832d 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/job/extensions/insert/streaming/DataSourceConfigValidatorTest.java @@ -18,6 +18,7 @@ package org.apache.doris.job.extensions.insert.streaming; import org.apache.doris.job.cdc.DataSourceConfigKeys; +import org.apache.doris.job.common.DataSourceType; import org.junit.Assert; import org.junit.Test; @@ -36,7 +37,7 @@ public void testSlotNameAndPublicationNameAllowed() { props.put(DataSourceConfigKeys.SLOT_NAME, "my_custom_slot"); props.put(DataSourceConfigKeys.PUBLICATION_NAME, "my_custom_pub"); // Should not throw - DataSourceConfigValidator.validateSource(props); + DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name()); } @Test @@ -44,7 +45,7 @@ public void testSlotNameAndPublicationNameNotRequired() { Map props = new HashMap<>(); props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); // Should not throw without slot_name and publication_name - DataSourceConfigValidator.validateSource(props); + DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name()); } @Test @@ -90,7 +91,7 @@ public void testSlotNameRejectsInvalidPgIdentifiers() { props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); props.put(DataSourceConfigKeys.SLOT_NAME, invalid); try { - DataSourceConfigValidator.validateSource(props); + DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name()); Assert.fail("Expected IllegalArgumentException for slot_name='" + invalid + "'"); } catch (IllegalArgumentException expected) { // ok @@ -106,7 +107,7 @@ public void testPublicationNameRejectsInvalidPgIdentifiers() { props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); props.put(DataSourceConfigKeys.PUBLICATION_NAME, invalid); try { - DataSourceConfigValidator.validateSource(props); + DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name()); Assert.fail("Expected IllegalArgumentException for publication_name='" + invalid + "'"); } catch (IllegalArgumentException expected) { // ok @@ -124,7 +125,7 @@ public void testSlotNameRejectsOverlongIdentifier() { props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); props.put(DataSourceConfigKeys.SLOT_NAME, sb.toString()); try { - DataSourceConfigValidator.validateSource(props); + DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name()); Assert.fail("Expected IllegalArgumentException for slot_name exceeding " + PG_MAX_IDENTIFIER_LENGTH + " chars"); } catch (IllegalArgumentException expected) { @@ -139,7 +140,7 @@ public void testSlotNameAcceptsValidPgIdentifiers() { Map props = new HashMap<>(); props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db"); props.put(DataSourceConfigKeys.SLOT_NAME, valid); - DataSourceConfigValidator.validateSource(props); + DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name()); } } } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java index ca2bb7f83f731a..10cb98e448abae 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/JdbcIncrementalSourceReader.java @@ -734,8 +734,10 @@ private Offset getStartOffsetFromConfig(JdbcSourceConfig sourceConfig) { case EARLIEST_OFFSET: startingOffset = createInitialOffset(); break; - case TIMESTAMP: case SPECIFIC_OFFSETS: + startingOffset = createOffset(startupOptions.getOffset()); + break; + case TIMESTAMP: case COMMITTED_OFFSETS: default: throw new IllegalStateException( diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java index 6917c43ca4a30a..9aa268ef09bd81 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/mysql/MySqlSourceReader.java @@ -48,6 +48,7 @@ import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig; import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset; +import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetKind; import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffsetUtils; import org.apache.flink.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo; import org.apache.flink.cdc.connectors.mysql.source.split.MySqlBinlogSplit; @@ -650,18 +651,30 @@ private Tuple2 createBinlogSplit( } BinlogOffset startOffset; - BinlogOffset lastOffset = - new BinlogOffset( - binlogSplit.getStartingOffset() == null - ? new HashMap<>() - : binlogSplit.getStartingOffset()); + Map lastOffsetMap = + binlogSplit.getStartingOffset() == null + ? new HashMap<>() + : new HashMap<>(binlogSplit.getStartingOffset()); + // ALTER offset may not contain "kind", supplement it for Flink CDC + if (lastOffsetMap.containsKey(BinlogOffset.BINLOG_FILENAME_OFFSET_KEY) + && !lastOffsetMap.containsKey(BinlogOffset.OFFSET_KIND_KEY)) { + lastOffsetMap.put(BinlogOffset.OFFSET_KIND_KEY, BinlogOffsetKind.SPECIFIC.name()); + } + BinlogOffset lastOffset = new BinlogOffset(lastOffsetMap); if (minOffsetFinishSplits != null && lastOffset.getOffsetKind() == null) { startOffset = minOffsetFinishSplits; } else if (lastOffset.getOffsetKind() != null && lastOffset.getFilename() != null) { startOffset = lastOffset; } else if (offsetConfig != null) { + LOG.warn( + "Falling back to startup config offset {}, meta offset was: {}", + offsetConfig, + lastOffsetMap); startOffset = offsetConfig; } else { + LOG.warn( + "No valid offset found, falling back to earliest. meta offset was: {}", + lastOffsetMap); startOffset = BinlogOffset.ofEarliest(); } diff --git a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java index 1d9ec6bd626822..6638787ea48155 100644 --- a/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java +++ b/fs_brokers/cdc_client/src/main/java/org/apache/doris/cdcclient/source/reader/postgres/PostgresSourceReader.java @@ -212,7 +212,12 @@ private PostgresSourceConfig generatePostgresConfig( } else if (DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(startupMode)) { configFactory.startupOptions(StartupOptions.latest()); } else if (ConfigUtil.isJson(startupMode)) { - throw new RuntimeException("Unsupported json offset " + startupMode); + Map offsetMap = ConfigUtil.toStringMap(startupMode); + if (offsetMap == null || !offsetMap.containsKey(SourceInfo.LSN_KEY)) { + throw new RuntimeException( + "JSON offset for PostgreSQL must contain 'lsn' key, got: " + startupMode); + } + configFactory.startupOptions(StartupOptions.specificOffset(offsetMap)); } else if (ConfigUtil.is13Timestamp(startupMode)) { // start from timestamp Long ts = Long.parseLong(startupMode); @@ -331,6 +336,13 @@ protected OffsetFactory getOffsetFactory() { @Override protected Offset createOffset(Map offset) { + // ALTER offset may only contain lsn, supplement ts_usec for PostgresOffsetContext.Loader + if (offset.containsKey(SourceInfo.LSN_KEY) + && !offset.containsKey(SourceInfo.TIMESTAMP_USEC_KEY)) { + Map supplemented = new HashMap<>(offset); + supplemented.put(SourceInfo.TIMESTAMP_USEC_KEY, "0"); + return PostgresOffset.of(supplemented); + } return PostgresOffset.of(offset); } diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.out new file mode 100644 index 00000000000000..2fc5212efaf13b --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.out @@ -0,0 +1,11 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_after_create -- +10 specific1 +11 specific2 + +-- !select_after_alter -- +10 specific1 +11 specific2 +20 alter1 +21 alter2 + diff --git a/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.out b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.out new file mode 100644 index 00000000000000..d14feb4178551f --- /dev/null +++ b/regression-test/data/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.out @@ -0,0 +1,13 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select_after_create -- +1 alice +2 bob +3 charlie + +-- !select_after_alter -- +1 alice +2 bob +3 charlie +30 after_lsn +31 after_lsn + diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy index 3cff806161640d..0b3a2d2eee85e9 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_create_alter.groovy @@ -405,19 +405,6 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke exception "target database can't be modified" } - sql """ALTER JOB ${jobName} - FROM MYSQL ( - "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", - "driver_url" = "${driver_url}", - "driver_class" = "com.mysql.cj.jdbc.Driver", - "user" = "root", - "password" = "123456", - "database" = "${mysqlDb}", - "include_tables" = "${table1}", - "offset" = "latest" - ) - TO DATABASE ${currentDb}""" - def jobInfoOrigin = sql """ select CurrentOffset,LoadStatistic from jobs("type"="insert") where Name='${jobName}' """ @@ -459,7 +446,6 @@ suite("test_streaming_mysql_job_create_alter", "p0,external,mysql,external_docke assert jobInfoCurrent.get(0).get(1) == jobInfoOrigin.get(0).get(1) assert jobInfoCurrent.get(0).get(2).contains("\"max_interval\":\"5\"") assert jobInfoCurrent.get(0).get(2).contains("\"__source_subtype\":\"aws_rds_mysql\"") - assert jobInfoCurrent.get(0).get(3).contains("latest") sql """ DROP JOB IF EXISTS where jobname = '${jobName}' diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy index 4909e3d0469c0c..540b96b04638d9 100644 --- a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_priv.groovy @@ -128,7 +128,7 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ log.info("jobSuccendCount: " + jobSuccendCount) // check job status and succeed task count larger than 2 - jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0) + jobSuccendCount.size() == 1 && '2' <= jobSuccendCount.get(0).get(0) } ) } catch (Exception ex) { @@ -155,14 +155,8 @@ suite("test_streaming_mysql_job_priv", "p0,external,mysql,external_docker,extern sql """ALTER JOB ${jobName} FROM MYSQL ( - "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}?allowPublicKeyRetrieval=true&useSSL=false", - "driver_url" = "${driver_url}", - "driver_class" = "com.mysql.cj.jdbc.Driver", "user" = "${newMysqlUser}", - "password" = "test123", - "database" = "${mysqlDb}", - "include_tables" = "${tableName}", - "offset" = "latest" + "password" = "test123" ) TO DATABASE ${currentDb}""" diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy new file mode 100644 index 00000000000000..3c988239225a65 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset.groovy @@ -0,0 +1,229 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_special_offset", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_special_offset" + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_mysql_tbl" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare MySQL source table + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` int NOT NULL, + `name` varchar(100) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice'), (2, 'bob')""" + } + + // ===== Test 1: offset = latest, then insert new data, verify synced ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + // wait for job to be running + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + // insert new data after job started with latest offset + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 1 + }) + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 3: CREATE with JSON binlog offset, then ALTER to earlier offset ===== + // Pre-create a DUPLICATE KEY table so duplicate rows from re-consuming are visible + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} ( + `id` int NULL, + `name` varchar(100) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS AUTO + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + // Step 1: Get binlog position, insert data, create job from that position + def binlogFile = "" + def binlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + def masterStatus = sql """SHOW MASTER STATUS""" + binlogFile = masterStatus[0][0] + binlogPos = masterStatus[0][1].toString() + log.info("CREATE binlog position: file=${binlogFile}, pos=${binlogPos}") + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (10, 'specific1')""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (11, 'specific2')""" + } + def offsetJson = """{"file":"${binlogFile}","pos":"${binlogPos}"}""" + log.info("CREATE with JSON offset: ${offsetJson}") + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = '${offsetJson}' + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 2 + }) + // Verify data after CREATE with specific offset + qt_select_after_create """ SELECT * FROM ${currentDb}.${table1} ORDER BY id """ + + // Wait for current task to complete (commit offset successfully) before PAUSE, + // otherwise PAUSE may race with a running task and cause commit offset failure. + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + return cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2 + }) + + // Step 2: Get a new binlog position (different from CREATE), insert data, ALTER to it + def alterBinlogFile = "" + def alterBinlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + def masterStatus = sql """SHOW MASTER STATUS""" + alterBinlogFile = masterStatus[0][0] + alterBinlogPos = masterStatus[0][1].toString() + log.info("ALTER binlog position: file=${alterBinlogFile}, pos=${alterBinlogPos}") + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (20, 'alter1')""" + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (21, 'alter2')""" + } + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + def alterOffsetJson = """{"file":"${alterBinlogFile}","pos":"${alterBinlogPos}"}""" + log.info("ALTER to new offset: ${alterOffsetJson}") + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = '${alterOffsetJson}') + """ + sql "RESUME JOB where jobname = '${jobName}'" + // After ALTER to new position, id 20,21 should be synced + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1} WHERE id IN (20, 21)""" + return result[0][0] >= 2 + }) + qt_select_after_alter """ SELECT * FROM ${currentDb}.${table1} ORDER BY id """ + + // Step 3: ALTER with named mode should fail for CDC + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + test { + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = 'initial') + """ + exception "ALTER JOB for CDC only supports JSON specific offset" + } + // ALTER offset via source properties should be rejected + test { + sql """ALTER JOB ${jobName} + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "The offset in source properties cannot be modified in ALTER JOB" + } + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 4: invalid offset format ===== + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = "not_valid_offset" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "Invalid value for key 'offset'" + } + + // cleanup MySQL source table + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy new file mode 100644 index 00000000000000..ad6c79e93cddd5 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_mysql_job_special_offset_restart_fe.groovy @@ -0,0 +1,140 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.apache.doris.regression.suite.ClusterOptions +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_mysql_job_special_offset_restart_fe", "docker,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_mysql_job_special_offset_restart_fe" + def options = new ClusterOptions() + options.setFeNum(1) + // run in cloud and not cloud + options.cloudMode = null + + docker(options) { + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_restart_tbl" + def mysqlDb = "test_cdc_db" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare MySQL source table and get binlog position + def binlogFile = "" + def binlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + sql """CREATE TABLE ${mysqlDb}.${table1} ( + `id` int NOT NULL, + `name` varchar(100) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + // get current binlog position + def masterStatus = sql """SHOW MASTER STATUS""" + binlogFile = masterStatus[0][0] + binlogPos = masterStatus[0][1].toString() + log.info("Binlog position: file=${binlogFile}, pos=${binlogPos}") + // insert data after this position + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (1, 'alice'), (2, 'bob')""" + } + + // create job with JSON binlog offset + def offsetJson = """{"file":"${binlogFile}","pos":"${binlogPos}"}""" + log.info("Creating job with offset: ${offsetJson}") + sql """CREATE JOB ${jobName} + ON STREAMING + FROM MYSQL ( + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "include_tables" = "${table1}", + "offset" = '${offsetJson}' + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + + // wait for data synced + try { + Awaitility.await().atMost(300, SECONDS) + .pollInterval(1, SECONDS).until( + { + def jobSuccendCount = sql """ select SucceedTaskCount from jobs("type"="insert") where Name = '${jobName}' and ExecuteType='STREAMING' """ + log.info("jobSuccendCount: " + jobSuccendCount) + jobSuccendCount.size() == 1 && '1' <= jobSuccendCount.get(0).get(0) + } + ) + } catch (Exception ex) { + def showjob = sql """select * from jobs("type"="insert") where Name='${jobName}'""" + def showtask = sql """select * from tasks("type"="insert") where JobName='${jobName}'""" + log.info("show job: " + showjob) + log.info("show task: " + showtask) + throw ex; + } + + def jobInfoBefore = sql """ + select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoBefore: " + jobInfoBefore) + assert jobInfoBefore.get(0).get(1) == "RUNNING" + + // Restart FE + cluster.restartFrontends() + sleep(60000) + context.reconnectFe() + + // check job is consistent after restart + def jobInfoAfter = sql """ + select loadStatistic, status, currentOffset from jobs("type"="insert") where Name='${jobName}' + """ + log.info("jobInfoAfter: " + jobInfoAfter) + assert jobInfoAfter.get(0).get(1) == "RUNNING" + assert jobInfoAfter.get(0).get(2) == jobInfoBefore.get(0).get(2) + + // insert more data and verify job still works after restart + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """INSERT INTO ${mysqlDb}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 3 + }) + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${table1}""" + } + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy new file mode 100644 index 00000000000000..1b64f111859aea --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/test_streaming_postgres_job_special_offset.groovy @@ -0,0 +1,249 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +suite("test_streaming_postgres_job_special_offset", "p0,external,pg,external_docker,external_docker_pg,nondatalake") { + def jobName = "test_streaming_pg_job_special_offset" + def currentDb = (sql "select database()")[0][0] + def table1 = "special_offset_pg_tbl" + def pgDB = "postgres" + def pgSchema = "cdc_test" + def pgUser = "postgres" + def pgPassword = "123456" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String pg_port = context.config.otherConfigs.get("pg_14_port"); + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/postgresql-42.5.0.jar" + + // prepare PG source table + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + sql """CREATE TABLE ${pgDB}.${pgSchema}.${table1} ( + "id" int, + "name" varchar(100), + PRIMARY KEY ("id") + )""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (1, 'alice'), (2, 'bob')""" + } + + // ===== Test 1: offset = latest, then insert new data, verify synced ===== + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus.size() == 1 && jobStatus[0][0] == "RUNNING" + }) + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (3, 'charlie')""" + } + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 1 + }) + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 2: CREATE with initial, then ALTER with JSON LSN offset ===== + // Pre-create a DUPLICATE KEY table so duplicate rows from re-consuming are visible + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${table1} ( + `id` int NULL, + `name` varchar(100) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS AUTO + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + // Step 1: Create job with initial to establish replication slot and sync snapshot + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "initial" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1}""" + return result[0][0] >= 2 + }) + qt_select_after_create """ SELECT * FROM ${currentDb}.${table1} ORDER BY id """ + + // Wait for current task to complete (commit offset successfully) before PAUSE, + // otherwise PAUSE may race with a running task and cause commit offset failure. + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + return cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2 + }) + + // Step 2: PAUSE, insert data before and after a LSN mark, ALTER to that LSN + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + def alterLsn = "" + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + // insert data BEFORE the LSN mark + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (20, 'before_lsn')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (21, 'before_lsn')""" + // record LSN mark + def lsnResult = sql """SELECT pg_current_wal_lsn()::text""" + def lsnStr = lsnResult[0][0].toString() + def parts = lsnStr.split("/") + def high = Long.parseLong(parts[0], 16) + def low = Long.parseLong(parts[1], 16) + alterLsn = String.valueOf((high << 32) + low) + log.info("ALTER LSN mark: ${lsnStr} -> numeric: ${alterLsn}") + // insert data AFTER the LSN mark + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (30, 'after_lsn')""" + sql """INSERT INTO ${pgDB}.${pgSchema}.${table1} VALUES (31, 'after_lsn')""" + } + def alterOffsetJson = """{"lsn":"${alterLsn}"}""" + log.info("ALTER to LSN: ${alterOffsetJson}") + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = '${alterOffsetJson}') + """ + sql "RESUME JOB where jobname = '${jobName}'" + // After ALTER to LSN mark, only data AFTER that LSN (id 30,31) should be synced + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${table1} WHERE id IN (30, 31)""" + return result[0][0] >= 2 + }) + qt_select_after_alter """ SELECT * FROM ${currentDb}.${table1} ORDER BY id """ + + // Step 3: ALTER with named mode should fail for CDC + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + test { + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = 'initial') + """ + exception "ALTER JOB for CDC only supports JSON specific offset" + } + // ALTER offset via source properties should be rejected + test { + sql """ALTER JOB ${jobName} + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "latest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "The offset in source properties cannot be modified in ALTER JOB" + } + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${table1} force""" + + // ===== Test 3: earliest should fail for PG ===== + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "earliest" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "Invalid value for key 'offset'" + } + + // ===== Test 4: invalid offset format ===== + test { + sql """CREATE JOB ${jobName} + ON STREAMING + FROM POSTGRES ( + "jdbc_url" = "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}", + "driver_url" = "${driver_url}", + "driver_class" = "org.postgresql.Driver", + "user" = "${pgUser}", + "password" = "${pgPassword}", + "database" = "${pgDB}", + "schema" = "${pgSchema}", + "include_tables" = "${table1}", + "offset" = "not_valid" + ) + TO DATABASE ${currentDb} ( + "table.create.properties.replication_num" = "1" + ) + """ + exception "Invalid value for key 'offset'" + } + + // cleanup PG source table + connect("${pgUser}", "${pgPassword}", "jdbc:postgresql://${externalEnvIp}:${pg_port}/${pgDB}") { + sql """DROP TABLE IF EXISTS ${pgDB}.${pgSchema}.${table1}""" + } + } +} diff --git a/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy new file mode 100644 index 00000000000000..db674f305dfe63 --- /dev/null +++ b/regression-test/suites/job_p0/streaming_job/cdc/tvf/test_streaming_job_cdc_stream_mysql_alter_offset.groovy @@ -0,0 +1,164 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +import org.awaitility.Awaitility + +import static java.util.concurrent.TimeUnit.SECONDS + +/** + * Test ALTER JOB with JSON binlog offset for cdc_stream TVF path. + * + * Scenario: + * 1. Create job with initial offset, wait for snapshot sync. + * 2. Get current binlog position, insert new data. + * 3. PAUSE -> ALTER with JSON binlog offset via PROPERTIES -> RESUME. + * 4. Verify new data synced. + */ +suite("test_streaming_job_cdc_stream_mysql_alter_offset", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") { + def jobName = "test_streaming_job_cdc_stream_mysql_alter_offset_name" + def currentDb = (sql "select database()")[0][0] + def dorisTable = "cdc_stream_alter_offset_tbl" + def mysqlDb = "test_cdc_db" + def mysqlTable = "cdc_stream_alter_offset_src" + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + sql """ + CREATE TABLE IF NOT EXISTS ${currentDb}.${dorisTable} ( + `id` int NULL, + `name` varchar(200) NULL + ) ENGINE=OLAP + DUPLICATE KEY(`id`) + DISTRIBUTED BY HASH(`id`) BUCKETS AUTO + PROPERTIES ("replication_allocation" = "tag.location.default: 1") + """ + + String enabled = context.config.otherConfigs.get("enableJdbcTest") + if (enabled != null && enabled.equalsIgnoreCase("true")) { + String mysql_port = context.config.otherConfigs.get("mysql_57_port") + String externalEnvIp = context.config.otherConfigs.get("externalEnvIp") + String s3_endpoint = getS3Endpoint() + String bucket = getS3BucketName() + String driver_url = "https://${bucket}.${s3_endpoint}/regression/jdbc_driver/mysql-connector-j-8.4.0.jar" + + // prepare source table with snapshot data + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """CREATE DATABASE IF NOT EXISTS ${mysqlDb}""" + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}""" + sql """CREATE TABLE ${mysqlDb}.${mysqlTable} ( + `id` int NOT NULL, + `name` varchar(200) DEFAULT NULL, + PRIMARY KEY (`id`) + ) ENGINE=InnoDB""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (1, 'alice'), (2, 'bob')""" + } + + // Step 1: Create job with initial offset via cdc_stream TVF + sql """ + CREATE JOB ${jobName} + ON STREAMING DO INSERT INTO ${currentDb}.${dorisTable} (id, name) + SELECT id, name FROM cdc_stream( + "type" = "mysql", + "jdbc_url" = "jdbc:mysql://${externalEnvIp}:${mysql_port}", + "driver_url" = "${driver_url}", + "driver_class" = "com.mysql.cj.jdbc.Driver", + "user" = "root", + "password" = "123456", + "database" = "${mysqlDb}", + "table" = "${mysqlTable}", + "offset" = "initial" + ) + """ + + // wait for snapshot sync + try { + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + log.info("SucceedTaskCount: " + cnt) + cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 1 + }) + } catch (Exception ex) { + log.info("job: " + (sql """select * from jobs("type"="insert") where Name='${jobName}'""")) + log.info("tasks: " + (sql """select * from tasks("type"="insert") where JobName='${jobName}'""")) + throw ex + } + + // verify snapshot data + Awaitility.await().atMost(60, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${dorisTable}""" + return result[0][0] >= 2 + }) + + // Wait for current task to complete (commit offset successfully) before PAUSE, + // otherwise PAUSE may race with a running task and cause commit offset failure. + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def cnt = sql """select SucceedTaskCount from jobs("type"="insert") where Name='${jobName}' and ExecuteType='STREAMING'""" + return cnt.size() == 1 && (cnt.get(0).get(0) as int) >= 2 + }) + + // Step 2: PAUSE, insert data before and after a binlog mark, ALTER to that mark + sql "PAUSE JOB where jobname = '${jobName}'" + Awaitility.await().atMost(30, SECONDS).pollInterval(1, SECONDS).until({ + def jobStatus = sql """select status from jobs("type"="insert") where Name='${jobName}'""" + return jobStatus[0][0] == "PAUSED" + }) + def binlogFile = "" + def binlogPos = "" + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + // insert data BEFORE the binlog mark + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (10, 'before_mark')""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (11, 'before_mark')""" + // record binlog mark + def masterStatus = sql """SHOW MASTER STATUS""" + binlogFile = masterStatus[0][0] + binlogPos = masterStatus[0][1].toString() + log.info("Binlog mark for ALTER: file=${binlogFile}, pos=${binlogPos}") + // insert data AFTER the binlog mark + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (20, 'after_mark')""" + sql """INSERT INTO ${mysqlDb}.${mysqlTable} VALUES (21, 'after_mark')""" + } + def offsetJson = """{"file":"${binlogFile}","pos":"${binlogPos}"}""" + log.info("ALTER TVF job offset: ${offsetJson}") + sql """ALTER JOB ${jobName} + PROPERTIES('offset' = '${offsetJson}') + """ + sql "RESUME JOB where jobname = '${jobName}'" + + // Step 3: Verify only data AFTER the mark (id 20,21) is synced + Awaitility.await().atMost(300, SECONDS).pollInterval(2, SECONDS).until({ + def result = sql """SELECT count(*) FROM ${currentDb}.${dorisTable} WHERE id IN (20, 21)""" + return result[0][0] >= 2 + }) + def afterMarkRows = sql """SELECT * FROM ${currentDb}.${dorisTable} WHERE id IN (20, 21) order by id""" + log.info("afterMarkRows: " + afterMarkRows) + assert afterMarkRows.size() == 2 + // id 10,11 (before mark) should NOT be synced + def beforeMarkRows = sql """SELECT * FROM ${currentDb}.${dorisTable} WHERE id IN (10, 11)""" + log.info("beforeMarkRows: " + beforeMarkRows) + assert beforeMarkRows.size() == 0 + + sql """DROP JOB IF EXISTS where jobname = '${jobName}'""" + sql """drop table if exists ${currentDb}.${dorisTable} force""" + + // cleanup MySQL source table + connect("root", "123456", "jdbc:mysql://${externalEnvIp}:${mysql_port}") { + sql """DROP TABLE IF EXISTS ${mysqlDb}.${mysqlTable}""" + } + } +}