Skip to content

Commit c4cb1fc

Browse files
authored
[Fix][Connector-V2] fix starRocks automatically creates tables with comment (#8568)
1 parent a05ba93 commit c4cb1fc

File tree

12 files changed

+116
-21
lines changed

12 files changed

+116
-21
lines changed

seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/ClickhouseCatalogUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ public String columnToConnectorType(Column column) {
3838
ClickhouseTypeConverter.INSTANCE.reconvert(column).getColumnType(),
3939
StringUtils.isEmpty(column.getComment())
4040
? ""
41-
: "COMMENT '" + column.getComment() + "'");
41+
: "COMMENT '"
42+
+ column.getComment().replace("'", "''").replace("\\", "\\\\")
43+
+ "'");
4244
}
4345

4446
public String getDropTableSql(TablePath tablePath, boolean ignoreIfNotExists) {

seatunnel-connectors-v2/connector-clickhouse/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseCreateTableTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ public void test() {
5353
columns.add(
5454
PhysicalColumn.of(
5555
"age", BasicType.INT_TYPE, (Long) null, true, null, "test comment"));
56-
columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, ""));
56+
columns.add(
57+
PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, "'N'-N"));
5758
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, ""));
5859
columns.add(
5960
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, ""));
@@ -103,7 +104,7 @@ public void test() {
103104
"CREATE TABLE IF NOT EXISTS `test1`.`test2` (\n"
104105
+ " `id` Int64 ,`age` Int32 COMMENT 'test comment',\n"
105106
+ " `name` String ,\n"
106-
+ "`score` Int32 ,\n"
107+
+ "`score` Int32 COMMENT '''N''-N',\n"
107108
+ "`gender` Int8 ,\n"
108109
+ "`create_time` Int64 \n"
109110
+ ") ENGINE = MergeTree()\n"

seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/util/CatalogUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,9 @@ public String getCreateTableSql(
106106
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields)
107107
.replaceAll(
108108
SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(),
109-
Objects.isNull(comment) ? "" : comment);
109+
Objects.isNull(comment)
110+
? ""
111+
: comment.replace("'", "''").replace("\\", "\\\\"));
110112
}
111113

112114
private String mergeColumnInTemplate(

seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/util/DorisCatalogUtil.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,12 @@ public static String getCreateTableStatement(
209209
SaveModePlaceHolder.ROWTYPE_FIELDS.getReplacePlaceHolder(), rowTypeFields)
210210
.replaceAll(
211211
SaveModePlaceHolder.COMMENT.getReplacePlaceHolder(),
212-
Objects.isNull(catalogTable.getComment()) ? "" : catalogTable.getComment());
212+
Objects.isNull(catalogTable.getComment())
213+
? ""
214+
: catalogTable
215+
.getComment()
216+
.replace("'", "''")
217+
.replace("\\", "\\\\"));
213218
}
214219

215220
private static String mergeColumnInTemplate(
@@ -263,6 +268,8 @@ private static String columnToDorisType(
263268
column.isNullable() ? "NULL" : "NOT NULL",
264269
StringUtils.isEmpty(column.getComment())
265270
? ""
266-
: "COMMENT '" + column.getComment() + "'");
271+
: "COMMENT '"
272+
+ column.getComment().replace("'", "''").replace("\\", "\\\\")
273+
+ "'");
267274
}
268275
}

seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/catalog/DorisCreateTableTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,8 @@ public void test() {
6060
columns.add(
6161
PhysicalColumn.of(
6262
"age", BasicType.INT_TYPE, (Long) null, true, null, "test comment"));
63-
columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, ""));
63+
columns.add(
64+
PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, "'N'-N"));
6465
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, ""));
6566
columns.add(
6667
PhysicalColumn.of("create_time", BasicType.LONG_TYPE, (Long) null, true, null, ""));
@@ -125,7 +126,7 @@ public void test() {
125126
result,
126127
"CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n"
127128
+ "`id` BIGINT NULL ,`age` INT NULL COMMENT 'test comment' , \n"
128-
+ "`name` STRING NULL ,`score` INT NULL , \n"
129+
+ "`name` STRING NULL ,`score` INT NULL COMMENT '''N''-N' , \n"
129130
+ "`create_time` DATETIME NOT NULL , \n"
130131
+ "`gender` TINYINT NULL \n"
131132
+ ") ENGINE=OLAP \n"

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSaveModeUtil.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ public String columnToConnectorType(Column column) {
4545
column.isNullable() ? "NULL" : "NOT NULL",
4646
StringUtils.isEmpty(column.getComment())
4747
? ""
48-
: "COMMENT '" + column.getComment() + "'");
48+
: "COMMENT '"
49+
+ column.getComment().replace("'", "''").replace("\\", "\\\\")
50+
+ "'");
4951
}
5052

5153
private static String dataTypeToStarrocksType(SeaTunnelDataType<?> dataType, long length) {

seatunnel-connectors-v2/connector-starrocks/src/test/java/org/apache/seatunnel/connectors/seatunnel/starrocks/catalog/StarRocksCreateTableTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public void test() {
5858
columns.add(
5959
PhysicalColumn.of(
6060
"name", BasicType.STRING_TYPE, (Long) null, true, null, "test comment"));
61-
columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null, true, null, ""));
61+
columns.add(PhysicalColumn.of("age", BasicType.INT_TYPE, (Long) null, true, null, "'N'-N"));
6262
columns.add(PhysicalColumn.of("score", BasicType.INT_TYPE, (Long) null, true, null, ""));
6363
columns.add(PhysicalColumn.of("gender", BasicType.BYTE_TYPE, (Long) null, true, null, ""));
6464
columns.add(
@@ -115,7 +115,7 @@ public void test() {
115115
StarRocksSinkOptions.SAVE_MODE_CREATE_TEMPLATE.key());
116116
Assertions.assertEquals(
117117
"CREATE TABLE IF NOT EXISTS `test1`.`test2` ( \n"
118-
+ "`id` BIGINT NULL ,`age` INT NULL , \n"
118+
+ "`id` BIGINT NULL ,`age` INT NULL COMMENT '''N''-N' , \n"
119119
+ "`name` STRING NULL COMMENT 'test comment',`score` INT NULL , \n"
120120
+ "`create_time` DATETIME NOT NULL , \n"
121121
+ "`gender` TINYINT NULL \n"

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/ClickhouseIT.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,14 @@ public void testSourceParallelism(TestContainer container) throws Exception {
107107
Assertions.assertEquals(0, execResult.getExitCode());
108108
}
109109

110+
@TestTemplate
111+
public void testClickhouseWithCreateSchemaWhenComment(TestContainer container)
112+
throws Exception {
113+
Container.ExecResult execResult =
114+
container.executeJob("/clickhouse_with_create_schema_when_comment.conf");
115+
Assertions.assertEquals(0, execResult.getExitCode());
116+
}
117+
110118
@TestTemplate
111119
public void clickhouseWithCreateSchemaWhenNotExist(TestContainer container) throws Exception {
112120
String tableName = "default.sink_table_for_schema";
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
env {
22+
parallelism = 1
23+
job.mode = "BATCH"
24+
checkpoint.interval = 10
25+
}
26+
27+
source {
28+
# This is a example source plugin **only for test and demonstrate the feature source plugin**
29+
Clickhouse {
30+
host = "clickhouse:8123"
31+
database = "default"
32+
sql = "select * from source_table"
33+
username = "default"
34+
password = ""
35+
plugin_output = "source_table"
36+
}
37+
# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
38+
# please go to https://seatunnel.apache.org/docs/connector-v2/source/ClickhouseSource
39+
}
40+
41+
sink {
42+
Clickhouse {
43+
host = "clickhouse:8123"
44+
database = "default"
45+
table = "clickhouse_with_create_schema_when_comment"
46+
username = "default"
47+
password = ""
48+
"schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
49+
"data_save_mode"="APPEND_DATA"
50+
"save_mode_create_template" = """
51+
CREATE TABLE IF NOT EXISTS `${database}`.`${table}` (
52+
${rowtype_fields}
53+
) ENGINE =Memory
54+
COMMENT '${comment}';
55+
"""
56+
support_upsert = true
57+
allow_experimental_lightweight_delete = true
58+
}
59+
60+
# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
61+
# please go to https://seatunnel.apache.org/docs/connector-v2/sink
62+
}

seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/init/clickhouse_init.conf

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@ source_table = """
1919
set allow_experimental_geo_types = 1;
2020
create table if not exists `default`.source_table(
2121
`id` Int64,
22-
`c_map` Map(String, Int32),
23-
`c_array_string` Array(String),
22+
`c_map` Map(String, Int32) COMMENT '''N''-N',
23+
`c_array_string` Array(String) COMMENT '\\N\\-N',
2424
`c_array_short` Array(Int16),
2525
`c_array_int` Array(Int32),
2626
`c_array_long` Array(Int64),
@@ -51,14 +51,15 @@ create table if not exists `default`.source_table(
5151
`c_uint256` UInt256,
5252
`c_point` Point,
5353
`c_ring` Ring
54-
)engine=Memory;
54+
)engine=Memory
55+
comment '''N''-N';
5556
"""
5657

5758
sink_table = """
5859
create table if not exists `default`.sink_table(
5960
`id` Int64,
60-
`c_map` Map(String, Int32),
61-
`c_array_string` Array(String),
61+
`c_map` Map(String, Int32) COMMENT '''N''-N',
62+
`c_array_string` Array(String) COMMENT '\\N\\-N',
6263
`c_array_short` Array(Int16),
6364
`c_array_int` Array(Int32),
6465
`c_array_long` Array(Int64),
@@ -89,7 +90,8 @@ create table if not exists `default`.sink_table(
8990
`c_uint256` UInt256,
9091
`c_point` Point,
9192
`c_ring` Ring
92-
)engine=Memory;
93+
)engine=Memory
94+
comment '''N''-N';
9395
"""
9496

9597
insert_sql = """

0 commit comments

Comments
 (0)