Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,24 @@
package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.nereids.trees.plans.commands.LoadCommand;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Sets;

import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;

public class DataSourceConfigValidator {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

// PostgreSQL unquoted identifier: lowercase letters, digits, underscores, not starting with a digit.
private static final Pattern PG_IDENTIFIER_PATTERN = Pattern.compile("^[a-z_][a-z0-9_]*$");
private static final int PG_MAX_IDENTIFIER_LENGTH = 63;

private static final Set<String> ALLOW_SOURCE_KEYS = Sets.newHashSet(
DataSourceConfigKeys.JDBC_URL,
DataSourceConfigKeys.USER,
Expand Down Expand Up @@ -58,7 +63,8 @@ public class DataSourceConfigValidator {

private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + ".";

public static void validateSource(Map<String, String> input) throws IllegalArgumentException {
public static void validateSource(Map<String, String> input,
String dataSourceType) throws IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
Expand Down Expand Up @@ -86,7 +92,7 @@ public static void validateSource(Map<String, String> input) throws IllegalArgum
throw new IllegalArgumentException("Unexpected key: '" + key + "'");
}

if (!isValidValue(key, value)) {
if (!isValidValue(key, value, dataSourceType)) {
throw new IllegalArgumentException("Invalid value for key '" + key + "': " + value);
}
}
Expand All @@ -110,16 +116,13 @@ public static void validateTarget(Map<String, String> input) throws IllegalArgum
}
}

private static boolean isValidValue(String key, String value) {
private static boolean isValidValue(String key, String value, String dataSourceType) {
if (value == null || value.isEmpty()) {
return false;
}

if (key.equals(DataSourceConfigKeys.OFFSET)
&& !(value.equals(DataSourceConfigKeys.OFFSET_INITIAL)
|| value.equals(DataSourceConfigKeys.OFFSET_LATEST)
|| value.equals(DataSourceConfigKeys.OFFSET_SNAPSHOT))) {
return false;
if (key.equals(DataSourceConfigKeys.OFFSET)) {
return isValidOffset(value, dataSourceType);
}

// slot_name / publication_name are interpolated into PG DDL without quoting,
Expand All @@ -132,4 +135,40 @@ private static boolean isValidValue(String key, String value) {
return true;
}

/**
* Check if the offset value is valid for the given data source type.
* Supported: initial, snapshot, latest, JSON binlog/lsn position.
* earliest is only supported for MySQL.
*/
public static boolean isValidOffset(String offset, String dataSourceType) {
if (offset == null || offset.isEmpty()) {
return false;
}
if (DataSourceConfigKeys.OFFSET_INITIAL.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_LATEST.equalsIgnoreCase(offset)
|| DataSourceConfigKeys.OFFSET_SNAPSHOT.equalsIgnoreCase(offset)) {
return true;
}
// earliest only for MySQL
if (DataSourceConfigKeys.OFFSET_EARLIEST.equalsIgnoreCase(offset)) {
return DataSourceType.MYSQL.name().equalsIgnoreCase(dataSourceType);
}
if (isJsonOffset(offset)) {
return true;
}
return false;
}

public static boolean isJsonOffset(String offset) {
if (offset == null || offset.trim().isEmpty()) {
return false;
}
try {
JsonNode node = OBJECT_MAPPER.readTree(offset);
return node.isObject();
} catch (Exception e) {
return false;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,17 @@ public void initLogicalPlan(boolean regen) {
}
}

/**
* Validate the offset format for ALTER JOB, delegating to the provider.
*/
public void validateAlterOffset(String offset) throws AnalysisException {
try {
offsetProvider.validateAlterOffset(offset);
} catch (Exception ex) {
throw new AnalysisException(ex.getMessage());
}
}

/**
* Check whether Offset can be serialized into the corresponding data source
* */
Expand Down Expand Up @@ -793,11 +804,12 @@ public void replayOnUpdated(StreamingInsertJob replayJob) {
*/
private void modifyPropertiesInternal(Map<String, String> inputProperties) throws AnalysisException, JobException {
StreamingJobProperties inputStreamProps = new StreamingJobProperties(inputProperties);
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())
&& S3TableValuedFunction.NAME.equalsIgnoreCase(this.tvfType)) {
if (StringUtils.isNotEmpty(inputStreamProps.getOffsetProperty())) {
Offset offset = validateOffset(inputStreamProps.getOffsetProperty());
this.offsetProvider.updateOffset(offset);
this.offsetProviderPersist = offsetProvider.getPersistInfo();
log.info("modifyPropertiesInternal: offset updated to {}, job {}",
inputStreamProps.getOffsetProperty(), getJobId());
if (Config.isCloudMode()) {
resetCloudProgress(offset);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ default void initOnCreate() throws JobException {}
*/
Offset deserializeOffsetProperty(String offset);

/**
* Validate the offset format for ALTER JOB.
* Each provider defines its own rules (e.g. CDC only allows JSON specific offset).
*/
default void validateAlterOffset(String offset) throws Exception {
}

/**
* Replaying OffsetProvider is currently only required by JDBC.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -48,7 +49,20 @@ public class JdbcOffset implements Offset {

@Override
public String toSerializedJson() {
return null;
if (splits == null || splits.isEmpty()) {
return null;
}
// Serialize to the same flat format that deserializeOffset() expects:
// [{"split_id":"binlog-split","file":"xxx","pos":"yyy"}]
Preconditions.checkState(splits.size() == 1 && splits.get(0) instanceof BinlogSplit,
"toSerializedJson only supports single BinlogSplit");
BinlogSplit binlog = (BinlogSplit) splits.get(0);
Map<String, String> map = new HashMap<>();
map.put(JdbcSourceOffsetProvider.SPLIT_ID, binlog.getSplitId());
if (binlog.getStartingOffset() != null) {
map.putAll(binlog.getStartingOffset());
}
return new Gson().toJson(Collections.singletonList(map));
}

@Override
Expand All @@ -58,7 +72,7 @@ public boolean isEmpty() {

@Override
public boolean isValidOffset() {
return false;
return splits != null && !splits.isEmpty();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.job.offset.jdbc;

import org.apache.doris.catalog.Env;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.rest.RestApiStatusCode;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
Expand All @@ -29,6 +30,7 @@
import org.apache.doris.job.cdc.split.SnapshotSplit;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.exception.JobException;
import org.apache.doris.job.extensions.insert.streaming.DataSourceConfigValidator;
import org.apache.doris.job.extensions.insert.streaming.StreamingInsertJob;
import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
import org.apache.doris.job.offset.Offset;
Expand Down Expand Up @@ -361,10 +363,33 @@ public Offset deserializeOffset(String offset) {

@Override
public Offset deserializeOffsetProperty(String offset) {
// no need cause cdc_stream has offset property
if (offset == null || offset.trim().isEmpty()) {
return null;
}
// JSON format: {"file":"binlog.000003","pos":154} or {"lsn":"123456"}
if (DataSourceConfigValidator.isJsonOffset(offset)) {
try {
Map<String, String> offsetMap = objectMapper.readValue(offset,
new TypeReference<Map<String, String>>() {});
return new JdbcOffset(Collections.singletonList(new BinlogSplit(offsetMap)));
} catch (Exception e) {
log.warn("Failed to parse JSON offset: {}", offset, e);
return null;
}
}
return null;
}

@Override
public void validateAlterOffset(String offset) throws Exception {
if (!DataSourceConfigValidator.isJsonOffset(offset)) {
throw new AnalysisException(
"ALTER JOB for CDC only supports JSON specific offset, "
+ "e.g. '{\"file\":\"binlog.000001\",\"pos\":\"154\"}' for MySQL "
+ "or '{\"lsn\":\"12345678\"}' for PostgreSQL");
}
}

/**
* Replay snapshot splits if needed
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ private void validate() throws Exception {
boolean sourcePropModified =
isPropertiesModified(streamingJob.getSourceProperties(), this.getSourceProperties());
if (sourcePropModified) {
DataSourceConfigValidator.validateSource(this.getSourceProperties());
DataSourceConfigValidator.validateSource(this.getSourceProperties(),
streamingJob.getDataSourceType().name());
checkUnmodifiableSourceProperties(streamingJob.getSourceProperties());
}

Expand Down Expand Up @@ -213,6 +214,14 @@ private void checkUnmodifiableSourceProperties(Map<String, String> originSourceP
"The exclude_tables property cannot be modified in ALTER JOB");
}

if (sourceProperties.containsKey(DataSourceConfigKeys.OFFSET)) {
Preconditions.checkArgument(Objects.equals(
originSourceProperties.get(DataSourceConfigKeys.OFFSET),
sourceProperties.get(DataSourceConfigKeys.OFFSET)),
"The offset in source properties cannot be modified in ALTER JOB. "
+ "Use PROPERTIES('offset'='{...}') to alter offset");
}

// slot_name / publication_name decide Doris-vs-user ownership at create time; flipping
// them afterwards would orphan Doris-created resources or let Doris drop user-owned ones.
if (sourceProperties.containsKey(DataSourceConfigKeys.SLOT_NAME)) {
Expand All @@ -233,9 +242,8 @@ private void checkUnmodifiableSourceProperties(Map<String, String> originSourceP
private void validateProps(StreamingInsertJob streamingJob) throws AnalysisException {
StreamingJobProperties jobProperties = new StreamingJobProperties(properties);
jobProperties.validate();
// from to job no need valiate offset in job properties
if (streamingJob.getDataSourceType() == null
&& jobProperties.getOffsetProperty() != null) {
if (jobProperties.getOffsetProperty() != null) {
streamingJob.validateAlterOffset(jobProperties.getOffsetProperty());
streamingJob.validateOffset(jobProperties.getOffsetProperty());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ private void validate() throws JobException {
}

if (StringUtils.isNotEmpty(createJobInfo.getSourceType())) {
DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties());
DataSourceConfigValidator.validateSource(createJobInfo.getSourceProperties(),
createJobInfo.getSourceType());
DataSourceConfigValidator.validateTarget(createJobInfo.getTargetProperties());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.doris.job.extensions.insert.streaming;

import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.common.DataSourceType;

import org.junit.Assert;
import org.junit.Test;
Expand All @@ -36,15 +37,15 @@ public void testSlotNameAndPublicationNameAllowed() {
props.put(DataSourceConfigKeys.SLOT_NAME, "my_custom_slot");
props.put(DataSourceConfigKeys.PUBLICATION_NAME, "my_custom_pub");
// Should not throw
DataSourceConfigValidator.validateSource(props);
DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name());
}

@Test
public void testSlotNameAndPublicationNameNotRequired() {
Map<String, String> props = new HashMap<>();
props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db");
// Should not throw without slot_name and publication_name
DataSourceConfigValidator.validateSource(props);
DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name());
}

@Test
Expand Down Expand Up @@ -90,7 +91,7 @@ public void testSlotNameRejectsInvalidPgIdentifiers() {
props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db");
props.put(DataSourceConfigKeys.SLOT_NAME, invalid);
try {
DataSourceConfigValidator.validateSource(props);
DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name());
Assert.fail("Expected IllegalArgumentException for slot_name='" + invalid + "'");
} catch (IllegalArgumentException expected) {
// ok
Expand All @@ -106,7 +107,7 @@ public void testPublicationNameRejectsInvalidPgIdentifiers() {
props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db");
props.put(DataSourceConfigKeys.PUBLICATION_NAME, invalid);
try {
DataSourceConfigValidator.validateSource(props);
DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name());
Assert.fail("Expected IllegalArgumentException for publication_name='" + invalid + "'");
} catch (IllegalArgumentException expected) {
// ok
Expand All @@ -124,7 +125,7 @@ public void testSlotNameRejectsOverlongIdentifier() {
props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db");
props.put(DataSourceConfigKeys.SLOT_NAME, sb.toString());
try {
DataSourceConfigValidator.validateSource(props);
DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name());
Assert.fail("Expected IllegalArgumentException for slot_name exceeding "
+ PG_MAX_IDENTIFIER_LENGTH + " chars");
} catch (IllegalArgumentException expected) {
Expand All @@ -139,7 +140,7 @@ public void testSlotNameAcceptsValidPgIdentifiers() {
Map<String, String> props = new HashMap<>();
props.put(DataSourceConfigKeys.JDBC_URL, "jdbc:postgresql://localhost:5432/db");
props.put(DataSourceConfigKeys.SLOT_NAME, valid);
DataSourceConfigValidator.validateSource(props);
DataSourceConfigValidator.validateSource(props, DataSourceType.POSTGRES.name());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -734,8 +734,10 @@ private Offset getStartOffsetFromConfig(JdbcSourceConfig sourceConfig) {
case EARLIEST_OFFSET:
startingOffset = createInitialOffset();
break;
case TIMESTAMP:
case SPECIFIC_OFFSETS:
startingOffset = createOffset(startupOptions.getOffset());
break;
case TIMESTAMP:
case COMMITTED_OFFSETS:
default:
throw new IllegalStateException(
Expand Down
Loading
Loading