Skip to content

Commit efbf793

Browse files
authored
[Improve][Connector-V2] Improve mongodb connector (#2778)
* Fix schema projection * Fix datatype conversion between MongoDB and SeaTunnel * Support use `SimpleTextSchema` read & write
1 parent 4bb2beb commit efbf793

File tree

20 files changed

+1158
-513
lines changed

20 files changed

+1158
-513
lines changed

seatunnel-connectors-v2/connector-mongodb/pom.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,6 @@
4040
<version>${project.version}</version>
4141
</dependency>
4242

43-
<dependency>
44-
<groupId>org.apache.seatunnel</groupId>
45-
<artifactId>seatunnel-format-json</artifactId>
46-
<version>${project.version}</version>
47-
</dependency>
48-
4943
<dependency>
5044
<groupId>org.apache.seatunnel</groupId>
5145
<artifactId>seatunnel-api</artifactId>
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.mongodb.data;
19+
20+
import org.apache.seatunnel.api.table.type.MapType;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
23+
import org.apache.seatunnel.api.table.type.SqlType;
24+
25+
public class DataTypeValidator {
26+
27+
public static void validateDataType(SeaTunnelDataType dataType) throws IllegalArgumentException {
28+
switch (dataType.getSqlType()) {
29+
case TIME:
30+
throw new IllegalArgumentException("Unsupported data type: " + dataType);
31+
case MAP:
32+
MapType mapType = (MapType) dataType;
33+
if (!SqlType.STRING.equals(mapType.getKeyType().getSqlType())) {
34+
throw new IllegalArgumentException("Unsupported map key type: " + mapType.getKeyType());
35+
}
36+
break;
37+
case ROW:
38+
SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
39+
for (int i = 0; i < rowType.getTotalFields(); i++) {
40+
validateDataType(rowType.getFieldType(i));
41+
}
42+
break;
43+
default:
44+
}
45+
}
46+
}
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.mongodb.data;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
21+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
22+
23+
import lombok.NonNull;
24+
import org.bson.BsonWriter;
25+
import org.bson.Document;
26+
import org.bson.codecs.DocumentCodec;
27+
import org.bson.codecs.Encoder;
28+
import org.bson.codecs.EncoderContext;
29+
30+
public class DefaultDeserializer implements Deserializer {
31+
32+
private final SeaTunnelRowType rowType;
33+
private final Encoder encoder;
34+
35+
public DefaultDeserializer(@NonNull SeaTunnelRowType rowType) {
36+
DataTypeValidator.validateDataType(rowType);
37+
this.rowType = rowType;
38+
this.encoder = new DocumentCodec();
39+
}
40+
41+
@Override
42+
public SeaTunnelRow deserialize(Document document) {
43+
return convert(document);
44+
}
45+
46+
private SeaTunnelRow convert(Document document) {
47+
SeaTunnelRow row = new SeaTunnelRow(rowType.getTotalFields());
48+
49+
BsonWriter writer = new SeaTunnelRowBsonWriter(rowType, row);
50+
encoder.encode(writer, document, EncoderContext.builder().build());
51+
writer.flush();
52+
53+
return row;
54+
}
55+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.mongodb.data;
19+
20+
import org.apache.seatunnel.api.table.type.ArrayType;
21+
import org.apache.seatunnel.api.table.type.MapType;
22+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
25+
26+
import lombok.NonNull;
27+
import org.bson.BsonTimestamp;
28+
import org.bson.Document;
29+
import org.bson.types.Binary;
30+
import org.bson.types.Decimal128;
31+
32+
import java.math.BigDecimal;
33+
import java.time.LocalDate;
34+
import java.time.LocalDateTime;
35+
import java.time.ZoneOffset;
36+
import java.util.ArrayList;
37+
import java.util.Date;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
public class DefaultSerializer implements Serializer {
42+
43+
private final SeaTunnelRowType rowType;
44+
45+
public DefaultSerializer(@NonNull SeaTunnelRowType rowType) {
46+
DataTypeValidator.validateDataType(rowType);
47+
this.rowType = rowType;
48+
}
49+
50+
public Document serialize(@NonNull SeaTunnelRow row) {
51+
return convert(rowType, row);
52+
}
53+
54+
private static Document convert(SeaTunnelRowType rowType, SeaTunnelRow row) {
55+
Document document = new Document();
56+
for (int i = 0; i < rowType.getTotalFields(); i++) {
57+
String fieldName = rowType.getFieldName(i);
58+
SeaTunnelDataType<?> fieldType = rowType.getFieldType(i);
59+
Object fieldValue = row.getField(i);
60+
document.append(fieldName, convert(fieldType, fieldValue));
61+
}
62+
return document;
63+
}
64+
65+
private static Object convert(SeaTunnelDataType<?> fieldType, Object fieldValue) {
66+
if (fieldValue == null) {
67+
return null;
68+
}
69+
switch (fieldType.getSqlType()) {
70+
case TINYINT:
71+
case SMALLINT:
72+
Number number = (Number) fieldValue;
73+
return number.intValue();
74+
case FLOAT:
75+
Float floatValue = (Float) fieldValue;
76+
return Double.parseDouble(String.valueOf(floatValue));
77+
case DECIMAL:
78+
BigDecimal bigDecimal = (BigDecimal) fieldValue;
79+
return new Decimal128(bigDecimal);
80+
case DATE:
81+
LocalDate localDate = (LocalDate) fieldValue;
82+
return Date.from(localDate.atStartOfDay(ZoneOffset.UTC).toInstant());
83+
case TIMESTAMP:
84+
LocalDateTime localDateTime = (LocalDateTime) fieldValue;
85+
return new BsonTimestamp(localDateTime.toInstant(ZoneOffset.UTC).toEpochMilli());
86+
case BYTES:
87+
byte[] bytes = (byte[]) fieldValue;
88+
return new Binary(bytes);
89+
case ARRAY:
90+
ArrayType arrayType = (ArrayType) fieldType;
91+
Object[] array = (Object[]) fieldValue;
92+
List<Object> listValues = new ArrayList();
93+
for (Object item : array) {
94+
listValues.add(convert(arrayType.getElementType(), item));
95+
}
96+
return listValues;
97+
case MAP:
98+
MapType mapType = (MapType) fieldType;
99+
Map<String, Object> map = (Map) fieldValue;
100+
Document mapDocument = new Document();
101+
for (Map.Entry<String, Object> entry : map.entrySet()) {
102+
String mapKeyName = entry.getKey();
103+
mapDocument.append(mapKeyName,
104+
convert(mapType.getValueType(), entry.getValue()));
105+
}
106+
return mapDocument;
107+
case ROW:
108+
SeaTunnelRowType rowType = (SeaTunnelRowType) fieldType;
109+
SeaTunnelRow row = (SeaTunnelRow) fieldValue;
110+
Document rowDocument = new Document();
111+
for (int i = 0; i < rowType.getTotalFields(); i++) {
112+
String rowFieldName = rowType.getFieldName(i);
113+
SeaTunnelDataType rowFieldType = rowType.getFieldType(i);
114+
Object rowValue = row.getField(i);
115+
rowDocument.append(rowFieldName, convert(rowFieldType, rowValue));
116+
}
117+
return rowDocument;
118+
default:
119+
return fieldValue;
120+
}
121+
}
122+
}
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.mongodb.data;
19+
20+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
21+
22+
import org.bson.Document;
23+
24+
import java.io.Serializable;
25+
26+
public interface Deserializer extends Serializable {
27+
SeaTunnelRow deserialize(Document document);
28+
}

0 commit comments

Comments
 (0)