-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[DOC ] Update data type mapping #1802
Merged
Merged
Changes from 1 commit
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,11 +1,13 @@ | ||
# SQLServer CDC Connector | ||
|
||
The SQLServer CDC connector allows for reading snapshot data and incremental data from SQLServer database. This document describes how to setup the SQLServer CDC connector to run SQL queries against SQLServer databases. | ||
The SQLServer CDC connector allows for reading snapshot data and incremental data from SQLServer database. This document | ||
describes how to setup the SQLServer CDC connector to run SQL queries against SQLServer databases. | ||
|
||
Dependencies | ||
------------ | ||
|
||
In order to setup the SQLServer CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. | ||
In order to setup the SQLServer CDC connector, the following table provides dependency information for both projects | ||
using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. | ||
|
||
### Maven dependency | ||
|
||
|
@@ -22,24 +24,34 @@ In order to setup the SQLServer CDC connector, the following table provides depe | |
|
||
```Download link is available only for stable releases.``` | ||
|
||
Download [flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`. | ||
Download [flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar) | ||
and put it under `<FLINK_HOME>/lib/`. | ||
|
||
**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-sqlserver-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc), the released version will be available in the Maven central warehouse. | ||
**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. | ||
Users need to download the source code and compile the corresponding jar. Users should use the released version, such | ||
as [flink-sql-connector-sqlserver-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc) | ||
, the released version will be available in the Maven central warehouse. | ||
|
||
Setup SQLServer Database | ||
---------------- | ||
A SQL Server administrator must enable change data capture on the source tables that you want to capture. The database must already be enabled for CDC. To enable CDC on a table, a SQL Server administrator runs the stored procedure ```sys.sp_cdc_enable_table``` for the table. | ||
A SQL Server administrator must enable change data capture on the source tables that you want to capture. The database | ||
must already be enabled for CDC. To enable CDC on a table, a SQL Server administrator runs the stored | ||
procedure ```sys.sp_cdc_enable_table``` for the table. | ||
|
||
**Prerequisites:** | ||
|
||
* CDC is enabled on the SQL Server database. | ||
* The SQL Server Agent is running. | ||
* You are a member of the db_owner fixed database role for the database. | ||
|
||
**Procedure:** | ||
|
||
* Connect to the SQL Server database by database management studio. | ||
* Run the following SQL statement to enable CDC on the table. | ||
|
||
```sql | ||
USE MyDB | ||
USE | ||
MyDB | ||
GO | ||
|
||
EXEC sys.sp_cdc_enable_table | ||
|
@@ -50,15 +62,21 @@ EXEC sys.sp_cdc_enable_table | |
@supports_net_changes = 0 | ||
GO | ||
``` | ||
|
||
* Verifying that the user has access to the CDC table | ||
|
||
```sql | ||
--The following example runs the stored procedure sys.sp_cdc_help_change_data_capture on the database MyDB: | ||
USE MyDB; | ||
USE | ||
MyDB; | ||
GO | ||
EXEC sys.sp_cdc_help_change_data_capture | ||
GO | ||
``` | ||
The query returns configuration information for each table in the database that is enabled for CDC and that contains change data that the caller is authorized to access. If the result is empty, verify that the user has privileges to access both the capture instance and the CDC tables. | ||
|
||
The query returns configuration information for each table in the database that is enabled for CDC and that contains | ||
change data that the caller is authorized to access. If the result is empty, verify that the user has privileges to | ||
access both the capture instance and the CDC tables. | ||
|
||
How to create a SQLServer CDC table | ||
---------------- | ||
|
@@ -67,26 +85,28 @@ The SqlServer CDC table can be defined as following: | |
|
||
```sql | ||
-- register a SqlServer table 'orders' in Flink SQL | ||
CREATE TABLE orders ( | ||
id INT, | ||
CREATE TABLE orders | ||
( | ||
id INT, | ||
order_date DATE, | ||
purchaser INT, | ||
quantity INT, | ||
purchaser INT, | ||
quantity INT, | ||
product_id INT, | ||
PRIMARY KEY (id) NOT ENFORCED | ||
) WITH ( | ||
'connector' = 'sqlserver-cdc', | ||
'hostname' = 'localhost', | ||
'port' = '1433', | ||
'username' = 'sa', | ||
'password' = 'Password!', | ||
'database-name' = 'inventory', | ||
'schema-name' = 'dbo', | ||
'table-name' = 'orders' | ||
); | ||
'connector' = 'sqlserver-cdc', | ||
'hostname' = 'localhost', | ||
'port' = '1433', | ||
'username' = 'sa', | ||
'password' = 'Password!', | ||
'database-name' = 'inventory', | ||
'schema-name' = 'dbo', | ||
'table-name' = 'orders' | ||
); | ||
|
||
-- read snapshot and binlogs from orders table | ||
SELECT * FROM orders; | ||
SELECT * | ||
FROM orders; | ||
``` | ||
|
||
Connector Options | ||
|
@@ -221,7 +241,12 @@ Limitation | |
-------- | ||
|
||
### Can't perform checkpoint during scanning snapshot of tables | ||
During scanning snapshot of database tables, since there is no recoverable position, we can't perform checkpoints. In order to not perform checkpoints, SqlServer CDC source will keep the checkpoint waiting to timeout. The timeout checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if the database table is large, it is recommended to add following Flink configurations to avoid failover because of the timeout checkpoints: | ||
|
||
During scanning snapshot of database tables, since there is no recoverable position, we can't perform checkpoints. In | ||
order to not perform checkpoints, SqlServer CDC source will keep the checkpoint waiting to timeout. The timeout | ||
checkpoint will be recognized as failed checkpoint, by default, this will trigger a failover for the Flink job. So if | ||
the database table is large, it is recommended to add following Flink configurations to avoid failover because of the | ||
timeout checkpoints: | ||
|
||
``` | ||
execution.checkpointing.interval: 10min | ||
|
@@ -231,44 +256,54 @@ restart-strategy.fixed-delay.attempts: 2147483647 | |
``` | ||
|
||
The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields: | ||
|
||
```sql | ||
CREATE TABLE products ( | ||
table_name STRING METADATA FROM 'table_name' VIRTUAL, | ||
schema_name STRING METADATA FROM 'schema_name' VIRTUAL, | ||
db_name STRING METADATA FROM 'database_name' VIRTUAL, | ||
CREATE TABLE products | ||
( | ||
table_name STRING METADATA FROM 'table_name' VIRTUAL, | ||
schema_name STRING METADATA FROM 'schema_name' VIRTUAL, | ||
db_name STRING METADATA FROM 'database_name' VIRTUAL, | ||
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL, | ||
id INT NOT NULL, | ||
name STRING, | ||
description STRING, | ||
weight DECIMAL(10,3) | ||
id INT NOT NULL, | ||
name STRING, | ||
description STRING, | ||
weight DECIMAL(10, 3) | ||
) WITH ( | ||
'connector' = 'sqlserver-cdc', | ||
'hostname' = 'localhost', | ||
'port' = '1433', | ||
'username' = 'sa', | ||
'password' = 'Password!', | ||
'database-name' = 'inventory', | ||
'schema-name' = 'dbo', | ||
'table-name' = 'products' | ||
); | ||
'connector' = 'sqlserver-cdc', | ||
'hostname' = 'localhost', | ||
'port' = '1433', | ||
'username' = 'sa', | ||
'password' = 'Password!', | ||
'database-name' = 'inventory', | ||
'schema-name' = 'dbo', | ||
'table-name' = 'products' | ||
); | ||
``` | ||
|
||
Features | ||
-------- | ||
|
||
### Exactly-Once Processing | ||
|
||
The SQLServer CDC connector is a Flink Source connector which will read database snapshot first and then continues to read change events with **exactly-once processing** even failures happen. Please read [How the connector works](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#how-the-sqlserver-connector-works). | ||
The SQLServer CDC connector is a Flink Source connector which will read database snapshot first and then continues to | ||
read change events with **exactly-once processing** even failures happen. Please | ||
read [How the connector works](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#how-the-sqlserver-connector-works) | ||
. | ||
|
||
### Startup Reading Position | ||
|
||
The config option `scan.startup.mode` specifies the startup mode for SQLServer CDC consumer. The valid enumerations are: | ||
|
||
- `initial` (default): Takes a snapshot of structure and data of captured tables; useful if topics should be populated with a complete representation of the data from the captured tables. | ||
- `initial-only`: Takes a snapshot of structure and data like initial but instead does not transition into streaming changes once the snapshot has completed. | ||
- `latest-offset`: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now onwards should be propagated to topics. | ||
- `initial` (default): Takes a snapshot of structure and data of captured tables; useful if topics should be populated | ||
with a complete representation of the data from the captured tables. | ||
- `initial-only`: Takes a snapshot of structure and data like initial but instead does not transition into streaming | ||
changes once the snapshot has completed. | ||
- `latest-offset`: Takes a snapshot of the structure of captured tables only; useful if only changes happening from now | ||
onwards should be propagated to topics. | ||
|
||
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not use them together. If you specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it may make `scan.startup.mode` doesn't work._ | ||
_Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapshot.mode` configuration. So please do not | ||
use them together. If you specific both `scan.startup.mode` and `debezium.snapshot.mode` options in the table DDL, it | ||
may make `scan.startup.mode` doesn't work._ | ||
|
||
### Single Thread Reading | ||
|
||
|
@@ -285,27 +320,28 @@ import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; | |
import com.ververica.cdc.connectors.sqlserver.SqlServerSource; | ||
|
||
public class SqlServerSourceExample { | ||
public static void main(String[] args) throws Exception { | ||
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder() | ||
.hostname("localhost") | ||
.port(1433) | ||
.database("sqlserver") // monitor sqlserver database | ||
.tableList("dbo.products") // monitor products table | ||
.username("sa") | ||
.password("Password!") | ||
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String | ||
.build(); | ||
|
||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
|
||
env | ||
.addSource(sourceFunction) | ||
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering | ||
|
||
env.execute(); | ||
} | ||
public static void main(String[] args) throws Exception { | ||
SourceFunction<String> sourceFunction = SqlServerSource.<String>builder() | ||
.hostname("localhost") | ||
.port(1433) | ||
.database("sqlserver") // monitor sqlserver database | ||
.tableList("dbo.products") // monitor products table | ||
.username("sa") | ||
.password("Password!") | ||
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String | ||
.build(); | ||
|
||
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | ||
|
||
env | ||
.addSource(sourceFunction) | ||
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering | ||
|
||
env.execute(); | ||
} | ||
} | ||
``` | ||
|
||
**Note:** Please refer [Deserialization](../about.html#deserialization) for more details about the JSON deserialization. | ||
|
||
Data Type Mapping | ||
|
@@ -357,7 +393,7 @@ Data Type Mapping | |
float<br> | ||
real | ||
</td> | ||
<td>FLOAT</td> | ||
<td>DOUBLE</td> | ||
</tr> | ||
<tr> | ||
<td>bit</td> | ||
|
@@ -369,7 +405,7 @@ Data Type Mapping | |
</tr> | ||
<tr> | ||
<td>tinyint</td> | ||
<td>TINYINT</td> | ||
<td>SMALLINT</td> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please fix corresponding |
||
</tr> | ||
<tr> | ||
<td>smallint</td> | ||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not format files.