Skip to content

Commit 37fcff3

Browse files
authored
[Bug][CDC] Fix state recovery error when switching a single table to multiple tables (#5784)
1 parent d908f0a commit 37fcff3

File tree

7 files changed

+78
-35
lines changed

7 files changed

+78
-35
lines changed

seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/CatalogTableUtil.java

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -151,13 +151,17 @@ public static SeaTunnelDataType<SeaTunnelRow> convertToDataType(
151151
if (catalogTables.size() == 1) {
152152
return catalogTables.get(0).getTableSchema().toPhysicalRowDataType();
153153
} else {
154-
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
155-
for (CatalogTable catalogTable : catalogTables) {
156-
String tableId = catalogTable.getTableId().toTablePath().toString();
157-
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
158-
}
159-
return new MultipleRowType(rowTypeMap);
154+
return convertToMultipleRowType(catalogTables);
155+
}
156+
}
157+
158+
public static MultipleRowType convertToMultipleRowType(List<CatalogTable> catalogTables) {
159+
Map<String, SeaTunnelRowType> rowTypeMap = new HashMap<>();
160+
for (CatalogTable catalogTable : catalogTables) {
161+
String tableId = catalogTable.getTableId().toTablePath().toString();
162+
rowTypeMap.put(tableId, catalogTable.getTableSchema().toPhysicalRowDataType());
160163
}
164+
return new MultipleRowType(rowTypeMap);
161165
}
162166

163167
// We need to use buildWithConfig(String catalogName, ReadonlyConfig readonlyConfig);

seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/debezium/row/SeaTunnelRowDebeziumDeserializeSchema.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2828
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2929
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
30+
import org.apache.seatunnel.api.table.type.SqlType;
3031
import org.apache.seatunnel.connectors.cdc.base.schema.SchemaChangeResolver;
3132
import org.apache.seatunnel.connectors.cdc.base.utils.SourceRecordUtils;
3233
import org.apache.seatunnel.connectors.cdc.debezium.DebeziumDeserializationConverterFactory;
@@ -233,12 +234,14 @@ public SchemaChangeResolver getSchemaChangeResolver() {
233234

234235
@Override
235236
public void restoreCheckpointProducedType(SeaTunnelDataType<SeaTunnelRow> checkpointDataType) {
236-
if (!checkpointDataType.getSqlType().equals(resultTypeInfo.getSqlType())) {
237-
throw new IllegalStateException(
238-
String.format(
239-
"The produced type %s of the SeaTunnel deserialization schema "
240-
+ "doesn't match the type %s of the restored snapshot.",
241-
resultTypeInfo.getSqlType(), checkpointDataType.getSqlType()));
237+
if (SqlType.ROW.equals(checkpointDataType.getSqlType())
238+
&& SqlType.MULTIPLE_ROW.equals(resultTypeInfo.getSqlType())) {
239+
// TODO: Older versions may have this issue
240+
log.warn(
241+
"Skip incompatible restore type. produced type: {}, checkpoint type: {}",
242+
resultTypeInfo,
243+
checkpointDataType);
244+
return;
242245
}
243246
if (checkpointDataType instanceof MultipleRowType) {
244247
MultipleRowType latestDataType = (MultipleRowType) resultTypeInfo;

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
7777
CatalogTableUtil.getCatalogTables(
7878
context.getOptions(), context.getClassLoader());
7979
SeaTunnelDataType<SeaTunnelRow> dataType =
80-
CatalogTableUtil.convertToDataType(catalogTables);
80+
CatalogTableUtil.convertToMultipleRowType(catalogTables);
8181
return (SeaTunnelSource<T, SplitT, StateT>)
8282
new MongodbIncrementalSource<>(context.getOptions(), dataType, catalogTables);
8383
};

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java

Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.seatunnel.api.table.type.ArrayType;
2222
import org.apache.seatunnel.api.table.type.DecimalType;
2323
import org.apache.seatunnel.api.table.type.MapType;
24+
import org.apache.seatunnel.api.table.type.MultipleRowType;
2425
import org.apache.seatunnel.api.table.type.RowKind;
2526
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2627
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -41,6 +42,7 @@
4142
import org.bson.types.Decimal128;
4243

4344
import com.mongodb.client.model.changestream.OperationType;
45+
import lombok.extern.slf4j.Slf4j;
4446

4547
import javax.annotation.Nonnull;
4648

@@ -67,17 +69,17 @@
6769
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
6870
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
6971

72+
@Slf4j
7073
public class MongoDBConnectorDeserializationSchema
7174
implements DebeziumDeserializationSchema<SeaTunnelRow> {
72-
7375
private final SeaTunnelDataType<SeaTunnelRow> resultTypeInfo;
7476

75-
private final DeserializationRuntimeConverter physicalConverter;
77+
private final Map<String, DeserializationRuntimeConverter> tableRowConverters;
7678

7779
public MongoDBConnectorDeserializationSchema(
7880
SeaTunnelDataType<SeaTunnelRow> physicalDataType,
7981
SeaTunnelDataType<SeaTunnelRow> resultTypeInfo) {
80-
this.physicalConverter = createConverter(physicalDataType);
82+
this.tableRowConverters = createConverter(physicalDataType);
8183
this.resultTypeInfo = resultTypeInfo;
8284
}
8385

@@ -92,29 +94,44 @@ public void deserialize(@Nonnull SourceRecord record, Collector<SeaTunnelRow> ou
9294
Objects.requireNonNull(
9395
extractBsonDocument(value, valueSchema, DOCUMENT_KEY)));
9496
BsonDocument fullDocument = extractBsonDocument(value, valueSchema, FULL_DOCUMENT);
97+
String tableId = extractTableId(record);
98+
DeserializationRuntimeConverter tableRowConverter;
99+
if (tableId == null && tableRowConverters.size() == 1) {
100+
tableRowConverter = tableRowConverters.values().iterator().next();
101+
} else {
102+
tableRowConverter = tableRowConverters.get(tableId);
103+
}
104+
if (tableRowConverter == null) {
105+
log.debug("Ignore newly added table {}", tableId);
106+
return;
107+
}
95108

96109
switch (op) {
97110
case INSERT:
98-
SeaTunnelRow insert = extractRowData(fullDocument);
111+
SeaTunnelRow insert = extractRowData(tableRowConverter, fullDocument);
99112
insert.setRowKind(RowKind.INSERT);
113+
insert.setTableId(tableId);
100114
emit(record, insert, out);
101115
break;
102116
case DELETE:
103-
SeaTunnelRow delete = extractRowData(documentKey);
117+
SeaTunnelRow delete = extractRowData(tableRowConverter, documentKey);
104118
delete.setRowKind(RowKind.DELETE);
119+
delete.setTableId(tableId);
105120
emit(record, delete, out);
106121
break;
107122
case UPDATE:
108123
if (fullDocument == null) {
109124
break;
110125
}
111-
SeaTunnelRow updateAfter = extractRowData(fullDocument);
126+
SeaTunnelRow updateAfter = extractRowData(tableRowConverter, fullDocument);
112127
updateAfter.setRowKind(RowKind.UPDATE_AFTER);
128+
updateAfter.setTableId(tableId);
113129
emit(record, updateAfter, out);
114130
break;
115131
case REPLACE:
116-
SeaTunnelRow replaceAfter = extractRowData(fullDocument);
132+
SeaTunnelRow replaceAfter = extractRowData(tableRowConverter, fullDocument);
117133
replaceAfter.setRowKind(RowKind.UPDATE_AFTER);
134+
replaceAfter.setTableId(tableId);
118135
emit(record, replaceAfter, out);
119136
break;
120137
case INVALIDATE:
@@ -145,9 +162,15 @@ private void emit(
145162
collector.collect(physicalRow);
146163
}
147164

148-
private SeaTunnelRow extractRowData(BsonDocument document) {
165+
private SeaTunnelRow extractRowData(
166+
DeserializationRuntimeConverter tableRowConverter, BsonDocument document) {
149167
checkNotNull(document);
150-
return (SeaTunnelRow) physicalConverter.convert(document);
168+
return (SeaTunnelRow) tableRowConverter.convert(document);
169+
}
170+
171+
private String extractTableId(SourceRecord record) {
172+
// TODO extract table id from record
173+
return null;
151174
}
152175

153176
// -------------------------------------------------------------------------------------
@@ -159,17 +182,24 @@ public interface DeserializationRuntimeConverter extends Serializable {
159182
Object convert(BsonValue bsonValue);
160183
}
161184

162-
public DeserializationRuntimeConverter createConverter(SeaTunnelDataType<?> type) {
163-
SerializableFunction<BsonValue, Object> internalRowConverter =
164-
createNullSafeInternalConverter(type);
165-
return new DeserializationRuntimeConverter() {
166-
private static final long serialVersionUID = 1L;
167-
168-
@Override
169-
public Object convert(BsonValue bsonValue) {
170-
return internalRowConverter.apply(bsonValue);
171-
}
172-
};
185+
public Map<String, DeserializationRuntimeConverter> createConverter(
186+
SeaTunnelDataType<?> inputDataType) {
187+
Map<String, DeserializationRuntimeConverter> tableRowConverters = new HashMap<>();
188+
for (Map.Entry<String, SeaTunnelRowType> item : (MultipleRowType) inputDataType) {
189+
SerializableFunction<BsonValue, Object> internalRowConverter =
190+
createNullSafeInternalConverter(item.getValue());
191+
DeserializationRuntimeConverter itemRowConverter =
192+
new DeserializationRuntimeConverter() {
193+
private static final long serialVersionUID = 1L;
194+
195+
@Override
196+
public Object convert(BsonValue bsonValue) {
197+
return internalRowConverter.apply(bsonValue);
198+
}
199+
};
200+
tableRowConverters.put(item.getKey(), itemRowConverter);
201+
}
202+
return tableRowConverters;
173203
}
174204

175205
private static SerializableFunction<BsonValue, Object> createNullSafeInternalConverter(

seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/source/MySqlIncrementalSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
9797
CatalogTableUtil.getCatalogTables(
9898
context.getOptions(), context.getClassLoader());
9999
SeaTunnelDataType<SeaTunnelRow> dataType =
100-
CatalogTableUtil.convertToDataType(catalogTables);
100+
CatalogTableUtil.convertToMultipleRowType(catalogTables);
101101
return (SeaTunnelSource<T, SplitT, StateT>)
102102
new MySqlIncrementalSource<>(context.getOptions(), dataType, catalogTables);
103103
};

seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/source/SqlServerIncrementalSourceFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
102102
CatalogTableUtil.getCatalogTables(
103103
context.getOptions(), context.getClassLoader());
104104
SeaTunnelDataType<SeaTunnelRow> dataType =
105-
CatalogTableUtil.convertToDataType(catalogTables);
105+
CatalogTableUtil.convertToMultipleRowType(catalogTables);
106106
return new SqlServerIncrementalSource(context.getOptions(), dataType, catalogTables);
107107
};
108108
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MysqlCDCIT.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,12 @@ public void testMultiTableWithRestore(TestContainer container)
324324
getSourceQuerySQL(
325325
MYSQL_DATABASE2,
326326
SOURCE_TABLE_2)))));
327+
328+
log.info("****************** container logs start ******************");
329+
String containerLogs = container.getServerLogs();
330+
log.info(containerLogs);
331+
Assertions.assertFalse(containerLogs.contains("ERROR"));
332+
log.info("****************** container logs end ******************");
327333
}
328334

329335
private Connection getJdbcConnection() throws SQLException {

0 commit comments

Comments
 (0)