diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
index e4361c3592e..dfe5b5dd51e 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/pom.xml
@@ -46,6 +46,16 @@
1.17.3
test
+
+ org.testcontainers
+ mssqlserver
+ 1.17.3
+ test
+
+
+ com.microsoft.sqlserver
+ mssql-jdbc
+ test
+
-
-
\ No newline at end of file
+
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
new file mode 100644
index 00000000000..be3b038c56d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/jdbc/JdbcSqlserverIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.jdbc;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSqlserverIT extends FlinkContainer {
+
+ private MSSQLServerContainer> mssqlServerContainer;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startSqlserverContainer() throws ClassNotFoundException, SQLException {
+ mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("sqlserver")
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(mssqlServerContainer)).join();
+ log.info("Sqlserver container started");
+ Class.forName(mssqlServerContainer.getDriverClassName());
+ Awaitility.given().ignoreExceptions()
+ .await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(this::initializeJdbcTable);
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sourceSql = "CREATE TABLE [source] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ String sinkSql = "CREATE TABLE [sink] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ statement.execute(sourceSql);
+ statement.execute(sinkSql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Sqlserver table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ private void batchInsertData() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ String sql =
+ "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " +
+ "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)";
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelFlinkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sql = "select * from sink";
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("ids"));
+ }
+ Assertions.assertFalse(result.isEmpty());
+ }
+ }
+
+ @AfterEach
+ public void closeSqlserverContainer() {
+ if (mssqlServerContainer != null) {
+ mssqlServerContainer.stop();
+ }
+ }
+
+}
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 00000000000..7f099b0aa4e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2022-latest
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
new file mode 100644
index 00000000000..52f18f1e083
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-jdbc-flink-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
@@ -0,0 +1,60 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set flink configuration here
+ execution.parallelism = 1
+ job.mode = "BATCH"
+ #execution.checkpoint.interval = 10000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
index 8d96fb66a38..8a7dac7c0d1 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/pom.xml
@@ -40,6 +40,17 @@
${project.version}
test
+
+ org.testcontainers
+ mssqlserver
+ 1.17.3
+ test
+
+
+ com.microsoft.sqlserver
+ mssql-jdbc
+ test
+
-
\ No newline at end of file
+
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
new file mode 100644
index 00000000000..219ca953bee
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/jdbc/JdbcSqlserverIT.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.jdbc;
+
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.MSSQLServerContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.shaded.com.google.common.collect.Lists;
+import org.testcontainers.shaded.org.awaitility.Awaitility;
+import org.testcontainers.utility.DockerImageName;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.List;
+import java.util.stream.Stream;
+
+@Slf4j
+public class JdbcSqlserverIT extends SparkContainer {
+
+ private MSSQLServerContainer> mssqlServerContainer;
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ @BeforeEach
+ public void startSqlServerContainer() throws ClassNotFoundException, SQLException {
+ mssqlServerContainer = new MSSQLServerContainer<>(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"))
+ .withNetwork(NETWORK)
+ .withNetworkAliases("sqlserver")
+ .withLogConsumer(new Slf4jLogConsumer(log));
+ Startables.deepStart(Stream.of(mssqlServerContainer)).join();
+ log.info("Sqlserver container started");
+ Class.forName(mssqlServerContainer.getDriverClassName());
+ Awaitility.given().ignoreExceptions()
+ .await()
+ .atMost(Duration.ofMinutes(1))
+ .untilAsserted(this::initializeJdbcTable);
+ batchInsertData();
+ }
+
+ private void initializeJdbcTable() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ String sourceSql = "CREATE TABLE [source] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ String sinkSql = "CREATE TABLE [sink] (\n" +
+ " [ids] bigint NOT NULL,\n" +
+ " [name] text COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sfzh] varchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [sort] int NULL,\n" +
+ " [dz] nvarchar(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xchar] char(255) COLLATE Chinese_PRC_CI_AS NULL,\n" +
+ " [xdecimal] decimal(18) NULL,\n" +
+ " [xfloat] float(53) NULL,\n" +
+ " [xnumeric] numeric(18) NULL,\n" +
+ " [xsmall] smallint NULL,\n" +
+ " [xbit] bit NULL,\n" +
+ " [rq] datetime DEFAULT NULL NULL,\n" +
+ " [xrq] smalldatetime NULL,\n" +
+ " [xreal] real NULL,\n" +
+ " [ximage] image NULL\n" +
+ ")";
+ statement.execute(sourceSql);
+ statement.execute(sinkSql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Initializing Sqlserver table failed!", e);
+ }
+ }
+
+ @SuppressWarnings("checkstyle:RegexpSingleline")
+ private void batchInsertData() {
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ String sql =
+ "INSERT INTO [source] ([ids], [name], [sfzh], [sort], [dz], [xchar], [xdecimal], [xfloat], [xnumeric], [xsmall], [xbit], [rq], [xrq], [xreal], [ximage]) " +
+ "VALUES (1504057, '张三', '3ee98c990e2011eda8fd00ff27b3340d', 1, N'3232', 'qwq', 1, 19.1, 2, 1, '0', '2022-07-26 11:58:46.000', '2022-07-26 13:49:00', 2, 0x)";
+ Statement statement = connection.createStatement();
+ statement.execute(sql);
+ } catch (SQLException e) {
+ throw new RuntimeException("Batch insert data failed!", e);
+ }
+ }
+
+ @Test
+ public void tesSqlserverSourceAndSink() throws SQLException, IOException, InterruptedException {
+ Container.ExecResult execResult = executeSeaTunnelSparkJob("/jdbc/jdbc_sqlserver_source_to_sink.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ // query result
+ String sql = "select * from sink";
+ try (Connection connection = DriverManager.getConnection(mssqlServerContainer.getJdbcUrl(), mssqlServerContainer.getUsername(), mssqlServerContainer.getPassword())) {
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(sql);
+ List result = Lists.newArrayList();
+ while (resultSet.next()) {
+ result.add(resultSet.getString("ids"));
+ }
+ Assertions.assertFalse(result.isEmpty());
+ }
+ }
+
+ @AfterEach
+ public void closeSqlserverContainer() {
+ if (mssqlServerContainer != null) {
+ mssqlServerContainer.stop();
+ }
+ }
+
+}
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt
new file mode 100644
index 00000000000..7f099b0aa4e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/container-license-acceptance.txt
@@ -0,0 +1 @@
+mcr.microsoft.com/mssql/server:2022-latest
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
new file mode 100644
index 00000000000..98d8451de93
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-jdbc-spark-e2e/src/test/resources/jdbc/jdbc_sqlserver_source_to_sink.conf
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the feature source plugin**
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "select name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage from source"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of source plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc
+}
+
+transform {
+
+ # If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
+ # please go to https://seatunnel.apache.org/docs/transform/sql
+}
+
+sink {
+ Jdbc {
+ driver = com.microsoft.sqlserver.jdbc.SQLServerDriver
+ url = "jdbc:sqlserver://sqlserver;encrypt=false;"
+ user = SA
+ password = "A_Str0ng_Required_Password"
+ query = "insert into sink(name,ids,sfzh,sort,dz,xchar,xdecimal,xfloat,xnumeric,xsmall,xbit,rq,xrq,xreal,ximage) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"
+ }
+
+ # If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
+ # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc
+}