Skip to content

Commit

Permalink
[sqlserver] Sqlserver incremental source. (apache#1732)
Browse files Browse the repository at this point in the history
* [sqlserver] Sqlserver incremental source

---------

Co-authored-by: gongzhongqiang <gongzhongqiang@gigacloudtech.com>
  • Loading branch information
GOODBOY008 and GOODBOY008 committed Jun 2, 2023
1 parent c8c8107 commit 8cef4af
Show file tree
Hide file tree
Showing 35 changed files with 4,440 additions and 227 deletions.
88 changes: 76 additions & 12 deletions docs/content/connectors/sqlserver-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ CREATE TABLE orders (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
'table-name' = 'dob.orders'
);

-- read snapshot and binlogs from orders table
Expand Down Expand Up @@ -139,19 +138,12 @@ Connector Options
<td>String</td>
<td>Database name of the SQLServer database to monitor.</td>
</tr>
<tr>
<td>schema-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Schema name of the SQLServer database to monitor.</td>
</tr>
<tr>
<td>table-name</td>
<td>required</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Table name of the SQLServer database to monitor.</td>
<td>Table name of the SQLServer database to monitor, e.g.: "db1.table1"</td>
</tr>
<tr>
<td>port</td>
Expand All @@ -167,6 +159,38 @@ Connector Options
<td>String</td>
<td>The session time zone in database server, e.g. "Asia/Shanghai".</td>
</tr>
<tr>
<td>scan.incremental.snapshot.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>Whether enable parallelism snapshot.</td>
</tr>
<tr>
<td>chunk-meta.group.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">1000</td>
<td>Integer</td>
<td>The group size of chunk meta, if the meta size exceeds the group size, the meta will be divided into multiple groups.</td>
</tr>
<tr>
<td>chunk-key.even-distribution.factor.lower-bound</td>
<td>optional</td>
<td style="word-wrap: break-word;">0.05d</td>
<td>Double</td>
<td>The lower bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not.
The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven.
The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.</td>
</tr>
<tr>
<td>chunk-key.even-distribution.factor.upper-bound</td>
<td>optional</td>
<td style="word-wrap: break-word;">1000.0d</td>
<td>Double</td>
<td>The upper bound of chunk key distribution factor. The distribution factor is used to determine whether the table is evenly distribution or not.
The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven.
The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.</td>
</tr>
<tr>
<td>debezium.*</td>
<td>optional</td>
Expand Down Expand Up @@ -248,8 +272,7 @@ CREATE TABLE products (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
'table-name' = 'dbo.products'
);
```

Expand Down Expand Up @@ -306,6 +329,47 @@ public class SqlServerSourceExample {
}
}
```

The SQLServer CDC incremental connector (after 2.4.0) can be used as the following shows:
```java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder;
import com.ververica.cdc.connectors.sqlserver.source.SqlServerSourceBuilder.SqlServerIncrementalSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;

public class SqlServerIncrementalSourceExample {
public static void main(String[] args) throws Exception {
SqlServerIncrementalSource<String> sqlServerSource =
new SqlServerSourceBuilder()
.hostname("localhost")
.port(1433)
.databaseList("inventory")
.tableList("dbo.products")
.username("sa")
.password("Password!")
.deserializer(new JsonDebeziumDeserializationSchema())
.startupOptions(StartupOptions.initial())
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// enable checkpoint
env.enableCheckpointing(3000);
// set the source parallelism to 2
env.fromSource(
sqlServerSource,
WatermarkStrategy.noWatermarks(),
"SqlServerIncrementalSource")
.setParallelism(2)
.print()
.setParallelism(1);

env.execute("Print SqlServer Snapshot + Change Stream");
}
}
```
**Note:** Please refer [Deserialization](../about.html#deserialization) for more details about the JSON deserialization.

Data Type Mapping
Expand Down
6 changes: 2 additions & 4 deletions docs/content/quickstart/sqlserver-tutorial.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ Flink SQL> CREATE TABLE products (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
'table-name' = 'dbo.products'
);
Flink SQL> CREATE TABLE orders (
Expand All @@ -161,8 +160,7 @@ Flink SQL> CREATE TABLE orders (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
'table-name' = 'dbo.orders'
);
Flink SQL> CREATE TABLE enriched_orders (
Expand Down
6 changes: 2 additions & 4 deletions docs/content/快速上手/sqlserver-tutorial-zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,7 @@ Flink SQL> CREATE TABLE products (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'products'
'table-name' = 'dbo.products'
);
Flink SQL> CREATE TABLE orders (
Expand All @@ -161,8 +160,7 @@ Flink SQL> CREATE TABLE orders (
'username' = 'sa',
'password' = 'Password!',
'database-name' = 'inventory',
'schema-name' = 'dbo',
'table-name' = 'orders'
'table-name' = 'dbo.orders'
);
Flink SQL> CREATE TABLE enriched_orders (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) {
public static boolean isDataChangeRecord(SourceRecord record) {
Schema valueSchema = record.valueSchema();
Struct value = (Struct) record.value();
return valueSchema.field(Envelope.FieldName.OPERATION) != null
return valueSchema != null
&& valueSchema.field(Envelope.FieldName.OPERATION) != null
&& value.getString(Envelope.FieldName.OPERATION) != null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MSSQLServerContainer;
Expand All @@ -38,6 +39,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.regex.Matcher;
Expand All @@ -57,6 +59,20 @@ public class SqlServerE2eITCase extends FlinkContainerTestEnvironment {
TestUtils.getResource("sqlserver-cdc-connector.jar");
private static final Path mysqlDriverJar = TestUtils.getResource("mysql-driver.jar");

@Parameterized.Parameter(1)
public boolean parallelismSnapshot;

@Parameterized.Parameters(name = "flinkVersion: {0}, parallelismSnapshot: {1}")
public static List<Object[]> parameters() {
final List<String> flinkVersions = getFlinkVersion();
List<Object[]> params = new ArrayList<>();
for (String flinkVersion : flinkVersions) {
params.add(new Object[] {flinkVersion, true});
params.add(new Object[] {flinkVersion, false});
}
return params;
}

@Rule
public MSSQLServerContainer sqlServer =
new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest")
Expand Down Expand Up @@ -101,8 +117,9 @@ public void testSqlServerCDC() throws Exception {
" 'username' = '" + sqlServer.getUsername() + "',",
" 'password' = '" + sqlServer.getPassword() + "',",
" 'database-name' = 'inventory',",
" 'schema-name' = 'dbo',",
" 'table-name' = 'products'",
" 'table-name' = 'dbo.products',",
" 'scan.incremental.snapshot.enabled' = '" + parallelismSnapshot + "',",
" 'scan.incremental.snapshot.chunk.size' = '4'",
");",
"CREATE TABLE products_sink (",
" `id` INT NOT NULL,",
Expand All @@ -126,7 +143,7 @@ public void testSqlServerCDC() throws Exception {
submitSQLJob(sqlLines, sqlServerCdcJar, jdbcJar, mysqlDriverJar);
waitUntilJobRunning(Duration.ofSeconds(30));

// generate binlogs
// generate change stream
try (Connection conn = getSqlServerJdbcConnection();
Statement statement = conn.createStatement()) {

Expand Down Expand Up @@ -171,7 +188,7 @@ public void testSqlServerCDC() throws Exception {
expectResult,
"products_sink",
new String[] {"id", "name", "description", "weight"},
60000L);
80000L);
}

private void initializeSqlServerTable(String sqlFile) {
Expand Down
7 changes: 7 additions & 0 deletions flink-connector-sqlserver-cdc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ under the License.

<dependencies>

<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-cdc-base</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Debezium dependencies -->
<dependency>
<groupId>com.ververica</groupId>
Expand Down Expand Up @@ -138,6 +144,7 @@ under the License.
<scope>test</scope>
</dependency>


</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.ververica.cdc.connectors.sqlserver;

import com.ververica.cdc.connectors.sqlserver.table.StartupOptions;
import com.ververica.cdc.connectors.base.options.StartupOptions;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import com.ververica.cdc.debezium.DebeziumSourceFunction;
import io.debezium.connector.sqlserver.SqlServerConnector;
Expand Down Expand Up @@ -137,9 +137,6 @@ public DebeziumSourceFunction<T> build() {
case INITIAL:
props.setProperty("snapshot.mode", "initial");
break;
case INITIAL_ONLY:
props.setProperty("snapshot.mode", "initial_only");
break;
case LATEST_OFFSET:
props.setProperty("snapshot.mode", "schema_only");
break;
Expand Down
Loading

0 comments on commit 8cef4af

Please sign in to comment.