From 769f2a75b9a65580ca253ee2ab0cd65803e6cce5 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 6 Sep 2022 20:40:33 +0800 Subject: [PATCH] [Feature][Connector-V2] add sqlserver connector e2e test --- .../connector-jdbc-flink-e2e/pom.xml | 14 +- .../e2e/flink/v2/jdbc/JdbcSqlserverIT.java | 148 ++++++++++++++++++ .../container-license-acceptance.txt | 1 + .../jdbc/jdbc_sqlserver_source_to_sink.conf | 60 +++++++ .../connector-jdbc-spark-e2e/pom.xml | 13 +- .../e2e/spark/v2/jdbc/JdbcSqlserverIT.java | 148 ++++++++++++++++++ .../container-license-acceptance.txt | 1 + .../jdbc/jdbc_sqlserver_source_to_sink.conf | 62 ++++++++ 8 files changed, 444 insertions(+), 3 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt create mode 100644 seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml index e4361c3592e..dfe5b5dd51e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml @@ -46,6 +46,16 @@ 1.17.3 test + + org.testcontainers + mssqlserver + 1.17.3 + test + + + com.microsoft.sqlserver + mssql-jdbc + test + - - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java new file mode 100644 index 00000000000..be3b038c56d --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.flink.v2.jdbc; + +import org.apache.seatunnel.e2e.flink.FlinkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +@Slf4j +public class JdbcSqlserverIT extends FlinkContainer { + + private MSSQLServerContainer mssqlServerContainer; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startSqlserverContainer() throws ClassNotFoundException, SQLException { + mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest")) + .withNetwork(NETWORK) + .withNetworkAliases("sqlserver") + .withLogConsumer(new Slf4jLogConsumer(log)); + Startables.deepStart(Stream.of(mssqlServerContainer)).join(); + log.info("Sqlserver container started"); + Class.forName(mssqlServerContainer.getDriverClassName()); + Awaitility.given().ignoreExceptions() + .await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(this::initializeJdbcTable); + batchInsertData(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + String sourceSql = "CREATE TABLE [source] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + String sinkSql = "CREATE TABLE [sink] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + statement.execute(sourceSql); + statement.execute(sinkSql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Sqlserver table failed!", e); + } + } + + @SuppressWarnings("checkstyle:RegexpSingleline") + private void batchInsertData() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + String sql = + "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " + + "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)"; + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + @Test + public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // query result + String sql = "select * from sink"; + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + List result = Lists.newArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("ids")); + } + Assertions.assertFalse(result.isEmpty()); + } + } + + @AfterEach + public void closeSqlserverContainer() { + if (mssqlServerContainer != null) { + mssqlServerContainer.stop(); + } + } + +} diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt new file mode 100644 index 00000000000..7f099b0aa4e --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt @@ -0,0 +1 @@ +mcr.microsoft.com/mssql/server:2022-latest diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf new file mode 100644 index 00000000000..52f18f1e083 --- /dev/null +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml index 8d96fb66a38..8a7dac7c0d1 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml @@ -40,6 +40,17 @@ ${project.version} test + + org.testcontainers + mssqlserver + 1.17.3 + test + + + com.microsoft.sqlserver + mssql-jdbc + test + - \ No newline at end of file + diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java new file mode 100644 index 00000000000..219ca953bee --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.spark.v2.jdbc; + +import org.apache.seatunnel.e2e.spark.SparkContainer; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.MSSQLServerContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testcontainers.utility.DockerImageName; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +@Slf4j +public class JdbcSqlserverIT extends SparkContainer { + + private MSSQLServerContainer mssqlServerContainer; + + @SuppressWarnings("checkstyle:MagicNumber") + @BeforeEach + public void startSqlServerContainer() throws ClassNotFoundException, SQLException { + mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest")) + .withNetwork(NETWORK) + .withNetworkAliases("sqlserver") + .withLogConsumer(new Slf4jLogConsumer(log)); + Startables.deepStart(Stream.of(mssqlServerContainer)).join(); + log.info("Sqlserver container started"); + Class.forName(mssqlServerContainer.getDriverClassName()); + Awaitility.given().ignoreExceptions() + .await() + .atMost(Duration.ofMinutes(1)) + .untilAsserted(this::initializeJdbcTable); + batchInsertData(); + } + + private void initializeJdbcTable() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + String sourceSql = "CREATE TABLE [source] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + String sinkSql = "CREATE TABLE [sink] (\n" + + " [ids] bigint NOT NULL,\n" + + " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [sort] int NULL,\n" + + " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" + + " [xdecimal] decimal(18) NULL,\n" + + " [xfloat] float(53) NULL,\n" + + " [xnumeric] numeric(18) NULL,\n" + + " [xsmall] smallint NULL,\n" + + " [xbit] bit NULL,\n" + + " [rq] datetime DEFAULT NULL NULL,\n" + + " [xrq] smalldatetime NULL,\n" + + " [xreal] real NULL,\n" + + " [ximage] image NULL\n" + + ")"; + statement.execute(sourceSql); + statement.execute(sinkSql); + } catch (SQLException e) { + throw new RuntimeException("Initializing Sqlserver table failed!", e); + } + } + + @SuppressWarnings("checkstyle:RegexpSingleline") + private void batchInsertData() { + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + String sql = + "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " + + "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)"; + Statement statement = connection.createStatement(); + statement.execute(sql); + } catch (SQLException e) { + throw new RuntimeException("Batch insert data failed!", e); + } + } + + @Test + public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + // query result + String sql = "select * from sink"; + try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) { + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(sql); + List result = Lists.newArrayList(); + while (resultSet.next()) { + result.add(resultSet.getString("ids")); + } + Assertions.assertFalse(result.isEmpty()); + } + } + + @AfterEach + public void closeSqlserverContainer() { + if (mssqlServerContainer != null) { + mssqlServerContainer.stop(); + } + } + +} diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt new file mode 100644 index 00000000000..7f099b0aa4e --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt @@ -0,0 +1 @@ +mcr.microsoft.com/mssql/server:2022-latest diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf new file mode 100644 index 00000000000..98d8451de93 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf @@ -0,0 +1,62 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://sqlserver;encrypt=false;" + user = SA + password = "A_Str0ng_Required_Password" + query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +}