Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Hotfix][Connector-V2][Jdbc] Flink engine, source is LocalFile, sink … #6982

Closed
wants to merge 6 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import com.mysql.cj.MysqlType;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -150,8 +152,9 @@ public String build(String catalogName) {

private String buildColumnsIdentifySql(String catalogName) {
List<String> columnSqls = new ArrayList<>();
Map<String, String> columnTypeMap = new HashMap<>();
for (Column column : columns) {
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName));
columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap));
}
if (primaryKey != null) {
columnSqls.add("\t" + buildPrimaryKeySql());
Expand All @@ -161,28 +164,34 @@ private String buildColumnsIdentifySql(String catalogName) {
if (StringUtils.isBlank(constraintKey.getConstraintName())) {
continue;
}
// columnSqls.add("\t" + buildConstraintKeySql(constraintKey));
String constraintKeyStr = buildConstraintKeySql(constraintKey, columnTypeMap);
if (StringUtils.isNotBlank(constraintKeyStr)) {
columnSqls.add("\t" + constraintKeyStr);
}
}
}
return String.join(", \n", columnSqls);
}

private String buildColumnIdentifySql(Column column, String catalogName) {
private String buildColumnIdentifySql(
Column column, String catalogName, Map<String, String> columnTypeMap) {
final List<String> columnSqls = new ArrayList<>();
columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`"));
boolean isSupportDef = true;

String type;
if ((SqlType.TIME.equals(column.getDataType().getSqlType())
|| SqlType.TIMESTAMP.equals(column.getDataType().getSqlType()))
&& column.getScale() != null) {
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) {
columnSqls.add(column.getSourceType());
type = typeDefine.getColumnType();
} else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)
&& StringUtils.isNotBlank(column.getSourceType())) {
type = column.getSourceType();
} else {
BasicTypeDefine<MysqlType> typeDefine = typeConverter.reconvert(column);
columnSqls.add(typeDefine.getColumnType());
type = typeDefine.getColumnType();
}
columnSqls.add(type);
columnTypeMap.put(column.getName(), type);
// nullable
if (column.isNullable()) {
columnSqls.add("NULL");
Expand All @@ -206,19 +215,32 @@ private String buildPrimaryKeySql() {
return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde));
}

private String buildConstraintKeySql(ConstraintKey constraintKey) {
private String buildConstraintKeySql(
ConstraintKey constraintKey, Map<String, String> columnTypeMap) {
ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType();
String indexColumns =
constraintKey.getColumnNames().stream()
.map(
constraintKeyColumn -> {
String columnName = constraintKeyColumn.getColumnName();
boolean withLength = false;
if (columnTypeMap.containsKey(columnName)) {
String columnType = columnTypeMap.get(columnName);
if (columnType.endsWith("BLOB")
|| columnType.endsWith("TEXT")) {
withLength = true;
}
}
if (constraintKeyColumn.getSortType() == null) {
return String.format(
"`%s`", constraintKeyColumn.getColumnName());
"`%s`%s",
CatalogUtils.getFieldIde(columnName, fieldIde),
withLength ? "(255)" : "");
}
return String.format(
"`%s` %s",
constraintKeyColumn.getColumnName(),
"`%s`%s %s",
CatalogUtils.getFieldIde(columnName, fieldIde),
withLength ? "(255)" : "",
constraintKeyColumn.getSortType().name());
})
.collect(Collectors.joining(", "));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public static Optional<PrimaryKey> getPrimaryKey(DatabaseMetaData metaData, Tabl
while (rs.next()) {
String columnName = rs.getString("COLUMN_NAME");
// all the PK_NAME should be the same
pkName = rs.getString("PK_NAME");
pkName = cleanKeyName(rs.getString("PK_NAME"));
int keySeq = rs.getInt("KEY_SEQ");
// KEY_SEQ is 1-based index
primaryKeyColumns.add(Pair.of(keySeq, columnName));
Expand Down Expand Up @@ -152,7 +152,7 @@ public static List<ConstraintKey> getConstraintKeys(
if (columnName == null) {
continue;
}
String indexName = resultSet.getString("INDEX_NAME");
String indexName = cleanKeyName(resultSet.getString("INDEX_NAME"));
boolean noUnique = resultSet.getBoolean("NON_UNIQUE");

ConstraintKey constraintKey =
Expand All @@ -179,6 +179,15 @@ public static List<ConstraintKey> getConstraintKeys(
return new ArrayList<>(constraintKeyMap.values());
}

private static String cleanKeyName(String keyName) {
if (keyName != null) {
// only keep the characters that are valid in an index name
keyName = keyName.replaceAll("[^a-zA-Z0-9_]", "");
keyName = keyName.replaceAll("^_+", "");
}
return keyName;
}

public static TableSchema getTableSchema(
DatabaseMetaData metadata, TablePath tablePath, JdbcDialectTypeMapper typeMapper)
throws SQLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ public MultiTableResourceManager<ConnectionPoolManager> initMultiTableResourceMa
ds.setPassword(jdbcSinkConfig.getJdbcConnectionConfig().getPassword().get());
}
ds.setAutoCommit(jdbcSinkConfig.getJdbcConnectionConfig().isAutoCommit());
ds.setDriverClassName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName());
return new JdbcMultiTableResourceManager(new ConnectionPoolManager(ds));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter;
Expand All @@ -37,6 +38,7 @@

import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;

public class MysqlCreateTableSqlBuilderTest {
Expand All @@ -58,6 +60,14 @@ public void testBuild() {
.column(
PhysicalColumn.of(
"age", BasicType.INT_TYPE, (Long) null, true, null, "age"))
.column(
PhysicalColumn.of(
"blob_v",
PrimitiveByteArrayType.INSTANCE,
Long.MAX_VALUE,
true,
null,
"blob_v"))
.column(
PhysicalColumn.of(
"createTime",
Expand All @@ -76,12 +86,19 @@ public void testBuild() {
"lastUpdateTime"))
.primaryKey(PrimaryKey.of("id", Lists.newArrayList("id")))
.constraintKey(
ConstraintKey.of(
ConstraintKey.ConstraintType.INDEX_KEY,
"name",
Lists.newArrayList(
ConstraintKey.ConstraintKeyColumn.of(
"name", null))))
Arrays.asList(
ConstraintKey.of(
ConstraintKey.ConstraintType.INDEX_KEY,
"name",
Lists.newArrayList(
ConstraintKey.ConstraintKeyColumn.of(
"name", null))),
ConstraintKey.of(
ConstraintKey.ConstraintType.INDEX_KEY,
"blob_v",
Lists.newArrayList(
ConstraintKey.ConstraintKeyColumn.of(
"blob_v", null)))))
.build();
CatalogTable catalogTable =
CatalogTable.of(
Expand All @@ -98,12 +115,15 @@ public void testBuild() {
// create table sql is change; The old unit tests are no longer applicable
String expect =
"CREATE TABLE `test_table` (\n"
+ "\t`id` null NOT NULL COMMENT 'id', \n"
+ "\t`name` null NOT NULL COMMENT 'name', \n"
+ "\t`age` null NULL COMMENT 'age', \n"
+ "\t`createTime` null NULL COMMENT 'createTime', \n"
+ "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n"
+ "\tPRIMARY KEY (`id`)\n"
+ "\t`id` BIGINT NOT NULL COMMENT 'id', \n"
+ "\t`name` VARCHAR(128) NOT NULL COMMENT 'name', \n"
+ "\t`age` INT NULL COMMENT 'age', \n"
+ "\t`blob_v` LONGBLOB NULL COMMENT 'blob_v', \n"
+ "\t`createTime` DATETIME NULL COMMENT 'createTime', \n"
+ "\t`lastUpdateTime` DATETIME NULL COMMENT 'lastUpdateTime', \n"
+ "\tPRIMARY KEY (`id`), \n"
+ "\tKEY `name` (`name`), \n"
+ "\tKEY `blob_v` (`blob_v`(255))\n"
+ ") COMMENT = 'User table';";
CONSOLE.println(expect);
Assertions.assertEquals(expect, createTableSql);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils;

import org.apache.seatunnel.api.table.catalog.ConstraintKey;
import org.apache.seatunnel.api.table.catalog.PrimaryKey;
import org.apache.seatunnel.api.table.catalog.TablePath;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.sql.SQLException;
import java.util.List;
import java.util.Optional;

public class CatalogUtilsTest {

@Test
void testPrimaryKeysNameWithOutSpecialChar() throws SQLException {
Optional<PrimaryKey> primaryKey =
CatalogUtils.getPrimaryKey(new TestDatabaseMetaData(), TablePath.of("test.test"));
Assertions.assertEquals("testfdawe_", primaryKey.get().getPrimaryKey());
}

@Test
void testConstraintKeysNameWithOutSpecialChar() throws SQLException {
List<ConstraintKey> constraintKeys =
CatalogUtils.getConstraintKeys(
new TestDatabaseMetaData(), TablePath.of("test.test"));
Assertions.assertEquals("testfdawe_", constraintKeys.get(0).getConstraintName());
}
}
Loading
Loading