diff --git a/docs/content.zh/docs/connectors/table/jdbc.md b/docs/content.zh/docs/connectors/table/jdbc.md
index 9a812fc891815..f94ce03ca1b56 100644
--- a/docs/content.zh/docs/connectors/table/jdbc.md
+++ b/docs/content.zh/docs/connectors/table/jdbc.md
@@ -54,6 +54,7 @@ JDBC 连接器不是二进制发行版的一部分,请查阅[这里]({{< ref "
| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [下载](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8)
| PostgreSQL | `org.postgresql` | `postgresql` | [下载](https://jdbc.postgresql.org/download.html) |
| Derby | `org.apache.derby` | `derby` | [下载](http://db.apache.org/derby/derby_downloads.html) | |
+| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [下载](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
当前,JDBC 连接器和驱动不在 Flink 二进制发布包中,请参阅[这里]({{< ref "docs/dev/configuration" >}})了解在集群上执行时何连接它们。
@@ -340,6 +341,13 @@ lookup cache 的主要目的是用于提高时态表关联 JDBC 连接器的性
PostgreSQL |
INSERT .. ON CONFLICT .. DO UPDATE SET .. |
+
+ | MS SQL Server |
+ MERGE INTO .. USING (..) ON (..)
+ WHEN MATCHED THEN UPDATE SET (..)
+ WHEN NOT MATCHED THEN INSERT (..)
+ VALUES (..) |
+
@@ -552,6 +560,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
MySQL type |
Oracle type |
PostgreSQL type |
+ SQL Server type |
}}">Flink SQL type |
@@ -561,6 +570,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
|
|
TINYINT |
+ TINYINT |
@@ -573,6 +583,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
SMALLSERIAL
SERIAL2 |
SMALLINT |
+ SMALLINT |
@@ -584,6 +595,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
INTEGER
SERIAL |
INT |
+ INT |
@@ -594,11 +606,13 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
BIGINT
BIGSERIAL |
BIGINT |
+ BIGINT |
BIGINT UNSIGNED |
|
|
+ |
DECIMAL(20, 0) |
@@ -614,6 +628,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
REAL
FLOAT4 |
+ REAL |
FLOAT |
@@ -624,6 +639,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
FLOAT8
DOUBLE PRECISION |
+ FLOAT |
DOUBLE |
@@ -640,6 +656,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s) |
+ DECIMAL(p, s) |
@@ -647,6 +664,7 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
TINYINT(1) |
|
BOOLEAN |
+ BIT |
BOOLEAN |
@@ -654,17 +672,23 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
DATE |
DATE |
DATE |
+ DATE |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
+ TIME(0) |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+
+ DATETIME
+ DATETIME2
+ |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
@@ -682,6 +706,13 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
+
+ CHAR(n)
+ NCHAR(n)
+ VARCHAR(n)
+ NVARCHAR(n)
+ TEXT
+ NTEXT |
STRING |
@@ -693,12 +724,16 @@ Flink 支持连接到多个使用方言(dialect)的数据库,如 MySQL、O
RAW(s)
BLOB
BYTEA |
+
+ BINARY(n)
+ VARBINARY(n)
|
BYTES |
|
|
ARRAY |
+ |
ARRAY |
diff --git a/docs/content/docs/connectors/table/jdbc.md b/docs/content/docs/connectors/table/jdbc.md
index 9f38f44bd8230..255d7ae08d863 100644
--- a/docs/content/docs/connectors/table/jdbc.md
+++ b/docs/content/docs/connectors/table/jdbc.md
@@ -45,12 +45,13 @@ See how to link with it for cluster execution [here]({{< ref "docs/dev/configura
A driver dependency is also required to connect to a specified database. Here are drivers currently supported:
-| Driver | Group Id | Artifact Id | JAR |
-| :-----------| :------------------| :----------------------| :----------------|
-| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
-| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
-| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download.html) |
-| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| Driver | Group Id | Artifact Id | JAR |
+|:-----------| :------------------| :----------------------| :----------------|
+| MySQL | `mysql` | `mysql-connector-java` | [Download](https://repo.maven.apache.org/maven2/mysql/mysql-connector-java/) |
+| Oracle | `com.oracle.database.jdbc` | `ojdbc8` | [Download](https://mvnrepository.com/artifact/com.oracle.database.jdbc/ojdbc8) |
+| PostgreSQL | `org.postgresql` | `postgresql` | [Download](https://jdbc.postgresql.org/download.html) |
+| Derby | `org.apache.derby` | `derby` | [Download](http://db.apache.org/derby/derby_downloads.html) |
+| SQL Server | `com.microsoft.sqlserver` | `mssql-jdbc` | [Download](https://docs.microsoft.com/en-us/sql/connect/jdbc/download-microsoft-jdbc-driver-for-sql-server?view=sql-server-ver16) |
JDBC connector and drivers are not part of Flink's binary distribution. See how to link with them for cluster execution [here]({{< ref "docs/dev/configuration" >}}).
@@ -347,6 +348,13 @@ As there is no standard syntax for upsert, the following table describes the dat
PostgreSQL |
INSERT .. ON CONFLICT .. DO UPDATE SET .. |
+
+ | MS SQL Server |
+ MERGE INTO .. USING (..) ON (..)
+ WHEN MATCHED THEN UPDATE SET (..)
+ WHEN NOT MATCHED THEN INSERT (..)
+ VALUES (..) |
+
@@ -546,6 +554,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
MySQL type |
Oracle type |
PostgreSQL type |
+ SQL Server type |
}}">Flink SQL type |
@@ -555,6 +564,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
|
|
TINYINT |
+ TINYINT |
@@ -567,6 +577,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
SMALLSERIAL
SERIAL2 |
SMALLINT |
+ SMALLINT |
@@ -578,6 +589,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
INTEGER
SERIAL |
INT |
+ INT |
@@ -588,18 +600,14 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
BIGINT
BIGSERIAL |
BIGINT |
+ BIGINT |
BIGINT UNSIGNED |
|
|
- DECIMAL(20, 0) |
-
-
- BIGINT |
|
- BIGINT |
- BIGINT |
+ DECIMAL(20, 0) |
FLOAT |
@@ -608,6 +616,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
REAL
FLOAT4 |
+ REAL |
FLOAT |
@@ -618,6 +627,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
FLOAT8
DOUBLE PRECISION |
+ FLOAT |
DOUBLE |
@@ -634,6 +644,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
NUMERIC(p, s)
DECIMAL(p, s)
DECIMAL(p, s) |
+ DECIMAL(p, s) |
@@ -641,6 +652,7 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
TINYINT(1) |
|
BOOLEAN |
+ BIT |
BOOLEAN |
@@ -648,17 +660,23 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
DATE |
DATE |
DATE |
+ DATE |
TIME [(p)] |
DATE |
TIME [(p)] [WITHOUT TIMEZONE] |
+ TIME(0) |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
+
+ DATETIME
+ DATETIME2
+ |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
@@ -676,6 +694,13 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
VARCHAR(n)
CHARACTER VARYING(n)
TEXT
+
+ CHAR(n)
+ NCHAR(n)
+ VARCHAR(n)
+ NVARCHAR(n)
+ TEXT
+ NTEXT |
STRING |
@@ -687,12 +712,16 @@ Flink supports connect to several databases which uses dialect like MySQL, Oracl
RAW(s)
BLOB
BYTEA |
+
+ BINARY(n)
+ VARBINARY(n)
|
BYTES |
|
|
ARRAY |
+ |
ARRAY |
diff --git a/flink-connectors/flink-connector-jdbc/pom.xml b/flink-connectors/flink-connector-jdbc/pom.xml
index 87361b47673fd..157dcf54b7e68 100644
--- a/flink-connectors/flink-connector-jdbc/pom.xml
+++ b/flink-connectors/flink-connector-jdbc/pom.xml
@@ -68,6 +68,14 @@ under the License.
provided
+
+
+ com.microsoft.sqlserver
+ mssql-jdbc
+ 10.2.1.jre8
+ provided
+
+
@@ -157,6 +165,14 @@ under the License.
test
+
+
+ org.testcontainers
+ mssqlserver
+ 1.17.3
+ test
+
+
com.h2database
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java
new file mode 100644
index 0000000000000..22663c1132453
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialect.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.converter.JdbcRowConverter;
+import org.apache.flink.connector.jdbc.dialect.AbstractDialect;
+import org.apache.flink.connector.jdbc.internal.converter.SqlServerRowConverter;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/** JDBC dialect for SqlServer. */
+@Internal
+public class SqlServerDialect extends AbstractDialect {
+ @Override
+ public String dialectName() {
+ return "SqlServer";
+ }
+
+ /**
+ * The maximum precision is supported by datetime2.
+ * https://docs.microsoft.com/en-us/sql/t-sql/data-types/datetime2-transact-sql?view=sql-server-ver16
+ */
+ @Override
+ public Optional timestampPrecisionRange() {
+ return Optional.of(Range.of(0, 7));
+ }
+
+ /**
+ * The maximum precision is supported by decimal.
+ * https://docs.microsoft.com/en-us/sql/t-sql/data-types/precision-scale-and-length-transact-sql?view=sql-server-ver16
+ */
+ @Override
+ public Optional decimalPrecisionRange() {
+ return Optional.of(Range.of(0, 38));
+ }
+
+ @Override
+ public Optional defaultDriverName() {
+ return Optional.of("com.microsoft.sqlserver.jdbc.SQLServerDriver");
+ }
+
+ @Override
+ public String quoteIdentifier(String identifier) {
+ return identifier;
+ }
+
+ @Override
+ public Optional getUpsertStatement(
+ String tableName, String[] fieldNames, String[] uniqueKeyFields) {
+ List nonUniqueKeyFields =
+ Arrays.stream(fieldNames)
+ .filter(f -> !Arrays.asList(uniqueKeyFields).contains(f))
+ .collect(Collectors.toList());
+ String fieldsProjection =
+ Arrays.stream(fieldNames)
+ .map(this::quoteIdentifier)
+ .collect(Collectors.joining(", "));
+
+ String valuesBinding =
+ Arrays.stream(fieldNames)
+ .map(f -> ":" + f + " " + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ String usingClause = String.format("SELECT %s", valuesBinding);
+ String onConditions =
+ Arrays.stream(uniqueKeyFields)
+ .map(
+ f ->
+ "[TARGET]."
+ + quoteIdentifier(f)
+ + "=[SOURCE]."
+ + quoteIdentifier(f))
+ .collect(Collectors.joining(" AND "));
+ String updateSetClause =
+ nonUniqueKeyFields.stream()
+ .map(
+ f ->
+ "[TARGET]."
+ + quoteIdentifier(f)
+ + "=[SOURCE]."
+ + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+
+ String insertValues =
+ Arrays.stream(fieldNames)
+ .map(f -> "[SOURCE]." + quoteIdentifier(f))
+ .collect(Collectors.joining(", "));
+ return Optional.of(
+ String.format(
+ "MERGE INTO %s AS [TARGET]"
+ + " USING (%s) AS [SOURCE]"
+ + " ON (%s)"
+ + " WHEN MATCHED THEN"
+ + " UPDATE SET %s"
+ + " WHEN NOT MATCHED THEN"
+ + " INSERT (%s) VALUES (%s);",
+ quoteIdentifier(tableName),
+ usingClause,
+ onConditions,
+ updateSetClause,
+ fieldsProjection,
+ insertValues));
+ }
+
+ @Override
+ public JdbcRowConverter getRowConverter(RowType rowType) {
+ return new SqlServerRowConverter(rowType);
+ }
+
+ @Override
+ public String getLimitClause(long limit) {
+ throw new IllegalArgumentException("SqlServerDialect does not support limit clause");
+ }
+
+ @Override
+ public Set supportedTypes() {
+ return EnumSet.of(
+ LogicalTypeRoot.CHAR,
+ LogicalTypeRoot.VARCHAR,
+ LogicalTypeRoot.BOOLEAN,
+ LogicalTypeRoot.BINARY,
+ LogicalTypeRoot.VARBINARY,
+ LogicalTypeRoot.DECIMAL,
+ LogicalTypeRoot.TINYINT,
+ LogicalTypeRoot.SMALLINT,
+ LogicalTypeRoot.INTEGER,
+ LogicalTypeRoot.BIGINT,
+ LogicalTypeRoot.FLOAT,
+ LogicalTypeRoot.DOUBLE,
+ LogicalTypeRoot.DATE,
+ LogicalTypeRoot.TIME_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
+ LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE);
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialectFactory.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialectFactory.java
new file mode 100644
index 0000000000000..3e7116692eb2d
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerDialectFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory;
+
+/** Factory for {@link SqlServerDialect}. */
+@Internal
+public class SqlServerDialectFactory implements JdbcDialectFactory {
+ @Override
+ public boolean acceptsURL(String url) {
+ return url.startsWith("jdbc:sqlserver:");
+ }
+
+ @Override
+ public JdbcDialect create() {
+ return new SqlServerDialect();
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/SqlServerRowConverter.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/SqlServerRowConverter.java
new file mode 100644
index 0000000000000..4e8a1af418609
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/converter/SqlServerRowConverter.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.internal.converter;
+
+import org.apache.flink.connector.jdbc.converter.AbstractJdbcRowConverter;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+/**
+ * Runtime converter that responsible to convert between JDBC object and Flink internal object for
+ * MsSql.
+ */
+public class SqlServerRowConverter extends AbstractJdbcRowConverter {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String converterName() {
+ return "SqlServer";
+ }
+
+ public SqlServerRowConverter(RowType rowType) {
+ super(rowType);
+ }
+
+ @Override
+ protected JdbcDeserializationConverter createInternalConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case TINYINT:
+ return val -> ((Short) val).byteValue();
+ default:
+ return super.createInternalConverter(type);
+ }
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
index 2142dc936b7bc..20c1f6ad067e6 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
+++ b/flink-connectors/flink-connector-jdbc/src/main/resources/META-INF/services/org.apache.flink.connector.jdbc.dialect.JdbcDialectFactory
@@ -17,3 +17,4 @@ org.apache.flink.connector.jdbc.dialect.derby.DerbyDialectFactory
org.apache.flink.connector.jdbc.dialect.mysql.MySqlDialectFactory
org.apache.flink.connector.jdbc.dialect.psql.PostgresDialectFactory
org.apache.flink.connector.jdbc.dialect.oracle.OracleDialectFactory
+org.apache.flink.connector.jdbc.dialect.sqlserver.SqlServerDialectFactory
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java
new file mode 100644
index 0000000000000..a137dbe2d518a
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerPreparedStatementTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectLoader;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link SqlServerPreparedStatementTest}. */
+class SqlServerPreparedStatementTest {
+
+ private final JdbcDialect dialect =
+ JdbcDialectLoader.load("jdbc:sqlserver://localhost:3306/test");
+ private final String[] fieldNames =
+ new String[] {"id", "name", "email", "ts", "field1", "field_2", "__field_3__"};
+ private final String[] keyFields = new String[] {"id", "__field_3__"};
+ private final String tableName = "tbl";
+
+ @Test
+ void testInsertStatement() {
+ String insertStmt = dialect.getInsertIntoStatement(tableName, fieldNames);
+ assertThat(insertStmt)
+ .isEqualTo(
+ "INSERT INTO tbl(id, name, email, ts, field1, field_2, __field_3__) "
+ + "VALUES (:id, :name, :email, :ts, :field1, :field_2, :__field_3__)");
+ }
+
+ @Test
+ void testDeleteStatement() {
+ String deleteStmt = dialect.getDeleteStatement(tableName, keyFields);
+ assertThat(deleteStmt)
+ .isEqualTo("DELETE FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
+ }
+
+ @Test
+ void testRowExistsStatement() {
+ String rowExistStmt = dialect.getRowExistsStatement(tableName, keyFields);
+ assertThat(rowExistStmt)
+ .isEqualTo("SELECT 1 FROM tbl WHERE id = :id AND __field_3__ = :__field_3__");
+ }
+
+ @Test
+ void testUpdateStatement() {
+ String updateStmt = dialect.getUpdateStatement(tableName, fieldNames, keyFields);
+ assertThat(updateStmt)
+ .isEqualTo(
+ "UPDATE tbl SET id = :id, name = :name, email = :email, ts = :ts, "
+ + "field1 = :field1, field_2 = :field_2, __field_3__ = :__field_3__ "
+ + "WHERE id = :id AND __field_3__ = :__field_3__");
+ }
+
+ @Test
+ void testUpsertStatement() {
+ String upsertStmt = dialect.getUpsertStatement(tableName, fieldNames, keyFields).get();
+ assertThat(upsertStmt)
+ .isEqualTo(
+ "MERGE INTO tbl AS [TARGET]"
+ + " USING (SELECT :id id, :name name, :email email, :ts ts, :field1 field1, :field_2 field_2, :__field_3__ __field_3__) AS [SOURCE]"
+ + " ON ([TARGET].id=[SOURCE].id AND [TARGET].__field_3__=[SOURCE].__field_3__)"
+ + " WHEN MATCHED THEN UPDATE SET [TARGET].name=[SOURCE].name, [TARGET].email=[SOURCE].email,"
+ + " [TARGET].ts=[SOURCE].ts, [TARGET].field1=[SOURCE].field1, [TARGET].field_2=[SOURCE].field_2"
+ + " WHEN NOT MATCHED THEN INSERT (id, name, email, ts, field1, field_2, __field_3__)"
+ + " VALUES ([SOURCE].id, [SOURCE].name, [SOURCE].email, [SOURCE].ts, [SOURCE].field1, [SOURCE].field_2, [SOURCE].__field_3__);");
+ }
+
+ @Test
+ void testSelectStatement() {
+ String selectStmt = dialect.getSelectFromStatement(tableName, fieldNames, keyFields);
+ assertThat(selectStmt)
+ .isEqualTo(
+ "SELECT id, name, email, ts, field1, field_2, __field_3__ FROM tbl "
+ + "WHERE id = :id AND __field_3__ = :__field_3__");
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
new file mode 100644
index 0000000000000..0227a5878dc9c
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSinkITCase.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.jdbc.internal.GenericJdbcSinkFunction;
+import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkContextUtil;
+import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
+import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.SinkFunctionProvider;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.factories.TestValuesTableFactory;
+import org.apache.flink.table.planner.runtime.utils.TestData;
+import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.connector.jdbc.internal.JdbcTableOutputFormatTest.check;
+import static org.apache.flink.table.api.Expressions.$;
+import static org.apache.flink.table.factories.utils.FactoryMocks.createTableSink;
+
+/** The Table Sink ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSinkITCase extends AbstractTestBase {
+
+ private static final MSSQLServerContainer container =
+ new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
+ .acceptLicense();
+ private static String containerUrl;
+
+ public static final String OUTPUT_TABLE1 = "dynamicSinkForUpsert";
+ public static final String OUTPUT_TABLE2 = "dynamicSinkForAppend";
+ public static final String OUTPUT_TABLE3 = "dynamicSinkForBatch";
+ public static final String OUTPUT_TABLE4 = "REAL_TABLE";
+ public static final String OUTPUT_TABLE5 = "checkpointTable";
+ public static final String USER_TABLE = "USER_TABLE";
+
+ @BeforeClass
+ public static void beforeAll() throws ClassNotFoundException, SQLException {
+ container.start();
+ containerUrl =
+ String.format(
+ "%s;username=%s;password=%s",
+ container.getJdbcUrl(), container.getUsername(), container.getPassword());
+ Class.forName(container.getDriverClassName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ containerUrl, container.getUsername(), container.getPassword());
+ Statement stat = conn.createStatement()) {
+ stat.executeUpdate(
+ "CREATE TABLE "
+ + OUTPUT_TABLE1
+ + " ("
+ + "cnt FLOAT DEFAULT 0 NOT NULL,"
+ + "lencnt FLOAT DEFAULT 0 NOT NULL,"
+ + "cTag INT DEFAULT 0 NOT NULL,"
+ + "ts DATETIME2,"
+ + "CONSTRAINT PK1 PRIMARY KEY CLUSTERED (cnt, cTag))");
+
+ stat.executeUpdate(
+ "CREATE TABLE "
+ + OUTPUT_TABLE2
+ + " ("
+ + "id INT DEFAULT 0 NOT NULL,"
+ + "num INT DEFAULT 0 NOT NULL,"
+ + "ts DATETIME2)");
+
+ stat.executeUpdate(
+ "CREATE TABLE "
+ + OUTPUT_TABLE3
+ + " ("
+ + "NAME VARCHAR(20) NOT NULL,"
+ + "SCORE INT DEFAULT 0 NOT NULL)");
+
+ stat.executeUpdate("CREATE TABLE " + OUTPUT_TABLE4 + " (real_data REAL)");
+
+ stat.executeUpdate(
+ "CREATE TABLE " + OUTPUT_TABLE5 + " (" + "id BIGINT DEFAULT 0 NOT NULL)");
+
+ stat.executeUpdate(
+ "CREATE TABLE "
+ + USER_TABLE
+ + " ("
+ + "user_id VARCHAR(20) NOT NULL,"
+ + "user_name VARCHAR(20) NOT NULL,"
+ + "email VARCHAR(255),"
+ + "balance DECIMAL(18,2),"
+ + "balance2 DECIMAL(18,2),"
+ + "CONSTRAINT PK2 PRIMARY KEY CLUSTERED (user_id))");
+ }
+ }
+
+ @AfterClass
+ public static void afterAll() throws Exception {
+ TestValuesTableFactory.clearAllData();
+ Class.forName(container.getDriverClassName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ containerUrl, container.getUsername(), container.getPassword());
+ Statement stat = conn.createStatement()) {
+ stat.execute("DROP TABLE " + OUTPUT_TABLE1);
+ stat.execute("DROP TABLE " + OUTPUT_TABLE2);
+ stat.execute("DROP TABLE " + OUTPUT_TABLE3);
+ stat.execute("DROP TABLE " + OUTPUT_TABLE4);
+ stat.execute("DROP TABLE " + OUTPUT_TABLE5);
+ stat.execute("DROP TABLE " + USER_TABLE);
+ }
+ container.stop();
+ }
+
+ public static DataStream> get4TupleDataStream(
+ StreamExecutionEnvironment env) {
+ List> data = new ArrayList<>();
+ data.add(new Tuple4<>(1, 1L, "Hi", Timestamp.valueOf("1970-01-01 00:00:00.001")));
+ data.add(new Tuple4<>(2, 2L, "Hello", Timestamp.valueOf("1970-01-01 00:00:00.002")));
+ data.add(new Tuple4<>(3, 2L, "Hello world", Timestamp.valueOf("1970-01-01 00:00:00.003")));
+ data.add(
+ new Tuple4<>(
+ 4,
+ 3L,
+ "Hello world, how are you?",
+ Timestamp.valueOf("1970-01-01 00:00:00.004")));
+ data.add(new Tuple4<>(5, 3L, "I am fine.", Timestamp.valueOf("1970-01-01 00:00:00.005")));
+ data.add(
+ new Tuple4<>(
+ 6, 3L, "Luke Skywalker", Timestamp.valueOf("1970-01-01 00:00:00.006")));
+ data.add(new Tuple4<>(7, 4L, "Comment#1", Timestamp.valueOf("1970-01-01 00:00:00.007")));
+ data.add(new Tuple4<>(8, 4L, "Comment#2", Timestamp.valueOf("1970-01-01 00:00:00.008")));
+ data.add(new Tuple4<>(9, 4L, "Comment#3", Timestamp.valueOf("1970-01-01 00:00:00.009")));
+ data.add(new Tuple4<>(10, 4L, "Comment#4", Timestamp.valueOf("1970-01-01 00:00:00.010")));
+ data.add(new Tuple4<>(11, 5L, "Comment#5", Timestamp.valueOf("1970-01-01 00:00:00.011")));
+ data.add(new Tuple4<>(12, 5L, "Comment#6", Timestamp.valueOf("1970-01-01 00:00:00.012")));
+ data.add(new Tuple4<>(13, 5L, "Comment#7", Timestamp.valueOf("1970-01-01 00:00:00.013")));
+ data.add(new Tuple4<>(14, 5L, "Comment#8", Timestamp.valueOf("1970-01-01 00:00:00.014")));
+ data.add(new Tuple4<>(15, 5L, "Comment#9", Timestamp.valueOf("1970-01-01 00:00:00.015")));
+ data.add(new Tuple4<>(16, 6L, "Comment#10", Timestamp.valueOf("1970-01-01 00:00:00.016")));
+ data.add(new Tuple4<>(17, 6L, "Comment#11", Timestamp.valueOf("1970-01-01 00:00:00.017")));
+ data.add(new Tuple4<>(18, 6L, "Comment#12", Timestamp.valueOf("1970-01-01 00:00:00.018")));
+ data.add(new Tuple4<>(19, 6L, "Comment#13", Timestamp.valueOf("1970-01-01 00:00:00.019")));
+ data.add(new Tuple4<>(20, 6L, "Comment#14", Timestamp.valueOf("1970-01-01 00:00:00.020")));
+ data.add(new Tuple4<>(21, 6L, "Comment#15", Timestamp.valueOf("1970-01-01 00:00:00.021")));
+
+ Collections.shuffle(data);
+ return env.fromCollection(data);
+ }
+
+ @Test
+ public void testReal() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ StreamTableEnvironment tEnv =
+ StreamTableEnvironment.create(env, EnvironmentSettings.inStreamingMode());
+
+ tEnv.executeSql(
+ "CREATE TABLE upsertSink ("
+ + " real_data float"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + containerUrl
+ + "',"
+ + " 'table-name'='"
+ + OUTPUT_TABLE4
+ + "',"
+ + " 'username'='"
+ + container.getUsername()
+ + "',"
+ + " 'password'='"
+ + container.getPassword()
+ + "'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO upsertSink SELECT CAST(1.1 as FLOAT)").await();
+ check(new Row[] {Row.of(1.1f)}, containerUrl, "REAL_TABLE", new String[] {"real_data"});
+ }
+
+ @Test
+ public void testUpsert() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ Table t =
+ tEnv.fromDataStream(
+ get4TupleDataStream(env)
+ .assignTimestampsAndWatermarks(
+ new AscendingTimestampExtractor<
+ Tuple4>() {
+ @Override
+ public long extractAscendingTimestamp(
+ Tuple4
+ element) {
+ return element.f0;
+ }
+ }),
+ $("id"),
+ $("num"),
+ $("text"),
+ $("ts"));
+
+ tEnv.createTemporaryView("T", t);
+ tEnv.executeSql(
+ "CREATE TABLE upsertSink ("
+ + " cnt DECIMAL(18,2),"
+ + " lencnt DECIMAL(18,2),"
+ + " cTag INT,"
+ + " ts TIMESTAMP(3),"
+ + " PRIMARY KEY (cnt, cTag) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + containerUrl
+ + "',"
+ + " 'table-name'='"
+ + OUTPUT_TABLE1
+ + "',"
+ + " 'username'='"
+ + container.getUsername()
+ + "',"
+ + " 'password'='"
+ + container.getPassword()
+ + "',"
+ + " 'sink.buffer-flush.max-rows' = '2',"
+ + " 'sink.buffer-flush.interval' = '0',"
+ + " 'sink.max-retries' = '0'"
+ + ")");
+
+ tEnv.executeSql(
+ "INSERT INTO upsertSink \n"
+ + "SELECT cnt, COUNT(len) AS lencnt, cTag, MAX(ts) AS ts\n"
+ + "FROM (\n"
+ + " SELECT len, COUNT(id) as cnt, cTag, MAX(ts) AS ts\n"
+ + " FROM (SELECT id, CHAR_LENGTH(text) AS len, (CASE WHEN id > 0 THEN 1 ELSE 0 END) cTag, ts FROM T)\n"
+ + " GROUP BY len, cTag\n"
+ + ")\n"
+ + "GROUP BY cnt, cTag")
+ .await();
+ check(
+ new Row[] {
+ Row.of(1.0, 5.0, 1, Timestamp.valueOf("1970-01-01 00:00:00.006")),
+ Row.of(7.0, 1.0, 1, Timestamp.valueOf("1970-01-01 00:00:00.021")),
+ Row.of(9.0, 1.0, 1, Timestamp.valueOf("1970-01-01 00:00:00.015"))
+ },
+ containerUrl,
+ OUTPUT_TABLE1,
+ new String[] {"cnt", "lencnt", "cTag", "ts"});
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.getConfig().enableObjectReuse();
+ env.getConfig().setParallelism(1);
+ StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+
+ Table t =
+ tEnv.fromDataStream(
+ get4TupleDataStream(env), $("id"), $("num"), $("text"), $("ts"));
+
+ tEnv.registerTable("T", t);
+
+ tEnv.executeSql(
+ "CREATE TABLE upsertSink ("
+ + " id INT,"
+ + " num BIGINT,"
+ + " ts TIMESTAMP(3)"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + containerUrl
+ + "',"
+ + " 'table-name'='"
+ + OUTPUT_TABLE2
+ + "',"
+ + " 'username'='"
+ + container.getUsername()
+ + "',"
+ + " 'password'='"
+ + container.getPassword()
+ + "'"
+ + ")");
+
+ tEnv.executeSql("INSERT INTO upsertSink SELECT id, num, ts FROM T WHERE id IN (2, 10, 20)")
+ .await();
+ check(
+ new Row[] {
+ Row.of(2, 2, Timestamp.valueOf("1970-01-01 00:00:00.002")),
+ Row.of(10, 4, Timestamp.valueOf("1970-01-01 00:00:00.01")),
+ Row.of(20, 6, Timestamp.valueOf("1970-01-01 00:00:00.02"))
+ },
+ containerUrl,
+ OUTPUT_TABLE2,
+ new String[] {"id", "num", "ts"});
+ }
+
+ @Test
+ public void testBatchSink() throws Exception {
+ TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.inBatchMode());
+
+ tEnv.executeSql(
+ "CREATE TABLE USER_RESULT("
+ + "NAME VARCHAR,"
+ + "SCORE INT"
+ + ") WITH ( "
+ + "'connector' = 'jdbc',"
+ + "'url'='"
+ + containerUrl
+ + "',"
+ + "'table-name' = '"
+ + OUTPUT_TABLE3
+ + "',"
+ + " 'username'='"
+ + container.getUsername()
+ + "',"
+ + " 'password'='"
+ + container.getPassword()
+ + "',"
+ + "'sink.buffer-flush.max-rows' = '2',"
+ + "'sink.buffer-flush.interval' = '300ms',"
+ + "'sink.max-retries' = '4'"
+ + ")");
+
+ TableResult tableResult =
+ tEnv.executeSql(
+ "INSERT INTO USER_RESULT\n"
+ + "SELECT user_name, score "
+ + "FROM (VALUES (1, 'Bob'), (22, 'Tom'), (42, 'Kim'), "
+ + "(42, 'Kim'), (1, 'Bob')) "
+ + "AS UserCountTable(score, user_name)");
+ tableResult.await();
+
+ check(
+ new Row[] {
+ Row.of("Bob", 1),
+ Row.of("Tom", 22),
+ Row.of("Kim", 42),
+ Row.of("Kim", 42),
+ Row.of("Bob", 1)
+ },
+ containerUrl,
+ OUTPUT_TABLE3,
+ new String[] {"NAME", "SCORE"});
+ }
+
+ @Test
+ public void testReadingFromChangelogSource() throws Exception {
+ TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings.newInstance().build());
+ String dataId = TestValuesTableFactory.registerData(TestData.userChangelog());
+ tEnv.executeSql(
+ "CREATE TABLE user_logs (\n"
+ + " user_id STRING,\n"
+ + " user_name STRING,\n"
+ + " email STRING,\n"
+ + " balance DECIMAL(18,2),\n"
+ + " balance2 AS balance * 2\n"
+ + ") WITH (\n"
+ + " 'connector' = 'values',\n"
+ + " 'data-id' = '"
+ + dataId
+ + "',\n"
+ + " 'changelog-mode' = 'I,UA,UB,D'\n"
+ + ")");
+ tEnv.executeSql(
+ "CREATE TABLE user_sink (\n"
+ + " user_id STRING PRIMARY KEY NOT ENFORCED,\n"
+ + " user_name STRING,\n"
+ + " email STRING,\n"
+ + " balance DECIMAL(18,3),\n"
+ + " balance2 DECIMAL(18,3)\n"
+ + ") WITH (\n"
+ + " 'connector' = 'jdbc',"
+ + " 'url'='"
+ + containerUrl
+ + "',"
+ + " 'table-name' = '"
+ + USER_TABLE
+ + "',"
+ + " 'username'='"
+ + container.getUsername()
+ + "',"
+ + " 'password'='"
+ + container.getPassword()
+ + "',"
+ + " 'sink.buffer-flush.max-rows' = '2',"
+ + " 'sink.buffer-flush.interval' = '0'"
+ + // disable async flush
+ ")");
+ tEnv.executeSql("INSERT INTO user_sink SELECT * FROM user_logs").await();
+
+ check(
+ new Row[] {
+ Row.of(
+ "user1",
+ "Tom",
+ "tom123@gmail.com",
+ new BigDecimal("8.10"),
+ new BigDecimal("16.20")),
+ Row.of(
+ "user3",
+ "Bailey",
+ "bailey@qq.com",
+ new BigDecimal("9.99"),
+ new BigDecimal("19.98")),
+ Row.of(
+ "user4",
+ "Tina",
+ "tina@gmail.com",
+ new BigDecimal("11.30"),
+ new BigDecimal("22.60"))
+ },
+ containerUrl,
+ USER_TABLE,
+ new String[] {"user_id", "user_name", "email", "balance", "balance2"});
+ }
+
+ @Test
+ public void testFlushBufferWhenCheckpoint() throws Exception {
+ Map options = new HashMap<>();
+ options.put("connector", "jdbc");
+ options.put("url", containerUrl);
+ options.put("table-name", OUTPUT_TABLE5);
+ options.put("sink.buffer-flush.interval", "0");
+ options.put("username", container.getUsername());
+ options.put("password", container.getPassword());
+
+ ResolvedSchema schema =
+ ResolvedSchema.of(Column.physical("id", DataTypes.BIGINT().notNull()));
+
+ DynamicTableSink tableSink = createTableSink(schema, options);
+
+ SinkRuntimeProviderContext context = new SinkRuntimeProviderContext(false);
+ SinkFunctionProvider sinkProvider =
+ (SinkFunctionProvider) tableSink.getSinkRuntimeProvider(context);
+ GenericJdbcSinkFunction sinkFunction =
+ (GenericJdbcSinkFunction) sinkProvider.createSinkFunction();
+ sinkFunction.setRuntimeContext(new MockStreamingRuntimeContext(true, 1, 0));
+ sinkFunction.open(new Configuration());
+ sinkFunction.invoke(GenericRowData.of(1L), SinkContextUtil.forTimestamp(1));
+ sinkFunction.invoke(GenericRowData.of(2L), SinkContextUtil.forTimestamp(1));
+
+ check(new Row[] {}, containerUrl, OUTPUT_TABLE5, new String[] {"id"});
+ sinkFunction.snapshotState(new StateSnapshotContextSynchronousImpl(1, 1));
+ check(new Row[] {Row.of(1L), Row.of(2L)}, containerUrl, OUTPUT_TABLE5, new String[] {"id"});
+ sinkFunction.close();
+ }
+}
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
new file mode 100644
index 0000000000000..2c9c73b27d7e0
--- /dev/null
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/dialect/sqlserver/SqlServerTableSourceITCase.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.sqlserver;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CollectionUtil;
+
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.testcontainers.containers.MSSQLServerContainer;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The Table Source ITCase for {@link SqlServerDialect}. */
+public class SqlServerTableSourceITCase extends AbstractTestBase {
+
+ private static final MSSQLServerContainer container =
+ new MSSQLServerContainer("mcr.microsoft.com/mssql/server:2019-GA-ubuntu-16.04")
+ .acceptLicense();
+ private static String containerUrl;
+ private static final String INPUT_TABLE = "sql_test_table";
+
+ private static StreamExecutionEnvironment env;
+ private static TableEnvironment tEnv;
+
+ @BeforeClass
+ public static void beforeAll() throws ClassNotFoundException, SQLException {
+ container.start();
+ containerUrl = container.getJdbcUrl();
+ Class.forName(container.getDriverClassName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ containerUrl, container.getUsername(), container.getPassword());
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + " ("
+ + "id INT NOT NULL,"
+ + "tiny_int TINYINT,"
+ + "small_int SMALLINT,"
+ + "big_int BIGINT,"
+ + "float_col REAL,"
+ + "double_col FLOAT ,"
+ + "decimal_col DECIMAL(10, 4) NOT NULL,"
+ + "bool BIT NOT NULL,"
+ + "date_col DATE NOT NULL,"
+ + "time_col TIME(5) NOT NULL,"
+ + "datetime_col DATETIME,"
+ + "datetime2_col DATETIME2,"
+ + "char_col CHAR NOT NULL,"
+ + "nchar_col NCHAR(3) NOT NULL,"
+ + "varchar2_col VARCHAR(30) NOT NULL,"
+ + "nvarchar2_col NVARCHAR(30) NOT NULL,"
+ + "text_col TEXT,"
+ + "ntext_col NTEXT,"
+ + "binary_col BINARY(10)"
+ + ")");
+ statement.executeUpdate(
+ "INSERT INTO "
+ + INPUT_TABLE
+ + " VALUES ("
+ + "1, 2, 4, 10000000000, 1.12345, 2.12345678791, 100.1234, 0, "
+ + "'1997-01-01', '05:20:20.222','2020-01-01 15:35:00.123',"
+ + "'2020-01-01 15:35:00.1234567', 'a', 'abc', 'abcdef', 'xyz',"
+ + "'Hello World', 'World Hello', 1024)");
+ statement.executeUpdate(
+ "INSERT INTO "
+ + INPUT_TABLE
+ + " VALUES ("
+ + "2, 2, 4, 10000000000, 1.12345, 2.12345678791, 101.1234, 1, "
+ + "'1997-01-02', '05:20:20.222','2020-01-01 15:36:01.123',"
+ + "'2020-01-01 15:36:01.1234567', 'a', 'abc', 'abcdef', 'xyz',"
+ + "'Hey Leonard', 'World Hello', 1024)");
+ }
+ }
+
+ @AfterClass
+ public static void afterAll() throws Exception {
+ Class.forName(container.getDriverClassName());
+ try (Connection conn =
+ DriverManager.getConnection(
+ containerUrl, container.getUsername(), container.getPassword());
+ Statement statement = conn.createStatement()) {
+ statement.executeUpdate("DROP TABLE " + INPUT_TABLE);
+ }
+ container.stop();
+ }
+
+ @Before
+ public void before() throws Exception {
+ env = StreamExecutionEnvironment.getExecutionEnvironment();
+ tEnv = StreamTableEnvironment.create(env);
+ }
+
+ @Test
+ public void testJdbcSource() throws Exception {
+ createFlinkTable();
+ Iterator collected = tEnv.executeSql("SELECT * FROM " + INPUT_TABLE).collect();
+ List result =
+ CollectionUtil.iteratorToList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
+ List expected =
+ Stream.of(
+ "+I[1, 2, 4, 10000000000, 1.12345, 2.12345678791, 100.1234, false, "
+ + "1997-01-01, 05:20:20, 2020-01-01T15:35:00.123, "
+ + "2020-01-01T15:35:00.123456700, a, abc, abcdef, xyz, "
+ + "Hello World, World Hello, [0, 0, 0, 0, 0, 0, 0, 0, 4, 0]]",
+ "+I[2, 2, 4, 10000000000, 1.12345, 2.12345678791, 101.1234, true, "
+ + "1997-01-02, 05:20:20, 2020-01-01T15:36:01.123, "
+ + "2020-01-01T15:36:01.123456700, a, abc, abcdef, xyz, "
+ + "Hey Leonard, World Hello, [0, 0, 0, 0, 0, 0, 0, 0, 4, 0]]")
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(result).isEqualTo(expected);
+ }
+
+ @Test
+ public void testProject() throws Exception {
+ createFlinkTable();
+ Iterator collected =
+ tEnv.executeSql("SELECT id,datetime_col,decimal_col FROM " + INPUT_TABLE).collect();
+ List result =
+ CollectionUtil.iteratorToList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
+ List expected =
+ Stream.of(
+ "+I[1, 2020-01-01T15:35:00.123, 100.1234]",
+ "+I[2, 2020-01-01T15:36:01.123, 101.1234]")
+ .sorted()
+ .collect(Collectors.toList());
+ assertThat(result).isEqualTo(expected);
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+ createFlinkTable();
+ Iterator collected =
+ tEnv.executeSql(
+ "SELECT id,datetime_col,decimal_col FROM "
+ + INPUT_TABLE
+ + " WHERE id = 1")
+ .collect();
+ List result =
+ CollectionUtil.iteratorToList(collected).stream()
+ .map(Row::toString)
+ .sorted()
+ .collect(Collectors.toList());
+ List expected =
+ Stream.of("+I[1, 2020-01-01T15:35:00.123, 100.1234]").collect(Collectors.toList());
+ assertThat(result).isEqualTo(expected);
+ }
+
+ private void createFlinkTable() {
+ tEnv.executeSql(
+ "CREATE TABLE "
+ + INPUT_TABLE
+ + " ("
+ + "id INT NOT NULL,"
+ + "tiny_int TINYINT,"
+ + "small_int SMALLINT,"
+ + "big_int BIGINT,"
+ + "float_col FLOAT,"
+ + "double_col DOUBLE ,"
+ + "decimal_col DECIMAL(10, 4) NOT NULL,"
+ + "bool BOOLEAN NOT NULL,"
+ + "date_col DATE NOT NULL,"
+ + "time_col TIME(0) NOT NULL,"
+ + "datetime_col TIMESTAMP,"
+ + "datetime2_col TIMESTAMP WITHOUT TIME ZONE,"
+ + "char_col STRING NOT NULL,"
+ + "nchar_col STRING NOT NULL,"
+ + "varchar2_col STRING NOT NULL,"
+ + "nvarchar2_col STRING NOT NULL,"
+ + "text_col STRING,"
+ + "ntext_col STRING,"
+ + "binary_col BYTES"
+ + ") WITH ("
+ + " 'connector'='jdbc',"
+ + " 'url'='"
+ + containerUrl
+ + "',"
+ + " 'table-name'='"
+ + INPUT_TABLE
+ + "',"
+ + " 'username'='"
+ + container.getUsername()
+ + "',"
+ + " 'password'='"
+ + container.getPassword()
+ + "'"
+ + ")");
+ }
+}