Skip to content

Commit d33b0da

Browse files
authored
[Feature][Connector-V2] StarRocks-sink support schema evolution (#8082)
1 parent 50f0c5f commit d33b0da

File tree

21 files changed

+3282
-39
lines changed

21 files changed

+3282
-39
lines changed

docs/en/concept/schema-evolution.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ Now we only support the operation about `add column`、`drop column`、`rename c
1212
### Sink
1313
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
1414
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
15+
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
1516

1617
Note: The schema evolution is not support the transform at now. The schema evolution of different types of databases(Oracle-CDC -> Jdbc-Mysql)is currently not supported the default value of the column in ddl.
1718

@@ -151,3 +152,55 @@ sink {
151152
}
152153
}
153154
```
155+
156+
### Mysql-cdc -> StarRocks
157+
```
158+
env {
159+
# You can set engine configuration here
160+
parallelism = 1
161+
job.mode = "STREAMING"
162+
checkpoint.interval = 5000
163+
}
164+
165+
source {
166+
MySQL-CDC {
167+
username = "st_user_source"
168+
password = "mysqlpw"
169+
table-names = ["shop.products"]
170+
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
171+
debezium = {
172+
include.schema.changes = true
173+
}
174+
}
175+
}
176+
177+
sink {
178+
StarRocks {
179+
nodeUrls = ["starrocks_cdc_e2e:8030"]
180+
username = "root"
181+
password = ""
182+
database = "shop"
183+
table = "${table_name}"
184+
base-url = "jdbc:mysql://starrocks_cdc_e2e:9030/shop"
185+
max_retries = 3
186+
enable_upsert_delete = true
187+
schema_save_mode="RECREATE_SCHEMA"
188+
data_save_mode="DROP_DATA"
189+
save_mode_create_template = """
190+
CREATE TABLE IF NOT EXISTS shop.`${table_name}` (
191+
${rowtype_primary_key},
192+
${rowtype_fields}
193+
) ENGINE=OLAP
194+
PRIMARY KEY (${rowtype_primary_key})
195+
DISTRIBUTED BY HASH (${rowtype_primary_key})
196+
PROPERTIES (
197+
"replication_num" = "1",
198+
"in_memory" = "false",
199+
"enable_persistent_index" = "true",
200+
"replicated_storage" = "true",
201+
"compression" = "LZ4"
202+
)
203+
"""
204+
}
205+
}
206+
```

docs/zh/concept/schema-evolution.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
### 目标
1212
[Jdbc-Mysql](https://github.com/apache/seatunnel/blob/dev/docs/zh/connector-v2/sink/Jdbc.md)
1313
[Jdbc-Oracle](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/Jdbc.md)
14+
[StarRocks](https://github.com/apache/seatunnel/blob/dev/docs/en/connector-v2/sink/StarRocks.md)
1415

1516
注意: 目前模式演进不支持transform。不同类型数据库(Oracle-CDC -> Jdbc-Mysql)的模式演进目前不支持ddl中列的默认值。
1617

@@ -150,3 +151,55 @@ sink {
150151
}
151152
}
152153
```
154+
155+
### Mysql-cdc -> StarRocks
156+
```
157+
env {
158+
# You can set engine configuration here
159+
parallelism = 1
160+
job.mode = "STREAMING"
161+
checkpoint.interval = 5000
162+
}
163+
164+
source {
165+
MySQL-CDC {
166+
username = "st_user_source"
167+
password = "mysqlpw"
168+
table-names = ["shop.products"]
169+
base-url = "jdbc:mysql://mysql_cdc_e2e:3306/shop"
170+
debezium = {
171+
include.schema.changes = true
172+
}
173+
}
174+
}
175+
176+
sink {
177+
StarRocks {
178+
nodeUrls = ["starrocks_cdc_e2e:8030"]
179+
username = "root"
180+
password = ""
181+
database = "shop"
182+
table = "${table_name}"
183+
base-url = "jdbc:mysql://starrocks_cdc_e2e:9030/shop"
184+
max_retries = 3
185+
enable_upsert_delete = true
186+
schema_save_mode="RECREATE_SCHEMA"
187+
data_save_mode="DROP_DATA"
188+
save_mode_create_template = """
189+
CREATE TABLE IF NOT EXISTS shop.`${table_name}` (
190+
${rowtype_primary_key},
191+
${rowtype_fields}
192+
) ENGINE=OLAP
193+
PRIMARY KEY (${rowtype_primary_key})
194+
DISTRIBUTED BY HASH (${rowtype_primary_key})
195+
PROPERTIES (
196+
"replication_num" = "1",
197+
"in_memory" = "false",
198+
"enable_persistent_index" = "true",
199+
"replicated_storage" = "true",
200+
"compression" = "LZ4"
201+
)
202+
"""
203+
}
204+
}
205+
```

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksSinkManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.seatunnel.shade.com.google.common.base.Strings;
2121

22+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2223
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
2324
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
2425
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
@@ -43,10 +44,10 @@ public class StarRocksSinkManager {
4344
private int batchRowCount = 0;
4445
private long batchBytesSize = 0;
4546

46-
public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames) {
47+
public StarRocksSinkManager(SinkConfig sinkConfig, TableSchema tableSchema) {
4748
this.sinkConfig = sinkConfig;
4849
this.batchList = new ArrayList<>();
49-
starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, fileNames);
50+
starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, tableSchema);
5051
}
5152

5253
private void tryInit() throws IOException {

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/StarRocksStreamLoadVisitor.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.starrocks.client;
1919

20+
import org.apache.seatunnel.api.table.catalog.Column;
21+
import org.apache.seatunnel.api.table.catalog.TableSchema;
2022
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
2123
import org.apache.seatunnel.common.utils.JsonUtils;
2224
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SinkConfig;
2325
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode;
2426
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
2527
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksDelimiterParser;
28+
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP;
2629

2730
import org.apache.commons.codec.binary.Base64;
2831

@@ -56,11 +59,11 @@ public class StarRocksStreamLoadVisitor {
5659
private static final String RESULT_LABEL_ABORTED = "ABORTED";
5760
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
5861

59-
private List<String> fieldNames;
62+
private final TableSchema tableSchema;
6063

61-
public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, List<String> fieldNames) {
64+
public StarRocksStreamLoadVisitor(SinkConfig sinkConfig, TableSchema tableSchema) {
6265
this.sinkConfig = sinkConfig;
63-
this.fieldNames = fieldNames;
66+
this.tableSchema = tableSchema;
6467
this.httpHelper = new HttpHelper(sinkConfig);
6568
}
6669

@@ -260,16 +263,19 @@ private String getBasicAuthHeader(String username, String password) {
260263

261264
private Map<String, String> getStreamLoadHttpHeader(String label) {
262265
Map<String, String> headerMap = new HashMap<>();
263-
if (null != fieldNames
264-
&& !fieldNames.isEmpty()
266+
List<Column> columns = tableSchema.getColumns();
267+
List<String> fieldNames =
268+
columns.stream().map(Column::getName).collect(Collectors.toList());
269+
if (sinkConfig.isEnableUpsertDelete()) {
270+
fieldNames.add(StarRocksSinkOP.COLUMN_KEY);
271+
}
272+
if (!fieldNames.isEmpty()
265273
&& SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
266274
headerMap.put(
267275
"columns",
268-
String.join(
269-
",",
270-
fieldNames.stream()
271-
.map(f -> String.format("`%s`", f))
272-
.collect(Collectors.toList())));
276+
fieldNames.stream()
277+
.map(f -> String.format("`%s`", f))
278+
.collect(Collectors.joining(",")));
273279
}
274280
if (null != sinkConfig.getStreamLoadProps()) {
275281
for (Map.Entry<String, Object> entry : sinkConfig.getStreamLoadProps().entrySet()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.seatunnel.connectors.seatunnel.starrocks.datatypes;
18+
19+
import lombok.AllArgsConstructor;
20+
import lombok.Getter;
21+
22+
@Getter
23+
@AllArgsConstructor
24+
public class StarRocksType {
25+
public static final String SR_NULL = "NULL";
26+
public static final String SR_BOOLEAN = "BOOLEAN";
27+
public static final String SR_TINYINT = "TINYINT";
28+
public static final String SR_SMALLINT = "SMALLINT";
29+
public static final String SR_INT = "INT";
30+
public static final String SR_BIGINT = "BIGINT";
31+
public static final String SR_LARGEINT = "LARGEINT";
32+
public static final String SR_FLOAT = "FLOAT";
33+
public static final String SR_DOUBLE = "DOUBLE";
34+
public static final String SR_DECIMAL = "DECIMAL";
35+
public static final String SR_DECIMALV3 = "DECIMALV3";
36+
public static final String SR_DATE = "DATE";
37+
public static final String SR_DATETIME = "DATETIME";
38+
public static final String SR_CHAR = "CHAR";
39+
public static final String SR_VARCHAR = "VARCHAR";
40+
public static final String SR_STRING = "STRING";
41+
42+
public static final String SR_BOOLEAN_ARRAY = "ARRAY<boolean>";
43+
public static final String SR_TINYINT_ARRAY = "ARRAY<tinyint>";
44+
public static final String SR_SMALLINT_ARRAY = "ARRAY<smallint>";
45+
public static final String SR_INT_ARRAY = "ARRAY<int(11)>";
46+
public static final String SR_BIGINT_ARRAY = "ARRAY<bigint>";
47+
public static final String SR_FLOAT_ARRAY = "ARRAY<float>";
48+
public static final String SR_DOUBLE_ARRAY = "ARRAY<double>";
49+
public static final String SR_DECIMALV3_ARRAY = "ARRAY<DECIMALV3>";
50+
public static final String SR_DECIMALV3_ARRAY_COLUMN_TYPE_TMP = "ARRAY<DECIMALV3(%s, %s)>";
51+
public static final String SR_DATEV2_ARRAY = "ARRAY<DATEV2>";
52+
public static final String SR_DATETIMEV2_ARRAY = "ARRAY<DATETIMEV2>";
53+
public static final String SR_STRING_ARRAY = "ARRAY<STRING>";
54+
55+
// Because can not get the column length from array, So the following types of arrays cannot be
56+
// generated properly.
57+
public static final String SR_LARGEINT_ARRAY = "ARRAY<largeint>";
58+
public static final String SR_CHAR_ARRAY = "ARRAY<CHAR>";
59+
public static final String SR_CHAR_ARRAY_COLUMN_TYPE_TMP = "ARRAY<CHAR(%s)>";
60+
public static final String SR_VARCHAR_ARRAY = "ARRAY<VARCHAR>";
61+
public static final String SR_VARCHAR_ARRAY_COLUMN_TYPE_TMP = "ARRAY<VARCHAR(%s)>";
62+
63+
public static final String SR_JSON = "JSON";
64+
public static final String SR_JSONB = "JSONB";
65+
66+
public static final String SR_ARRAY = "ARRAY";
67+
68+
public static final String SR_ARRAY_BOOLEAN_INTER = "tinyint(1)";
69+
public static final String SR_ARRAY_TINYINT_INTER = "tinyint(4)";
70+
public static final String SR_ARRAY_SMALLINT_INTER = "smallint(6)";
71+
public static final String SR_ARRAY_INT_INTER = "int(11)";
72+
public static final String SR_ARRAY_BIGINT_INTER = "bigint(20)";
73+
public static final String SR_ARRAY_DECIMAL_PRE = "DECIMAL";
74+
public static final String SR_ARRAY_DATE_INTER = "date";
75+
public static final String SR_ARRAY_DATEV2_INTER = "DATEV2";
76+
public static final String SR_ARRAY_DATETIME_INTER = "DATETIME";
77+
public static final String SR_ARRAY_DATETIMEV2_INTER = "DATETIMEV2";
78+
79+
public static final String SR_MAP = "MAP";
80+
public static final String SR_MAP_COLUMN_TYPE = "MAP<%s, %s>";
81+
82+
public static final String SR_BOOLEAN_INDENTFIER = "TINYINT(1)";
83+
84+
private String type;
85+
}

0 commit comments

Comments
 (0)