Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ public class DataSourceConfigKeys {
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";

// per-table config: key format is "table.<tableName>.<suffix>"
public static final String TABLE = "table";
public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = "exclude_columns";
public static final String TABLE_TARGET_TABLE_SUFFIX = "target_table";

// target properties
public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
public static final String LOAD_PROPERTIES = "load.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,37 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.SSL_ROOTCERT
);

// Known suffixes for per-table config keys (format: "table.<tableName>.<suffix>")
private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = Sets.newHashSet(
DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX
);

private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + ".";

public static void validateSource(Map<String, String> input) throws IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();

if (key.startsWith(TABLE_LEVEL_PREFIX)) {
// per-table config key must be exactly: table.<tableName>.<suffix>
// reject malformed keys like "table.exclude_columns" (missing tableName)
String[] parts = key.split("\\.", -1);
if (parts.length != 3 || parts[1].isEmpty()) {
throw new IllegalArgumentException("Malformed per-table config key: '" + key
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] The split("\\.", -1) + parts.length != 3 approach assumes table names never contain dots. While this is typically true for PG/MySQL, note that ConfigUtil.parseAllTargetTableMappings on the cdc_client side uses a prefix/suffix substring approach that correctly handles dots in table names. The two modules use inconsistent parsing strategies for the same key format.

This is a minor issue in practice (PG/MySQL identifiers rarely contain dots), but worth noting as a design inconsistency. Consider aligning both sides to use the same parsing strategy — the prefix/suffix approach is strictly more correct.

+ "'. Expected format: table.<tableName>.<suffix>");
}
String suffix = parts[parts.length - 1];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Minor] Table names with dots will be rejected: split("\\.", -1) with parts.length != 3 means that a table name containing a dot (e.g., table.my.dotted.table.target_table) will produce more than 3 parts and fail validation. PostgreSQL allows dots in quoted identifiers.

Consider using indexOf/lastIndexOf instead:

int firstDot = key.indexOf('.', TABLE_LEVEL_PREFIX.length());
int lastDot = key.lastIndexOf('.');
if (firstDot == -1 || firstDot != lastDot - ???) { ... }

Or at minimum, document this limitation (no dots in source table names).

if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
throw new IllegalArgumentException("Unknown per-table config key: '" + key + "'");
}
if (value == null || value.trim().isEmpty()) {
throw new IllegalArgumentException(
"Value for per-table config key '" + key + "' must not be empty");
}
continue;
}

if (!ALLOW_SOURCE_KEYS.contains(key)) {
throw new IllegalArgumentException("Unexpected key: '" + key + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
Expand Down Expand Up @@ -255,15 +256,21 @@ private void checkRequiredSourceProperties() {

private List<String> createTableIfNotExists() throws Exception {
List<String> syncTbls = new ArrayList<>();
List<CreateTableCommand> createTblCmds = StreamingJobUtils.generateCreateTableCmds(targetDb,
dataSourceType, sourceProperties, targetProperties);
// Key: source table name (PG/MySQL); Value: CreateTableCommand for the Doris target table.
// The two names differ when "table.<src>.target_table" is configured.
LinkedHashMap<String, CreateTableCommand> createTblCmds =
StreamingJobUtils.generateCreateTableCmds(targetDb,
dataSourceType, sourceProperties, targetProperties);
Database db = Env.getCurrentEnv().getInternalCatalog().getDbNullable(targetDb);
Preconditions.checkNotNull(db, "target database %s does not exist", targetDb);
for (CreateTableCommand createTblCmd : createTblCmds) {
for (Map.Entry<String, CreateTableCommand> entry : createTblCmds.entrySet()) {
String srcTable = entry.getKey();
CreateTableCommand createTblCmd = entry.getValue();
if (!db.isTableExist(createTblCmd.getCreateTableInfo().getTableName())) {
createTblCmd.run(ConnectContext.get(), null);
}
syncTbls.add(createTblCmd.getCreateTableInfo().getTableName());
// Use the source (upstream) table name so CDC monitors the correct PG/MySQL table
syncTbls.add(srcTable);
}
return syncTbls;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,10 +275,20 @@ public static Map<String, String> convertCertFile(long dbId, Map<String, String>
return newProps;
}

public static List<CreateTableCommand> generateCreateTableCmds(String targetDb, DataSourceType sourceType,
/**
* Generate CREATE TABLE commands for the Doris target tables.
*
* <p>Returns a {@link LinkedHashMap} whose key is the <b>source</b> (upstream) table name and
* whose value is the corresponding {@link CreateTableCommand} that creates the Doris target
* table (which may have a different name when {@code table.<src>.target_table} is configured).
* Callers must use the map key as the PG/MySQL source table identifier for CDC monitoring and
* the {@link CreateTableCommand} value for the actual DDL execution.
*/
public static LinkedHashMap<String, CreateTableCommand> generateCreateTableCmds(String targetDb,
DataSourceType sourceType,
Map<String, String> properties, Map<String, String> targetProperties)
throws JobException {
List<CreateTableCommand> createtblCmds = new ArrayList<>();
LinkedHashMap<String, CreateTableCommand> createtblCmds = new LinkedHashMap<>();
String includeTables = properties.get(DataSourceConfigKeys.INCLUDE_TABLES);
String excludeTables = properties.get(DataSourceConfigKeys.EXCLUDE_TABLES);
List<String> includeTablesList = new ArrayList<>();
Expand Down Expand Up @@ -319,6 +329,13 @@ public static List<CreateTableCommand> generateCreateTableCmds(String targetDb,
if (primaryKeys.isEmpty()) {
noPrimaryKeyTables.add(table);
}

// Resolve target (Doris) table name; defaults to source table name if not configured
String targetTableName = properties.getOrDefault(
DataSourceConfigKeys.TABLE + "." + table + "."
+ DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX,
table);

// Convert Column to ColumnDefinition
List<ColumnDefinition> columnDefinitions = columns.stream().map(col -> {
DataType dataType = DataType.fromCatalogType(col.getType());
Expand All @@ -340,7 +357,7 @@ public static List<CreateTableCommand> generateCreateTableCmds(String targetDb,
false, // isTemp
InternalCatalog.INTERNAL_CATALOG_NAME, // ctlName
targetDb, // dbName
table, // tableName
targetTableName, // tableName
columnDefinitions, // columns
ImmutableList.of(), // indexes
"olap", // engineName
Expand All @@ -355,7 +372,8 @@ public static List<CreateTableCommand> generateCreateTableCmds(String targetDb,
ImmutableList.of() // clusterKeyColumnNames
);
CreateTableCommand createtblCmd = new CreateTableCommand(Optional.empty(), createtblInfo);
createtblCmds.add(createtblCmd);
// Key: source (PG/MySQL) table name; Value: command that creates the Doris target table
createtblCmds.put(table, createtblCmd);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] Multi-table merge schema safety: When two source tables map to the same Doris target table, this LinkedHashMap will contain two entries with different keys but both CreateTableCommand values targeting the same Doris table name. In createTableIfNotExists(), the second entry's CreateTableCommand is silently skipped (because the table already exists after the first entry creates it).

This means the Doris target table is created with only the first source table's schema. If the two source tables have different columns, data from the second source may fail at stream-load time.

Consider either:

  1. Validating that all source tables mapping to the same target have compatible schemas (at least same column names and compatible types), or
  2. Documenting this limitation clearly, or
  3. Merging column definitions (union of columns) when building the CreateTableCommand for shared targets.

The current test works because both PG source tables have identical schemas (id int, name varchar(200)).

if (createtblCmds.isEmpty()) {
throw new JobException("Can not found match table in database " + database);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.doris.cdcclient.source.deserialize.DeserializeResult;
import org.apache.doris.cdcclient.source.reader.SourceReader;
import org.apache.doris.cdcclient.source.reader.SplitReadResult;
import org.apache.doris.cdcclient.utils.ConfigUtil;
import org.apache.doris.cdcclient.utils.SchemaChangeManager;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.cdc.request.WriteRecordRequest;
Expand Down Expand Up @@ -249,6 +250,10 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
Map<String, String> deserializeContext = new HashMap<>(writeRecordRequest.getConfig());
deserializeContext.put(Constants.DORIS_TARGET_DB, targetDb);

// Pre-parse source->target table name mappings once for this request
Map<String, String> targetTableMappings =
ConfigUtil.parseAllTargetTableMappings(writeRecordRequest.getConfig());

SourceReader sourceReader = Env.getCurrentEnv().getReader(writeRecordRequest);
DorisBatchStreamLoad batchStreamLoad = null;
long scannedRows = 0L;
Expand Down Expand Up @@ -338,9 +343,10 @@ public void writeRecords(WriteRecordRequest writeRecordRequest) throws Exception
}
if (!CollectionUtils.isEmpty(result.getRecords())) {
String table = extractTable(element);
String dorisTable = targetTableMappings.getOrDefault(table, table);
for (String record : result.getRecords()) {
scannedRows++;
batchStreamLoad.writeRecord(targetDb, table, record.getBytes());
batchStreamLoad.writeRecord(targetDb, dorisTable, record.getBytes());
}
// Mark last message as data (not heartbeat)
lastMessageIsHeartbeat = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,24 @@ public class DebeziumJsonDeserializer
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
@Getter @Setter protected Map<TableId, TableChanges.TableChange> tableSchemas;
// Parsed source->target table name mappings, populated once in init() from config
protected Map<String, String> targetTableMappingsCache = new HashMap<>();

public DebeziumJsonDeserializer() {}

@Override
public void init(Map<String, String> props) {
this.serverTimeZone =
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
targetTableMappingsCache = ConfigUtil.parseAllTargetTableMappings(props);
}

/**
* Resolve the Doris target table name for a given upstream (PG) source table name. Returns the
* mapped name if configured, otherwise returns the source name unchanged.
*/
protected String resolveTargetTable(String srcTable) {
return targetTableMappingsCache.getOrDefault(srcTable, srcTable);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,9 @@ public DeserializeResult deserialize(Map<String, String> context, SourceRecord r
List<String> ddls = new ArrayList<>();

for (String colName : pgDropped) {
ddls.add(SchemaChangeHelper.buildDropColumnSql(db, tableId.table(), colName));
ddls.add(
SchemaChangeHelper.buildDropColumnSql(
db, resolveTargetTable(tableId.table()), colName));
}

for (Column col : pgAdded) {
Expand All @@ -219,7 +221,7 @@ public DeserializeResult deserialize(Map<String, String> context, SourceRecord r
ddls.add(
SchemaChangeHelper.buildAddColumnSql(
db,
tableId.table(),
resolveTargetTable(tableId.table()),
col.name(),
colType + nullable,
defaultObj != null ? String.valueOf(defaultObj) : null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

package org.apache.doris.cdcclient.utils;

import org.apache.doris.job.cdc.DataSourceConfigKeys;

import org.apache.commons.lang3.StringUtils;

import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

Expand Down Expand Up @@ -116,6 +119,31 @@ public static boolean isJson(String str) {
}
}

/**
* Parse all target-table name mappings from config.
*
* <p>Scans all keys matching {@code "table.<srcTableName>.target_table"} and returns a map from
* source table name to target (Doris) table name. Tables without a mapping are NOT included;
* callers should use {@code getOrDefault(srcTable, srcTable)}.
*/
public static Map<String, String> parseAllTargetTableMappings(Map<String, String> config) {
String prefix = DataSourceConfigKeys.TABLE + ".";
String suffix = "." + DataSourceConfigKeys.TABLE_TARGET_TABLE_SUFFIX;
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, String> entry : config.entrySet()) {
String key = entry.getKey();
if (key.startsWith(prefix) && key.endsWith(suffix)) {
String srcTable = key.substring(prefix.length(), key.length() - suffix.length());
String rawValue = entry.getValue();
String dstTable = rawValue != null ? rawValue.trim() : "";
if (!srcTable.isEmpty() && !dstTable.isEmpty()) {
result.put(srcTable, dstTable);
}
}
}
return result;
}

public static Map<String, String> toStringMap(String json) {
if (!isJson(json)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_snapshot --
1 Alice
2 Bob

-- !select_incremental --
2 Bob_v2
3 Carol

-- !select_merge_snapshot --
100 Src1_A
200 Src2_A

-- !select_merge_incremental --
100 Src1_A
101 Src1_B
200 Src2_A
201 Src2_B

Loading
Loading