Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public class DataSourceConfigKeys {
public static final String SSL_MODE = "ssl_mode";
public static final String SSL_ROOTCERT = "ssl_rootcert";

// per-table config: key format is "table.<tableName>.<suffix>"
public static final String TABLE = "table";
public static final String TABLE_EXCLUDE_COLUMNS_SUFFIX = "exclude_columns";

// target properties
public static final String TABLE_PROPS_PREFIX = "table.create.properties.";
public static final String LOAD_PROPERTIES = "load.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,33 @@ public class DataSourceConfigValidator {
DataSourceConfigKeys.SSL_ROOTCERT
);

// Known suffixes for per-table config keys (format: "table.<tableName>.<suffix>")
private static final Set<String> ALLOW_TABLE_LEVEL_SUFFIXES = Sets.newHashSet(
DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX
);

private static final String TABLE_LEVEL_PREFIX = DataSourceConfigKeys.TABLE + ".";

public static void validateSource(Map<String, String> input) throws IllegalArgumentException {
for (Map.Entry<String, String> entry : input.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();

if (key.startsWith(TABLE_LEVEL_PREFIX)) {
// per-table config key must be exactly: table.<tableName>.<suffix>
// reject malformed keys like "table.exclude_columns" (missing tableName)
String[] parts = key.split("\\.", -1);
if (parts.length != 3 || parts[1].isEmpty()) {
throw new IllegalArgumentException("Malformed per-table config key: '" + key
+ "'. Expected format: table.<tableName>.<suffix>");
}
String suffix = parts[parts.length - 1];
if (!ALLOW_TABLE_LEVEL_SUFFIXES.contains(suffix)) {
throw new IllegalArgumentException("Unknown per-table config key: '" + key + "'");
}
continue;
}

if (!ALLOW_SOURCE_KEYS.contains(key)) {
throw new IllegalArgumentException("Unexpected key: '" + key + "'");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -319,6 +321,16 @@ public static List<CreateTableCommand> generateCreateTableCmds(String targetDb,
if (primaryKeys.isEmpty()) {
noPrimaryKeyTables.add(table);
}

// Validate and apply exclude_columns for this table
Set<String> excludeColumns = parseExcludeColumns(properties, table);
if (!excludeColumns.isEmpty()) {
validateExcludeColumns(excludeColumns, table, columns, primaryKeys);
columns = columns.stream()
.filter(col -> !excludeColumns.contains(col.getName()))
.collect(Collectors.toList());
}

// Convert Column to ColumnDefinition
List<ColumnDefinition> columnDefinitions = columns.stream().map(col -> {
DataType dataType = DataType.fromCatalogType(col.getType());
Expand Down Expand Up @@ -437,6 +449,37 @@ private static String getRemoteDbName(DataSourceType sourceType, Map<String, Str
return remoteDb;
}

private static Set<String> parseExcludeColumns(Map<String, String> properties, String tableName) {
String key = DataSourceConfigKeys.TABLE + "." + tableName + "."
+ DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
String value = properties.get(key);
if (StringUtils.isEmpty(value)) {
return Collections.emptySet();
}
return Arrays.stream(value.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toSet());
}

private static void validateExcludeColumns(Set<String> excludeColumns, String tableName,
List<Column> columns, List<String> primaryKeys) throws JobException {
Set<String> colNames = columns.stream().map(Column::getName).collect(Collectors.toSet());
for (String col : excludeColumns) {
if (!colNames.contains(col)) {
throw new JobException(String.format(
"exclude_columns validation failed: column '%s' does not exist in table '%s'",
col, tableName));
}
if (primaryKeys.contains(col)) {
throw new JobException(String.format(
"exclude_columns validation failed: column '%s' in table '%s'"
+ " is a primary key column and cannot be excluded",
col, tableName));
}
}
}

private static Map<String, String> getTableCreateProperties(Map<String, String> properties) {
final Map<String, String> tableCreateProps = new HashMap<>();
for (Map.Entry<String, String> entry : properties.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,15 @@
import java.time.LocalTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

import static io.debezium.connector.AbstractSourceInfo.TABLE_NAME_KEY;
import static org.apache.doris.cdcclient.common.Constants.DORIS_DELETE_SIGN;

import com.esri.core.geometry.ogc.OGCGeometry;
Expand Down Expand Up @@ -79,13 +82,16 @@ public class DebeziumJsonDeserializer
private static ObjectMapper objectMapper = new ObjectMapper();
@Setter private ZoneId serverTimeZone = ZoneId.systemDefault();
@Getter @Setter protected Map<TableId, TableChanges.TableChange> tableSchemas;
// Parsed exclude-column sets per table, populated once in init() from config
protected Map<String, Set<String>> excludeColumnsCache = new HashMap<>();

public DebeziumJsonDeserializer() {}

@Override
public void init(Map<String, String> props) {
this.serverTimeZone =
ConfigUtil.getServerTimeZoneFromJdbcUrl(props.get(DataSourceConfigKeys.JDBC_URL));
excludeColumnsCache = ConfigUtil.parseAllExcludeColumns(props);
}

@Override
Expand All @@ -102,26 +108,34 @@ public DeserializeResult deserialize(Map<String, String> context, SourceRecord r

private List<String> deserializeDataChangeRecord(SourceRecord record) throws IOException {
List<String> rows = new ArrayList<>();
String tableName = extractTableName(record);
Set<String> excludeColumns =
excludeColumnsCache.getOrDefault(tableName, Collections.emptySet());
Envelope.Operation op = Envelope.operationFor(record);
Struct value = (Struct) record.value();
Schema valueSchema = record.valueSchema();
if (Envelope.Operation.DELETE.equals(op)) {
String deleteRow = extractBeforeRow(value, valueSchema);
String deleteRow = extractBeforeRow(value, valueSchema, excludeColumns);
if (StringUtils.isNotEmpty(deleteRow)) {
rows.add(deleteRow);
}
} else if (Envelope.Operation.READ.equals(op)
|| Envelope.Operation.CREATE.equals(op)
|| Envelope.Operation.UPDATE.equals(op)) {
String insertRow = extractAfterRow(value, valueSchema);
String insertRow = extractAfterRow(value, valueSchema, excludeColumns);
if (StringUtils.isNotEmpty(insertRow)) {
rows.add(insertRow);
}
}
return rows;
}

private String extractAfterRow(Struct value, Schema valueSchema)
private String extractTableName(SourceRecord record) {
Struct value = (Struct) record.value();
return value.getStruct(Envelope.FieldName.SOURCE).getString(TABLE_NAME_KEY);
}

private String extractAfterRow(Struct value, Schema valueSchema, Set<String> excludeColumns)
throws JsonProcessingException {
Map<String, Object> record = new HashMap<>();
Struct after = value.getStruct(Envelope.FieldName.AFTER);
Expand All @@ -133,15 +147,19 @@ private String extractAfterRow(Struct value, Schema valueSchema)
.fields()
.forEach(
field -> {
Object valueConverted =
convert(field.schema(), after.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
if (!excludeColumns.contains(field.name())) {
Object valueConverted =
convert(
field.schema(),
after.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
}
});
record.put(DORIS_DELETE_SIGN, 0);
return objectMapper.writeValueAsString(record);
}

private String extractBeforeRow(Struct value, Schema valueSchema)
private String extractBeforeRow(Struct value, Schema valueSchema, Set<String> excludeColumns)
throws JsonProcessingException {
Map<String, Object> record = new HashMap<>();
Struct before = value.getStruct(Envelope.FieldName.BEFORE);
Expand All @@ -153,9 +171,13 @@ private String extractBeforeRow(Struct value, Schema valueSchema)
.fields()
.forEach(
field -> {
Object valueConverted =
convert(field.schema(), before.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
if (!excludeColumns.contains(field.name())) {
Object valueConverted =
convert(
field.schema(),
before.getWithoutDefault(field.name()));
record.put(field.name(), valueConverted);
}
});
record.put(DORIS_DELETE_SIGN, 1);
return objectMapper.writeValueAsString(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -203,12 +204,35 @@ public DeserializeResult deserialize(Map<String, String> context, SourceRecord r
// Generate DDLs using accurate PG column types
String db = context.get(Constants.DORIS_TARGET_DB);
List<String> ddls = new ArrayList<>();
Set<String> excludedCols =
excludeColumnsCache.getOrDefault(tableId.table(), Collections.emptySet());

for (String colName : pgDropped) {
if (excludedCols.contains(colName)) {
// The column is excluded from sync — skip DDL; updatedSchemas already
// reflects the drop since it is built from afterSchema.
LOG.info(
"[SCHEMA-CHANGE] Table {}: dropped column '{}' is excluded from sync,"
+ " skipping DROP DDL",
tableId.identifier(),
colName);
continue;
}
ddls.add(SchemaChangeHelper.buildDropColumnSql(db, tableId.table(), colName));
}

for (Column col : pgAdded) {
if (excludedCols.contains(col.name())) {
// The column is excluded from sync — Doris table does not have it,
// so skip the ADD DDL.
// case: An excluded column was dropped and then re-added.
LOG.info(
"[SCHEMA-CHANGE] Table {}: added column '{}' is excluded from sync,"
+ " skipping ADD DDL",
tableId.identifier(),
col.name());
continue;
}
String colType = SchemaChangeHelper.columnToDorisType(col);
String nullable = col.isOptional() ? "" : " NOT NULL";
// pgAdded only contains columns present in afterSchema, so field lookup is safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,18 @@

package org.apache.doris.cdcclient.utils;

import org.apache.doris.job.cdc.DataSourceConfigKeys;

import org.apache.commons.lang3.StringUtils;

import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand Down Expand Up @@ -116,6 +123,52 @@ public static boolean isJson(String str) {
}
}

/**
* Parse the exclude-column set for a specific table from config.
*
* <p>Looks for key {@code "table.<tableName>.exclude_columns"} whose value is a comma-separated
* column list, e.g. {@code "secret,internal_note"}.
*
* @return column name set (original case preserved); empty set when the key is absent
*/
public static Set<String> parseExcludeColumns(Map<String, String> config, String tableName) {
String key =
DataSourceConfigKeys.TABLE
+ "."
+ tableName
+ "."
+ DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
String value = config.get(key);
if (StringUtils.isEmpty(value)) {
return Collections.emptySet();
}
return Arrays.stream(value.split(","))
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toSet());
}

/**
* Parse all per-table exclude-column sets from config at once.
*
* <p>Scans all keys matching {@code "table.<tableName>.exclude_columns"} and returns a map from
* table name to its excluded column set. Intended to be called once during initialization.
*/
public static Map<String, Set<String>> parseAllExcludeColumns(Map<String, String> config) {
String prefix = DataSourceConfigKeys.TABLE + ".";
String suffix = "." + DataSourceConfigKeys.TABLE_EXCLUDE_COLUMNS_SUFFIX;
Map<String, Set<String>> result = new HashMap<>();
for (String key : config.keySet()) {
if (key.startsWith(prefix) && key.endsWith(suffix)) {
String tableName = key.substring(prefix.length(), key.length() - suffix.length());
if (!tableName.isEmpty()) {
result.put(tableName, parseExcludeColumns(config, tableName));
}
}
}
return result;
}

public static Map<String, String> toStringMap(String json) {
if (!isJson(json)) {
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !select_snapshot --
A1 1
B1 2

-- !select_incremental --
B1 20
C1 3

-- !select_after_drop_excluded --
B1 20
C1 3
D1 4

-- !select_after_readd_excluded --
B1 20
C1 3
D1 4
E1 5

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
// under the License.

suite("test_streaming_mysql_job_dup", "p0,external,mysql,external_docker,external_docker_mysql,nondatalake") {
def jobName = "test_streaming_mysql_job_name"
def jobName = "test_streaming_mysql_job_name_dup"
def currentDb = (sql "select database()")[0][0]
def table1 = "test_streaming_mysql_job_dup"
def mysqlDb = "test_cdc_db"
Expand Down
Loading
Loading