From 811fb7e6cb4b0daf6e3033101a8d58a9e7f71d5d Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Thu, 8 Sep 2022 23:52:01 +0800 Subject: [PATCH 01/12] refactor sqlserver connector --- pom.xml | 1 + .../connector-jdbc/pom.xml | 7 + .../dialect/sqlserver/SqlServerDialect.java | 39 +++++ .../sqlserver/SqlServerDialectFactory.java | 40 +++++ .../sqlserver/SqlserverJdbcRowConverter.java | 39 +++++ .../sqlserver/SqlserverTypeMapper.java | 131 ++++++++++++++++ .../connector-jdbc-flink-e2e/pom.xml | 12 ++ .../e2e/flink/v2/jdbc/JdbcSqlserverIT.java | 148 ++++++++++++++++++ .../container-license-acceptance.txt | 1 + .../jdbc/jdbc_sqlserver_source_to_sink.conf | 60 +++++++ .../connector-jdbc-spark-e2e/pom.xml | 14 +- .../e2e/spark/v2/jdbc/JdbcSqlserverIT.java | 148 ++++++++++++++++++ .../container-license-acceptance.txt | 1 + .../jdbc/jdbc_sqlserver_source_to_sink.conf | 62 ++++++++ 14 files changed, 702 insertions(+), 1 deletion(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf diff --git a/pom.xml b/pom.xml index 4233ebcbfd2..5fa0d1815fd 100644 --- a/pom.xml +++ b/pom.xml @@ -146,6 +146,7 @@ 8.0.16 42.3.3 8.1.2.141 + 9.4.1.jre8 false false false diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index 3aa3e94043f..5f942737ebb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -62,6 +62,13 @@ ali-phoenix-shaded-thin-client ${phoenix.version} + + + com.microsoft.sqlserver + mssql-jdbc + ${sqlserver.version} + provided + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java new file mode 100644 index 00000000000..81c26593913 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialect.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +public class SqlServerDialect implements JdbcDialect { + @Override + public String dialectName() { + return "Sqlserver"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new SqlserverJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new SqlserverTypeMapper(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java new file mode 100644 index 00000000000..050082b7cfd --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlServerDialectFactory.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +/** + * Factory for {@link SqlServerDialect}. + */ + +@AutoService(JdbcDialectFactory.class) +public class SqlServerDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:sqlserver:"); + } + + @Override + public JdbcDialect create() { + return new SqlServerDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java new file mode 100644 index 00000000000..ed8d9db7120 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverJdbcRowConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class SqlserverJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "Sqlserver"; + } + + @Override + public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { + return super.toInternal(rs, metaData, typeInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java new file mode 100644 index 00000000000..913392cd2a2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +@Slf4j +public class SqlserverTypeMapper implements JdbcDialectTypeMapper { + + + // ============================data types===================== + + private static final String SQLSERVER_UNKNOWN = "UNKNOWN"; + + // -------------------------number---------------------------- + private static final String SQLSERVER_BIT = "BIT"; + private static final String SQLSERVER_TINYINT = "TINYINT"; + private static final String SQLSERVER_SMALLINT = "SMALLINT"; + private static final String SQLSERVER_INTEGER = "INTEGER"; + private static final String SQLSERVER_INT = "INT"; + private static final String SQLSERVER_BIGINT = "BIGINT"; + private static final String SQLSERVER_DECIMAL = "DECIMAL"; + private static final String SQLSERVER_FLOAT = "FLOAT"; + private static final String SQLSERVER_REAL = "REAL"; + private static final String SQLSERVER_NUMERIC = "NUMERIC"; + private static final String SQLSERVER_MONEY = "MONEY"; + private static final String SQLSERVER_SMALLMONEY = "SMALLMONEY"; + // -------------------------string---------------------------- + private static final String SQLSERVER_CHAR = "CHAR"; + private static final String SQLSERVER_VARCHAR = "VARCHAR"; + private static final String SQLSERVER_NTEXT = "NTEXT"; + private static final String SQLSERVER_NCHAR = "NCHAR"; + private static final String SQLSERVER_NVARCHAR = "NVARCHAR"; + private static final String SQLSERVER_TEXT = "TEXT"; + + // ------------------------------time------------------------- + private static final String SQLSERVER_DATE = "DATE"; + private static final String SQLSERVER_TIME = "TIME"; + private static final String SQLSERVER_DATETIME = "DATETIME"; + private static final String SQLSERVER_DATETIME2 = "DATETIME2"; + private static final String SQLSERVER_SMALLDATETIME = "SMALLDATETIME"; + private static final String SQLSERVER_DATETIMEOFFSET = "DATETIMEOFFSET"; + private static final String SQLSERVER_TIMESTAMP = "TIMESTAMP"; + + // ------------------------------blob------------------------- + private static final String SQLSERVER_BINARY = "BINARY"; + private static final String SQLSERVER_VARBINARY = "VARBINARY"; + private static final String SQLSERVER_IMAGE = "IMAGE"; + + @SuppressWarnings("checkstyle:MagicNumber") + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String sqlServerType = metadata.getColumnTypeName(colIndex).toUpperCase(); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + switch (sqlServerType) { + case SQLSERVER_BIT: + return BasicType.BOOLEAN_TYPE; + case SQLSERVER_TINYINT: + case SQLSERVER_SMALLINT: + return BasicType.SHORT_TYPE; + case SQLSERVER_INTEGER: + case SQLSERVER_INT: + return BasicType.INT_TYPE; + case SQLSERVER_BIGINT: + return BasicType.LONG_TYPE; + case SQLSERVER_DECIMAL: + case SQLSERVER_NUMERIC: + case SQLSERVER_MONEY: + case SQLSERVER_SMALLMONEY: + return new DecimalType(precision, scale); + case SQLSERVER_FLOAT: + case SQLSERVER_REAL: + return BasicType.FLOAT_TYPE; + case SQLSERVER_CHAR: + case SQLSERVER_NCHAR: + case SQLSERVER_VARCHAR: + case SQLSERVER_NTEXT: + case SQLSERVER_NVARCHAR: + case SQLSERVER_TEXT: + return BasicType.STRING_TYPE; + case SQLSERVER_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case SQLSERVER_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case SQLSERVER_DATETIME: + case SQLSERVER_DATETIME2: + case SQLSERVER_TIMESTAMP: + case SQLSERVER_SMALLDATETIME: + case SQLSERVER_DATETIMEOFFSET: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case SQLSERVER_BINARY: + case SQLSERVER_VARBINARY: + case SQLSERVER_IMAGE: + return PrimitiveByteArrayType.INSTANCE; + //Doesn't support yet + case SQLSERVER_UNKNOWN: + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support SQLSERVER type '%s' on column '%s' yet.", + sqlServerType, jdbcColumnName)); + } + } +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml index 1d46370b6b3..e694efe979f 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml @@ -79,6 +79,18 @@ DmJdbcDriver18 ${dm-jdbc.version} + + org.testcontainers + mssqlserver + 1.17.3 + test + + + com.microsoft.sqlserver + mssql-jdbc + ${sqlserver.version} + test + diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java new file mode 100644 index 00000000000..be3b038c56d --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +@Slf4j +public class JdbcSqlserverIT extends FlinkContainer { + + private MSSQLServerContainer mssqlServerContainer; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startSqlserverContainer() throws ClassNotFoundException, SQLException { + mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest")) + .withNetwork(NETWORK) + .withNetworkAliases("sqlserver") + .withLogConsumer(new Slf4jLogConsumer(log)); + Startables.deepStart(Stream.of(mssqlServerContainer)).join(); + log.info("Sqlserver container started"); + Class.forName(mssqlServerContainer.getDriverClassName()); + Awaitility.given().ignoreExceptions() + .await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(this::initializeJdbcTable); + batchInsertData(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + String sourceSql = "CREATE TABLE [source] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + String sinkSql = "CREATE TABLE [sink] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + statement.execute(sourceSql); + statement.execute(sinkSql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Sqlserver table failed!", e); + } + } + + @SuppressWarnings("checkstyle:RegexpSingleline") + private void batchInsertData() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + String sql = + "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " + + "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)"; + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + @Test + public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // query result + String sql = "select * from sink"; + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + List result = Lists.newArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("ids")); + } + Assertions.assertFalse(result.isEmpty()); + } + } + + @AfterEach + public void closeSqlserverContainer() { + if (mssqlServerContainer != null) { + mssqlServerContainer.stop(); + } + } + +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt new file mode 100644 index 00000000000..7f099b0aa4e --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt @@ -0,0 +1 @@ +mcr.microsoft.com/mssql/server:2022-latest diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf new file mode 100644 index 00000000000..52f18f1e083 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index bd0a341fea9..c1bbdbc9ffd 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -61,6 +61,18 @@ DmJdbcDriver18 ${dm-jdbc.version} + + org.testcontainers + mssqlserver + 1.17.3 + test + + + com.microsoft.sqlserver + mssql-jdbc + ${sqlserver.version} + test + - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java new file mode 100644 index 00000000000..219ca953bee --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.jdbc; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +@Slf4j +public class JdbcSqlserverIT extends SparkContainer { + + private MSSQLServerContainer mssqlServerContainer; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startSqlServerContainer() throws ClassNotFoundException, SQLException { + mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest")) + .withNetwork(NETWORK) + .withNetworkAliases("sqlserver") + .withLogConsumer(new Slf4jLogConsumer(log)); + Startables.deepStart(Stream.of(mssqlServerContainer)).join(); + log.info("Sqlserver container started"); + Class.forName(mssqlServerContainer.getDriverClassName()); + Awaitility.given().ignoreExceptions() + .await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(this::initializeJdbcTable); + batchInsertData(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + String sourceSql = "CREATE TABLE [source] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + String sinkSql = "CREATE TABLE [sink] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + statement.execute(sourceSql); + statement.execute(sinkSql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Sqlserver table failed!", e); + } + } + + @SuppressWarnings("checkstyle:RegexpSingleline") + private void batchInsertData() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + String sql = + "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " + + "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)"; + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + @Test + public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // query result + String sql = "select * from sink"; + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + List result = Lists.newArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("ids")); + } + Assertions.assertFalse(result.isEmpty()); + } + } + + @AfterEach + public void closeSqlserverContainer() { + if (mssqlServerContainer != null) { + mssqlServerContainer.stop(); + } + } + +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt new file mode 100644 index 00000000000..7f099b0aa4e --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt @@ -0,0 +1 @@ +mcr.microsoft.com/mssql/server:2022-latest diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf new file mode 100644 index 00000000000..98d8451de93 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} From 6d2d7207e7874546b8dd9d4b2eb58e6ad6597ba6 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Fri, 9 Sep 2022 22:49:20 +0800 Subject: [PATCH 02/12] merge dev and fix some conflict --- seatunnel-connectors-v2/connector-jdbc/pom.xml | 1 - .../connector-jdbc-spark-e2e/pom.xml | 1 - .../e2e/spark/v2/jdbc/JdbcSqlserverIT.java | 13 ++++--------- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index 5f942737ebb..e190001a3f6 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -67,7 +67,6 @@ com.microsoft.sqlserver mssql-jdbc ${sqlserver.version} - provided diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index c1bbdbc9ffd..76060634110 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -71,7 +71,6 @@ com.microsoft.sqlserver mssql-jdbc ${sqlserver.version} - test diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java index 219ca953bee..8e7f7e1c19a 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java @@ -28,7 +28,6 @@ import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; -import org.testcontainers.shaded.com.google.common.collect.Lists; import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; @@ -39,7 +38,6 @@ import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; -import java.util.List; import java.util.stream.Stream; @Slf4j @@ -59,7 +57,7 @@ public void startSqlServerContainer() throws ClassNotFoundException, SQLExceptio Class.forName(mssqlServerContainer.getDriverClassName()); Awaitility.given().ignoreExceptions() .await() - .atMost(Duration.ofMinutes(1)) + .atMost(Duration.ofMinutes(3)) .untilAsserted(this::initializeJdbcTable); batchInsertData(); } @@ -126,15 +124,12 @@ public void tesSqlserverSourceAndSink() throws SQLException, IOException, Interr Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); Assertions.assertEquals(0, execResult.getExitCode()); // query result - String sql = "select * from sink"; + String sql = "select * from sink minus select * from source;"; try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { Statement statement = connection.createStatement(); ResultSet resultSet = statement.executeQuery(sql); - List result = Lists.newArrayList(); - while (resultSet.next()) { - result.add(resultSet.getString("ids")); - } - Assertions.assertFalse(result.isEmpty()); + Assertions.assertTrue(resultSet.next()); + } } From ff16d7ccb60adc66e269852d6017773324cf86e1 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Fri, 9 Sep 2022 23:01:02 +0800 Subject: [PATCH 03/12] add sqlserver doc --- docs/en/connector-v2/sink/Jdbc.md | 48 +++++++++++++++++++++++------ docs/en/connector-v2/source/Jdbc.md | 3 +- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index ed6285eae12..4cbb9d94801 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -3,13 +3,16 @@ > JDBC sink connector ## Description -Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once semantics (using XA transaction guarantee). + +Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once +semantics (using XA transaction guarantee). ## Key features - [x] [exactly-once](../../concept/connector-v2-features.md) -Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` for the database which is support `Xa transactions`. You can set `is_exactly_once=true` to enable it. +Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` for the database which is +support `Xa transactions`. You can set `is_exactly_once=true` to enable it. - [ ] [schema projection](../../concept/connector-v2-features.md) @@ -32,19 +35,25 @@ Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` f | transaction_timeout_sec | Int | No | -1 | ### driver [string] + The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver. -Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to $SEATNUNNEL_HOME/lib for Standalone. +Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to +$SEATNUNNEL_HOME/lib for Standalone. ### user [string] + userName ### password [string] + password ### url [string] + The URL of the JDBC connection. Refer to a case: jdbc:postgresql://localhost/test ### query [string] + Query statement ### connection_check_timeout_sec [int] @@ -52,41 +61,59 @@ Query statement The time in seconds to wait for the database operation used to validate the connection to complete. ### max_retries[int] + The number of retries to submit failed (executeBatch) ### batch_size[int] -For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms` +, the data will be flushed into the database ### batch_interval_ms[int] -For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms`, the data will be flushed into the database + +For batch writing, when the number of buffers reaches the number of `batch_size` or the time reaches `batch_interval_ms` +, the data will be flushed into the database ### is_exactly_once[boolean] -Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to set `xa_data_source_class_name`. + +Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to +set `xa_data_source_class_name`. ### xa_data_source_class_name[string] -The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and please refer to appendix for other data sources + +The xa data source class name of the database Driver, for example, mysql is `com.mysql.cj.jdbc.MysqlXADataSource`, and +please refer to appendix for other data sources ### max_commit_attempts[int] + The number of retries for transaction commit failures ### transaction_timeout_sec[int] -The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect exactly-once semantics + +The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect +exactly-once semantics ## tips -In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases require some setup. For example, postgres needs to set `max_prepared_transactions > 1` + +In the case of is_exactly_once = "true", Xa transactions are used. This requires database support, and some databases +require some setup. For example, postgres needs to set `max_prepared_transactions > 1` Such as `ALTER SYSTEM set max_prepared_transactions to 10`. ## appendix + there are some reference value for params above. | datasource | driver | url | xa_data_source_class_name | maven | |------------|--------------------------|-------------------------------------------|-------------------------------------|---------------------------------------------------------------| | mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | com.mysql.cj.jdbc.MysqlXADataSource | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | | +| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | org.postgresql.xa.PGXADataSource | https://mvnrepository.com/artifact/org.postgresql/postgresql | | dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | dm.jdbc.driver.DmdbXADataSource | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | com.microsoft.sqlserver.jdbc.SQLServerXADataSource | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | ## Example + Simple + ``` jdbc { url = "jdbc:mysql://localhost/test" @@ -99,6 +126,7 @@ jdbc { ``` Exactly-once + ``` jdbc { diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 784d2d2648d..4d8575a9943 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -73,8 +73,9 @@ there are some reference value for params above. | datasource | driver | url | maven | |------------|--------------------------|-------------------------------------------|---------------------------------------------------------------| | mysql | com.mysql.cj.jdbc.Driver | jdbc:mysql://localhost:3306/test | https://mvnrepository.com/artifact/mysql/mysql-connector-java | -| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | | +| postgresql | org.postgresql.Driver | jdbc:postgresql://localhost:5432/postgres | https://mvnrepository.com/artifact/org.postgresql/postgresql | | dm | dm.jdbc.driver.DmDriver | jdbc:dm://localhost:5236 | https://mvnrepository.com/artifact/com.dameng/DmJdbcDriver18 | +| sqlserver | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:microsoft:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | ## Example simple: From 113a0bbe3be6f4f6c55ef23adcd6a042b101f2e7 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sun, 18 Sep 2022 17:17:06 +0800 Subject: [PATCH 04/12] [Feature][Connector-V2] add sqlserver jdbc driver into container. --- .../e2e/flink/v2/jdbc/JdbcSqlserverIT.java | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java index be3b038c56d..5318ed8ebbe 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java @@ -25,14 +25,18 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; @@ -40,12 +44,14 @@ import java.sql.Statement; import java.time.Duration; import java.util.List; +import java.util.Objects; import java.util.stream.Stream; @Slf4j public class JdbcSqlserverIT extends FlinkContainer { private MSSQLServerContainer mssqlServerContainer; + private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar"; @SuppressWarnings("checkstyle:MagicNumber") @BeforeEach @@ -126,15 +132,33 @@ public void tesSqlserverSourceAndSink() throws SQLException, IOException, Interr Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); Assertions.assertEquals(0, execResult.getExitCode()); // query result - String sql = "select * from sink"; + String sourceSql = "select * from source"; + String sinkSql = "select * from sink"; + List columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage"); + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - List result = Lists.newArrayList(); - while (resultSet.next()) { - result.add(resultSet.getString("ids")); + ResultSet sourceResultSet = statement.executeQuery(sourceSql); + ResultSet sinkResultSet = statement.executeQuery(sinkSql); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columns) { + Object source = sourceResultSet.getObject(column); + int sourceIndex = sourceResultSet.findColumn(column); + int sinkIndex = sinkResultSet.findColumn(column); + Object sink = sinkResultSet.getObject(column); + sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex); + InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); + } + } } - Assertions.assertFalse(result.isEmpty()); } } @@ -145,4 +169,10 @@ public void closeSqlserverContainer() { } } + @Override + protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/spark/seatunnel/plugins/Jdbc/lib && cd /tmp/spark/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Assertions.assertEquals(0, extraCommands.getExitCode()); + } + } From a7f26c528d6e831ade24e2f045b84d8505ebbafc Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sun, 18 Sep 2022 18:16:52 +0800 Subject: [PATCH 05/12] [Feature][Connector-V2] fix some bug --- .../connector-jdbc/pom.xml | 1 + .../sqlserver/SqlserverTypeMapper.java | 3 +- .../e2e/flink/v2/jdbc/JdbcSqlserverIT.java | 10 ++--- .../e2e/spark/v2/jdbc/JdbcSqlserverIT.java | 45 ++++++++++++++++--- 4 files changed, 48 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index e190001a3f6..5f942737ebb 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -67,6 +67,7 @@ com.microsoft.sqlserver mssql-jdbc ${sqlserver.version} + provided diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java index 913392cd2a2..d7a11563fb8 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/sqlserver/SqlserverTypeMapper.java @@ -94,9 +94,10 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) th case SQLSERVER_MONEY: case SQLSERVER_SMALLMONEY: return new DecimalType(precision, scale); - case SQLSERVER_FLOAT: case SQLSERVER_REAL: return BasicType.FLOAT_TYPE; + case SQLSERVER_FLOAT: + return BasicType.DOUBLE_TYPE; case SQLSERVER_CHAR: case SQLSERVER_NCHAR: case SQLSERVER_VARCHAR: diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java index 5318ed8ebbe..3cbeac8dfd0 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java @@ -137,9 +137,10 @@ public void tesSqlserverSourceAndSink() throws SQLException, IOException, Interr List columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage"); try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { - Statement statement = connection.createStatement(); - ResultSet sourceResultSet = statement.executeQuery(sourceSql); - ResultSet sinkResultSet = statement.executeQuery(sinkSql); + Statement sourceStatement = connection.createStatement(); + Statement sinkStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); while (sourceResultSet.next()) { if (sinkResultSet.next()) { for (String column : columns) { @@ -147,7 +148,6 @@ public void tesSqlserverSourceAndSink() throws SQLException, IOException, Interr int sourceIndex = sourceResultSet.findColumn(column); int sinkIndex = sinkResultSet.findColumn(column); Object sink = sinkResultSet.getObject(column); - sinkResultSet.getObject(column); if (!Objects.deepEquals(source, sink)) { InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex); InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex); @@ -171,7 +171,7 @@ public void closeSqlserverContainer() { @Override protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { - Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/spark/seatunnel/plugins/Jdbc/lib && cd /tmp/spark/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); Assertions.assertEquals(0, extraCommands.getExitCode()); } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java index 8e7f7e1c19a..cd933069aa7 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java @@ -25,25 +25,33 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MSSQLServerContainer; import org.testcontainers.containers.output.Slf4jLogConsumer; import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; import org.testcontainers.shaded.org.awaitility.Awaitility; import org.testcontainers.utility.DockerImageName; import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; import java.sql.Connection; import java.sql.DriverManager; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; +import java.util.List; +import java.util.Objects; import java.util.stream.Stream; @Slf4j public class JdbcSqlserverIT extends SparkContainer { private MSSQLServerContainer mssqlServerContainer; + private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/9.4.1.jre8/mssql-jdbc-9.4.1.jre8.jar"; @SuppressWarnings("checkstyle:MagicNumber") @BeforeEach @@ -124,12 +132,33 @@ public void tesSqlserverSourceAndSink() throws SQLException, IOException, Interr Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); Assertions.assertEquals(0, execResult.getExitCode()); // query result - String sql = "select * from sink minus select * from source;"; - try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { - Statement statement = connection.createStatement(); - ResultSet resultSet = statement.executeQuery(sql); - Assertions.assertTrue(resultSet.next()); + String sourceSql = "select * from source"; + String sinkSql = "select * from sink"; + List columns = Lists.newArrayList("ids", "name", "sfzh", "sort", "dz", "xchar", "xdecimal", "xfloat", "xnumeric", "xsmall", "xbit", "rq", "xrq", "xreal", "ximage"); + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement sourceStatement = connection.createStatement(); + Statement sinkStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columns) { + Object source = sourceResultSet.getObject(column); + int sourceIndex = sourceResultSet.findColumn(column); + int sinkIndex = sinkResultSet.findColumn(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(sourceIndex); + InputStream sinkAsciiStream = sourceResultSet.getBinaryStream(sinkIndex); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); + } + } + } } } @@ -140,4 +169,10 @@ public void closeSqlserverContainer() { } } + @Override + protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/spark/seatunnel/plugins/Jdbc/lib && cd /tmp/spark/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Assertions.assertEquals(0, extraCommands.getExitCode()); + } + } From d3759a19ae769452477cf5f3c9df7d8ee62cc0ed Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 20 Sep 2022 14:27:25 +0800 Subject: [PATCH 06/12] Update docs/en/connector-v2/sink/Jdbc.md Co-authored-by: Hisoka --- docs/en/connector-v2/sink/Jdbc.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index 4cbb9d94801..a778a97533e 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -37,7 +37,7 @@ support `Xa transactions`. You can set `is_exactly_once=true` to enable it. ### driver [string] The jdbc class name used to connect to the remote data source, if you use MySQL the value is com.mysql.cj.jdbc.Driver. -Warn: for license compliance, you have to provide MySQL JDBC driver yourself, e.g. copy mysql-connector-java-xxx.jar to +Warn: for license compliance, you have to provide any driver yourself like MySQL JDBC Driver, e.g. copy mysql-connector-java-xxx.jar to $SEATNUNNEL_HOME/lib for Standalone. ### user [string] From f7ca0aab68e4afcf0b5940480f3f43c135b12cc1 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Wed, 21 Sep 2022 11:13:27 +0800 Subject: [PATCH 07/12] [Feature][Connector-V2]jdbc-sqlserver change seatunnel home path --- .../org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java | 2 +- .../org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java index 3cbeac8dfd0..85360a2751a 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java @@ -171,7 +171,7 @@ public void closeSqlserverContainer() { @Override protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { - Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/flink/seatunnel/plugins/Jdbc/lib && cd /tmp/flink/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); Assertions.assertEquals(0, extraCommands.getExitCode()); } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java index cd933069aa7..053ea0b3779 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java @@ -171,7 +171,7 @@ public void closeSqlserverContainer() { @Override protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { - Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/spark/seatunnel/plugins/Jdbc/lib && cd /tmp/spark/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); + Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); Assertions.assertEquals(0, extraCommands.getExitCode()); } From c72699d5d12ceff36ea10730bd5664f6cb2812df Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Thu, 29 Sep 2022 12:36:58 +0800 Subject: [PATCH 08/12] fix some merge error --- seatunnel-connectors-v2/connector-jdbc/pom.xml | 13 +++++++------ .../connector-jdbc-flink-e2e/pom.xml | 1 - .../connector-jdbc-spark-e2e/pom.xml | 3 +-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index d05d7556283..25a94511793 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -33,6 +33,7 @@ 8.0.16 42.3.3 8.1.2.141 + 9.2.1.jre8 5.2.5-HBase-2.x @@ -62,6 +63,12 @@ ${dm-jdbc.version} provided + + com.microsoft.sqlserver + mssql-jdbc + ${sqlserver.version} + provided + @@ -70,19 +77,15 @@ mysql mysql-connector-java - ${mysql.version} org.postgresql postgresql - ${postgresql.version} com.dameng DmJdbcDriver18 - ${dm-jdbc.version} - provided com.aliyun.phoenix @@ -92,8 +95,6 @@ com.microsoft.sqlserver mssql-jdbc - ${sqlserver.version} - provided diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml index 1b8bec3f07c..e2706ce8717 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml @@ -115,7 +115,6 @@ com.microsoft.sqlserver mssql-jdbc - ${sqlserver.version} test diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index ccd4ab0b0d6..8ff933504da 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -78,8 +78,8 @@ mysql mysql-connector-java + test - org.postgresql postgresql @@ -104,7 +104,6 @@ com.microsoft.sqlserver mssql-jdbc - ${sqlserver.version} From 4e75da256f7d5d07f9746dd0f5e3c47d5d57f160 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Thu, 29 Sep 2022 13:12:06 +0800 Subject: [PATCH 09/12] fix some merge error --- .../apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java | 7 ------- .../apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java | 8 -------- 2 files changed, 15 deletions(-) diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java index 39004a35bbb..1fbc1e5ac01 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcDmdbIT.java @@ -60,7 +60,6 @@ public class JdbcDmdbIT extends FlinkContainer { private static final String DM_DRIVER_JAR = "https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar"; private Connection jdbcConnection; private GenericContainer dbServer; - private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar"; @BeforeEach public void startDmdbContainer() throws ClassNotFoundException { @@ -156,10 +155,4 @@ public void testJdbcDmdbSourceAndSink() throws IOException, InterruptedException + "DM_DECIMAL, DM_DEC, DM_REAL, DM_FLOAT, DM_DOUBLE_PRECISION, DM_DOUBLE, DM_CHAR, DM_CHARACTER, DM_VARCHAR, DM_VARCHAR2," + " DM_TEXT, DM_LONG, DM_LONGVARCHAR, DM_CLOB, DM_TIMESTAMP, DM_DATETIME, DM_TIME, DM_DATE, DM_BLOB, DM_BINARY, DM_VARBINARY, DM_LONGVARBINARY"); } - - @Override - protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { - Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); - Assertions.assertEquals(0, extraCommands.getExitCode()); - } } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java index 68a5a061350..2c0c8628a21 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcDmdbIT.java @@ -61,7 +61,6 @@ public class JdbcDmdbIT extends SparkContainer { public static final String DM_DRIVER_JAR = "https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.1.193/DmJdbcDriver18-8.1.1.193.jar"; private GenericContainer dbServer; private Connection jdbcConnection; - private static final String THIRD_PARTY_PLUGINS_URL = "https://repo1.maven.org/maven2/com/dameng/DmJdbcDriver18/8.1.2.141/DmJdbcDriver18-8.1.2.141.jar"; @BeforeEach public void beforeAllForDM() { @@ -181,11 +180,4 @@ public void testDMDBSourceToJdbcSink() throws SQLException, IOException, Interru "DM_LONG", "DM_LONGVARCHAR")); } - - @Override - protected void executeExtraCommands(GenericContainer container) throws IOException, InterruptedException { - Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + THIRD_PARTY_PLUGINS_URL); - Assertions.assertEquals(0, extraCommands.getExitCode()); - } - } From 9210881c1faae480d92603f0e643eda746062691 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Thu, 29 Sep 2022 13:37:02 +0800 Subject: [PATCH 10/12] fix `log has private access` --- .../apache/seatunnel/e2e/common/AbstractFlinkContainer.java | 4 ---- .../apache/seatunnel/e2e/common/AbstractSparkContainer.java | 4 ---- 2 files changed, 8 deletions(-) diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java index 7e6a1342c9f..aa185bafbaa 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer; -import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -32,7 +31,6 @@ * The before method will create a Flink cluster, and after method will close the Flink cluster. * You can use {@link AbstractFlinkContainer#executeJob} to submit a seatunnel config and run a seatunnel job. */ -@Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractFlinkContainer extends AbstractTestFlinkContainer { @@ -40,14 +38,12 @@ public abstract class AbstractFlinkContainer extends AbstractTestFlinkContainer @BeforeAll public void startUp() throws Exception { super.startUp(); - log.info("The TestContainer[{}] is running.", identifier()); } @Override @AfterAll public void tearDown() throws Exception { super.tearDown(); - log.info("The TestContainer[{}] is closed.", identifier()); } public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java index 41829ca43b8..54c7ace5f72 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java @@ -19,7 +19,6 @@ import org.apache.seatunnel.e2e.common.container.spark.AbstractTestSparkContainer; -import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -27,7 +26,6 @@ import java.io.IOException; -@Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractSparkContainer extends AbstractTestSparkContainer { @@ -35,13 +33,11 @@ public abstract class AbstractSparkContainer extends AbstractTestSparkContainer @BeforeAll public void startUp() throws Exception { super.startUp(); - log.info("The TestContainer[{}] is running.", identifier()); } @AfterAll public void tearDown() throws Exception { super.tearDown(); - log.info("The TestContainer[{}] is closed.", identifier()); } public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException { From 7ca1e2b8f510e6cc523a6916e7993d8e30714414 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Fri, 30 Sep 2022 15:07:27 +0800 Subject: [PATCH 11/12] revert --- .../apache/seatunnel/e2e/common/AbstractFlinkContainer.java | 4 ++++ .../apache/seatunnel/e2e/common/AbstractSparkContainer.java | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java index aa185bafbaa..7e6a1342c9f 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractFlinkContainer.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.e2e.common.container.flink.AbstractTestFlinkContainer; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -31,6 +32,7 @@ * The before method will create a Flink cluster, and after method will close the Flink cluster. * You can use {@link AbstractFlinkContainer#executeJob} to submit a seatunnel config and run a seatunnel job. */ +@Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractFlinkContainer extends AbstractTestFlinkContainer { @@ -38,12 +40,14 @@ public abstract class AbstractFlinkContainer extends AbstractTestFlinkContainer @BeforeAll public void startUp() throws Exception { super.startUp(); + log.info("The TestContainer[{}] is running.", identifier()); } @Override @AfterAll public void tearDown() throws Exception { super.tearDown(); + log.info("The TestContainer[{}] is closed.", identifier()); } public Container.ExecResult executeSeaTunnelFlinkJob(String confFile) throws IOException, InterruptedException { diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java index 54c7ace5f72..41829ca43b8 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/AbstractSparkContainer.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.e2e.common.container.spark.AbstractTestSparkContainer; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.TestInstance; @@ -26,6 +27,7 @@ import java.io.IOException; +@Slf4j @TestInstance(TestInstance.Lifecycle.PER_CLASS) public abstract class AbstractSparkContainer extends AbstractTestSparkContainer { @@ -33,11 +35,13 @@ public abstract class AbstractSparkContainer extends AbstractTestSparkContainer @BeforeAll public void startUp() throws Exception { super.startUp(); + log.info("The TestContainer[{}] is running.", identifier()); } @AfterAll public void tearDown() throws Exception { super.tearDown(); + log.info("The TestContainer[{}] is closed.", identifier()); } public Container.ExecResult executeSeaTunnelSparkJob(String confFile) throws IOException, InterruptedException { From 7e2fed4838b184067ff4268945816ec3625baf61 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Fri, 30 Sep 2022 15:10:29 +0800 Subject: [PATCH 12/12] fix some review problem --- .../connector-jdbc-flink-e2e/pom.xml | 2 +- .../connector-jdbc-spark-e2e/pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml index c38a3e6e908..f6220d9b07d 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml @@ -100,7 +100,7 @@ org.testcontainers mssqlserver - 1.17.3 + ${testcontainer.version} test diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index 047e8d729e6..ee7629f1498 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -89,7 +89,7 @@ org.testcontainers mssqlserver - 1.17.3 + ${testcontainer.version} test