Skip to content

Commit a399ef4

Browse files
authored
[Improve][Connector-V2] Clean key name in catalog table (#6942)
1 parent 670bba0 commit a399ef4

File tree

6 files changed

+1929
-27
lines changed

6 files changed

+1929
-27
lines changed

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@
3535
import com.mysql.cj.MysqlType;
3636

3737
import java.util.ArrayList;
38+
import java.util.HashMap;
3839
import java.util.List;
40+
import java.util.Map;
3941
import java.util.stream.Collectors;
4042

4143
import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
@@ -150,8 +152,9 @@ public String build(String catalogName) {
150152

151153
private String buildColumnsIdentifySql(String catalogName) {
152154
List<String> columnSqls = new ArrayList<>();
155+
Map<String, String> columnTypeMap = new HashMap<>();
153156
for (Column column : columns) {
154-
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName));
157+
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap));
155158
}
156159
if (primaryKey != null) {
157160
columnSqls.add("\t" + buildPrimaryKeySql());
@@ -161,28 +164,34 @@ private String buildColumnsIdentifySql(String catalogName) {
161164
if (StringUtils.isBlank(constraintKey.getConstraintName())) {
162165
continue;
163166
}
164-
// columnSqls.add("\t" + buildConstraintKeySql(constraintKey));
167+
String constraintKeyStr = buildConstraintKeySql(constraintKey, columnTypeMap);
168+
if (StringUtils.isNotBlank(constraintKeyStr)) {
169+
columnSqls.add("\t" + constraintKeyStr);
170+
}
165171
}
166172
}
167173
return String.join(", \n", columnSqls);
168174
}
169175

170-
private String buildColumnIdentifySql(Column column, String catalogName) {
176+
private String buildColumnIdentifySql(
177+
Column column, String catalogName, Map<String, String> columnTypeMap) {
171178
final List<String> columnSqls = new ArrayList<>();
172179
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
173-
boolean isSupportDef = true;
174-
180+
String type;
175181
if ((SqlType.TIME.equals(column.getDataType().getSqlType())
176182
|| SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
177183
&& column.getScale() != null) {
178184
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
179-
columnSqls.add(typeDefine.getColumnType());
180-
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
181-
columnSqls.add(column.getSourceType());
185+
type = typeDefine.getColumnType();
186+
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)
187+
&& StringUtils.isNotBlank(column.getSourceType())) {
188+
type = column.getSourceType();
182189
} else {
183190
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
184-
columnSqls.add(typeDefine.getColumnType());
191+
type = typeDefine.getColumnType();
185192
}
193+
columnSqls.add(type);
194+
columnTypeMap.put(column.getName(), type);
186195
// nullable
187196
if (column.isNullable()) {
188197
columnSqls.add("NULL");
@@ -206,19 +215,32 @@ private String buildPrimaryKeySql() {
206215
return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde));
207216
}
208217

209-
private String buildConstraintKeySql(ConstraintKey constraintKey) {
218+
private String buildConstraintKeySql(
219+
ConstraintKey constraintKey, Map<String, String> columnTypeMap) {
210220
ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType();
211221
String indexColumns =
212222
constraintKey.getColumnNames().stream()
213223
.map(
214224
constraintKeyColumn -> {
225+
String columnName = constraintKeyColumn.getColumnName();
226+
boolean withLength = false;
227+
if (columnTypeMap.containsKey(columnName)) {
228+
String columnType = columnTypeMap.get(columnName);
229+
if (columnType.endsWith("BLOB")
230+
|| columnType.endsWith("TEXT")) {
231+
withLength = true;
232+
}
233+
}
215234
if (constraintKeyColumn.getSortType() == null) {
216235
return String.format(
217-
"`%s`", constraintKeyColumn.getColumnName());
236+
"`%s`%s",
237+
CatalogUtils.getFieldIde(columnName, fieldIde),
238+
withLength ? "(255)" : "");
218239
}
219240
return String.format(
220-
"`%s` %s",
221-
constraintKeyColumn.getColumnName(),
241+
"`%s`%s %s",
242+
CatalogUtils.getFieldIde(columnName, fieldIde),
243+
withLength ? "(255)" : "",
222244
constraintKeyColumn.getSortType().name());
223245
})
224246
.collect(Collectors.joining(", "));

seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, Tabl
118118
while (rs.next()) {
119119
String columnName = rs.getString("COLUMN_NAME");
120120
// all the PK_NAME should be the same
121-
pkName = rs.getString("PK_NAME");
121+
pkName = cleanKeyName(rs.getString("PK_NAME"));
122122
int keySeq = rs.getInt("KEY_SEQ");
123123
// KEY_SEQ is 1-based index
124124
primaryKeyColumns.add(Pair.of(keySeq, columnName));
@@ -152,7 +152,7 @@ public static List<ConstraintKey> getConstraintKeys(
152152
if (columnName == null) {
153153
continue;
154154
}
155-
String indexName = resultSet.getString("INDEX_NAME");
155+
String indexName = cleanKeyName(resultSet.getString("INDEX_NAME"));
156156
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");
157157

158158
ConstraintKey constraintKey =
@@ -179,6 +179,15 @@ public static List<ConstraintKey> getConstraintKeys(
179179
return new ArrayList<>(constraintKeyMap.values());
180180
}
181181

182+
private static String cleanKeyName(String keyName) {
183+
if (keyName != null) {
184+
// only keep the characters that are valid in an index name
185+
keyName = keyName.replaceAll("[^a-zA-Z0-9_]", "");
186+
keyName = keyName.replaceAll("^_+", "");
187+
}
188+
return keyName;
189+
}
190+
182191
public static TableSchema getTableSchema(
183192
DatabaseMetaData metadata, TablePath tablePath, JdbcDialectTypeMapper typeMapper)
184193
throws SQLException {

seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.seatunnel.api.table.catalog.TableSchema;
2727
import org.apache.seatunnel.api.table.type.BasicType;
2828
import org.apache.seatunnel.api.table.type.LocalTimeType;
29+
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
2930
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
3031
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
3132
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
@@ -37,6 +38,7 @@
3738

3839
import java.io.PrintStream;
3940
import java.util.ArrayList;
41+
import java.util.Arrays;
4042
import java.util.HashMap;
4143

4244
public class MysqlCreateTableSqlBuilderTest {
@@ -58,6 +60,14 @@ public void testBuild() {
5860
.column(
5961
PhysicalColumn.of(
6062
"age", BasicType.INT_TYPE, (Long) null, true, null, "age"))
63+
.column(
64+
PhysicalColumn.of(
65+
"blob_v",
66+
PrimitiveByteArrayType.INSTANCE,
67+
Long.MAX_VALUE,
68+
true,
69+
null,
70+
"blob_v"))
6171
.column(
6272
PhysicalColumn.of(
6373
"createTime",
@@ -76,12 +86,19 @@ public void testBuild() {
7686
"lastUpdateTime"))
7787
.primaryKey(PrimaryKey.of("id", Lists.newArrayList("id")))
7888
.constraintKey(
79-
ConstraintKey.of(
80-
ConstraintKey.ConstraintType.INDEX_KEY,
81-
"name",
82-
Lists.newArrayList(
83-
ConstraintKey.ConstraintKeyColumn.of(
84-
"name", null))))
89+
Arrays.asList(
90+
ConstraintKey.of(
91+
ConstraintKey.ConstraintType.INDEX_KEY,
92+
"name",
93+
Lists.newArrayList(
94+
ConstraintKey.ConstraintKeyColumn.of(
95+
"name", null))),
96+
ConstraintKey.of(
97+
ConstraintKey.ConstraintType.INDEX_KEY,
98+
"blob_v",
99+
Lists.newArrayList(
100+
ConstraintKey.ConstraintKeyColumn.of(
101+
"blob_v", null)))))
85102
.build();
86103
CatalogTable catalogTable =
87104
CatalogTable.of(
@@ -98,12 +115,15 @@ public void testBuild() {
98115
// create table sql is change; The old unit tests are no longer applicable
99116
String expect =
100117
"CREATE TABLE `test_table` (\n"
101-
+ "\t`id` null NOT NULL COMMENT 'id', \n"
102-
+ "\t`name` null NOT NULL COMMENT 'name', \n"
103-
+ "\t`age` null NULL COMMENT 'age', \n"
104-
+ "\t`createTime` null NULL COMMENT 'createTime', \n"
105-
+ "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n"
106-
+ "\tPRIMARY KEY (`id`)\n"
118+
+ "\t`id` BIGINT NOT NULL COMMENT 'id', \n"
119+
+ "\t`name` VARCHAR(128) NOT NULL COMMENT 'name', \n"
120+
+ "\t`age` INT NULL COMMENT 'age', \n"
121+
+ "\t`blob_v` LONGBLOB NULL COMMENT 'blob_v', \n"
122+
+ "\t`createTime` DATETIME NULL COMMENT 'createTime', \n"
123+
+ "\t`lastUpdateTime` DATETIME NULL COMMENT 'lastUpdateTime', \n"
124+
+ "\tPRIMARY KEY (`id`), \n"
125+
+ "\tKEY `name` (`name`), \n"
126+
+ "\tKEY `blob_v` (`blob_v`(255))\n"
107127
+ ") COMMENT = 'User table';";
108128
CONSOLE.println(expect);
109129
Assertions.assertEquals(expect, createTableSql);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;
19+
20+
import org.apache.seatunnel.api.table.catalog.ConstraintKey;
21+
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
22+
import org.apache.seatunnel.api.table.catalog.TablePath;
23+
24+
import org.junit.jupiter.api.Assertions;
25+
import org.junit.jupiter.api.Test;
26+
27+
import java.sql.SQLException;
28+
import java.util.List;
29+
import java.util.Optional;
30+
31+
public class CatalogUtilsTest {
32+
33+
@Test
34+
void testPrimaryKeysNameWithOutSpecialChar() throws SQLException {
35+
Optional<PrimaryKey> primaryKey =
36+
CatalogUtils.getPrimaryKey(new TestDatabaseMetaData(), TablePath.of("test.test"));
37+
Assertions.assertEquals("testfdawe_", primaryKey.get().getPrimaryKey());
38+
}
39+
40+
@Test
41+
void testConstraintKeysNameWithOutSpecialChar() throws SQLException {
42+
List<ConstraintKey> constraintKeys =
43+
CatalogUtils.getConstraintKeys(
44+
new TestDatabaseMetaData(), TablePath.of("test.test"));
45+
Assertions.assertEquals("testfdawe_", constraintKeys.get(0).getConstraintName());
46+
}
47+
}

0 commit comments

Comments
 (0)