Skip to content

Commit 21ef45f

Browse files
authored
[Feature][CDC] MySQL CDC supports deserialization of multi-tables (#4067)
1 parent 74666c4 commit 21ef45f

File tree

9 files changed

+181
-27
lines changed

9 files changed

+181
-27
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TableIdentifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public String getDatabaseName() {
4747
return databaseName;
4848
}
4949

50-
public String gettableName() {
50+
public String getTableName() {
5151
return tableName;
5252
}
5353

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.api.table.type;
19+
20+
import lombok.RequiredArgsConstructor;
21+
22+
import java.util.HashMap;
23+
import java.util.Iterator;
24+
import java.util.Map;
25+
26+
@RequiredArgsConstructor
27+
public class MultipleRowType implements SeaTunnelDataType<SeaTunnelRow>, Iterable<Map.Entry<String, SeaTunnelRowType>> {
28+
private final Map<String, SeaTunnelRowType> rowTypeMap;
29+
30+
public MultipleRowType(String[] tableIds, SeaTunnelRowType[] rowTypes) {
31+
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
32+
for (int i = 0; i < tableIds.length; i++) {
33+
rowTypeMap.put(tableIds[i], rowTypes[i]);
34+
}
35+
this.rowTypeMap = rowTypeMap;
36+
}
37+
38+
public SeaTunnelRowType getRowType(String tableId) {
39+
return rowTypeMap.get(tableId);
40+
}
41+
42+
@Override
43+
public Class<SeaTunnelRow> getTypeClass() {
44+
return SeaTunnelRow.class;
45+
}
46+
47+
@Override
48+
public SqlType getSqlType() {
49+
return SqlType.MULTIPLE_ROW;
50+
}
51+
52+
@Override
53+
public Iterator<Map.Entry<String, SeaTunnelRowType>> iterator() {
54+
return rowTypeMap.entrySet().iterator();
55+
}
56+
}

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SeaTunnelRow.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
public final class SeaTunnelRow implements Serializable {
3030
private static final long serialVersionUID = -1L;
3131
/** Table identifier, used for the source connector that {@link SupportMultipleTable}. */
32-
private int tableId = -1;
32+
private String tableId;
3333
/** The kind of change that a row describes in a changelog. */
3434
private RowKind kind = RowKind.INSERT;
3535
/** The array to store the actual internal format values. */
@@ -47,7 +47,7 @@ public void setField(int pos, Object value) {
4747
this.fields[pos] = value;
4848
}
4949

50-
public void setTableId(int tableId) {
50+
public void setTableId(String tableId) {
5151
this.tableId = tableId;
5252
}
5353

@@ -59,7 +59,7 @@ public int getArity() {
5959
return fields.length;
6060
}
6161

62-
public int getTableId() {
62+
public String getTableId() {
6363
return tableId;
6464
}
6565

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlType.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,5 +37,6 @@ public enum SqlType {
3737
DATE,
3838
TIME,
3939
TIMESTAMP,
40-
ROW;
40+
ROW,
41+
MULTIPLE_ROW;
4142
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/source/IncrementalSource.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.seatunnel.api.source.SourceReader;
2525
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
2626
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2728
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2829
import org.apache.seatunnel.connectors.cdc.base.config.StartupConfig;
2930
import org.apache.seatunnel.connectors.cdc.base.config.StopConfig;
@@ -53,6 +54,7 @@
5354
import org.apache.seatunnel.shade.com.typesafe.config.Config;
5455

5556
import io.debezium.relational.TableId;
57+
import lombok.NoArgsConstructor;
5658

5759
import java.util.HashMap;
5860
import java.util.HashSet;
@@ -61,6 +63,7 @@
6163
import java.util.concurrent.LinkedBlockingQueue;
6264
import java.util.function.Supplier;
6365

66+
@NoArgsConstructor
6467
public abstract class IncrementalSource<T, C extends SourceConfig> implements SeaTunnelSource<T, SourceSplitBase, PendingSplitsState> {
6568

6669
protected ReadonlyConfig readonlyConfig;
@@ -76,6 +79,21 @@ public abstract class IncrementalSource<T, C extends SourceConfig> implements Se
7679
protected StopMode stopMode;
7780
protected DebeziumDeserializationSchema<T> deserializationSchema;
7881

82+
protected SeaTunnelDataType<SeaTunnelRow> dataType;
83+
84+
protected IncrementalSource(ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
85+
this.dataType = dataType;
86+
this.readonlyConfig = options;
87+
this.startupConfig = getStartupConfig(readonlyConfig);
88+
this.stopConfig = getStopConfig(readonlyConfig);
89+
this.stopMode = stopConfig.getStopMode();
90+
this.incrementalParallelism = readonlyConfig.get(SourceOptions.INCREMENTAL_PARALLELISM);
91+
this.configFactory = createSourceConfigFactory(readonlyConfig);
92+
this.dataSourceDialect = createDataSourceDialect(readonlyConfig);
93+
this.deserializationSchema = createDebeziumDeserializationSchema(readonlyConfig);
94+
this.offsetFactory = createOffsetFactory(readonlyConfig);
95+
}
96+
7997
@Override
8098
public final void prepare(Config pluginConfig) throws PrepareFailException {
8199
this.readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig);

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java

Lines changed: 44 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,10 @@
1818
package org.apache.seatunnel.connectors.cdc.debezium.row;
1919

2020
import static com.google.common.base.Preconditions.checkNotNull;
21+
import static io.debezium.connector.AbstractSourceInfo.DATABASE_NAME_KEY;
2122

2223
import org.apache.seatunnel.api.source.Collector;
24+
import org.apache.seatunnel.api.table.type.MultipleRowType;
2325
import org.apache.seatunnel.api.table.type.RowKind;
2426
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2527
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,6 +38,9 @@
3638

3739
import java.io.Serializable;
3840
import java.time.ZoneId;
41+
import java.util.Collections;
42+
import java.util.HashMap;
43+
import java.util.Map;
3944

4045
/**
4146
* Deserialization schema from Debezium object to {@link SeaTunnelRow}.
@@ -52,7 +57,9 @@ public final class SeaTunnelRowDebeziumDeserializeSchema
5257
/**
5358
* Runtime converter that converts Kafka {@link SourceRecord}s into {@link SeaTunnelRow} consisted of
5459
*/
55-
private final SeaTunnelRowDebeziumDeserializationConverters converters;
60+
private final SeaTunnelRowDebeziumDeserializationConverters singleRowConverter;
61+
62+
private final Map<String, SeaTunnelRowDebeziumDeserializationConverters> multipleRowConverters;
5663

5764
/**
5865
* Validator to validate the row value.
@@ -67,18 +74,35 @@ public static Builder builder() {
6774
}
6875

6976
SeaTunnelRowDebeziumDeserializeSchema(
70-
SeaTunnelRowType physicalDataType,
77+
SeaTunnelDataType<SeaTunnelRow> physicalDataType,
7178
MetadataConverter[] metadataConverters,
72-
SeaTunnelRowType resultType,
79+
SeaTunnelDataType<SeaTunnelRow> resultType,
7380
ValueValidator validator,
7481
ZoneId serverTimeZone,
7582
DebeziumDeserializationConverterFactory userDefinedConverterFactory) {
76-
this.converters = new SeaTunnelRowDebeziumDeserializationConverters(
77-
physicalDataType,
78-
metadataConverters,
79-
serverTimeZone,
80-
userDefinedConverterFactory
81-
);
83+
84+
SeaTunnelRowDebeziumDeserializationConverters singleRowConverter = null;
85+
Map<String, SeaTunnelRowDebeziumDeserializationConverters> multipleRowConverters = Collections.emptyMap();
86+
if (physicalDataType instanceof MultipleRowType) {
87+
multipleRowConverters = new HashMap<>();
88+
for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType) physicalDataType) {
89+
SeaTunnelRowDebeziumDeserializationConverters itemRowConverter = new SeaTunnelRowDebeziumDeserializationConverters(
90+
item.getValue(),
91+
metadataConverters,
92+
serverTimeZone,
93+
userDefinedConverterFactory);
94+
multipleRowConverters.put(item.getKey(), itemRowConverter);
95+
}
96+
} else {
97+
singleRowConverter = new SeaTunnelRowDebeziumDeserializationConverters(
98+
(SeaTunnelRowType) physicalDataType,
99+
metadataConverters,
100+
serverTimeZone,
101+
userDefinedConverterFactory
102+
);
103+
}
104+
this.singleRowConverter = singleRowConverter;
105+
this.multipleRowConverters = multipleRowConverters;
82106
this.resultTypeInfo = checkNotNull(resultType);
83107
this.validator = checkNotNull(validator);
84108
}
@@ -90,28 +114,34 @@ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)
90114
Schema valueSchema = record.valueSchema();
91115

92116
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
93-
// TODO: multi-table
117+
String database = sourceStruct.getString(DATABASE_NAME_KEY);
94118
String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
119+
String tableId = database + ":" + tableName;
120+
SeaTunnelRowDebeziumDeserializationConverters converters = multipleRowConverters.getOrDefault(tableId, singleRowConverter);
95121

96122
if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) {
97123
SeaTunnelRow insert = extractAfterRow(converters, record, messageStruct, valueSchema);
98124
insert.setRowKind(RowKind.INSERT);
125+
insert.setTableId(tableId);
99126
validator.validate(insert, RowKind.INSERT);
100127
collector.collect(insert);
101128
} else if (operation == Envelope.Operation.DELETE) {
102129
SeaTunnelRow delete = extractBeforeRow(converters, record, messageStruct, valueSchema);
103130
validator.validate(delete, RowKind.DELETE);
104131
delete.setRowKind(RowKind.DELETE);
132+
delete.setTableId(tableId);
105133
collector.collect(delete);
106134
} else {
107135
SeaTunnelRow before = extractBeforeRow(converters, record, messageStruct, valueSchema);
108136
validator.validate(before, RowKind.UPDATE_BEFORE);
109137
before.setRowKind(RowKind.UPDATE_BEFORE);
138+
before.setTableId(tableId);
110139
collector.collect(before);
111140

112141
SeaTunnelRow after = extractAfterRow(converters, record, messageStruct, valueSchema);
113142
validator.validate(after, RowKind.UPDATE_AFTER);
114143
after.setRowKind(RowKind.UPDATE_AFTER);
144+
after.setTableId(tableId);
115145
collector.collect(after);
116146
}
117147
}
@@ -159,16 +189,16 @@ public interface ValueValidator extends Serializable {
159189
* Builder of {@link SeaTunnelRowDebeziumDeserializeSchema}.
160190
*/
161191
public static class Builder {
162-
private SeaTunnelRowType physicalRowType;
163-
private SeaTunnelRowType resultTypeInfo;
192+
private SeaTunnelDataType<SeaTunnelRow> physicalRowType;
193+
private SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
164194
private MetadataConverter[] metadataConverters = new MetadataConverter[0];
165195
private ValueValidator validator = (rowData, rowKind) -> {
166196
};
167197
private ZoneId serverTimeZone = ZoneId.of("UTC");
168198
private DebeziumDeserializationConverterFactory userDefinedConverterFactory =
169199
DebeziumDeserializationConverterFactory.DEFAULT;
170200

171-
public Builder setPhysicalRowType(SeaTunnelRowType physicalRowType) {
201+
public Builder setPhysicalRowType(SeaTunnelDataType<SeaTunnelRow> physicalRowType) {
172202
this.physicalRowType = physicalRowType;
173203
return this;
174204
}
@@ -178,7 +208,7 @@ public Builder setMetadataConverters(MetadataConverter[] metadataConverters) {
178208
return this;
179209
}
180210

181-
public Builder setResultTypeInfo(SeaTunnelRowType resultTypeInfo) {
211+
public Builder setResultTypeInfo(SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
182212
this.resultTypeInfo = resultTypeInfo;
183213
return this;
184214
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSource.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@
2222
import org.apache.seatunnel.api.source.SupportParallelism;
2323
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2424
import org.apache.seatunnel.api.table.catalog.TablePath;
25-
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2627
import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfig;
2728
import org.apache.seatunnel.connectors.cdc.base.config.SourceConfig;
2829
import org.apache.seatunnel.connectors.cdc.base.dialect.DataSourceDialect;
@@ -45,6 +46,10 @@
4546
public class MySqlIncrementalSource<T> extends IncrementalSource<T, JdbcSourceConfig> implements SupportParallelism {
4647
static final String IDENTIFIER = "MySQL-CDC";
4748

49+
public MySqlIncrementalSource(ReadonlyConfig options, SeaTunnelDataType<SeaTunnelRow> dataType) {
50+
super(options, dataType);
51+
}
52+
4853
@Override
4954
public String getPluginName() {
5055
return IDENTIFIER;
@@ -65,11 +70,15 @@ public SourceConfig.Factory<JdbcSourceConfig> createSourceConfigFactory(Readonly
6570
public DebeziumDeserializationSchema<T> createDebeziumDeserializationSchema(ReadonlyConfig config) {
6671
JdbcSourceConfig jdbcSourceConfig = configFactory.create(0);
6772
String baseUrl = config.get(JdbcCatalogOptions.BASE_URL);
68-
// TODO: support multi-table
69-
// TODO: support metadata keys
70-
MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), jdbcSourceConfig.getPassword(), baseUrl);
71-
CatalogTable table = mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), config.get(JdbcSourceOptions.TABLE_NAME)));
72-
SeaTunnelRowType physicalRowType = table.getTableSchema().toPhysicalRowDataType();
73+
SeaTunnelDataType<SeaTunnelRow> physicalRowType;
74+
if (dataType == null) {
75+
// TODO: support metadata keys
76+
MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql", jdbcSourceConfig.getDatabaseList().get(0), jdbcSourceConfig.getUsername(), jdbcSourceConfig.getPassword(), baseUrl);
77+
CatalogTable table = mySqlCatalog.getTable(TablePath.of(jdbcSourceConfig.getDatabaseList().get(0), config.get(JdbcSourceOptions.TABLE_NAME)));
78+
physicalRowType = table.getTableSchema().toPhysicalRowDataType();
79+
} else {
80+
physicalRowType = dataType;
81+
}
7382
String zoneId = config.get(JdbcSourceOptions.SERVER_TIME_ZONE);
7483
return (DebeziumDeserializationSchema<T>) SeaTunnelRowDebeziumDeserializeSchema.builder()
7584
.setPhysicalRowType(physicalRowType)

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,29 @@
1919

2020
import org.apache.seatunnel.api.configuration.util.OptionRule;
2121
import org.apache.seatunnel.api.source.SeaTunnelSource;
22+
import org.apache.seatunnel.api.source.SourceSplit;
23+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
24+
import org.apache.seatunnel.api.table.connector.TableSource;
2225
import org.apache.seatunnel.api.table.factory.Factory;
26+
import org.apache.seatunnel.api.table.factory.SupportMultipleTable;
27+
import org.apache.seatunnel.api.table.factory.TableFactoryContext;
2328
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
29+
import org.apache.seatunnel.api.table.type.MultipleRowType;
30+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
31+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
32+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2433
import org.apache.seatunnel.connectors.cdc.base.option.JdbcSourceOptions;
2534
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.JdbcCatalogOptions;
2635

2736
import com.google.auto.service.AutoService;
2837

38+
import java.io.Serializable;
39+
import java.util.Collections;
40+
import java.util.HashMap;
41+
import java.util.Map;
42+
2943
@AutoService(Factory.class)
30-
public class MySqlIncrementalSourceFactory implements TableSourceFactory {
44+
public class MySqlIncrementalSourceFactory implements TableSourceFactory, SupportMultipleTable {
3145
@Override
3246
public String factoryIdentifier() {
3347
return MySqlIncrementalSource.IDENTIFIER;
@@ -57,4 +71,30 @@ public OptionRule optionRule() {
5771
public Class<? extends SeaTunnelSource> getSourceClass() {
5872
return MySqlIncrementalSource.class;
5973
}
74+
75+
@Override
76+
public <T, SplitT extends SourceSplit, StateT extends Serializable> TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
77+
return () -> {
78+
SeaTunnelDataType<SeaTunnelRow> dataType;
79+
if (context.getCatalogTables().size() == 1) {
80+
dataType = context.getCatalogTables()
81+
.get(0)
82+
.getTableSchema()
83+
.toPhysicalRowDataType();
84+
} else {
85+
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
86+
for (CatalogTable catalogTable : context.getCatalogTables()) {
87+
String tableId = catalogTable.getTableId().getDatabaseName() + ":" + catalogTable.getTableId().getTableName();
88+
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
89+
}
90+
dataType = new MultipleRowType(rowTypeMap);
91+
}
92+
return (SeaTunnelSource<T, SplitT, StateT>) new MySqlIncrementalSource<>(context.getOptions(), dataType);
93+
};
94+
}
95+
96+
@Override
97+
public Result applyTables(TableFactoryContext context) {
98+
return Result.of(context.getCatalogTables(), Collections.emptyList());
99+
}
60100
}

0 commit comments

Comments
 (0)