Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions docs/content/connectors/db2-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR
<groupId>com.ververica</groupId>
<artifactId>flink-connector-db2-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.4-SNAPSHOT</version>
<version>2.3.0</version>
</dependency>
```

### SQL Client JAR

```Download link is available only for stable releases.```

Download [flink-sql-connector-db2-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.4-SNAPSHOT/flink-sql-connector-db2-cdc-2.4-SNAPSHOT.jar) and
Download [flink-sql-connector-db2-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.3.0/flink-sql-connector-db2-cdc-2.3.0.jar) and
put it under `<FLINK_HOME>/lib/`.

**Note:** flink-sql-connector-db2-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users
Expand All @@ -50,8 +50,8 @@ Notes

### Not support BOOLEAN type in SQL Replication on Db2

Only snapshots can be taken from tables with BOOLEAN type columns. Currently, SQL Replication on Db2 does not support BOOLEAN, so Debezium can not perform CDC on those tables.
Consider using another type to replace BOOLEAN type.
Only snapshots can be taken from tables with BOOLEAN type columns. Currently SQL Replication on Db2 does not support BOOLEAN, so Debezium can not perform CDC on those tables.
Consider using a different type.


How to create a Db2 CDC table
Expand Down Expand Up @@ -203,36 +203,36 @@ _Note: the mechanism of `scan.startup.mode` option relying on Debezium's `snapsh
### DataStream Source

```java
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import org.apache.kafka.connect.source.SourceRecord;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.db2.Db2Source;

public class Db2SourceExample {
public static void main(String[] args) throws Exception {
SourceFunction<String> db2Source =
Db2Source.<String>builder()
.hostname("yourHostname")
.port(50000)
.database("yourDatabaseName") // set captured database
.tableList("yourSchemaName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(
new JsonDebeziumDeserializationSchema()) // converts SourceRecord to
// JSON String
.build();
Db2Source<SourceRecord> db2Source = Db2Source.<SourceRecord>builder()
.hostname("yourHostname")
.port(yourPort)
.database("yourDatabaseName") // set captured database
.tableList("yourSchemaName.yourTableName") // set captured table
.username("yourUsername")
.password("yourPassword")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.build();

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// enable checkpoint
env.enableCheckpointing(3000);

env.addSource(db2Source)
.print()
.setParallelism(1); // use parallelism 1 for sink to keep message ordering
env
.fromSource(db2Source, WatermarkStrategy.noWatermarks(), "Db2 Source")
// set 4 parallel source tasks
.setParallelism(1)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

env.execute("Print Db2 Snapshot + Change Stream");
env.execute("Print Db2 Snapshot + Binlog");
}
}
```
Expand Down
13 changes: 10 additions & 3 deletions docs/content/connectors/mongodb-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ In order to setup the MongoDB CDC connector, the following table provides depend
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.4-SNAPSHOT</version>
<version>2.3.0</version>
</dependency>
```

### SQL Client JAR

```Download link is available only for stable releases.```

Download [flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.4-SNAPSHOT/flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`.
Download [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3.0/flink-sql-connector-mongodb-cdc-2.3.0.jar) and put it under `<FLINK_HOME>/lib/`.

**Note:** flink-sql-connector-mongodb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mongodb-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mongodb-cdc), the released version will be available in the Maven central warehouse.
**Note:** flink-sql-connector-mongodb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mongodb-cdc), the released version will be available in the Maven central warehouse.

Setup MongoDB
----------------
Expand Down Expand Up @@ -219,6 +219,13 @@ Connector Options
<td>Integer</td>
<td>The cursor batch size.</td>
</tr>
<tr>
<td>batch.size</td>
<td>optional</td>
<td style="word-wrap: break-word;">0</td>
<td>Integer</td>
<td>Change stream cursor batch size. Specifies the maximum number of change events to return in each batch of the response from the MongoDB cluster. The default is 0 meaning it uses the server's default value.</td>
</tr>
<tr>
<td>poll.max.batch.size</td>
<td>optional</td>
Expand Down
20 changes: 9 additions & 11 deletions docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。

| Connector | Database | Driver |
|-----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
| [mysql-cdc](mysql-cdc(ZH).md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
| [mysql-cdc](mysql-cdc(ZH).md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21 |

依赖
------------
Expand All @@ -20,17 +20,17 @@ MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- 请使用已发布的版本依赖,snapshot版本的依赖需要本地自行编译。 -->
<version>2.4-SNAPSHOT</version>
<version>2.3.0</version>
</dependency>
```

### SQL Client JAR

```下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。```

下载 [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar) 到 `<FLINK_HOME>/lib/` 目录下。
下载 [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) 到 `<FLINK_HOME>/lib/` 目录下。

**注意:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-mysql-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。
**注意:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-mysql-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。

配置 MySQL 服务器
----------------
Expand Down Expand Up @@ -236,21 +236,21 @@ Flink SQL> SELECT * FROM orders;
<td>scan.startup.specific-offset.gtid-set</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Long</td>
<td>在 "specific-offset" 启动模式下,启动位点的 GTID 集合。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-events</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>String</td>
<td>在指定的启动位点后需要跳过的事件数量。</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-rows</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>String</td>
<td>在指定的启动位点后需要跳过的数据行数量。</td>
</tr>
<tr>
Expand Down Expand Up @@ -589,10 +589,8 @@ CREATE TABLE mysql_source (...) WITH (
)
```

**注意**:
1. MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"。
**注意**:MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"。
该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
2. 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。


### DataStream Source
Expand Down Expand Up @@ -697,7 +695,7 @@ $ ./bin/flink run \
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width:30%;">MySQL type<a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html"></a></th>
<th class="text-left" style="width:30%;"><a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html">MySQL type</a></th>
<th class="text-left" style="width:10%;">Flink SQL type<a href="{% link dev/table/types.md %}"></a></th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
Expand Down
19 changes: 8 additions & 11 deletions docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ The MySQL CDC connector allows for reading snapshot data and incremental data fr

| Connector | Database | Driver |
|-----------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------|
| [mysql-cdc](mysql-cdc.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 |
| [mysql-cdc](mysql-cdc.md) | <li> [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x <li> [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x <li> [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x <li> [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x <li> [MariaDB](https://mariadb.org): 10.x <li> [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.21 |

Dependencies
------------
Expand All @@ -21,15 +21,15 @@ In order to setup the MySQL CDC connector, the following table provides dependen
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<!-- The dependency is available only for stable releases, SNAPSHOT dependency need build by yourself. -->
<version>2.4-SNAPSHOT</version>
<version>2.3.0</version>
</dependency>
```

### SQL Client JAR

```Download link is available only for stable releases.```

Download [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar) and put it under `<FLINK_HOME>/lib/`.
Download [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) and put it under `<FLINK_HOME>/lib/`.

**Note:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mysql-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc), the released version will be available in the Maven central warehouse.

Expand Down Expand Up @@ -246,14 +246,14 @@ Connector Options
<td>scan.startup.specific-offset.skip-events</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>String</td>
<td>Optional number of events to skip after the specific starting offset</td>
</tr>
<tr>
<td>scan.startup.specific-offset.skip-rows</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>Long</td>
<td>String</td>
<td>Optional number of rows to skip after the specific starting offset</td>
</tr>
<tr>
Expand Down Expand Up @@ -597,11 +597,8 @@ CREATE TABLE mysql_source (...) WITH (
)
```

**Notes:**
1. MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
**Note:** MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
"Binlog offset on checkpoint {checkpoint-id}". It could be useful if you want to restart the job from a specific checkpointed position.
2. If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

### DataStream Source

Expand Down Expand Up @@ -705,7 +702,7 @@ Data Type Mapping
<table class="colwidths-auto docutils">
<thead>
<tr>
<th class="text-left" style="width:30%;">MySQL type<a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html"></a></th>
<th class="text-left" style="width:30%;"><a href="https://dev.mysql.com/doc/man/8.0/en/data-types.html">MySQL type</a></th>
<th class="text-left" style="width:10%;">Flink SQL type<a href="{% link dev/table/types.md %}"></a></th>
<th class="text-left" style="width:60%;">NOTE</th>
</tr>
Expand Down Expand Up @@ -1011,4 +1008,4 @@ The example for different spatial data types mapping is as follows:
FAQ
--------
* [FAQ(English)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ)
* [FAQ(中文)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH))
* [FAQ(中文)](https://github.com/ververica/flink-cdc-connectors/wiki/FAQ(ZH))
Loading