Skip to content

Commit 51c28a3

Browse files
authored
[Feature][Connector-V2] Add mongodb connecter sink (#2694)
* [Feature][Connector-V2] Add mongodb connecter sink * Add license header * Add spark mongodb sink e2e * Add spark mongodb sink e2e * Fix * Fix * Fix
1 parent a496736 commit 51c28a3

File tree

11 files changed

+696
-0
lines changed

11 files changed

+696
-0
lines changed

docs/en/connector-v2/sink/MongoDB.md

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# MongoDb
2+
3+
> MongoDB sink connector
4+
5+
## Description
6+
7+
Write data to `MongoDB`
8+
9+
## Key features
10+
11+
- [x] [batch](../../concept/connector-v2-features.md)
12+
- [x] [stream](../../concept/connector-v2-features.md)
13+
- [ ] [exactly-once](../../concept/connector-v2-features.md)
14+
- [ ] [schema projection](../../concept/connector-v2-features.md)
15+
- [ ] [parallelism](../../concept/connector-v2-features.md)
16+
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
17+
18+
## Options
19+
20+
| name | type | required | default value |
21+
|------------| ------ |----------| ------------- |
22+
| uri | string | yes | - |
23+
| database | string | yes | - |
24+
| collection | string | yes | - |
25+
26+
### uri [string]
27+
28+
uri to write to mongoDB
29+
30+
### database [string]
31+
32+
database to write to mongoDB
33+
34+
### collection [string]
35+
36+
collection to write to mongoDB
37+
38+
## Example
39+
40+
```bash
41+
mongodb {
42+
uri = "mongodb://username:password@127.0.0.1:27017/mypost?retryWrites=true&writeConcern=majority"
43+
database = "mydatabase"
44+
collection = "mycollection"
45+
}
46+
```

plugin-mapping.properties

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,4 +127,5 @@ seatunnel.sink.Redis = connector-redis
127127
seatunnel.sink.DataHub = connector-datahub
128128
seatunnel.sink.Sentry = connector-sentry
129129
seatunnel.source.MongoDB = connector-mongodb
130+
seatunnel.sink.MongoDB = connector-mongodb
130131

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
19+
20+
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.COLLECTION;
21+
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.DATABASE;
22+
import static org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbConfig.URI;
23+
24+
import org.apache.seatunnel.api.common.PrepareFailException;
25+
import org.apache.seatunnel.api.sink.SeaTunnelSink;
26+
import org.apache.seatunnel.api.sink.SinkWriter;
27+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
28+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
29+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
30+
import org.apache.seatunnel.common.config.CheckConfigUtil;
31+
import org.apache.seatunnel.common.config.CheckResult;
32+
import org.apache.seatunnel.common.constants.PluginType;
33+
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSimpleSink;
34+
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
35+
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
36+
37+
import org.apache.seatunnel.shade.com.typesafe.config.Config;
38+
import org.apache.seatunnel.shade.com.typesafe.config.ConfigBeanFactory;
39+
40+
import com.google.auto.service.AutoService;
41+
42+
import java.io.IOException;
43+
44+
@AutoService(SeaTunnelSink.class)
45+
public class MongodbSink extends AbstractSimpleSink<SeaTunnelRow, Void> {
46+
47+
private SeaTunnelRowType rowType;
48+
49+
private MongodbParameters params;
50+
51+
@Override
52+
public String getPluginName() {
53+
return "MongoDB";
54+
}
55+
56+
@Override
57+
public void prepare(Config config) throws PrepareFailException {
58+
CheckResult result = CheckConfigUtil.checkAllExists(config, URI, DATABASE, COLLECTION);
59+
if (!result.isSuccess()) {
60+
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
61+
}
62+
63+
this.params = ConfigBeanFactory.create(config, MongodbParameters.class);
64+
}
65+
66+
@Override
67+
public void setTypeInfo(SeaTunnelRowType rowType) {
68+
this.rowType = rowType;
69+
}
70+
71+
@Override
72+
public SeaTunnelDataType<SeaTunnelRow> getConsumedType() {
73+
return rowType;
74+
}
75+
76+
@Override
77+
public AbstractSinkWriter<SeaTunnelRow, Void> createWriter(SinkWriter.Context context) throws IOException {
78+
return new MongodbSinkWriter(rowType, params);
79+
}
80+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.mongodb.sink;
19+
20+
import org.apache.seatunnel.api.serialization.SerializationSchema;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.connectors.seatunnel.common.sink.AbstractSinkWriter;
24+
import org.apache.seatunnel.connectors.seatunnel.mongodb.config.MongodbParameters;
25+
import org.apache.seatunnel.format.json.JsonSerializationSchema;
26+
27+
import com.mongodb.client.MongoClient;
28+
import com.mongodb.client.MongoClients;
29+
import com.mongodb.client.MongoCollection;
30+
import org.bson.Document;
31+
32+
import java.io.IOException;
33+
34+
public class MongodbSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void> {
35+
36+
private final SeaTunnelRowType rowType;
37+
38+
private final SerializationSchema serializationSchema;
39+
40+
private MongoClient client;
41+
42+
private final String database;
43+
44+
private final String collection;
45+
46+
private final MongoCollection<Document> mongoCollection;
47+
48+
public MongodbSinkWriter(SeaTunnelRowType rowType, MongodbParameters params) {
49+
this.rowType = rowType;
50+
this.database = params.getDatabase();
51+
this.collection = params.getCollection();
52+
this.client = MongoClients.create(params.getUri());
53+
this.mongoCollection = this.client.getDatabase(database).getCollection(collection);
54+
this.serializationSchema = new JsonSerializationSchema(rowType);
55+
}
56+
57+
@Override
58+
public void write(SeaTunnelRow rows) throws IOException {
59+
byte[] serialize = serializationSchema.serialize(rows);
60+
String content = new String(serialize);
61+
62+
Document doc = Document.parse(content);
63+
mongoCollection.insertOne(doc);
64+
}
65+
66+
@Override
67+
public void close() throws IOException {
68+
if (client != null) {
69+
client.close();
70+
}
71+
}
72+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.e2e.flink.v2.mongodb;
19+
20+
import static java.net.HttpURLConnection.HTTP_OK;
21+
import static java.net.HttpURLConnection.HTTP_UNAUTHORIZED;
22+
23+
import org.apache.seatunnel.e2e.flink.FlinkContainer;
24+
25+
import com.google.common.collect.Lists;
26+
import com.mongodb.client.MongoClient;
27+
import com.mongodb.client.MongoClients;
28+
import com.mongodb.client.MongoCursor;
29+
import lombok.extern.slf4j.Slf4j;
30+
import org.bson.Document;
31+
import org.junit.jupiter.api.AfterEach;
32+
import org.junit.jupiter.api.Assertions;
33+
import org.junit.jupiter.api.BeforeEach;
34+
import org.junit.jupiter.api.Test;
35+
import org.testcontainers.containers.Container;
36+
import org.testcontainers.containers.GenericContainer;
37+
import org.testcontainers.containers.output.Slf4jLogConsumer;
38+
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
39+
import org.testcontainers.lifecycle.Startables;
40+
import org.testcontainers.shaded.org.awaitility.Awaitility;
41+
import org.testcontainers.utility.DockerImageName;
42+
43+
import java.io.IOException;
44+
import java.time.Duration;
45+
import java.util.ArrayList;
46+
import java.util.HashMap;
47+
import java.util.List;
48+
import java.util.Map;
49+
import java.util.Set;
50+
import java.util.concurrent.TimeUnit;
51+
import java.util.stream.Stream;
52+
53+
@Slf4j
54+
public class FakeSourceToMongodbIT extends FlinkContainer {
55+
56+
private static final String MONGODB_IMAGE = "mongo:latest";
57+
58+
private static final String MONGODB_CONTAINER_HOST = "flink_e2e_mongodb_sink";
59+
60+
private static final String MONGODB_HOST = "localhost";
61+
62+
private static final int MONGODB_PORT = 27017;
63+
64+
private static final String MONGODB_DATABASE = "test_db";
65+
66+
private static final String MONGODB_COLLECTION = "test_table";
67+
68+
private static final String MONGODB_URI = String.format("mongodb://%s:%d/%s", MONGODB_HOST, MONGODB_PORT, MONGODB_DATABASE);
69+
70+
private MongoClient client;
71+
72+
private GenericContainer<?> mongodbContainer;
73+
74+
@BeforeEach
75+
public void startMongoContainer() {
76+
DockerImageName imageName = DockerImageName.parse(MONGODB_IMAGE);
77+
mongodbContainer = new GenericContainer<>(imageName)
78+
.withNetwork(NETWORK)
79+
.withNetworkAliases(MONGODB_CONTAINER_HOST)
80+
.withExposedPorts(MONGODB_PORT)
81+
.waitingFor(new HttpWaitStrategy()
82+
.forPort(MONGODB_PORT)
83+
.forStatusCodeMatching(response -> response == HTTP_OK || response == HTTP_UNAUTHORIZED)
84+
.withStartupTimeout(Duration.ofMinutes(2)))
85+
.withLogConsumer(new Slf4jLogConsumer(log));
86+
mongodbContainer.setPortBindings(Lists.newArrayList(String.format("%s:%s", MONGODB_PORT, MONGODB_PORT)));
87+
Startables.deepStart(Stream.of(mongodbContainer)).join();
88+
log.info("Mongodb container started");
89+
Awaitility.given().ignoreExceptions()
90+
.await()
91+
.atMost(30, TimeUnit.SECONDS)
92+
.untilAsserted(this::initConnection);
93+
}
94+
95+
public void initConnection() {
96+
client = MongoClients.create(MONGODB_URI);
97+
}
98+
99+
@Test
100+
public void testMongodbSink() throws IOException, InterruptedException {
101+
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/mongodb/fake_to_mongodb.conf");
102+
Assertions.assertEquals(0, execResult.getExitCode());
103+
104+
List<Map<String, Object>> list = new ArrayList<>();
105+
try (MongoCursor<Document> mongoCursor = client.getDatabase(MONGODB_DATABASE)
106+
.getCollection(MONGODB_COLLECTION)
107+
.find()
108+
.iterator()
109+
) {
110+
while (mongoCursor.hasNext()) {
111+
Document doc = mongoCursor.next();
112+
HashMap<String, Object> map = new HashMap<>(doc.size());
113+
Set<Map.Entry<String, Object>> entries = doc.entrySet();
114+
for (Map.Entry<String, Object> entry : entries) {
115+
String key = entry.getKey();
116+
Object value = entry.getValue();
117+
map.put(key, value);
118+
}
119+
log.info("Document ===>>>: " + map);
120+
list.add(map);
121+
}
122+
}
123+
124+
Assertions.assertEquals(10, list.size());
125+
}
126+
127+
@AfterEach
128+
public void close() {
129+
super.close();
130+
if (client != null) {
131+
client.close();
132+
}
133+
if (mongodbContainer != null) {
134+
mongodbContainer.close();
135+
}
136+
}
137+
}

0 commit comments

Comments
 (0)