Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class CdcActionCommonUtils {
public static final String METADATA_COLUMN = "metadata_column";
public static final String MULTIPLE_TABLE_PARTITION_KEYS = "multiple_table_partition_keys";
public static final String EAGER_INIT = "eager_init";
public static final String COMPOSITE_PRIMARY_KEY = "composite_primary_key";

public static void assertSchemaCompatible(
TableSchema paimonSchema, List<DataField> sourceTableFields) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ public SyncDatabaseActionBase withComputedColumnArgs(List<String> computedColumn
@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(
this.computedColumns, typeMapping, metadataConverters);
this.computedColumns, typeMapping, metadataConverters, compositePrimaryKey);
}

public SyncDatabaseActionBase withPartitionKeyMultiple(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,14 +199,23 @@ public void checkRequiredOption() {
public FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> provideRecordParser(
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
CdcMetadataConverter[] metadataConverters,
String compositePrimaryKey) {
switch (sourceType) {
case MYSQL:
return new MySqlRecordParser(
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
cdcSourceConfig,
computedColumns,
typeMapping,
metadataConverters,
compositePrimaryKey);
case POSTGRES:
return new PostgresRecordParser(
cdcSourceConfig, computedColumns, typeMapping, metadataConverters);
cdcSourceConfig,
computedColumns,
typeMapping,
metadataConverters,
compositePrimaryKey);
case KAFKA:
case PULSAR:
DataFormat dataFormat = provideDataFormat();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ protected void beforeBuildingSourceSink() throws Exception {

@Override
protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> recordParse() {
return syncJobHandler.provideRecordParser(computedColumns, typeMapping, metadataConverters);
return syncJobHandler.provideRecordParser(
computedColumns, typeMapping, metadataConverters, compositePrimaryKey);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;
import java.util.Optional;

import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPOSITE_PRIMARY_KEY;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.COMPUTED_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.METADATA_COLUMN;
import static org.apache.paimon.flink.action.cdc.CdcActionCommonUtils.PARTITION_KEYS;
Expand Down Expand Up @@ -76,5 +77,9 @@ protected void withParams(MultipleParameterToolAdapter params, SyncTableActionBa
String[] options = params.get(TYPE_MAPPING).split(",");
action.withTypeMapping(TypeMapping.parse(options));
}

if (params.has(COMPOSITE_PRIMARY_KEY)) {
action.withCompositePrimaryKey(params.get(COMPOSITE_PRIMARY_KEY));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public abstract class SynchronizationActionBase extends ActionBase {
protected Map<String, String> tableConfig = new HashMap<>();
protected TypeMapping typeMapping = TypeMapping.defaultMapping();
protected CdcMetadataConverter[] metadataConverters = new CdcMetadataConverter[] {};
protected String compositePrimaryKey;

public SynchronizationActionBase(
String database,
Expand Down Expand Up @@ -102,6 +103,11 @@ public SynchronizationActionBase withMetadataColumns(List<String> metadataColumn
return this;
}

public SynchronizationActionBase withCompositePrimaryKey(String compositePrimaryKey) {
this.compositePrimaryKey = compositePrimaryKey;
return this;
}

@VisibleForTesting
public Map<String, String> tableConfig() {
return tableConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,8 @@ public static JdbcSchemasInfo getMySqlTableInfos(
Configuration mySqlConfig,
Predicate<String> monitorTablePredication,
List<Identifier> excludedTables,
TypeMapping typeMapping)
TypeMapping typeMapping,
String compositePrimaryKey)
throws Exception {
Pattern databasePattern =
Pattern.compile(mySqlConfig.get(MySqlSourceOptions.DATABASE_NAME));
Expand Down Expand Up @@ -131,7 +132,8 @@ public static JdbcSchemasInfo getMySqlTableInfos(
tableName,
tableComment,
typeMapping,
toPaimonTypeVisitor());
toPaimonTypeVisitor(),
compositePrimaryKey);
mySqlSchemasInfo.addSchema(identifier, schema);
} else {
excludedTables.add(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class MySqlRecordParser implements FlatMapFunction<CdcSourceRecord, RichC
// NOTE: current table name is not converted by tableNameConverter
private String currentTable;
private String databaseName;
private String compositePrimaryKey;
private final CdcMetadataConverter[] metadataConverters;

private final Set<String> nonPkTables = new HashSet<>();
Expand All @@ -91,10 +92,12 @@ public MySqlRecordParser(
Configuration mySqlConfig,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
CdcMetadataConverter[] metadataConverters,
String compositePrimaryKey) {
this.computedColumns = computedColumns;
this.typeMapping = typeMapping;
this.metadataConverters = metadataConverters;
this.compositePrimaryKey = compositePrimaryKey;
objectMapper
.configure(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS, true)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
Expand Down Expand Up @@ -254,6 +257,10 @@ private Map<String, String> extractRow(JsonNode recordRow) {
typeMapping,
objectValue,
serverTimeZone);
if (fieldName.equals(compositePrimaryKey)) {
newValue = String.format("%s_%s_%s", databaseName, currentTable, newValue);
}

resultMap.put(fieldName, newValue);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ protected void beforeBuildingSourceSink() throws Exception {
tableName ->
shouldMonitorTable(tableName, includingPattern, excludingPattern),
excludedTables,
typeMapping);
typeMapping,
compositePrimaryKey);

logNonPkTables(mySqlSchemasInfo.nonPkTables());
List<JdbcTableInfo> jdbcTableInfos = mySqlSchemasInfo.toMySqlTableInfos(mergeShards);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public MySqlSyncTableAction(
protected Schema retrieveSchema() throws Exception {
this.mySqlSchemasInfo =
MySqlActionUtils.getMySqlTableInfos(
cdcSourceConfig, monitorTablePredication(), new ArrayList<>(), typeMapping);
cdcSourceConfig,
monitorTablePredication(),
new ArrayList<>(),
typeMapping,
compositePrimaryKey);
validateMySqlTableInfos(mySqlSchemasInfo);
JdbcTableInfo tableInfo = mySqlSchemasInfo.mergeAll();
return tableInfo.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public static JdbcSchemasInfo getPostgresTableInfos(
Configuration postgresConfig,
Predicate<String> monitorTablePredication,
List<Identifier> excludedTables,
TypeMapping typeMapping)
TypeMapping typeMapping,
String compositePrimaryKey)
throws Exception {

String databaseName = postgresConfig.get(PostgresSourceOptions.DATABASE_NAME);
Expand Down Expand Up @@ -103,7 +104,8 @@ public static JdbcSchemasInfo getPostgresTableInfos(
tableName,
tableComment,
typeMapping,
toPaimonTypeVisitor());
toPaimonTypeVisitor(),
compositePrimaryKey);
jdbcSchemasInfo.addSchema(identifier, schemaName, schema);
} else {
excludedTables.add(identifier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,15 @@ public class PostgresRecordParser
// NOTE: current table name is not converted by tableNameConverter
private String currentTable;
private String databaseName;
private String compositePrimaryKey;
private final CdcMetadataConverter[] metadataConverters;

public PostgresRecordParser(
Configuration postgresConfig,
List<ComputedColumn> computedColumns,
TypeMapping typeMapping,
CdcMetadataConverter[] metadataConverters) {
CdcMetadataConverter[] metadataConverters,
String compositePrimaryKey) {
this.computedColumns = computedColumns;
this.typeMapping = typeMapping;
this.metadataConverters = metadataConverters;
Expand All @@ -108,6 +110,7 @@ public PostgresRecordParser(
stringifyServerTimeZone == null
? ZoneId.systemDefault()
: ZoneId.of(stringifyServerTimeZone);
this.compositePrimaryKey = compositePrimaryKey;
}

@Override
Expand All @@ -132,7 +135,10 @@ private CdcSchema extractSchema(DebeziumEvent.Field schema) {
CdcSchema.Builder schemaBuilder = CdcSchema.newBuilder();
afterFields.forEach(
(key, value) -> {
DataType dataType = extractFieldType(value);
DataType dataType =
key.equals(compositePrimaryKey)
? DataTypes.STRING()
: extractFieldType(value);
dataType =
dataType.copy(
typeMapping.containsMode(TO_NULLABLE) || value.optional());
Expand Down Expand Up @@ -252,6 +258,9 @@ private Map<String, String> extractRow(JsonNode recordRow) {

String className = field.getValue().name();
String oldValue = objectValue.asText();
if (fieldName.equals(compositePrimaryKey)) {
oldValue = String.format("%s_%s_%s", databaseName, currentTable, oldValue);
}
String newValue = oldValue;

if (Bits.LOGICAL_NAME.equals(className)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ public PostgresSyncTableAction(
protected Schema retrieveSchema() throws Exception {
this.postgresSchemasInfo =
PostgresActionUtils.getPostgresTableInfos(
cdcSourceConfig, monitorTablePredication(), new ArrayList<>(), typeMapping);
cdcSourceConfig,
monitorTablePredication(),
new ArrayList<>(),
typeMapping,
compositePrimaryKey);
validatePostgresTableInfos(postgresSchemasInfo);
JdbcTableInfo tableInfo = postgresSchemasInfo.mergeAll();
return tableInfo.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.paimon.schema.Schema;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -49,7 +50,8 @@ public static Schema buildSchema(
String tableName,
String tableComment,
TypeMapping typeMapping,
JdbcToPaimonTypeVisitor jdbcToPaimonTypeVisitor)
JdbcToPaimonTypeVisitor jdbcToPaimonTypeVisitor,
String compositePrimaryKey)
throws SQLException {
return buildSchema(
metaData,
Expand All @@ -58,7 +60,8 @@ public static Schema buildSchema(
tableName,
tableComment,
typeMapping,
jdbcToPaimonTypeVisitor);
jdbcToPaimonTypeVisitor,
compositePrimaryKey);
}

public static Schema buildSchema(
Expand All @@ -68,7 +71,8 @@ public static Schema buildSchema(
String tableName,
String tableComment,
TypeMapping typeMapping,
JdbcToPaimonTypeVisitor jdbcToPaimonTypeVisitor)
JdbcToPaimonTypeVisitor jdbcToPaimonTypeVisitor,
String compositePrimaryKey)
throws SQLException {
Schema.Builder builder = Schema.newBuilder();
try (ResultSet rs = metaData.getColumns(databaseName, schemaName, tableName, null)) {
Expand All @@ -90,9 +94,11 @@ public static Schema buildSchema(
typeMapping.containsMode(TO_NULLABLE)
|| isNullableColumn(rs.getString("IS_NULLABLE"));
DataType paimonType =
jdbcToPaimonTypeVisitor
.visit(fieldType, precision, scale, typeMapping)
.copy(isNullable);
fieldName.equals(compositePrimaryKey)
? DataTypes.STRING()
: jdbcToPaimonTypeVisitor
.visit(fieldType, precision, scale, typeMapping)
.copy(isNullable);

builder.column(fieldName, paimonType, fieldComment);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,7 @@ protected abstract class SyncTableActionBuilder<T extends SynchronizationActionB
private final List<String> computedColumnArgs = new ArrayList<>();
private final List<String> typeMappingModes = new ArrayList<>();
private final List<String> metadataColumns = new ArrayList<>();
private final List<String> compositePrimaryKey = new ArrayList<>();

public SyncTableActionBuilder(Class<T> clazz, Map<String, String> sourceConfig) {
this.clazz = clazz;
Expand Down Expand Up @@ -371,6 +372,11 @@ public SyncTableActionBuilder<T> withMetadataColumns(String... metadataColumns)
return this;
}

public SyncTableActionBuilder<T> withCompositePrimaryKey(String... compositePrimaryKey) {
this.compositePrimaryKey.addAll(Arrays.asList(compositePrimaryKey));
return this;
}

public T build() {
List<String> args =
new ArrayList<>(
Expand All @@ -393,6 +399,7 @@ public T build() {

args.addAll(listToMultiArgs("--computed-column", computedColumnArgs));
args.addAll(listToMultiArgs("--metadata-column", metadataColumns));
args.addAll(listToMultiArgs("--composite_primary_key", compositePrimaryKey));

return createAction(clazz, args);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,4 +1588,62 @@ public void testRuntimeExecutionModeCheckForCdcSync() throws Exception {
waitForResult(expected, table, rowType, primaryKeys);
}
}

@Test
@Timeout(60)
public void testCompositePrimaryKey() throws Exception {
Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "composite_primary_key_\\d+");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withTableConfig(getBasicTableConfig())
.withPartitionKeys("pt")
.withPrimaryKeys("pt", "_id")
.withCompositePrimaryKey("pt")
.build();
runActionWithDefaultEnv(action);

Schema expectedSchema =
Schema.newBuilder()
.column("pt", DataTypes.STRING(), "primary")
.column("_id", DataTypes.INT(), "_id")
.column("v1", DataTypes.VARCHAR(10), "v1")
.primaryKey("pt", "_id")
.build();

checkTableSchema(expectedSchema);

try (Statement statement = getStatement()) {
FileStoreTable table = getFileStoreTable();
String table1 = "composite_primary_key_1";
String table2 = "composite_primary_key_2";
statement.executeUpdate("USE " + DATABASE_NAME);

statement.executeUpdate("INSERT INTO " + table1 + " VALUES (1, 1, 'one')");
statement.executeUpdate(
"INSERT INTO " + table2 + " VALUES (1, 2, 'one'), (1, 1, 'one')");

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.INT().notNull(),
DataTypes.VARCHAR(10)
},
new String[] {"pt", "_id", "v1"});
List<String> primaryKeys = Arrays.asList("pt", "_id");
List<String> expected =
Arrays.asList(
"+I[" + getPrefix(table1) + "1, 1, one]",
"+I[" + getPrefix(table2) + "1, 1, one]",
"+I[" + getPrefix(table2) + "1, 2, one]");
waitForResult(expected, table, rowType, primaryKeys);
}
}

private String getPrefix(String table) {
return DATABASE_NAME + "_" + table + "_";
}
}
Loading