Skip to content

Commit f3970d6

Browse files
authored
[Bug][Connector-v2] MongoDB CDC Set SeatunnelRow's tableId (#7935)
1 parent 5089d8a commit f3970d6

File tree

4 files changed

+169
-76
lines changed

4 files changed

+169
-76
lines changed

docs/en/connector-v2/source/MongoDB-CDC.md

Lines changed: 10 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -105,13 +105,14 @@ For specific types in MongoDB, we use Extended JSON format to map them to Seatun
105105
106106
## Source Options
107107
108-
| Name | Type | Required | Default | Description |
108+
| Name | Type | Required | Default | Description |
109109
|------------------------------------|--------|----------|---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
110110
| hosts | String | Yes | - | The comma-separated list of hostname and port pairs of the MongoDB servers. eg. `localhost:27017,localhost:27018` |
111111
| username | String | No | - | Name of the database user to be used when connecting to MongoDB. |
112112
| password | String | No | - | Password to be used when connecting to MongoDB. |
113113
| database | List | Yes | - | Name of the database to watch for changes. If not set then all databases will be captured. The database also supports regular expressions to monitor multiple databases matching the regular expression. eg. `db1,db2`. |
114114
| collection | List | Yes | - | Name of the collection in the database to watch for changes. If not set then all collections will be captured. The collection also supports regular expressions to monitor multiple collections matching fully-qualified collection identifiers. eg. `db1.coll1,db2.coll2`. |
115+
| schema | | yes | - | The structure of the data, including field names and field types. |
115116
| connection.options | String | No | - | The ampersand-separated connection options of MongoDB. eg. `replicaSet=test&connectTimeoutMS=300000`. |
116117
| batch.size | Long | No | 1024 | The cursor batch size. |
117118
| poll.max.batch.size | Enum | No | 1024 | Maximum number of change stream documents to include in a single batch when polling for new data. |
@@ -185,6 +186,14 @@ source {
185186
collection = ["inventory.products"]
186187
username = stuser
187188
password = stpw
189+
schema = {
190+
fields {
191+
"_id" : string,
192+
"name" : string,
193+
"description" : string,
194+
"weight" : string
195+
}
196+
}
188197
}
189198
}
190199
@@ -204,76 +213,6 @@ sink {
204213
}
205214
```
206215
207-
## Multi-table Synchronization
208-
209-
The following example demonstrates how to create a data synchronization job that read the cdc data of multiple library tables mongodb and prints it on the local client:
210-
211-
```hocon
212-
env {
213-
# You can set engine configuration here
214-
parallelism = 1
215-
job.mode = "STREAMING"
216-
checkpoint.interval = 5000
217-
}
218-
219-
source {
220-
MongoDB-CDC {
221-
hosts = "mongo0:27017"
222-
database = ["inventory","crm"]
223-
collection = ["inventory.products","crm.test"]
224-
username = stuser
225-
password = stpw
226-
}
227-
}
228-
229-
# Console printing of the read Mongodb data
230-
sink {
231-
Console {
232-
parallelism = 1
233-
}
234-
}
235-
```
236-
237-
### Tips:
238-
239-
> 1.The cdc synchronization of multiple library tables cannot specify the schema, and can only output json data downstream.
240-
> This is because MongoDB does not provide metadata information for querying, so if you want to support multiple tables, all tables can only be read as one structure.
241-
242-
## Regular Expression Matching for Multiple Tables
243-
244-
The following example demonstrates how to create a data synchronization job that through regular expression read the data of multiple library tables mongodb and prints it on the local client:
245-
246-
| Matching example | Expressions | | Describe |
247-
|------------------|-------------|---|----------------------------------------------------------------------------------------|
248-
| Prefix matching | ^(test).* | | Match the database name or table name with the prefix test, such as test1, test2, etc. |
249-
| Suffix matching | .*[p$] | | Match the database name or table name with the suffix p, such as cdcp, edcp, etc. |
250-
251-
```hocon
252-
env {
253-
# You can set engine configuration here
254-
parallelism = 1
255-
job.mode = "STREAMING"
256-
checkpoint.interval = 5000
257-
}
258-
259-
source {
260-
MongoDB-CDC {
261-
hosts = "mongo0:27017"
262-
# So this example is used (^(test).*|^(tpc).*|txc|.*[p$]|t{2}).(t[5-8]|tt),matching txc.tt、test2.test5.
263-
database = ["(^(test).*|^(tpc).*|txc|.*[p$]|t{2})"]
264-
collection = ["(t[5-8]|tt)"]
265-
username = stuser
266-
password = stpw
267-
}
268-
}
269-
270-
# Console printing of the read Mongodb data
271-
sink {
272-
Console {
273-
parallelism = 1
274-
}
275-
}
276-
```
277216
278217
## Format of real-time streaming data
279218
@@ -309,4 +248,3 @@ sink {
309248
}
310249
}
311250
```
312-

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/MongodbIncrementalSourceFactory.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222
import org.apache.seatunnel.api.source.SourceSplit;
2323
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2424
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
25+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
26+
import org.apache.seatunnel.api.table.catalog.TablePath;
27+
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
2528
import org.apache.seatunnel.api.table.connector.TableSource;
2629
import org.apache.seatunnel.api.table.factory.Factory;
2730
import org.apache.seatunnel.api.table.factory.TableSourceFactory;
@@ -31,11 +34,16 @@
3134
import org.apache.seatunnel.connectors.cdc.base.option.SourceOptions;
3235
import org.apache.seatunnel.connectors.cdc.base.option.StartupMode;
3336
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions;
37+
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
3438

3539
import com.google.auto.service.AutoService;
3640

3741
import java.io.Serializable;
3842
import java.util.List;
43+
import java.util.stream.Collectors;
44+
import java.util.stream.IntStream;
45+
46+
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
3947

4048
@AutoService(Factory.class)
4149
public class MongodbIncrementalSourceFactory implements TableSourceFactory {
@@ -50,7 +58,8 @@ public OptionRule optionRule() {
5058
.required(
5159
MongodbSourceOptions.HOSTS,
5260
MongodbSourceOptions.DATABASE,
53-
MongodbSourceOptions.COLLECTION)
61+
MongodbSourceOptions.COLLECTION,
62+
TableSchemaOptions.SCHEMA)
5463
.optional(
5564
MongodbSourceOptions.USERNAME,
5665
MongodbSourceOptions.PASSWORD,
@@ -79,9 +88,28 @@ public Class<? extends SeaTunnelSource> getSourceClass() {
7988
public <T, SplitT extends SourceSplit, StateT extends Serializable>
8089
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
8190
return () -> {
82-
List<CatalogTable> catalogTables =
91+
List<CatalogTable> configCatalog =
8392
CatalogTableUtil.getCatalogTables(
8493
context.getOptions(), context.getClassLoader());
94+
List<String> collections = context.getOptions().get(MongodbSourceOptions.COLLECTION);
95+
if (collections.size() != configCatalog.size()) {
96+
throw new MongodbConnectorException(
97+
ILLEGAL_ARGUMENT,
98+
"The number of collections must be equal to the number of schema tables");
99+
}
100+
List<CatalogTable> catalogTables =
101+
IntStream.range(0, configCatalog.size())
102+
.mapToObj(
103+
i -> {
104+
CatalogTable catalogTable = configCatalog.get(i);
105+
String fullName = collections.get(i);
106+
TableIdentifier tableIdentifier =
107+
TableIdentifier.of(
108+
catalogTable.getCatalogName(),
109+
TablePath.of(fullName));
110+
return CatalogTable.of(tableIdentifier, catalogTable);
111+
})
112+
.collect(Collectors.toList());
85113
SeaTunnelDataType<SeaTunnelRow> dataType =
86114
CatalogTableUtil.convertToMultipleRowType(catalogTables);
87115
return (SeaTunnelSource<T, SplitT, StateT>)

seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender;
1919

20+
import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;
21+
2022
import org.apache.seatunnel.api.source.Collector;
23+
import org.apache.seatunnel.api.table.catalog.TablePath;
2124
import org.apache.seatunnel.api.table.type.ArrayType;
2225
import org.apache.seatunnel.api.table.type.DecimalType;
2326
import org.apache.seatunnel.api.table.type.MapType;
@@ -62,10 +65,13 @@
6265
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT;
6366
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.UNSUPPORTED_DATA_TYPE;
6467
import static org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION;
68+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
69+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
6570
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DEFAULT_JSON_WRITER_SETTINGS;
6671
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
6772
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ENCODE_VALUE_FIELD;
6873
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
74+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
6975
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.extractBsonDocument;
7076
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
7177

@@ -169,8 +175,16 @@ private SeaTunnelRow extractRowData(
169175
}
170176

171177
private String extractTableId(SourceRecord record) {
172-
// TODO extract table id from record
173-
return null;
178+
Struct messageStruct = (Struct) record.value();
179+
Struct nsStruct = (Struct) messageStruct.get(NS_FIELD);
180+
String databaseName = nsStruct.getString(DB_FIELD);
181+
String tableName = nsStruct.getString(COLL_FIELD);
182+
return TablePath.of(databaseName, null, tableName).toString();
183+
}
184+
185+
@VisibleForTesting
186+
public String extractTableIdForTest(SourceRecord record) {
187+
return extractTableId(record);
174188
}
175189

176190
// -------------------------------------------------------------------------------------
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package mongodb.sender;
19+
20+
import org.apache.seatunnel.api.table.catalog.CatalogTable;
21+
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
22+
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
23+
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
24+
import org.apache.seatunnel.api.table.catalog.TableSchema;
25+
import org.apache.seatunnel.api.table.type.BasicType;
26+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
28+
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
29+
import org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
30+
31+
import org.apache.kafka.connect.source.SourceRecord;
32+
33+
import org.bson.BsonDocument;
34+
import org.bson.BsonInt64;
35+
import org.bson.BsonString;
36+
import org.junit.jupiter.api.Assertions;
37+
import org.junit.jupiter.api.Test;
38+
39+
import java.util.Collections;
40+
import java.util.Map;
41+
42+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
43+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
44+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DOCUMENT_KEY;
45+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.FULL_DOCUMENT;
46+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.ID_FIELD;
47+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.NS_FIELD;
48+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE;
49+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.OPERATION_TYPE_INSERT;
50+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_FIELD;
51+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SNAPSHOT_TRUE;
52+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.SOURCE_FIELD;
53+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.TS_MS_FIELD;
54+
import static org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils.createSourceOffsetMap;
55+
56+
public class MongoDBConnectorDeserializationSchemaTest {
57+
58+
@Test
59+
public void extractTableId() {
60+
CatalogTable catalogTable =
61+
CatalogTable.of(
62+
TableIdentifier.of("catalog", "database", "table"),
63+
TableSchema.builder()
64+
.column(
65+
PhysicalColumn.of(
66+
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
67+
.column(
68+
PhysicalColumn.of(
69+
"name1", BasicType.STRING_TYPE, 1L, true, null, ""))
70+
.build(),
71+
Collections.emptyMap(),
72+
Collections.emptyList(),
73+
"comment");
74+
SeaTunnelDataType<SeaTunnelRow> dataType =
75+
CatalogTableUtil.convertToMultipleRowType(Collections.singletonList(catalogTable));
76+
MongoDBConnectorDeserializationSchema schema =
77+
new MongoDBConnectorDeserializationSchema(dataType, dataType);
78+
79+
// Build SourceRecord
80+
Map<String, String> partitionMap =
81+
MongodbRecordUtils.createPartitionMap("localhost:27017", "inventory", "products");
82+
83+
BsonDocument valueDocument =
84+
new BsonDocument()
85+
.append(
86+
ID_FIELD,
87+
new BsonDocument(ID_FIELD, new BsonInt64(10000000000001L)))
88+
.append(OPERATION_TYPE, new BsonString(OPERATION_TYPE_INSERT))
89+
.append(
90+
NS_FIELD,
91+
new BsonDocument(DB_FIELD, new BsonString("inventory"))
92+
.append(COLL_FIELD, new BsonString("products")))
93+
.append(
94+
DOCUMENT_KEY,
95+
new BsonDocument(ID_FIELD, new BsonInt64(10000000000001L)))
96+
.append(FULL_DOCUMENT, new BsonDocument())
97+
.append(TS_MS_FIELD, new BsonInt64(System.currentTimeMillis()))
98+
.append(
99+
SOURCE_FIELD,
100+
new BsonDocument(SNAPSHOT_FIELD, new BsonString(SNAPSHOT_TRUE))
101+
.append(TS_MS_FIELD, new BsonInt64(0L)));
102+
BsonDocument keyDocument = new BsonDocument(ID_FIELD, valueDocument.get(ID_FIELD));
103+
SourceRecord sourceRecord =
104+
MongodbRecordUtils.buildSourceRecord(
105+
partitionMap,
106+
createSourceOffsetMap(keyDocument.getDocument(ID_FIELD), true),
107+
"inventory.products",
108+
keyDocument,
109+
valueDocument);
110+
Object tableId = schema.extractTableIdForTest(sourceRecord);
111+
Assertions.assertEquals("inventory.products", tableId);
112+
}
113+
}

0 commit comments

Comments
 (0)