diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index a35767640f8..6e3b386f442 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -35,7 +35,9 @@ import com.mysql.cj.MysqlType; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import static org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument; @@ -150,8 +152,9 @@ public String build(String catalogName) { private String buildColumnsIdentifySql(String catalogName) { List columnSqls = new ArrayList<>(); + Map columnTypeMap = new HashMap<>(); for (Column column : columns) { - columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName)); + columnSqls.add("\t" + buildColumnIdentifySql(column, catalogName, columnTypeMap)); } if (primaryKey != null) { columnSqls.add("\t" + buildPrimaryKeySql()); @@ -161,28 +164,34 @@ private String buildColumnsIdentifySql(String catalogName) { if (StringUtils.isBlank(constraintKey.getConstraintName())) { continue; } - // columnSqls.add("\t" + buildConstraintKeySql(constraintKey)); + String constraintKeyStr = buildConstraintKeySql(constraintKey, columnTypeMap); + if (StringUtils.isNotBlank(constraintKeyStr)) { + columnSqls.add("\t" + constraintKeyStr); + } } } return String.join(", \n", columnSqls); } - private String buildColumnIdentifySql(Column column, String catalogName) { + private String buildColumnIdentifySql( + Column column, String catalogName, Map columnTypeMap) { final List columnSqls = new ArrayList<>(); columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); - boolean isSupportDef = true; - + String type; if ((SqlType.TIME.equals(column.getDataType().getSqlType()) || SqlType.TIMESTAMP.equals(column.getDataType().getSqlType())) && column.getScale() != null) { BasicTypeDefine typeDefine = typeConverter.reconvert(column); - columnSqls.add(typeDefine.getColumnType()); - } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL)) { - columnSqls.add(column.getSourceType()); + type = typeDefine.getColumnType(); + } else if (StringUtils.equals(catalogName, DatabaseIdentifier.MYSQL) + && StringUtils.isNotBlank(column.getSourceType())) { + type = column.getSourceType(); } else { BasicTypeDefine typeDefine = typeConverter.reconvert(column); - columnSqls.add(typeDefine.getColumnType()); + type = typeDefine.getColumnType(); } + columnSqls.add(type); + columnTypeMap.put(column.getName(), type); // nullable if (column.isNullable()) { columnSqls.add("NULL"); @@ -206,19 +215,32 @@ private String buildPrimaryKeySql() { return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde)); } - private String buildConstraintKeySql(ConstraintKey constraintKey) { + private String buildConstraintKeySql( + ConstraintKey constraintKey, Map columnTypeMap) { ConstraintKey.ConstraintType constraintType = constraintKey.getConstraintType(); String indexColumns = constraintKey.getColumnNames().stream() .map( constraintKeyColumn -> { + String columnName = constraintKeyColumn.getColumnName(); + boolean withLength = false; + if (columnTypeMap.containsKey(columnName)) { + String columnType = columnTypeMap.get(columnName); + if (columnType.endsWith("BLOB") + || columnType.endsWith("TEXT")) { + withLength = true; + } + } if (constraintKeyColumn.getSortType() == null) { return String.format( - "`%s`", constraintKeyColumn.getColumnName()); + "`%s`%s", + CatalogUtils.getFieldIde(columnName, fieldIde), + withLength ? "(255)" : ""); } return String.format( - "`%s` %s", - constraintKeyColumn.getColumnName(), + "`%s`%s %s", + CatalogUtils.getFieldIde(columnName, fieldIde), + withLength ? "(255)" : "", constraintKeyColumn.getSortType().name()); }) .collect(Collectors.joining(", ")); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java index f45d03854ee..ed4490092dc 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -118,7 +118,7 @@ public static Optional getPrimaryKey(DatabaseMetaData metaData, Tabl while (rs.next()) { String columnName = rs.getString("COLUMN_NAME"); // all the PK_NAME should be the same - pkName = rs.getString("PK_NAME"); + pkName = cleanKeyName(rs.getString("PK_NAME")); int keySeq = rs.getInt("KEY_SEQ"); // KEY_SEQ is 1-based index primaryKeyColumns.add(Pair.of(keySeq, columnName)); @@ -152,7 +152,7 @@ public static List getConstraintKeys( if (columnName == null) { continue; } - String indexName = resultSet.getString("INDEX_NAME"); + String indexName = cleanKeyName(resultSet.getString("INDEX_NAME")); boolean noUnique = resultSet.getBoolean("NON_UNIQUE"); ConstraintKey constraintKey = @@ -179,6 +179,15 @@ public static List getConstraintKeys( return new ArrayList<>(constraintKeyMap.values()); } + private static String cleanKeyName(String keyName) { + if (keyName != null) { + // only keep the characters that are valid in an index name + keyName = keyName.replaceAll("[^a-zA-Z0-9_]", ""); + keyName = keyName.replaceAll("^_+", ""); + } + return keyName; + } + public static TableSchema getTableSchema( DatabaseMetaData metadata, TablePath tablePath, JdbcDialectTypeMapper typeMapper) throws SQLException { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java index d8c7b661c59..ee5e08d8f84 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java @@ -92,6 +92,7 @@ public MultiTableResourceManager initMultiTableResourceMa ds.setPassword(jdbcSinkConfig.getJdbcConnectionConfig().getPassword().get()); } ds.setAutoCommit(jdbcSinkConfig.getJdbcConnectionConfig().isAutoCommit()); + ds.setDriverClassName(jdbcSinkConfig.getJdbcConnectionConfig().getDriverName()); return new JdbcMultiTableResourceManager(new ConnectionPoolManager(ds)); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java index 2e7a72589f7..745c7031f8d 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java @@ -26,6 +26,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.mysql.MysqlCreateTableSqlBuilder; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.MySqlTypeConverter; @@ -37,6 +38,7 @@ import java.io.PrintStream; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; public class MysqlCreateTableSqlBuilderTest { @@ -58,6 +60,14 @@ public void testBuild() { .column( PhysicalColumn.of( "age", BasicType.INT_TYPE, (Long) null, true, null, "age")) + .column( + PhysicalColumn.of( + "blob_v", + PrimitiveByteArrayType.INSTANCE, + Long.MAX_VALUE, + true, + null, + "blob_v")) .column( PhysicalColumn.of( "createTime", @@ -76,12 +86,19 @@ public void testBuild() { "lastUpdateTime")) .primaryKey(PrimaryKey.of("id", Lists.newArrayList("id"))) .constraintKey( - ConstraintKey.of( - ConstraintKey.ConstraintType.INDEX_KEY, - "name", - Lists.newArrayList( - ConstraintKey.ConstraintKeyColumn.of( - "name", null)))) + Arrays.asList( + ConstraintKey.of( + ConstraintKey.ConstraintType.INDEX_KEY, + "name", + Lists.newArrayList( + ConstraintKey.ConstraintKeyColumn.of( + "name", null))), + ConstraintKey.of( + ConstraintKey.ConstraintType.INDEX_KEY, + "blob_v", + Lists.newArrayList( + ConstraintKey.ConstraintKeyColumn.of( + "blob_v", null))))) .build(); CatalogTable catalogTable = CatalogTable.of( @@ -98,12 +115,15 @@ public void testBuild() { // create table sql is change; The old unit tests are no longer applicable String expect = "CREATE TABLE `test_table` (\n" - + "\t`id` null NOT NULL COMMENT 'id', \n" - + "\t`name` null NOT NULL COMMENT 'name', \n" - + "\t`age` null NULL COMMENT 'age', \n" - + "\t`createTime` null NULL COMMENT 'createTime', \n" - + "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n" - + "\tPRIMARY KEY (`id`)\n" + + "\t`id` BIGINT NOT NULL COMMENT 'id', \n" + + "\t`name` VARCHAR(128) NOT NULL COMMENT 'name', \n" + + "\t`age` INT NULL COMMENT 'age', \n" + + "\t`blob_v` LONGBLOB NULL COMMENT 'blob_v', \n" + + "\t`createTime` DATETIME NULL COMMENT 'createTime', \n" + + "\t`lastUpdateTime` DATETIME NULL COMMENT 'lastUpdateTime', \n" + + "\tPRIMARY KEY (`id`), \n" + + "\tKEY `name` (`name`), \n" + + "\tKEY `blob_v` (`blob_v`(255))\n" + ") COMMENT = 'User table';"; CONSOLE.println(expect); Assertions.assertEquals(expect, createTableSql); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java new file mode 100644 index 00000000000..25f256fc04e --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtilsTest.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; + +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TablePath; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.sql.SQLException; +import java.util.List; +import java.util.Optional; + +public class CatalogUtilsTest { + + @Test + void testPrimaryKeysNameWithOutSpecialChar() throws SQLException { + Optional primaryKey = + CatalogUtils.getPrimaryKey(new TestDatabaseMetaData(), TablePath.of("test.test")); + Assertions.assertEquals("testfdawe_", primaryKey.get().getPrimaryKey()); + } + + @Test + void testConstraintKeysNameWithOutSpecialChar() throws SQLException { + List constraintKeys = + CatalogUtils.getConstraintKeys( + new TestDatabaseMetaData(), TablePath.of("test.test")); + Assertions.assertEquals("testfdawe_", constraintKeys.get(0).getConstraintName()); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java new file mode 100644 index 00000000000..c0ea1c911e8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestDatabaseMetaData.java @@ -0,0 +1,974 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; +import java.sql.RowIdLifetime; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class TestDatabaseMetaData implements DatabaseMetaData { + @Override + public boolean allProceduresAreCallable() throws SQLException { + return false; + } + + @Override + public boolean allTablesAreSelectable() throws SQLException { + return false; + } + + @Override + public String getURL() throws SQLException { + return null; + } + + @Override + public String getUserName() throws SQLException { + return null; + } + + @Override + public boolean isReadOnly() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedHigh() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedLow() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedAtStart() throws SQLException { + return false; + } + + @Override + public boolean nullsAreSortedAtEnd() throws SQLException { + return false; + } + + @Override + public String getDatabaseProductName() throws SQLException { + return null; + } + + @Override + public String getDatabaseProductVersion() throws SQLException { + return null; + } + + @Override + public String getDriverName() throws SQLException { + return null; + } + + @Override + public String getDriverVersion() throws SQLException { + return null; + } + + @Override + public int getDriverMajorVersion() { + return 0; + } + + @Override + public int getDriverMinorVersion() { + return 0; + } + + @Override + public boolean usesLocalFiles() throws SQLException { + return false; + } + + @Override + public boolean usesLocalFilePerTable() throws SQLException { + return false; + } + + @Override + public boolean supportsMixedCaseIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesUpperCaseIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesLowerCaseIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesMixedCaseIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean supportsMixedCaseQuotedIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesUpperCaseQuotedIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesLowerCaseQuotedIdentifiers() throws SQLException { + return false; + } + + @Override + public boolean storesMixedCaseQuotedIdentifiers() throws SQLException { + return false; + } + + @Override + public String getIdentifierQuoteString() throws SQLException { + return null; + } + + @Override + public String getSQLKeywords() throws SQLException { + return null; + } + + @Override + public String getNumericFunctions() throws SQLException { + return null; + } + + @Override + public String getStringFunctions() throws SQLException { + return null; + } + + @Override + public String getSystemFunctions() throws SQLException { + return null; + } + + @Override + public String getTimeDateFunctions() throws SQLException { + return null; + } + + @Override + public String getSearchStringEscape() throws SQLException { + return null; + } + + @Override + public String getExtraNameCharacters() throws SQLException { + return null; + } + + @Override + public boolean supportsAlterTableWithAddColumn() throws SQLException { + return false; + } + + @Override + public boolean supportsAlterTableWithDropColumn() throws SQLException { + return false; + } + + @Override + public boolean supportsColumnAliasing() throws SQLException { + return false; + } + + @Override + public boolean nullPlusNonNullIsNull() throws SQLException { + return false; + } + + @Override + public boolean supportsConvert() throws SQLException { + return false; + } + + @Override + public boolean supportsConvert(int fromType, int toType) throws SQLException { + return false; + } + + @Override + public boolean supportsTableCorrelationNames() throws SQLException { + return false; + } + + @Override + public boolean supportsDifferentTableCorrelationNames() throws SQLException { + return false; + } + + @Override + public boolean supportsExpressionsInOrderBy() throws SQLException { + return false; + } + + @Override + public boolean supportsOrderByUnrelated() throws SQLException { + return false; + } + + @Override + public boolean supportsGroupBy() throws SQLException { + return false; + } + + @Override + public boolean supportsGroupByUnrelated() throws SQLException { + return false; + } + + @Override + public boolean supportsGroupByBeyondSelect() throws SQLException { + return false; + } + + @Override + public boolean supportsLikeEscapeClause() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleResultSets() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsNonNullableColumns() throws SQLException { + return false; + } + + @Override + public boolean supportsMinimumSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsCoreSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsExtendedSQLGrammar() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92EntryLevelSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92IntermediateSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsANSI92FullSQL() throws SQLException { + return false; + } + + @Override + public boolean supportsIntegrityEnhancementFacility() throws SQLException { + return false; + } + + @Override + public boolean supportsOuterJoins() throws SQLException { + return false; + } + + @Override + public boolean supportsFullOuterJoins() throws SQLException { + return false; + } + + @Override + public boolean supportsLimitedOuterJoins() throws SQLException { + return false; + } + + @Override + public String getSchemaTerm() throws SQLException { + return null; + } + + @Override + public String getProcedureTerm() throws SQLException { + return null; + } + + @Override + public String getCatalogTerm() throws SQLException { + return null; + } + + @Override + public boolean isCatalogAtStart() throws SQLException { + return false; + } + + @Override + public String getCatalogSeparator() throws SQLException { + return null; + } + + @Override + public boolean supportsSchemasInDataManipulation() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInProcedureCalls() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInTableDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInIndexDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsSchemasInPrivilegeDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInDataManipulation() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInProcedureCalls() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInTableDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInIndexDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsCatalogsInPrivilegeDefinitions() throws SQLException { + return false; + } + + @Override + public boolean supportsPositionedDelete() throws SQLException { + return false; + } + + @Override + public boolean supportsPositionedUpdate() throws SQLException { + return false; + } + + @Override + public boolean supportsSelectForUpdate() throws SQLException { + return false; + } + + @Override + public boolean supportsStoredProcedures() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInComparisons() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInExists() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInIns() throws SQLException { + return false; + } + + @Override + public boolean supportsSubqueriesInQuantifieds() throws SQLException { + return false; + } + + @Override + public boolean supportsCorrelatedSubqueries() throws SQLException { + return false; + } + + @Override + public boolean supportsUnion() throws SQLException { + return false; + } + + @Override + public boolean supportsUnionAll() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenCursorsAcrossCommit() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenCursorsAcrossRollback() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenStatementsAcrossCommit() throws SQLException { + return false; + } + + @Override + public boolean supportsOpenStatementsAcrossRollback() throws SQLException { + return false; + } + + @Override + public int getMaxBinaryLiteralLength() throws SQLException { + return 0; + } + + @Override + public int getMaxCharLiteralLength() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInGroupBy() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInIndex() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInOrderBy() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInSelect() throws SQLException { + return 0; + } + + @Override + public int getMaxColumnsInTable() throws SQLException { + return 0; + } + + @Override + public int getMaxConnections() throws SQLException { + return 0; + } + + @Override + public int getMaxCursorNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxIndexLength() throws SQLException { + return 0; + } + + @Override + public int getMaxSchemaNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxProcedureNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxCatalogNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxRowSize() throws SQLException { + return 0; + } + + @Override + public boolean doesMaxRowSizeIncludeBlobs() throws SQLException { + return false; + } + + @Override + public int getMaxStatementLength() throws SQLException { + return 0; + } + + @Override + public int getMaxStatements() throws SQLException { + return 0; + } + + @Override + public int getMaxTableNameLength() throws SQLException { + return 0; + } + + @Override + public int getMaxTablesInSelect() throws SQLException { + return 0; + } + + @Override + public int getMaxUserNameLength() throws SQLException { + return 0; + } + + @Override + public int getDefaultTransactionIsolation() throws SQLException { + return 0; + } + + @Override + public boolean supportsTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsTransactionIsolationLevel(int level) throws SQLException { + return false; + } + + @Override + public boolean supportsDataDefinitionAndDataManipulationTransactions() throws SQLException { + return false; + } + + @Override + public boolean supportsDataManipulationTransactionsOnly() throws SQLException { + return false; + } + + @Override + public boolean dataDefinitionCausesTransactionCommit() throws SQLException { + return false; + } + + @Override + public boolean dataDefinitionIgnoredInTransactions() throws SQLException { + return false; + } + + @Override + public ResultSet getProcedures( + String catalog, String schemaPattern, String procedureNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getProcedureColumns( + String catalog, + String schemaPattern, + String procedureNamePattern, + String columnNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getTables( + String catalog, String schemaPattern, String tableNamePattern, String[] types) + throws SQLException { + return null; + } + + @Override + public ResultSet getSchemas() throws SQLException { + return null; + } + + @Override + public ResultSet getCatalogs() throws SQLException { + return null; + } + + @Override + public ResultSet getTableTypes() throws SQLException { + return null; + } + + @Override + public ResultSet getColumns( + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getColumnPrivileges( + String catalog, String schema, String table, String columnNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getTablePrivileges( + String catalog, String schemaPattern, String tableNamePattern) throws SQLException { + return null; + } + + @Override + public ResultSet getBestRowIdentifier( + String catalog, String schema, String table, int scope, boolean nullable) + throws SQLException { + return null; + } + + @Override + public ResultSet getVersionColumns(String catalog, String schema, String table) + throws SQLException { + return null; + } + + @Override + public ResultSet getPrimaryKeys(String catalog, String schema, String table) + throws SQLException { + List> value = new ArrayList<>(); + value.add( + new HashMap() { + { + put("COLUMN_NAME", "id"); + put("PK_NAME", "_test!#$#@fdawe_"); + put("KEY_SEQ", 1); + } + }); + return new TestResultSet(value); + } + + @Override + public ResultSet getImportedKeys(String catalog, String schema, String table) + throws SQLException { + return null; + } + + @Override + public ResultSet getExportedKeys(String catalog, String schema, String table) + throws SQLException { + return null; + } + + @Override + public ResultSet getCrossReference( + String parentCatalog, + String parentSchema, + String parentTable, + String foreignCatalog, + String foreignSchema, + String foreignTable) + throws SQLException { + return null; + } + + @Override + public ResultSet getTypeInfo() throws SQLException { + return null; + } + + @Override + public ResultSet getIndexInfo( + String catalog, String schema, String table, boolean unique, boolean approximate) + throws SQLException { + List> value = new ArrayList<>(); + value.add( + new HashMap() { + { + put("COLUMN_NAME", "id"); + put("INDEX_NAME", "_test!#$#@fdawe_"); + put("NON_UNIQUE", true); + put("ASC_OR_DESC", "A"); + } + }); + return new TestResultSet(value); + } + + @Override + public boolean supportsResultSetType(int type) throws SQLException { + return false; + } + + @Override + public boolean supportsResultSetConcurrency(int type, int concurrency) throws SQLException { + return false; + } + + @Override + public boolean ownUpdatesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean ownDeletesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean ownInsertsAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersUpdatesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersDeletesAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean othersInsertsAreVisible(int type) throws SQLException { + return false; + } + + @Override + public boolean updatesAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean deletesAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean insertsAreDetected(int type) throws SQLException { + return false; + } + + @Override + public boolean supportsBatchUpdates() throws SQLException { + return false; + } + + @Override + public ResultSet getUDTs( + String catalog, String schemaPattern, String typeNamePattern, int[] types) + throws SQLException { + return null; + } + + @Override + public Connection getConnection() throws SQLException { + return null; + } + + @Override + public boolean supportsSavepoints() throws SQLException { + return false; + } + + @Override + public boolean supportsNamedParameters() throws SQLException { + return false; + } + + @Override + public boolean supportsMultipleOpenResults() throws SQLException { + return false; + } + + @Override + public boolean supportsGetGeneratedKeys() throws SQLException { + return false; + } + + @Override + public ResultSet getSuperTypes(String catalog, String schemaPattern, String typeNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getSuperTables(String catalog, String schemaPattern, String tableNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getAttributes( + String catalog, + String schemaPattern, + String typeNamePattern, + String attributeNamePattern) + throws SQLException { + return null; + } + + @Override + public boolean supportsResultSetHoldability(int holdability) throws SQLException { + return false; + } + + @Override + public int getResultSetHoldability() throws SQLException { + return 0; + } + + @Override + public int getDatabaseMajorVersion() throws SQLException { + return 0; + } + + @Override + public int getDatabaseMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getJDBCMajorVersion() throws SQLException { + return 0; + } + + @Override + public int getJDBCMinorVersion() throws SQLException { + return 0; + } + + @Override + public int getSQLStateType() throws SQLException { + return 0; + } + + @Override + public boolean locatorsUpdateCopy() throws SQLException { + return false; + } + + @Override + public boolean supportsStatementPooling() throws SQLException { + return false; + } + + @Override + public RowIdLifetime getRowIdLifetime() throws SQLException { + return null; + } + + @Override + public ResultSet getSchemas(String catalog, String schemaPattern) throws SQLException { + return null; + } + + @Override + public boolean supportsStoredFunctionsUsingCallSyntax() throws SQLException { + return false; + } + + @Override + public boolean autoCommitFailureClosesAllResultSets() throws SQLException { + return false; + } + + @Override + public ResultSet getClientInfoProperties() throws SQLException { + return null; + } + + @Override + public ResultSet getFunctions(String catalog, String schemaPattern, String functionNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getFunctionColumns( + String catalog, + String schemaPattern, + String functionNamePattern, + String columnNamePattern) + throws SQLException { + return null; + } + + @Override + public ResultSet getPseudoColumns( + String catalog, String schemaPattern, String tableNamePattern, String columnNamePattern) + throws SQLException { + return null; + } + + @Override + public boolean generatedKeyAlwaysReturned() throws SQLException { + return false; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestResultSet.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestResultSet.java new file mode 100644 index 00000000000..f3a67a4e590 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/TestResultSet.java @@ -0,0 +1,830 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; + +import java.io.InputStream; +import java.io.Reader; +import java.math.BigDecimal; +import java.net.URL; +import java.sql.Array; +import java.sql.Blob; +import java.sql.Clob; +import java.sql.Date; +import java.sql.NClob; +import java.sql.Ref; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.RowId; +import java.sql.SQLException; +import java.sql.SQLWarning; +import java.sql.SQLXML; +import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Calendar; +import java.util.List; +import java.util.Map; + +public class TestResultSet implements ResultSet { + + private final List> value; + + private int index = -1; + + public TestResultSet(List> value) { + this.value = value; + } + + @Override + public boolean next() throws SQLException { + return value.size() > ++index; + } + + @Override + public void close() throws SQLException {} + + @Override + public boolean wasNull() throws SQLException { + return false; + } + + @Override + public String getString(int columnIndex) throws SQLException { + return null; + } + + @Override + public boolean getBoolean(int columnIndex) throws SQLException { + return false; + } + + @Override + public byte getByte(int columnIndex) throws SQLException { + return 0; + } + + @Override + public short getShort(int columnIndex) throws SQLException { + return 0; + } + + @Override + public int getInt(int columnIndex) throws SQLException { + return 0; + } + + @Override + public long getLong(int columnIndex) throws SQLException { + return 0; + } + + @Override + public float getFloat(int columnIndex) throws SQLException { + return 0; + } + + @Override + public double getDouble(int columnIndex) throws SQLException { + return 0; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex, int scale) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(int columnIndex) throws SQLException { + return new byte[0]; + } + + @Override + public Date getDate(int columnIndex) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getAsciiStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public InputStream getBinaryStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getString(String columnLabel) throws SQLException { + return value.get(index).get(columnLabel).toString(); + } + + @Override + public boolean getBoolean(String columnLabel) throws SQLException { + return (boolean) value.get(index).get(columnLabel); + } + + @Override + public byte getByte(String columnLabel) throws SQLException { + return 0; + } + + @Override + public short getShort(String columnLabel) throws SQLException { + return 0; + } + + @Override + public int getInt(String columnLabel) throws SQLException { + return (int) value.get(index).get(columnLabel); + } + + @Override + public long getLong(String columnLabel) throws SQLException { + return 0; + } + + @Override + public float getFloat(String columnLabel) throws SQLException { + return 0; + } + + @Override + public double getDouble(String columnLabel) throws SQLException { + return 0; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel, int scale) throws SQLException { + return null; + } + + @Override + public byte[] getBytes(String columnLabel) throws SQLException { + return new byte[0]; + } + + @Override + public Date getDate(String columnLabel) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getAsciiStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getUnicodeStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public InputStream getBinaryStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLWarning getWarnings() throws SQLException { + return null; + } + + @Override + public void clearWarnings() throws SQLException {} + + @Override + public String getCursorName() throws SQLException { + return null; + } + + @Override + public ResultSetMetaData getMetaData() throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex) throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel) throws SQLException { + return null; + } + + @Override + public int findColumn(String columnLabel) throws SQLException { + return 0; + } + + @Override + public Reader getCharacterStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public Reader getCharacterStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(int columnIndex) throws SQLException { + return null; + } + + @Override + public BigDecimal getBigDecimal(String columnLabel) throws SQLException { + return null; + } + + @Override + public boolean isBeforeFirst() throws SQLException { + return false; + } + + @Override + public boolean isAfterLast() throws SQLException { + return false; + } + + @Override + public boolean isFirst() throws SQLException { + return false; + } + + @Override + public boolean isLast() throws SQLException { + return false; + } + + @Override + public void beforeFirst() throws SQLException {} + + @Override + public void afterLast() throws SQLException {} + + @Override + public boolean first() throws SQLException { + return false; + } + + @Override + public boolean last() throws SQLException { + return false; + } + + @Override + public int getRow() throws SQLException { + return 0; + } + + @Override + public boolean absolute(int row) throws SQLException { + return false; + } + + @Override + public boolean relative(int rows) throws SQLException { + return false; + } + + @Override + public boolean previous() throws SQLException { + return false; + } + + @Override + public void setFetchDirection(int direction) throws SQLException {} + + @Override + public int getFetchDirection() throws SQLException { + return 0; + } + + @Override + public void setFetchSize(int rows) throws SQLException {} + + @Override + public int getFetchSize() throws SQLException { + return 0; + } + + @Override + public int getType() throws SQLException { + return 0; + } + + @Override + public int getConcurrency() throws SQLException { + return 0; + } + + @Override + public boolean rowUpdated() throws SQLException { + return false; + } + + @Override + public boolean rowInserted() throws SQLException { + return false; + } + + @Override + public boolean rowDeleted() throws SQLException { + return false; + } + + @Override + public void updateNull(int columnIndex) throws SQLException {} + + @Override + public void updateBoolean(int columnIndex, boolean x) throws SQLException {} + + @Override + public void updateByte(int columnIndex, byte x) throws SQLException {} + + @Override + public void updateShort(int columnIndex, short x) throws SQLException {} + + @Override + public void updateInt(int columnIndex, int x) throws SQLException {} + + @Override + public void updateLong(int columnIndex, long x) throws SQLException {} + + @Override + public void updateFloat(int columnIndex, float x) throws SQLException {} + + @Override + public void updateDouble(int columnIndex, double x) throws SQLException {} + + @Override + public void updateBigDecimal(int columnIndex, BigDecimal x) throws SQLException {} + + @Override + public void updateString(int columnIndex, String x) throws SQLException {} + + @Override + public void updateBytes(int columnIndex, byte[] x) throws SQLException {} + + @Override + public void updateDate(int columnIndex, Date x) throws SQLException {} + + @Override + public void updateTime(int columnIndex, Time x) throws SQLException {} + + @Override + public void updateTimestamp(int columnIndex, Timestamp x) throws SQLException {} + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, int length) throws SQLException {} + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, int length) + throws SQLException {} + + @Override + public void updateCharacterStream(int columnIndex, Reader x, int length) throws SQLException {} + + @Override + public void updateObject(int columnIndex, Object x, int scaleOrLength) throws SQLException {} + + @Override + public void updateObject(int columnIndex, Object x) throws SQLException {} + + @Override + public void updateNull(String columnLabel) throws SQLException {} + + @Override + public void updateBoolean(String columnLabel, boolean x) throws SQLException {} + + @Override + public void updateByte(String columnLabel, byte x) throws SQLException {} + + @Override + public void updateShort(String columnLabel, short x) throws SQLException {} + + @Override + public void updateInt(String columnLabel, int x) throws SQLException {} + + @Override + public void updateLong(String columnLabel, long x) throws SQLException {} + + @Override + public void updateFloat(String columnLabel, float x) throws SQLException {} + + @Override + public void updateDouble(String columnLabel, double x) throws SQLException {} + + @Override + public void updateBigDecimal(String columnLabel, BigDecimal x) throws SQLException {} + + @Override + public void updateString(String columnLabel, String x) throws SQLException {} + + @Override + public void updateBytes(String columnLabel, byte[] x) throws SQLException {} + + @Override + public void updateDate(String columnLabel, Date x) throws SQLException {} + + @Override + public void updateTime(String columnLabel, Time x) throws SQLException {} + + @Override + public void updateTimestamp(String columnLabel, Timestamp x) throws SQLException {} + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, int length) + throws SQLException {} + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, int length) + throws SQLException {} + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, int length) + throws SQLException {} + + @Override + public void updateObject(String columnLabel, Object x, int scaleOrLength) throws SQLException {} + + @Override + public void updateObject(String columnLabel, Object x) throws SQLException {} + + @Override + public void insertRow() throws SQLException {} + + @Override + public void updateRow() throws SQLException {} + + @Override + public void deleteRow() throws SQLException {} + + @Override + public void refreshRow() throws SQLException {} + + @Override + public void cancelRowUpdates() throws SQLException {} + + @Override + public void moveToInsertRow() throws SQLException {} + + @Override + public void moveToCurrentRow() throws SQLException {} + + @Override + public Statement getStatement() throws SQLException { + return null; + } + + @Override + public Object getObject(int columnIndex, Map> map) throws SQLException { + return null; + } + + @Override + public Ref getRef(int columnIndex) throws SQLException { + return null; + } + + @Override + public Blob getBlob(int columnIndex) throws SQLException { + return null; + } + + @Override + public Clob getClob(int columnIndex) throws SQLException { + return null; + } + + @Override + public Array getArray(int columnIndex) throws SQLException { + return null; + } + + @Override + public Object getObject(String columnLabel, Map> map) throws SQLException { + return null; + } + + @Override + public Ref getRef(String columnLabel) throws SQLException { + return null; + } + + @Override + public Blob getBlob(String columnLabel) throws SQLException { + return null; + } + + @Override + public Clob getClob(String columnLabel) throws SQLException { + return null; + } + + @Override + public Array getArray(String columnLabel) throws SQLException { + return null; + } + + @Override + public Date getDate(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Date getDate(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public Time getTime(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Time getTime(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException { + return null; + } + + @Override + public Timestamp getTimestamp(String columnLabel, Calendar cal) throws SQLException { + return null; + } + + @Override + public URL getURL(int columnIndex) throws SQLException { + return null; + } + + @Override + public URL getURL(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateRef(int columnIndex, Ref x) throws SQLException {} + + @Override + public void updateRef(String columnLabel, Ref x) throws SQLException {} + + @Override + public void updateBlob(int columnIndex, Blob x) throws SQLException {} + + @Override + public void updateBlob(String columnLabel, Blob x) throws SQLException {} + + @Override + public void updateClob(int columnIndex, Clob x) throws SQLException {} + + @Override + public void updateClob(String columnLabel, Clob x) throws SQLException {} + + @Override + public void updateArray(int columnIndex, Array x) throws SQLException {} + + @Override + public void updateArray(String columnLabel, Array x) throws SQLException {} + + @Override + public RowId getRowId(int columnIndex) throws SQLException { + return null; + } + + @Override + public RowId getRowId(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateRowId(int columnIndex, RowId x) throws SQLException {} + + @Override + public void updateRowId(String columnLabel, RowId x) throws SQLException {} + + @Override + public int getHoldability() throws SQLException { + return 0; + } + + @Override + public boolean isClosed() throws SQLException { + return false; + } + + @Override + public void updateNString(int columnIndex, String nString) throws SQLException {} + + @Override + public void updateNString(String columnLabel, String nString) throws SQLException {} + + @Override + public void updateNClob(int columnIndex, NClob nClob) throws SQLException {} + + @Override + public void updateNClob(String columnLabel, NClob nClob) throws SQLException {} + + @Override + public NClob getNClob(int columnIndex) throws SQLException { + return null; + } + + @Override + public NClob getNClob(String columnLabel) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(int columnIndex) throws SQLException { + return null; + } + + @Override + public SQLXML getSQLXML(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateSQLXML(int columnIndex, SQLXML xmlObject) throws SQLException {} + + @Override + public void updateSQLXML(String columnLabel, SQLXML xmlObject) throws SQLException {} + + @Override + public String getNString(int columnIndex) throws SQLException { + return null; + } + + @Override + public String getNString(String columnLabel) throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(int columnIndex) throws SQLException { + return null; + } + + @Override + public Reader getNCharacterStream(String columnLabel) throws SQLException { + return null; + } + + @Override + public void updateNCharacterStream(int columnIndex, Reader x, long length) + throws SQLException {} + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader, long length) + throws SQLException {} + + @Override + public void updateAsciiStream(int columnIndex, InputStream x, long length) + throws SQLException {} + + @Override + public void updateBinaryStream(int columnIndex, InputStream x, long length) + throws SQLException {} + + @Override + public void updateCharacterStream(int columnIndex, Reader x, long length) throws SQLException {} + + @Override + public void updateAsciiStream(String columnLabel, InputStream x, long length) + throws SQLException {} + + @Override + public void updateBinaryStream(String columnLabel, InputStream x, long length) + throws SQLException {} + + @Override + public void updateCharacterStream(String columnLabel, Reader reader, long length) + throws SQLException {} + + @Override + public void updateBlob(int columnIndex, InputStream inputStream, long length) + throws SQLException {} + + @Override + public void updateBlob(String columnLabel, InputStream inputStream, long length) + throws SQLException {} + + @Override + public void updateClob(int columnIndex, Reader reader, long length) throws SQLException {} + + @Override + public void updateClob(String columnLabel, Reader reader, long length) throws SQLException {} + + @Override + public void updateNClob(int columnIndex, Reader reader, long length) throws SQLException {} + + @Override + public void updateNClob(String columnLabel, Reader reader, long length) throws SQLException {} + + @Override + public void updateNCharacterStream(int columnIndex, Reader x) throws SQLException {} + + @Override + public void updateNCharacterStream(String columnLabel, Reader reader) throws SQLException {} + + @Override + public void updateAsciiStream(int columnIndex, InputStream x) throws SQLException {} + + @Override + public void updateBinaryStream(int columnIndex, InputStream x) throws SQLException {} + + @Override + public void updateCharacterStream(int columnIndex, Reader x) throws SQLException {} + + @Override + public void updateAsciiStream(String columnLabel, InputStream x) throws SQLException {} + + @Override + public void updateBinaryStream(String columnLabel, InputStream x) throws SQLException {} + + @Override + public void updateCharacterStream(String columnLabel, Reader reader) throws SQLException {} + + @Override + public void updateBlob(int columnIndex, InputStream inputStream) throws SQLException {} + + @Override + public void updateBlob(String columnLabel, InputStream inputStream) throws SQLException {} + + @Override + public void updateClob(int columnIndex, Reader reader) throws SQLException {} + + @Override + public void updateClob(String columnLabel, Reader reader) throws SQLException {} + + @Override + public void updateNClob(int columnIndex, Reader reader) throws SQLException {} + + @Override + public void updateNClob(String columnLabel, Reader reader) throws SQLException {} + + @Override + public T getObject(int columnIndex, Class type) throws SQLException { + return null; + } + + @Override + public T getObject(String columnLabel, Class type) throws SQLException { + return null; + } + + @Override + public T unwrap(Class iface) throws SQLException { + return null; + } + + @Override + public boolean isWrapperFor(Class iface) throws SQLException { + return false; + } +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java index 12f5acd597d..2b7498f4856 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointCoordinator.java @@ -441,10 +441,6 @@ protected void readyToCloseIdleTask(TaskLocation taskLocation) { subTask.getJobId()); } } - if (subTaskList.size() != 2) { - throw new UnsupportedOperationException( - "Unsupported close not reader/writer task group: " + subTaskList); - } readyToCloseIdleTask.addAll(subTaskList); tryTriggerPendingCheckpoint(CheckpointType.CHECKPOINT_TYPE); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java index a38703dee8b..0142d8a6a4b 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManager.java @@ -241,12 +241,11 @@ public CompletableFuture listenPipeline(int pipelineId, PipelineStatus pip * Called by the JobMaster.
* Listen to the {@link JobStatus} of the {@link Job}. */ - public CompletableFuture shutdown(JobStatus jobStatus) { + public void clearCheckpointIfNeed(JobStatus jobStatus) { if ((jobStatus == JobStatus.FINISHED || jobStatus == JobStatus.CANCELED) && !isSavePointEnd()) { checkpointStorage.deleteCheckpoint(jobId + ""); } - return CompletableFuture.completedFuture(null); } /** diff --git a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java index 8f54402d80d..1b7bf6bdad5 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java +++ b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/master/JobMaster.java @@ -518,6 +518,7 @@ public void releasePipelineResource(SubPlan subPlan) { } public void cleanJob() { + checkpointManager.clearCheckpointIfNeed(physicalPlan.getJobStatus()); jobHistoryService.storeJobInfo(jobImmutableInformation.getJobId(), getJobDAGInfo()); jobHistoryService.storeFinishedJobState(this); removeJobIMap(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java index c4b7b8af7d3..ce5cca1780d 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointManagerTest.java @@ -41,7 +41,6 @@ import java.time.Instant; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import static org.apache.seatunnel.engine.common.Constant.IMAP_CHECKPOINT_ID; import static org.apache.seatunnel.engine.common.Constant.IMAP_RUNNING_JOB_STATE; @@ -94,8 +93,7 @@ public void testHAByIMapCheckpointIDCounter() throws CheckpointStorageException Assertions.assertTrue(checkpointManager.isCompletedPipeline(1)); checkpointManager.listenPipeline(1, PipelineStatus.FINISHED); Assertions.assertNull(checkpointIdMap.get(1)); - CompletableFuture future = checkpointManager.shutdown(JobStatus.FINISHED); - future.join(); + checkpointManager.clearCheckpointIfNeed(JobStatus.FINISHED); Assertions.assertTrue(checkpointStorage.getAllCheckpoints(jobId + "").isEmpty()); } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java index 2a334fe3849..63e12778276 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/checkpoint/CheckpointStorageTest.java @@ -43,6 +43,11 @@ public class CheckpointStorageTest extends AbstractSeaTunnelServerTest { public static String STREAM_CONF_PATH = "stream_fake_to_console_biginterval.conf"; public static String BATCH_CONF_PATH = "batch_fakesource_to_file.conf"; + public static String BATCH_CONF_WITH_CHECKPOINT_PATH = + "batch_fakesource_to_file_with_checkpoint.conf"; + + public static String STREAM_CONF_WITH_CHECKPOINT_PATH = + "stream_fake_to_console_with_checkpoint.conf"; @Override public SeaTunnelConfig loadSeaTunnelConfig() { @@ -113,4 +118,63 @@ public void testBatchJob() throws CheckpointStorageException { checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); Assertions.assertEquals(0, allCheckpoints.size()); } + + @Test + public void testBatchJobWithCheckpoint() throws CheckpointStorageException { + long jobId = System.currentTimeMillis(); + CheckpointConfig checkpointConfig = + server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig(); + server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig); + + CheckpointStorage checkpointStorage = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + CheckpointStorageFactory.class, + checkpointConfig.getStorage().getStorage()) + .create(checkpointConfig.getStorage().getStoragePluginConfig()); + startJob(jobId, BATCH_CONF_WITH_CHECKPOINT_PATH, false); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.FINISHED)); + List allCheckpoints = + checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); + Assertions.assertEquals(0, allCheckpoints.size()); + } + + @Test + public void testStreamJobWithCancel() throws CheckpointStorageException, InterruptedException { + long jobId = System.currentTimeMillis(); + CheckpointConfig checkpointConfig = + server.getSeaTunnelConfig().getEngineConfig().getCheckpointConfig(); + server.getSeaTunnelConfig().getEngineConfig().setCheckpointConfig(checkpointConfig); + + CheckpointStorage checkpointStorage = + FactoryUtil.discoverFactory( + Thread.currentThread().getContextClassLoader(), + CheckpointStorageFactory.class, + checkpointConfig.getStorage().getStorage()) + .create(checkpointConfig.getStorage().getStoragePluginConfig()); + startJob(jobId, STREAM_CONF_WITH_CHECKPOINT_PATH, false); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.RUNNING)); + // wait for checkpoint + Thread.sleep(10 * 1000); + server.getCoordinatorService().getJobMaster(jobId).cancelJob(); + await().atMost(120000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + server.getCoordinatorService().getJobStatus(jobId), + JobStatus.CANCELED)); + List allCheckpoints = + checkpointStorage.getAllCheckpoints(String.valueOf(jobId)); + Assertions.assertEquals(0, allCheckpoints.size()); + } } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java index cbfde91b370..bc1e8f06f26 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMasterTest.java @@ -278,7 +278,7 @@ private void assertCloseIdleTask(JobMaster jobMaster) { .getCheckpointManager() .getCheckpointCoordinator(seaTunnelTask.getTaskLocation().getPipelineId()); await().atMost(60, TimeUnit.SECONDS) - .until(() -> checkpointCoordinator.getClosedIdleTask().size() == 2); + .until(() -> checkpointCoordinator.getClosedIdleTask().size() == 3); await().atMost(60, TimeUnit.SECONDS) .until(() -> slotService.getWorkerProfile().getAssignedSlots().length == 3); } diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf new file mode 100644 index 00000000000..721f89fe946 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/batch_fakesource_to_file_with_checkpoint.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 1000 +} + +source { + FakeSource { + row.num = 100 + split.num = 5 + split.read-interval = 3000 + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } + } + parallelism = 1 + } +} + +transform { +} + +sink { + LocalFile { + path="/tmp/hive/warehouse/test2" + field_delimiter="\t" + row_delimiter="\n" + partition_by=["age"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format_type="text" + sink_columns=["name","age"] + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + save_mode="error" + + } +} \ No newline at end of file diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf new file mode 100644 index 00000000000..de02ec9624f --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fake_to_console_with_checkpoint.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in SeaTunnel config +###### + +env { + # You can set SeaTunnel environment configuration here + parallelism = 2 + job.mode = "STREAMING" + checkpoint.interval = 1000 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 2 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + name = "string" + age = "int" + } + } + } + + # If you would like to get more information about how to configure SeaTunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +sink { + Console { + } + + # If you would like to get more information about how to configure SeaTunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf index 2cbcf14bd9a..a9515ca00bd 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf +++ b/seatunnel-engine/seatunnel-engine-server/src/test/resources/stream_fakesource_to_file.conf @@ -41,7 +41,11 @@ transform { } sink { + console { + source_table_name = "fake" + } LocalFile { + source_table_name = "fake" path="/tmp/hive/warehouse/test2" field_delimiter="\t" row_delimiter="\n"