Skip to content

Commit 3ca60c6

Browse files
authored
[Feature][CDC][Mysql] Support read database list (#4255)
1 parent b4e6f78 commit 3ca60c6

File tree

3 files changed

+13
-9
lines changed

3 files changed

+13
-9
lines changed

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.seatunnel.connectors.cdc.debezium.row;
1919

2020
import org.apache.seatunnel.api.source.Collector;
21+
import org.apache.seatunnel.api.table.catalog.TablePath;
2122
import org.apache.seatunnel.api.table.type.MultipleRowType;
2223
import org.apache.seatunnel.api.table.type.RowKind;
2324
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -113,12 +114,14 @@ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)
113114
Schema valueSchema = record.valueSchema();
114115

115116
Struct sourceStruct = messageStruct.getStruct(Envelope.FieldName.SOURCE);
117+
String databaseName = sourceStruct.getString(AbstractSourceInfo.DATABASE_NAME_KEY);
116118
String tableName = sourceStruct.getString(AbstractSourceInfo.TABLE_NAME_KEY);
119+
String tableId = TablePath.of(databaseName, tableName).toString();
117120
SeaTunnelRowDebeziumDeserializationConverters converters;
118121
if (!multipleTableRowConverters.isEmpty()) {
119-
converters = multipleTableRowConverters.get(tableName);
122+
converters = multipleTableRowConverters.get(tableId);
120123
if (converters == null) {
121-
log.debug("Ignore newly added table {}", tableName);
124+
log.debug("Ignore newly added table {}", tableId);
122125
return;
123126
}
124127
} else {
@@ -128,26 +131,26 @@ public void deserialize(SourceRecord record, Collector<SeaTunnelRow> collector)
128131
if (operation == Envelope.Operation.CREATE || operation == Envelope.Operation.READ) {
129132
SeaTunnelRow insert = extractAfterRow(converters, record, messageStruct, valueSchema);
130133
insert.setRowKind(RowKind.INSERT);
131-
insert.setTableId(tableName);
134+
insert.setTableId(tableId);
132135
validator.validate(insert, RowKind.INSERT);
133136
collector.collect(insert);
134137
} else if (operation == Envelope.Operation.DELETE) {
135138
SeaTunnelRow delete = extractBeforeRow(converters, record, messageStruct, valueSchema);
136139
validator.validate(delete, RowKind.DELETE);
137140
delete.setRowKind(RowKind.DELETE);
138-
delete.setTableId(tableName);
141+
delete.setTableId(tableId);
139142
collector.collect(delete);
140143
} else {
141144
SeaTunnelRow before = extractBeforeRow(converters, record, messageStruct, valueSchema);
142145
validator.validate(before, RowKind.UPDATE_BEFORE);
143146
before.setRowKind(RowKind.UPDATE_BEFORE);
144-
before.setTableId(tableName);
147+
before.setTableId(tableId);
145148
collector.collect(before);
146149

147150
SeaTunnelRow after = extractAfterRow(converters, record, messageStruct, valueSchema);
148151
validator.validate(after, RowKind.UPDATE_AFTER);
149152
after.setRowKind(RowKind.UPDATE_AFTER);
150-
after.setTableId(tableName);
153+
after.setTableId(tableId);
151154
collector.collect(after);
152155
}
153156
}

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,9 @@ TableSource<T, SplitT, StateT> createSource(TableFactoryContext context) {
8282
} else {
8383
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
8484
for (CatalogTable catalogTable : context.getCatalogTables()) {
85-
String tableId = catalogTable.getTableId().getTableName();
86-
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
85+
rowTypeMap.put(
86+
catalogTable.getTableId().toTablePath().toString(),
87+
catalogTable.getTableSchema().toPhysicalRowDataType());
8788
}
8889
dataType = new MultipleRowType(rowTypeMap);
8990
}

seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ private int getParallelism(ReadonlyConfig config) {
298298
Collections.singletonList(leftAction),
299299
sink,
300300
factoryUrls,
301-
new SinkConfig(catalogTable.getTableId().getTableName()));
301+
new SinkConfig(catalogTable.getTableId().toTablePath().toString()));
302302
handleSaveMode(sink);
303303
sinkAction.setParallelism(leftAction.getParallelism());
304304
sinkActions.add(sinkAction);

0 commit comments

Comments
 (0)