Skip to content

Commit f941953

Browse files
authored
Fix StarRocksJsonSerializer will transform array/map/row to string (#5281)
1 parent 358fc78 commit f941953

File tree

2 files changed

+72
-3
lines changed

2 files changed

+72
-3
lines changed

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@
1919

2020
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2121
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
22+
import org.apache.seatunnel.api.table.type.SqlType;
2223
import org.apache.seatunnel.common.utils.JsonUtils;
2324

24-
import java.util.HashMap;
25+
import java.util.LinkedHashMap;
2526
import java.util.Map;
2627

2728
public class StarRocksJsonSerializer extends StarRocksBaseSerializer
@@ -38,10 +39,22 @@ public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType, boolean enable
3839

3940
@Override
4041
public String serialize(SeaTunnelRow row) {
41-
Map<String, Object> rowMap = new HashMap<>(row.getFields().length);
42+
Map<String, Object> rowMap = new LinkedHashMap<>(row.getFields().length);
4243

4344
for (int i = 0; i < row.getFields().length; i++) {
44-
Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
45+
SqlType sqlType = seaTunnelRowType.getFieldType(i).getSqlType();
46+
Object value;
47+
if (sqlType == SqlType.ARRAY
48+
|| sqlType == SqlType.MAP
49+
|| sqlType == SqlType.ROW
50+
|| sqlType == SqlType.MULTIPLE_ROW) {
51+
// If the field type is complex type, we should keep the origin value.
52+
// It will be transformed to json string in the next step
53+
// JsonUtils.toJsonString(rowMap).
54+
value = row.getField(i);
55+
} else {
56+
value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
57+
}
4558
rowMap.put(seaTunnelRowType.getFieldName(i), value);
4659
}
4760
if (enableUpsertDelete) {
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;
19+
20+
import org.apache.seatunnel.api.table.type.ArrayType;
21+
import org.apache.seatunnel.api.table.type.BasicType;
22+
import org.apache.seatunnel.api.table.type.MapType;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
24+
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
25+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
26+
27+
import org.junit.jupiter.api.Assertions;
28+
import org.junit.jupiter.api.Test;
29+
30+
import java.util.Collections;
31+
32+
public class StarRocksJsonSerializerTest {
33+
34+
@Test
35+
public void serialize() {
36+
String[] filedNames = {"id", "name", "array", "map"};
37+
SeaTunnelDataType<?>[] filedTypes = {
38+
BasicType.LONG_TYPE,
39+
BasicType.STRING_TYPE,
40+
ArrayType.STRING_ARRAY_TYPE,
41+
new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)
42+
};
43+
44+
SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(filedNames, filedTypes);
45+
StarRocksJsonSerializer starRocksJsonSerializer =
46+
new StarRocksJsonSerializer(seaTunnelRowType, false);
47+
Object[] fields = {
48+
1, "Tom", new String[] {"tag1", "tag2"}, Collections.singletonMap("key1", "value1")
49+
};
50+
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
51+
String jsonString = starRocksJsonSerializer.serialize(seaTunnelRow);
52+
Assertions.assertEquals(
53+
"{\"id\":1,\"name\":\"Tom\",\"array\":[\"tag1\",\"tag2\"],\"map\":{\"key1\":\"value1\"}}",
54+
jsonString);
55+
}
56+
}

0 commit comments

Comments
 (0)