From 75ec91392896969d1e45a692a0942bc75bb41395 Mon Sep 17 00:00:00 2001 From: rookiegao Date: Mon, 7 Nov 2022 18:15:21 +0800 Subject: [PATCH 1/8] [docs] Update the connector download link in the document --- docs/content/connectors/db2-cdc.md | 4 ++-- docs/content/connectors/mongodb-cdc.md | 6 +++--- docs/content/connectors/mysql-cdc(ZH).md | 6 +++--- docs/content/connectors/mysql-cdc.md | 6 +++--- docs/content/connectors/oceanbase-cdc.md | 8 ++++---- docs/content/connectors/oracle-cdc.md | 6 +++--- docs/content/connectors/postgres-cdc.md | 6 +++--- docs/content/connectors/sqlserver-cdc.md | 6 +++--- docs/content/connectors/tidb-cdc.md | 6 +++--- .../quickstart/build-real-time-data-lake-tutorial.md | 2 +- docs/content/quickstart/db2-tutorial.md | 2 +- docs/content/quickstart/mongodb-tutorial.md | 2 +- docs/content/quickstart/mysql-postgres-tutorial.md | 4 ++-- docs/content/quickstart/oceanbase-tutorial.md | 2 +- docs/content/quickstart/oracle-tutorial.md | 2 +- docs/content/quickstart/polardbx-tutorial.md | 2 +- docs/content/quickstart/sqlserver-tutorial.md | 2 +- docs/content/quickstart/tidb-tutorial.md | 2 +- .../build-real-time-data-lake-tutorial-zh.md" | 2 +- .../mongodb-tutorial-zh.md" | 2 +- .../mysql-postgres-tutorial-zh.md" | 4 ++-- .../oceanbase-tutorial-zh.md" | 2 +- .../oracle-tutorial-zh.md" | 2 +- .../polardbx-tutorial-zh.md" | 2 +- .../sqlserver-tutorial-zh.md" | 2 +- .../tidb-tutorial-zh.md" | 2 +- 26 files changed, 46 insertions(+), 46 deletions(-) diff --git a/docs/content/connectors/db2-cdc.md b/docs/content/connectors/db2-cdc.md index a5ef3cfd628..cce5559d421 100644 --- a/docs/content/connectors/db2-cdc.md +++ b/docs/content/connectors/db2-cdc.md @@ -23,7 +23,7 @@ using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR com.ververica flink-connector-db2-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -31,7 +31,7 @@ using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR ```Download link is available only for stable releases.``` -Download [flink-sql-connector-db2-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.3-SNAPSHOT/flink-sql-connector-db2-cdc-2.3-SNAPSHOT.jar) and +Download [flink-sql-connector-db2-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.3.0/flink-sql-connector-db2-cdc-2.3.0.jar) and put it under `/lib/`. **Note:** flink-sql-connector-db2-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users diff --git a/docs/content/connectors/mongodb-cdc.md b/docs/content/connectors/mongodb-cdc.md index 56497d58342..0e5c21e59c0 100644 --- a/docs/content/connectors/mongodb-cdc.md +++ b/docs/content/connectors/mongodb-cdc.md @@ -13,7 +13,7 @@ In order to setup the MongoDB CDC connector, the following table provides depend com.ververica flink-connector-mongodb-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -21,9 +21,9 @@ In order to setup the MongoDB CDC connector, the following table provides depend ```Download link is available only for stable releases.``` -Download [flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3-SNAPSHOT/flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3.0/flink-sql-connector-mongodb-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-mongodb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mongodb-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mongodb-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-mongodb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mongodb-cdc), the released version will be available in the Maven central warehouse. Setup MongoDB ---------------- diff --git a/docs/content/connectors/mysql-cdc(ZH).md b/docs/content/connectors/mysql-cdc(ZH).md index 3c0ad47dc73..bcbf5c3a7f4 100644 --- a/docs/content/connectors/mysql-cdc(ZH).md +++ b/docs/content/connectors/mysql-cdc(ZH).md @@ -20,7 +20,7 @@ MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。 com.ververica flink-connector-mysql-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -28,9 +28,9 @@ MySQL CDC 连接器允许从 MySQL 数据库读取快照数据和增量数据。 ```下载链接仅在已发布版本可用,请在文档网站左下角选择浏览已发布的版本。``` -下载 [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) 到 `/lib/` 目录下。 +下载 [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) 到 `/lib/` 目录下。 -**注意:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-mysql-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。 +**注意:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT 版本是开发分支`release-XXX`对应的快照版本,快照版本用户需要下载源代码并编译相应的 jar。用户应使用已经发布的版本,例如 [flink-sql-connector-mysql-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-mysql-cdc) 当前已发布的所有版本都可以在 Maven 中央仓库获取。 配置 MySQL 服务器 ---------------- diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index 46034cb1704..83f5e5574bd 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -21,7 +21,7 @@ In order to setup the MySQL CDC connector, the following table provides dependen com.ververica flink-connector-mysql-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -29,9 +29,9 @@ In order to setup the MySQL CDC connector, the following table provides dependen ```Download link is available only for stable releases.``` -Download [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mysql-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-mysql-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-mysql-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-connector-mysql-cdc), the released version will be available in the Maven central warehouse. Setup MySQL server ---------------- diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index 9705db26a8a..50f5b1aee92 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -12,7 +12,7 @@ In order to setup the OceanBase CDC connector, the following table provides depe com.ververica flink-connector-oceanbase-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -20,9 +20,9 @@ In order to setup the OceanBase CDC connector, the following table provides depe ```Download link is available only for stable releases.``` -Download [flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3.0/flink-sql-connector-oceanbase-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-oceanbase-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-oceanbase-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oceanbase-cdc), the released version will be available in the Maven central warehouse. Setup OceanBase and LogProxy Server ---------------------- @@ -425,7 +425,7 @@ Data Type Mapping - + diff --git a/docs/content/connectors/oracle-cdc.md b/docs/content/connectors/oracle-cdc.md index 32e59844e4a..6fdf0f7dfc6 100644 --- a/docs/content/connectors/oracle-cdc.md +++ b/docs/content/connectors/oracle-cdc.md @@ -14,7 +14,7 @@ In order to setup the Oracle CDC connector, the following table provides depende com.ververicaflink-connector-oracle-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -22,9 +22,9 @@ In order to setup the Oracle CDC connector, the following table provides depende **Download link is available only for stable releases.** -Download [flink-sql-connector-oracle-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3-SNAPSHOT/flink-sql-connector-oracle-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-oracle-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-oracle-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-oracle-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oracle-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-oracle-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-oracle-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-oracle-cdc), the released version will be available in the Maven central warehouse. Setup Oracle ---------------- diff --git a/docs/content/connectors/postgres-cdc.md b/docs/content/connectors/postgres-cdc.md index f281064fdf5..9d754dba489 100644 --- a/docs/content/connectors/postgres-cdc.md +++ b/docs/content/connectors/postgres-cdc.md @@ -14,7 +14,7 @@ In order to setup the Postgres CDC connector, the following table provides depen com.ververica flink-connector-postgres-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -22,9 +22,9 @@ In order to setup the Postgres CDC connector, the following table provides depen ```Download link is available only for stable releases.``` -Download [flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3-SNAPSHOT/flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-postgres-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/flink-sql-connector-postgres-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-postgres-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-postgres-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-postgres-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-postgres-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), the released version will be available in the Maven central warehouse. How to create a Postgres CDC table ---------------- diff --git a/docs/content/connectors/sqlserver-cdc.md b/docs/content/connectors/sqlserver-cdc.md index 24816fe180b..93c63d68601 100644 --- a/docs/content/connectors/sqlserver-cdc.md +++ b/docs/content/connectors/sqlserver-cdc.md @@ -14,7 +14,7 @@ In order to setup the SQLServer CDC connector, the following table provides depe com.ververica flink-connector-sqlserver-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -22,9 +22,9 @@ In order to setup the SQLServer CDC connector, the following table provides depe ```Download link is available only for stable releases.``` -Download [flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3.0/flink-sql-connector-sqlserver-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-sqlserver-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc), the released version will be available in the Maven central warehouse. Setup SQLServer Database ---------------- diff --git a/docs/content/connectors/tidb-cdc.md b/docs/content/connectors/tidb-cdc.md index 99a3769b4cf..567dd8b3e05 100644 --- a/docs/content/connectors/tidb-cdc.md +++ b/docs/content/connectors/tidb-cdc.md @@ -14,7 +14,7 @@ In order to setup the TiDB CDC connector, the following table provides dependenc com.ververica flink-connector-tidb-cdc - 2.3-SNAPSHOT + 2.3.0 ``` @@ -22,9 +22,9 @@ In order to setup the TiDB CDC connector, the following table provides dependenc ```Download link is available only for stable releases.``` -Download [flink-sql-connector-tidb-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3-SNAPSHOT/flink-sql-connector-tidb-cdc-2.3-SNAPSHOT.jar) and put it under `/lib/`. +Download [flink-sql-connector-tidb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar) and put it under `/lib/`. -**Note:** flink-sql-connector-tidb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-tidb-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-tidb-cdc), the released version will be available in the Maven central warehouse. +**Note:** flink-sql-connector-tidb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-tidb-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-tidb-cdc), the released version will be available in the Maven central warehouse. How to create a TiDB CDC table ---------------- diff --git a/docs/content/quickstart/build-real-time-data-lake-tutorial.md b/docs/content/quickstart/build-real-time-data-lake-tutorial.md index eb335c69420..5a78fcf9467 100644 --- a/docs/content/quickstart/build-real-time-data-lake-tutorial.md +++ b/docs/content/quickstart/build-real-time-data-lake-tutorial.md @@ -85,7 +85,7 @@ The Docker Compose environment consists of the following containers: If you want to run with your own Flink environment, remember to download the following packages and then put them to `FLINK_HOME/lib/`. **Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. ** - - [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) - [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar) - [iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar](https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar) diff --git a/docs/content/quickstart/db2-tutorial.md b/docs/content/quickstart/db2-tutorial.md index e02c829b29f..b0ad73f7c8e 100644 --- a/docs/content/quickstart/db2-tutorial.md +++ b/docs/content/quickstart/db2-tutorial.md @@ -62,7 +62,7 @@ docker-compose down *Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. * - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-db2-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.3-SNAPSHOT/flink-sql-connector-db2-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-db2-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.3.0/flink-sql-connector-db2-cdc-2.3.0.jar) **3. Launch a Flink cluster and start a Flink SQL CLI** diff --git a/docs/content/quickstart/mongodb-tutorial.md b/docs/content/quickstart/mongodb-tutorial.md index a50d5f9a57d..56838ce68fc 100644 --- a/docs/content/quickstart/mongodb-tutorial.md +++ b/docs/content/quickstart/mongodb-tutorial.md @@ -110,7 +110,7 @@ db.customers.insertMany([ ```Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. ``` - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) - - [flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3-SNAPSHOT/flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3.0/flink-sql-connector-mongodb-cdc-2.3.0.jar) 4. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside: diff --git a/docs/content/quickstart/mysql-postgres-tutorial.md b/docs/content/quickstart/mysql-postgres-tutorial.md index 0a8b3ffb51d..151ea929a98 100644 --- a/docs/content/quickstart/mysql-postgres-tutorial.md +++ b/docs/content/quickstart/mysql-postgres-tutorial.md @@ -78,8 +78,8 @@ We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kib **Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. ** - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) - - [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) - - [flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3-SNAPSHOT/flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) + - [flink-sql-connector-postgres-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/flink-sql-connector-postgres-cdc-2.3.0.jar) ### Preparing data in databases #### Preparing data in MySQL diff --git a/docs/content/quickstart/oceanbase-tutorial.md b/docs/content/quickstart/oceanbase-tutorial.md index c4d7e0055b9..245eefdbfbb 100644 --- a/docs/content/quickstart/oceanbase-tutorial.md +++ b/docs/content/quickstart/oceanbase-tutorial.md @@ -112,7 +112,7 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), ```Download links are only available for stable releases.``` - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3.0/flink-sql-connector-oceanbase-cdc-2.3.0.jar) ### Use Flink DDL to create dynamic table in Flink SQL CLI diff --git a/docs/content/quickstart/oracle-tutorial.md b/docs/content/quickstart/oracle-tutorial.md index 09adccec72c..1f708bee8a1 100644 --- a/docs/content/quickstart/oracle-tutorial.md +++ b/docs/content/quickstart/oracle-tutorial.md @@ -55,7 +55,7 @@ docker-compose down *Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. * - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-oracle-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3-SNAPSHOT/flink-sql-connector-oracle-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-oracle-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar) **3. Launch a Flink cluster and start a Flink SQL CLI** diff --git a/docs/content/quickstart/polardbx-tutorial.md b/docs/content/quickstart/polardbx-tutorial.md index d393dde4578..ff1d4ed1c9c 100644 --- a/docs/content/quickstart/polardbx-tutorial.md +++ b/docs/content/quickstart/polardbx-tutorial.md @@ -67,7 +67,7 @@ We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kib 2. Download following JAR package required and put them under `flink-1.16.0/lib/`: **Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. ** - - [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) ### Preparing data in databases diff --git a/docs/content/quickstart/sqlserver-tutorial.md b/docs/content/quickstart/sqlserver-tutorial.md index 13b4d7a97cb..20f3961d0a1 100644 --- a/docs/content/quickstart/sqlserver-tutorial.md +++ b/docs/content/quickstart/sqlserver-tutorial.md @@ -64,7 +64,7 @@ docker-compose down *Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. * - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3.0/flink-sql-connector-sqlserver-cdc-2.3.0.jar) **Preparing data in SqlServer database** diff --git a/docs/content/quickstart/tidb-tutorial.md b/docs/content/quickstart/tidb-tutorial.md index 1752f678df6..4e389522aec 100644 --- a/docs/content/quickstart/tidb-tutorial.md +++ b/docs/content/quickstart/tidb-tutorial.md @@ -117,7 +117,7 @@ docker-compose down *Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. * - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-tidb-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3-SNAPSHOT/flink-sql-connector-tidb-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-tidb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar) **Preparing data in TiDB database** diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md" index 665593a9d0c..d1dc8dbe5bc 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md" @@ -85,7 +85,7 @@ volumes: **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译** - - [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) - [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar) - [iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar](https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar) diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md" index 58836d7f137..13cd3f21dbf 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md" @@ -110,7 +110,7 @@ db.customers.insertMany([ ```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译``` - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) - - [flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3-SNAPSHOT/flink-sql-connector-mongodb-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3.0/flink-sql-connector-mongodb-cdc-2.3.0.jar) 4. 然后启动 Flink 集群,再启动 SQL CLI. diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md" index e87b1b1099e..4392516ecbc 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md" @@ -74,8 +74,8 @@ docker-compose up -d **下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译** - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) - - [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) - - [flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3-SNAPSHOT/flink-sql-connector-postgres-cdc-2.3-SNAPSHOT.jar) + - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) + - [flink-sql-connector-postgres-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/flink-sql-connector-postgres-cdc-2.3.0.jar) ### 准备数据 #### 在 MySQL 数据库中准备数据 diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md" index b3ad98814b4..0c17b0a476b 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md" @@ -111,7 +111,7 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), ```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译``` - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3.0/flink-sql-connector-oceanbase-cdc-2.3.0.jar) ### 在 Flink SQL CLI 中使用 Flink DDL 创建表 diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md" index 0a0903d4afd..7b63420f366 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md" @@ -55,7 +55,7 @@ docker-compose down *下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译* - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-oracle-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3-SNAPSHOT/flink-sql-connector-oracle-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-oracle-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar) **3. 然后启动 Flink 集群,再启动 SQL CLI:** diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md" index c6b780bb1bc..41ee0314093 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md" @@ -109,7 +109,7 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), 2. 下载下面列出的依赖包,并将它们放到目录 `flink-1.16.0/lib/` 下 ```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译``` -- 用于订阅PolarDB-X Binlog: [flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3-SNAPSHOT/flink-sql-connector-mysql-cdc-2.3-SNAPSHOT.jar) +- 用于订阅PolarDB-X Binlog: [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar) - 用于写入Elasticsearch: [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) 3. 启动flink服务: ```shell diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md" index 7bdbf517857..4b74f80e06d 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md" @@ -64,7 +64,7 @@ docker-compose down ```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译``` - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3.0/flink-sql-connector-sqlserver-cdc-2.3.0.jar) **在 SqlServer 数据库中准备数据** diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md" index 7ec4ad91ad2..71cb47ad2af 100644 --- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md" +++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md" @@ -117,7 +117,7 @@ docker-compose down ```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译``` - [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar) -- [flink-sql-connector-tidb-cdc-2.3-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3-SNAPSHOT/flink-sql-connector-tidb-cdc-2.3-SNAPSHOT.jar) +- [flink-sql-connector-tidb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar) **在 TiDB 数据库中准备数据** From 84384b332b239d4ba6187290ab5e3bdefd6f7076 Mon Sep 17 00:00:00 2001 From: rookiegao Date: Mon, 7 Nov 2022 20:15:28 +0800 Subject: [PATCH 2/8] [docs] Repair Connector Data Type links --- docs/content/connectors/mysql-cdc(ZH).md | 2 +- docs/content/connectors/mysql-cdc.md | 2 +- docs/content/connectors/oceanbase-cdc.md | 2 +- docs/content/connectors/oracle-cdc.md | 2 +- docs/content/connectors/postgres-cdc.md | 2 +- docs/content/connectors/sqlserver-cdc.md | 2 +- docs/content/connectors/tidb-cdc.md | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/content/connectors/mysql-cdc(ZH).md b/docs/content/connectors/mysql-cdc(ZH).md index bcbf5c3a7f4..62e6ad93f98 100644 --- a/docs/content/connectors/mysql-cdc(ZH).md +++ b/docs/content/connectors/mysql-cdc(ZH).md @@ -695,7 +695,7 @@ $ ./bin/flink run \
OceanBase typeOceanBase type Flink SQL type NOTE
- + diff --git a/docs/content/connectors/mysql-cdc.md b/docs/content/connectors/mysql-cdc.md index 83f5e5574bd..4da3f5a721e 100644 --- a/docs/content/connectors/mysql-cdc.md +++ b/docs/content/connectors/mysql-cdc.md @@ -702,7 +702,7 @@ Data Type Mapping
MySQL typeMySQL type Flink SQL type NOTE
- + diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index 50f5b1aee92..57652f1e4af 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -425,7 +425,7 @@ Data Type Mapping
MySQL typeMySQL type Flink SQL type NOTE
- + diff --git a/docs/content/connectors/oracle-cdc.md b/docs/content/connectors/oracle-cdc.md index 6fdf0f7dfc6..cd230943a2f 100644 --- a/docs/content/connectors/oracle-cdc.md +++ b/docs/content/connectors/oracle-cdc.md @@ -455,7 +455,7 @@ Data Type Mapping
OceanBase typeOceanBase type Flink SQL type NOTE
- + diff --git a/docs/content/connectors/postgres-cdc.md b/docs/content/connectors/postgres-cdc.md index 9d754dba489..66d31378029 100644 --- a/docs/content/connectors/postgres-cdc.md +++ b/docs/content/connectors/postgres-cdc.md @@ -315,7 +315,7 @@ Data Type Mapping
Oracle typeOracle type Flink SQL type
- + diff --git a/docs/content/connectors/sqlserver-cdc.md b/docs/content/connectors/sqlserver-cdc.md index 93c63d68601..c928136da58 100644 --- a/docs/content/connectors/sqlserver-cdc.md +++ b/docs/content/connectors/sqlserver-cdc.md @@ -315,7 +315,7 @@ Data Type Mapping
PostgreSQL typePostgreSQL type Flink SQL type
- + diff --git a/docs/content/connectors/tidb-cdc.md b/docs/content/connectors/tidb-cdc.md index 567dd8b3e05..d35c239971a 100644 --- a/docs/content/connectors/tidb-cdc.md +++ b/docs/content/connectors/tidb-cdc.md @@ -290,7 +290,7 @@ Data Type Mapping
SQLServer typeSQLServer type Flink SQL type
- + From f649cfa91f3e5577873b35ef1b73c3f2992ec065 Mon Sep 17 00:00:00 2001 From: molsion Date: Tue, 24 May 2022 17:14:13 +0800 Subject: [PATCH 3/8] [oracle][fix]Close the connection to prevent the session from exceeding the upper limit --- .../oracle/source/reader/fetch/OracleScanFetchTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index edc0924d89f..9547f45eb23 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -280,6 +280,7 @@ protected SnapshotResult doExecute( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); + jdbcConnection.close(); ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); From 88522498a3d6c385d1a9875ede85c638521931a5 Mon Sep 17 00:00:00 2001 From: molsion Date: Tue, 24 May 2022 17:14:13 +0800 Subject: [PATCH 4/8] [oracle][fix]Close the connection to prevent the session from exceeding the upper limit --- .../oracle/source/reader/fetch/OracleScanFetchTask.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index edc0924d89f..9547f45eb23 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -280,6 +280,7 @@ protected SnapshotResult doExecute( "Snapshot step 3 - Determining high watermark {} for split {}", highWatermark, snapshotSplit); + jdbcConnection.close(); ((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark); dispatcher.dispatchWatermarkEvent( offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH); From be3c334e596c332d6506e8f84fbae37a5bb28a04 Mon Sep 17 00:00:00 2001 From: Enoch Date: Tue, 14 Feb 2023 10:08:40 +0800 Subject: [PATCH 5/8] [oracle][fix] 1. Copy the OracleConnection code in debezium and modify it to use hikari thread pool. 2. Close the connection to prevent the session from exceeding the upper limit --- .../oracle/source/OracleDialect.java | 16 +- .../oracle/source/OracleSourceBuilder.java | 2 +- .../reader/fetch/OracleScanFetchTask.java | 2 +- .../fetch/OracleSourceFetchTaskContext.java | 2 +- .../source/utils/OracleConnectionUtils.java | 19 +- .../oracle/source/utils/OracleUtils.java | 54 +- .../connector/oracle/OracleConnection.java | 473 ++++++++++++++++++ 7 files changed, 508 insertions(+), 60 deletions(-) create mode 100644 flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java index ff41ea2f459..78b67d5ed66 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java @@ -28,7 +28,6 @@ import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter; import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig; -import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory; import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask; import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext; import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask; @@ -52,15 +51,8 @@ public class OracleDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; - private final OracleSourceConfigFactory configFactory; - private final OracleSourceConfig sourceConfig; private transient OracleSchema oracleSchema; - public OracleDialect(OracleSourceConfigFactory configFactory) { - this.configFactory = configFactory; - this.sourceConfig = configFactory.create(0); - } - @Override public String getName() { return "Oracle"; @@ -87,8 +79,7 @@ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { @Override public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { - return OracleConnectionUtils.createOracleConnection( - sourceConfig.getDbzConnectorConfig().getJdbcConfig()); + return OracleConnectionUtils.createOracleConnection(sourceConfig); } @Override @@ -116,7 +107,7 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) { public Map discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig) { final List capturedTableIds = discoverDataCollections(sourceConfig); - try (OracleConnection jdbc = createOracleConnection(sourceConfig.getDbzConfiguration())) { + try (OracleConnection jdbc = createOracleConnection(sourceConfig)) { // fetch table schemas Map tableSchemas = new HashMap<>(); for (TableId tableId : capturedTableIds) { @@ -141,8 +132,7 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { @Override public OracleSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final OracleConnection jdbcConnection = - createOracleConnection(taskSourceConfig.getDbzConfiguration()); + final OracleConnection jdbcConnection = createOracleConnection(taskSourceConfig); return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection); } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java index 4d7b38156fd..ac49aab615c 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -219,7 +219,7 @@ public OracleSourceBuilder deserializer(DebeziumDeserializationSchema dese */ public OracleIncrementalSource build() { this.offsetFactory = new RedoLogOffsetFactory(); - this.dialect = new OracleDialect(configFactory); + this.dialect = new OracleDialect(); return new OracleIncrementalSource( configFactory, checkNotNull(deserializer), offsetFactory, dialect); } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 9547f45eb23..2ff9d9f82d5 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -172,7 +172,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask( // task to read binlog and backfill for current split return new RedoLogSplitReadTask( new OracleConnectorConfig(dezConf), - createOracleConnection(context.getSourceConfig().getDbzConfiguration()), + createOracleConnection(context.getSourceConfig()), context.getDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 0051f3c2121..fc38475f8a9 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -97,7 +97,7 @@ public void configure(SourceSplitBase sourceSplitBase) { .getDbzConfiguration() .getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), sourceSplitBase.getTableSchemas().values()); - this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig); + this.databaseSchema = OracleUtils.createOracleDatabaseSchema(sourceConfig, connectorConfig); // todo logMiner or xStream this.offsetContext = loadStartingOffsetState( diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java index 7e926e9922c..d520ababfff 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java @@ -18,6 +18,9 @@ import org.apache.flink.util.FlinkRuntimeException; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; +import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory; +import com.ververica.cdc.connectors.oracle.source.OraclePooledDataSourceFactory; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; @@ -34,8 +37,6 @@ import java.util.List; import java.util.Set; -import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; - /** Oracle connection Utilities. */ public class OracleConnectionUtils { @@ -44,15 +45,17 @@ public class OracleConnectionUtils { /** Returned by column metadata in Oracle if no scale is set. */ private static final int ORACLE_UNSET_SCALE = -127; + private static final OraclePooledDataSourceFactory FACTORY = + new OraclePooledDataSourceFactory(); + /** show current scn sql in oracle. */ private static final String SHOW_CURRENT_SCN = "SELECT CURRENT_SCN FROM V$DATABASE"; - /** Creates a new {@link OracleConnection}, but not open the connection. */ - public static OracleConnection createOracleConnection(Configuration dbzConfiguration) { - Configuration configuration = dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true); - return new OracleConnection( - configuration.isEmpty() ? dbzConfiguration : configuration, - OracleConnectionUtils.class::getClassLoader); + public static OracleConnection createOracleConnection(JdbcSourceConfig sourceConfig) { + Configuration dbzConfig = sourceConfig.getDbzConnectorConfig().getJdbcConfig(); + JdbcConnectionFactory jdbcConnectionFactory = + new JdbcConnectionFactory(sourceConfig, FACTORY); + return new OracleConnection(dbzConfig, jdbcConnectionFactory); } /** Fetch current redoLog offsets in Oracle Server. */ diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java index 58fee6746cc..0f23fe4561f 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java @@ -18,9 +18,10 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.FlinkRuntimeException; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; @@ -261,43 +262,24 @@ public static RowType getSplitType(Table table) { /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ public static OracleDatabaseSchema createOracleDatabaseSchema( - OracleConnectorConfig dbzOracleConfig) { + JdbcSourceConfig sourceConfig, OracleConnectorConfig dbzOracleConfig) { TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - OracleConnection oracleConnection = - OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig()); - // OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig); - OracleValueConverters oracleValueConverters = - new OracleValueConverters(dbzOracleConfig, oracleConnection); - StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = - dbzOracleConfig.getAdapter().getTableNameCaseSensitivity(oracleConnection); - return new OracleDatabaseSchema( - dbzOracleConfig, - oracleValueConverters, - schemaNameAdjuster, - topicSelector, - tableNameCaseSensitivity); - } - - /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ - public static OracleDatabaseSchema createOracleDatabaseSchema( - OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) { - TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig); - SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - OracleConnection oracleConnection = - OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig); - OracleValueConverters oracleValueConverters = - new OracleValueConverters(dbzOracleConfig, oracleConnection); - StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = - tableIdCaseInsensitive - ? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE - : StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE; - return new OracleDatabaseSchema( - dbzOracleConfig, - oracleValueConverters, - schemaNameAdjuster, - topicSelector, - tableNameCaseSensitivity); + try (OracleConnection oracleConnection = + OracleConnectionUtils.createOracleConnection(sourceConfig)) { + OracleValueConverters oracleValueConverters = + new OracleValueConverters(dbzOracleConfig, oracleConnection); + StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = + dbzOracleConfig.getAdapter().getTableNameCaseSensitivity(oracleConnection); + return new OracleDatabaseSchema( + dbzOracleConfig, + oracleValueConverters, + schemaNameAdjuster, + topicSelector, + tableNameCaseSensitivity); + } catch (SQLException e) { + throw new FlinkRuntimeException(e); + } } public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) { diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java new file mode 100644 index 00000000000..42039620627 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -0,0 +1,473 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.oracle; + +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.TableEditor; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.relational.Tables.ColumnNameFilter; +import io.debezium.relational.Tables.TableFilter; +import io.debezium.util.Strings; +import oracle.jdbc.OracleTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Clob; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** copy from debezium to fix connection pool problem. */ +public class OracleConnection extends JdbcConnection { + + private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnection.class); + + /** Returned by column metadata in Oracle if no scale is set. */ + private static final int ORACLE_UNSET_SCALE = -127; + + /** Pattern to identify system generated indices and column names. */ + private static final Pattern SYS_NC_PATTERN = + Pattern.compile("^SYS_NC(?:_OID|_ROWINFO|[0-9][0-9][0-9][0-9][0-9])\\$$"); + + /** A field for the raw jdbc url. This field has no default value. */ + private static final Field URL = Field.create("url", "Raw JDBC url"); + + /** The database version. */ + private final OracleDatabaseVersion databaseVersion; + + public OracleConnection(Configuration config, Supplier classLoaderSupplier) { + super(config, resolveConnectionFactory(config), classLoaderSupplier); + + this.databaseVersion = resolveOracleDatabaseVersion(); + LOGGER.info("Database Version: {}", databaseVersion.getBanner()); + } + + public OracleConnection(Configuration config, ConnectionFactory factory) { + super(config, factory); + this.databaseVersion = resolveOracleDatabaseVersion(); + LOGGER.info("Database Version: {}", databaseVersion.getBanner()); + } + + public void setSessionToPdb(String pdbName) { + Statement statement = null; + + try { + statement = connection().createStatement(); + statement.execute("alter session set container=" + pdbName); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOGGER.error("Couldn't close statement", e); + } + } + } + } + + public void resetSessionToCdb() { + Statement statement = null; + + try { + statement = connection().createStatement(); + statement.execute("alter session set container=cdb$root"); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOGGER.error("Couldn't close statement", e); + } + } + } + } + + public OracleDatabaseVersion getOracleVersion() { + return databaseVersion; + } + + private OracleDatabaseVersion resolveOracleDatabaseVersion() { + String versionStr; + try { + try { + // Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER + // This column uses a different format than the legacy BANNER. + versionStr = + queryAndMap( + "SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'", + (rs) -> { + if (rs.next()) { + return rs.getString(1); + } + return null; + }); + } catch (SQLException e) { + // exception ignored + if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\"")) { + LOGGER.debug( + "BANNER_FULL column not in V$VERSION, using BANNER column as fallback"); + versionStr = null; + } else { + throw e; + } + } + + // For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not + // being a column and + // this will cause versionStr to remain null, use fallback column BANNER for versions + // prior to 18.1. + if (versionStr == null) { + versionStr = + queryAndMap( + "SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'", + (rs) -> { + if (rs.next()) { + return rs.getString(1); + } + return null; + }); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to resolve Oracle database version", e); + } + + if (versionStr == null) { + throw new RuntimeException("Failed to resolve Oracle database version"); + } + + return OracleDatabaseVersion.parse(versionStr); + } + + @Override + public Set readTableNames( + String databaseCatalog, + String schemaNamePattern, + String tableNamePattern, + String[] tableTypes) + throws SQLException { + + Set tableIds = + super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes); + + return tableIds.stream() + .map(t -> new TableId(databaseCatalog, t.schema(), t.table())) + .collect(Collectors.toSet()); + } + + /** + * Retrieves all {@code TableId} in a given database catalog, filtering certain ids that should + * be omitted from the returned set such as special spatial tables and index-organized tables. + * + * @param catalogName the catalog/database name + * @return set of all table ids for existing table objects + * @throws SQLException if a database exception occurred + */ + protected Set getAllTableIds(String catalogName) throws SQLException { + final String query = + "select owner, table_name from all_tables " + + + // filter special spatial tables + "where table_name NOT LIKE 'MDRT_%' " + + "and table_name NOT LIKE 'MDRS_%' " + + "and table_name NOT LIKE 'MDXT_%' " + + + // filter index-organized-tables + "and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) "; + + Set tableIds = new HashSet<>(); + query( + query, + (rs) -> { + while (rs.next()) { + tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2))); + } + LOGGER.trace("TableIds are: {}", tableIds); + }); + + return tableIds; + } + + // todo replace metadata with something like this + private ResultSet getTableColumnsInfo(String schemaNamePattern, String tableName) + throws SQLException { + String columnQuery = + "select column_name, data_type, data_length, data_precision, data_scale, default_length, density, char_length from " + + "all_tab_columns where owner like '" + + schemaNamePattern + + "' and table_name='" + + tableName + + "'"; + + PreparedStatement statement = connection().prepareStatement(columnQuery); + return statement.executeQuery(); + } + + // this is much faster, we will use it until full replacement of the metadata usage TODO + public void readSchemaForCapturedTables( + Tables tables, + String databaseCatalog, + String schemaNamePattern, + ColumnNameFilter columnFilter, + boolean removeTablesNotFoundInJdbc, + Set capturedTables) + throws SQLException { + + Set tableIdsBefore = new HashSet<>(tables.tableIds()); + + DatabaseMetaData metadata = connection().getMetaData(); + Map> columnsByTable = new HashMap<>(); + + for (TableId tableId : capturedTables) { + try (ResultSet columnMetadata = + metadata.getColumns( + databaseCatalog, schemaNamePattern, tableId.table(), null)) { + while (columnMetadata.next()) { + // add all whitelisted columns + readTableColumn(columnMetadata, tableId, columnFilter) + .ifPresent( + column -> { + columnsByTable + .computeIfAbsent(tableId, t -> new ArrayList<>()) + .add(column.create()); + }); + } + } + } + + // Read the metadata for the primary keys ... + for (Map.Entry> tableEntry : columnsByTable.entrySet()) { + // First get the primary key information, which must be done for *each* table ... + List pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey()); + + // Then define the table ... + List columns = tableEntry.getValue(); + Collections.sort(columns); + tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null); + } + + if (removeTablesNotFoundInJdbc) { + // Remove any definitions for tables that were not found in the database metadata ... + tableIdsBefore.removeAll(columnsByTable.keySet()); + tableIdsBefore.forEach(tables::removeTable); + } + + for (TableId tableId : capturedTables) { + overrideOracleSpecificColumnTypes(tables, tableId, tableId); + } + } + + @Override + public void readSchema( + Tables tables, + String databaseCatalog, + String schemaNamePattern, + TableFilter tableFilter, + ColumnNameFilter columnFilter, + boolean removeTablesNotFoundInJdbc) + throws SQLException { + + super.readSchema( + tables, + null, + schemaNamePattern, + tableFilter, + columnFilter, + removeTablesNotFoundInJdbc); + + Set tableIds = + tables.tableIds().stream() + .filter(x -> schemaNamePattern.equals(x.schema())) + .collect(Collectors.toSet()); + + for (TableId tableId : tableIds) { + // super.readSchema() populates ids without the catalog; hence we apply the filtering + // only + // here and if a table is included, overwrite it with a new id including the catalog + TableId tableIdWithCatalog = + new TableId(databaseCatalog, tableId.schema(), tableId.table()); + + if (tableFilter.isIncluded(tableIdWithCatalog)) { + overrideOracleSpecificColumnTypes(tables, tableId, tableIdWithCatalog); + } + + tables.removeTable(tableId); + } + } + + @Override + public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id) + throws SQLException { + return super.readTableUniqueIndices(metadata, id.toDoubleQuoted()); + } + + @Override + protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) { + if (columnName != null) { + return !SYS_NC_PATTERN.matcher(columnName).matches(); + } + return false; + } + + private void overrideOracleSpecificColumnTypes( + Tables tables, TableId tableId, TableId tableIdWithCatalog) { + TableEditor editor = tables.editTable(tableId); + editor.tableId(tableIdWithCatalog); + + List columnNames = new ArrayList<>(editor.columnNames()); + for (String columnName : columnNames) { + Column column = editor.columnWithName(columnName); + if (column.jdbcType() == Types.TIMESTAMP) { + editor.addColumn( + column.edit() + .length(column.scale().orElse(Column.UNSET_INT_VALUE)) + .scale(null) + .create()); + } + // NUMBER columns without scale value have it set to -127 instead of null; + // let's rectify that + else if (column.jdbcType() == OracleTypes.NUMBER) { + column.scale() + .filter(s -> s == ORACLE_UNSET_SCALE) + .ifPresent( + s -> { + editor.addColumn(column.edit().scale(null).create()); + }); + } + } + tables.overwriteTable(editor.create()); + } + + /** + * Get the current, most recent system change number. + * + * @return the current system change number + * @throws SQLException if an exception occurred + * @throws IllegalStateException if the query does not return at least one row + */ + public Scn getCurrentScn() throws SQLException { + return queryAndMap( + "SELECT CURRENT_SCN FROM V$DATABASE", + (rs) -> { + if (rs.next()) { + return Scn.valueOf(rs.getString(1)); + } + throw new IllegalStateException("Could not get SCN"); + }); + } + + /** + * Get the maximum system change number in the archive logs. + * + * @param archiveLogDestinationName the archive log destination name to be queried, can be + * {@code null}. + * @return the maximum system change number in the archive logs + * @throws SQLException if a database exception occurred + * @throws DebeziumException if the maximum archive log system change number could not be found + */ + public Scn getMaxArchiveLogScn(String archiveLogDestinationName) throws SQLException { + String query = + "SELECT MAX(NEXT_CHANGE#) FROM V$ARCHIVED_LOG " + + "WHERE NAME IS NOT NULL " + + "AND ARCHIVED = 'YES' " + + "AND STATUS = 'A' " + + "AND DEST_ID IN (" + + "SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS " + + "WHERE STATUS = 'VALID' " + + "AND TYPE = 'LOCAL' "; + + if (Strings.isNullOrEmpty(archiveLogDestinationName)) { + query += "AND ROWNUM = 1"; + } else { + query += "AND UPPER(DEST_NAME) = '" + archiveLogDestinationName + "'"; + } + + query += ")"; + + return queryAndMap( + query, + (rs) -> { + if (rs.next()) { + return Scn.valueOf(rs.getString(1)).subtract(Scn.valueOf(1)); + } + throw new DebeziumException("Could not obtain maximum archive log scn."); + }); + } + + /** + * Generate a given table's DDL metadata. + * + * @param tableId table identifier, should never be {@code null} + * @return generated DDL + * @throws SQLException if an exception occurred obtaining the DDL metadata + */ + public String getTableMetadataDdl(TableId tableId) throws SQLException { + try { + // The storage and segment attributes aren't necessary + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;"); + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;"); + // In case DDL is returned as multiple DDL statements, this allows the parser to parse + // each separately. + // This is only critical during streaming as during snapshot the table structure is + // built from JDBC driver queries. + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;"); + return queryAndMap( + "SELECT dbms_metadata.get_ddl('TABLE','" + + tableId.table() + + "','" + + tableId.schema() + + "') FROM DUAL", + rs -> { + if (!rs.next()) { + throw new DebeziumException( + "Could not get DDL metadata for table: " + tableId); + } + + Object res = rs.getObject(1); + return ((Clob) res).getSubString(1, (int) ((Clob) res).length()); + }); + } finally { + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'DEFAULT'); end;"); + } + } + + public static String connectionString(Configuration config) { + return config.getString(URL) != null + ? config.getString(URL) + : ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl(); + } + + private static ConnectionFactory resolveConnectionFactory(Configuration config) { + return JdbcConnection.patternBasedFactory(connectionString(config)); + } +} From c18d478424d3e64930ecc33c8f2667b3562fc7c4 Mon Sep 17 00:00:00 2001 From: Enoch Date: Tue, 14 Feb 2023 10:08:40 +0800 Subject: [PATCH 6/8] [oracle][fix] 1. Copy the OracleConnection code in debezium and modify it to use hikari thread pool. 2. Close the connection to prevent the session from exceeding the upper limit --- .../oracle/source/OracleDialect.java | 16 +- .../oracle/source/OracleSourceBuilder.java | 2 +- .../reader/fetch/OracleScanFetchTask.java | 2 +- .../fetch/OracleSourceFetchTaskContext.java | 2 +- .../source/utils/OracleConnectionUtils.java | 19 +- .../oracle/source/utils/OracleUtils.java | 54 +- .../connector/oracle/OracleConnection.java | 473 ++++++++++++++++++ 7 files changed, 508 insertions(+), 60 deletions(-) create mode 100644 flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java index ff41ea2f459..78b67d5ed66 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java @@ -28,7 +28,6 @@ import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter; import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig; -import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory; import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask; import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext; import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask; @@ -52,15 +51,8 @@ public class OracleDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; - private final OracleSourceConfigFactory configFactory; - private final OracleSourceConfig sourceConfig; private transient OracleSchema oracleSchema; - public OracleDialect(OracleSourceConfigFactory configFactory) { - this.configFactory = configFactory; - this.sourceConfig = configFactory.create(0); - } - @Override public String getName() { return "Oracle"; @@ -87,8 +79,7 @@ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) { @Override public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) { - return OracleConnectionUtils.createOracleConnection( - sourceConfig.getDbzConnectorConfig().getJdbcConfig()); + return OracleConnectionUtils.createOracleConnection(sourceConfig); } @Override @@ -116,7 +107,7 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) { public Map discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig) { final List capturedTableIds = discoverDataCollections(sourceConfig); - try (OracleConnection jdbc = createOracleConnection(sourceConfig.getDbzConfiguration())) { + try (OracleConnection jdbc = createOracleConnection(sourceConfig)) { // fetch table schemas Map tableSchemas = new HashMap<>(); for (TableId tableId : capturedTableIds) { @@ -141,8 +132,7 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) { @Override public OracleSourceFetchTaskContext createFetchTaskContext( SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) { - final OracleConnection jdbcConnection = - createOracleConnection(taskSourceConfig.getDbzConfiguration()); + final OracleConnection jdbcConnection = createOracleConnection(taskSourceConfig); return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection); } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java index 4d7b38156fd..ac49aab615c 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java @@ -219,7 +219,7 @@ public OracleSourceBuilder deserializer(DebeziumDeserializationSchema dese */ public OracleIncrementalSource build() { this.offsetFactory = new RedoLogOffsetFactory(); - this.dialect = new OracleDialect(configFactory); + this.dialect = new OracleDialect(); return new OracleIncrementalSource( configFactory, checkNotNull(deserializer), offsetFactory, dialect); } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 9547f45eb23..2ff9d9f82d5 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -172,7 +172,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask( // task to read binlog and backfill for current split return new RedoLogSplitReadTask( new OracleConnectorConfig(dezConf), - createOracleConnection(context.getSourceConfig().getDbzConfiguration()), + createOracleConnection(context.getSourceConfig()), context.getDispatcher(), context.getErrorHandler(), context.getDatabaseSchema(), diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java index 0051f3c2121..fc38475f8a9 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java @@ -97,7 +97,7 @@ public void configure(SourceSplitBase sourceSplitBase) { .getDbzConfiguration() .getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME), sourceSplitBase.getTableSchemas().values()); - this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig); + this.databaseSchema = OracleUtils.createOracleDatabaseSchema(sourceConfig, connectorConfig); // todo logMiner or xStream this.offsetContext = loadStartingOffsetState( diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java index 7e926e9922c..d520ababfff 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java @@ -18,6 +18,9 @@ import org.apache.flink.util.FlinkRuntimeException; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; +import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory; +import com.ververica.cdc.connectors.oracle.source.OraclePooledDataSourceFactory; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; @@ -34,8 +37,6 @@ import java.util.List; import java.util.Set; -import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX; - /** Oracle connection Utilities. */ public class OracleConnectionUtils { @@ -44,15 +45,17 @@ public class OracleConnectionUtils { /** Returned by column metadata in Oracle if no scale is set. */ private static final int ORACLE_UNSET_SCALE = -127; + private static final OraclePooledDataSourceFactory FACTORY = + new OraclePooledDataSourceFactory(); + /** show current scn sql in oracle. */ private static final String SHOW_CURRENT_SCN = "SELECT CURRENT_SCN FROM V$DATABASE"; - /** Creates a new {@link OracleConnection}, but not open the connection. */ - public static OracleConnection createOracleConnection(Configuration dbzConfiguration) { - Configuration configuration = dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true); - return new OracleConnection( - configuration.isEmpty() ? dbzConfiguration : configuration, - OracleConnectionUtils.class::getClassLoader); + public static OracleConnection createOracleConnection(JdbcSourceConfig sourceConfig) { + Configuration dbzConfig = sourceConfig.getDbzConnectorConfig().getJdbcConfig(); + JdbcConnectionFactory jdbcConnectionFactory = + new JdbcConnectionFactory(sourceConfig, FACTORY); + return new OracleConnection(dbzConfig, jdbcConnectionFactory); } /** Fetch current redoLog offsets in Oracle Server. */ diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java index 58fee6746cc..0f23fe4561f 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java @@ -18,9 +18,10 @@ import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.util.FlinkRuntimeException; +import com.ververica.cdc.connectors.base.config.JdbcSourceConfig; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; -import io.debezium.config.Configuration; import io.debezium.connector.oracle.OracleConnection; import io.debezium.connector.oracle.OracleConnectorConfig; import io.debezium.connector.oracle.OracleDatabaseSchema; @@ -261,43 +262,24 @@ public static RowType getSplitType(Table table) { /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ public static OracleDatabaseSchema createOracleDatabaseSchema( - OracleConnectorConfig dbzOracleConfig) { + JdbcSourceConfig sourceConfig, OracleConnectorConfig dbzOracleConfig) { TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig); SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - OracleConnection oracleConnection = - OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig()); - // OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig); - OracleValueConverters oracleValueConverters = - new OracleValueConverters(dbzOracleConfig, oracleConnection); - StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = - dbzOracleConfig.getAdapter().getTableNameCaseSensitivity(oracleConnection); - return new OracleDatabaseSchema( - dbzOracleConfig, - oracleValueConverters, - schemaNameAdjuster, - topicSelector, - tableNameCaseSensitivity); - } - - /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */ - public static OracleDatabaseSchema createOracleDatabaseSchema( - OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) { - TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig); - SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(); - OracleConnection oracleConnection = - OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig); - OracleValueConverters oracleValueConverters = - new OracleValueConverters(dbzOracleConfig, oracleConnection); - StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = - tableIdCaseInsensitive - ? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE - : StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE; - return new OracleDatabaseSchema( - dbzOracleConfig, - oracleValueConverters, - schemaNameAdjuster, - topicSelector, - tableNameCaseSensitivity); + try (OracleConnection oracleConnection = + OracleConnectionUtils.createOracleConnection(sourceConfig)) { + OracleValueConverters oracleValueConverters = + new OracleValueConverters(dbzOracleConfig, oracleConnection); + StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity = + dbzOracleConfig.getAdapter().getTableNameCaseSensitivity(oracleConnection); + return new OracleDatabaseSchema( + dbzOracleConfig, + oracleValueConverters, + schemaNameAdjuster, + topicSelector, + tableNameCaseSensitivity); + } catch (SQLException e) { + throw new FlinkRuntimeException(e); + } } public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) { diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java new file mode 100644 index 00000000000..42039620627 --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -0,0 +1,473 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.oracle; + +import io.debezium.DebeziumException; +import io.debezium.config.Configuration; +import io.debezium.config.Field; +import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter; +import io.debezium.jdbc.JdbcConnection; +import io.debezium.relational.Column; +import io.debezium.relational.TableEditor; +import io.debezium.relational.TableId; +import io.debezium.relational.Tables; +import io.debezium.relational.Tables.ColumnNameFilter; +import io.debezium.relational.Tables.TableFilter; +import io.debezium.util.Strings; +import oracle.jdbc.OracleTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Clob; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.function.Supplier; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** copy from debezium to fix connection pool problem. */ +public class OracleConnection extends JdbcConnection { + + private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnection.class); + + /** Returned by column metadata in Oracle if no scale is set. */ + private static final int ORACLE_UNSET_SCALE = -127; + + /** Pattern to identify system generated indices and column names. */ + private static final Pattern SYS_NC_PATTERN = + Pattern.compile("^SYS_NC(?:_OID|_ROWINFO|[0-9][0-9][0-9][0-9][0-9])\\$$"); + + /** A field for the raw jdbc url. This field has no default value. */ + private static final Field URL = Field.create("url", "Raw JDBC url"); + + /** The database version. */ + private final OracleDatabaseVersion databaseVersion; + + public OracleConnection(Configuration config, Supplier classLoaderSupplier) { + super(config, resolveConnectionFactory(config), classLoaderSupplier); + + this.databaseVersion = resolveOracleDatabaseVersion(); + LOGGER.info("Database Version: {}", databaseVersion.getBanner()); + } + + public OracleConnection(Configuration config, ConnectionFactory factory) { + super(config, factory); + this.databaseVersion = resolveOracleDatabaseVersion(); + LOGGER.info("Database Version: {}", databaseVersion.getBanner()); + } + + public void setSessionToPdb(String pdbName) { + Statement statement = null; + + try { + statement = connection().createStatement(); + statement.execute("alter session set container=" + pdbName); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOGGER.error("Couldn't close statement", e); + } + } + } + } + + public void resetSessionToCdb() { + Statement statement = null; + + try { + statement = connection().createStatement(); + statement.execute("alter session set container=cdb$root"); + } catch (SQLException e) { + throw new RuntimeException(e); + } finally { + if (statement != null) { + try { + statement.close(); + } catch (SQLException e) { + LOGGER.error("Couldn't close statement", e); + } + } + } + } + + public OracleDatabaseVersion getOracleVersion() { + return databaseVersion; + } + + private OracleDatabaseVersion resolveOracleDatabaseVersion() { + String versionStr; + try { + try { + // Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER + // This column uses a different format than the legacy BANNER. + versionStr = + queryAndMap( + "SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'", + (rs) -> { + if (rs.next()) { + return rs.getString(1); + } + return null; + }); + } catch (SQLException e) { + // exception ignored + if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\"")) { + LOGGER.debug( + "BANNER_FULL column not in V$VERSION, using BANNER column as fallback"); + versionStr = null; + } else { + throw e; + } + } + + // For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not + // being a column and + // this will cause versionStr to remain null, use fallback column BANNER for versions + // prior to 18.1. + if (versionStr == null) { + versionStr = + queryAndMap( + "SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'", + (rs) -> { + if (rs.next()) { + return rs.getString(1); + } + return null; + }); + } + } catch (SQLException e) { + throw new RuntimeException("Failed to resolve Oracle database version", e); + } + + if (versionStr == null) { + throw new RuntimeException("Failed to resolve Oracle database version"); + } + + return OracleDatabaseVersion.parse(versionStr); + } + + @Override + public Set readTableNames( + String databaseCatalog, + String schemaNamePattern, + String tableNamePattern, + String[] tableTypes) + throws SQLException { + + Set tableIds = + super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes); + + return tableIds.stream() + .map(t -> new TableId(databaseCatalog, t.schema(), t.table())) + .collect(Collectors.toSet()); + } + + /** + * Retrieves all {@code TableId} in a given database catalog, filtering certain ids that should + * be omitted from the returned set such as special spatial tables and index-organized tables. + * + * @param catalogName the catalog/database name + * @return set of all table ids for existing table objects + * @throws SQLException if a database exception occurred + */ + protected Set getAllTableIds(String catalogName) throws SQLException { + final String query = + "select owner, table_name from all_tables " + + + // filter special spatial tables + "where table_name NOT LIKE 'MDRT_%' " + + "and table_name NOT LIKE 'MDRS_%' " + + "and table_name NOT LIKE 'MDXT_%' " + + + // filter index-organized-tables + "and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) "; + + Set tableIds = new HashSet<>(); + query( + query, + (rs) -> { + while (rs.next()) { + tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2))); + } + LOGGER.trace("TableIds are: {}", tableIds); + }); + + return tableIds; + } + + // todo replace metadata with something like this + private ResultSet getTableColumnsInfo(String schemaNamePattern, String tableName) + throws SQLException { + String columnQuery = + "select column_name, data_type, data_length, data_precision, data_scale, default_length, density, char_length from " + + "all_tab_columns where owner like '" + + schemaNamePattern + + "' and table_name='" + + tableName + + "'"; + + PreparedStatement statement = connection().prepareStatement(columnQuery); + return statement.executeQuery(); + } + + // this is much faster, we will use it until full replacement of the metadata usage TODO + public void readSchemaForCapturedTables( + Tables tables, + String databaseCatalog, + String schemaNamePattern, + ColumnNameFilter columnFilter, + boolean removeTablesNotFoundInJdbc, + Set capturedTables) + throws SQLException { + + Set tableIdsBefore = new HashSet<>(tables.tableIds()); + + DatabaseMetaData metadata = connection().getMetaData(); + Map> columnsByTable = new HashMap<>(); + + for (TableId tableId : capturedTables) { + try (ResultSet columnMetadata = + metadata.getColumns( + databaseCatalog, schemaNamePattern, tableId.table(), null)) { + while (columnMetadata.next()) { + // add all whitelisted columns + readTableColumn(columnMetadata, tableId, columnFilter) + .ifPresent( + column -> { + columnsByTable + .computeIfAbsent(tableId, t -> new ArrayList<>()) + .add(column.create()); + }); + } + } + } + + // Read the metadata for the primary keys ... + for (Map.Entry> tableEntry : columnsByTable.entrySet()) { + // First get the primary key information, which must be done for *each* table ... + List pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey()); + + // Then define the table ... + List columns = tableEntry.getValue(); + Collections.sort(columns); + tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null); + } + + if (removeTablesNotFoundInJdbc) { + // Remove any definitions for tables that were not found in the database metadata ... + tableIdsBefore.removeAll(columnsByTable.keySet()); + tableIdsBefore.forEach(tables::removeTable); + } + + for (TableId tableId : capturedTables) { + overrideOracleSpecificColumnTypes(tables, tableId, tableId); + } + } + + @Override + public void readSchema( + Tables tables, + String databaseCatalog, + String schemaNamePattern, + TableFilter tableFilter, + ColumnNameFilter columnFilter, + boolean removeTablesNotFoundInJdbc) + throws SQLException { + + super.readSchema( + tables, + null, + schemaNamePattern, + tableFilter, + columnFilter, + removeTablesNotFoundInJdbc); + + Set tableIds = + tables.tableIds().stream() + .filter(x -> schemaNamePattern.equals(x.schema())) + .collect(Collectors.toSet()); + + for (TableId tableId : tableIds) { + // super.readSchema() populates ids without the catalog; hence we apply the filtering + // only + // here and if a table is included, overwrite it with a new id including the catalog + TableId tableIdWithCatalog = + new TableId(databaseCatalog, tableId.schema(), tableId.table()); + + if (tableFilter.isIncluded(tableIdWithCatalog)) { + overrideOracleSpecificColumnTypes(tables, tableId, tableIdWithCatalog); + } + + tables.removeTable(tableId); + } + } + + @Override + public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id) + throws SQLException { + return super.readTableUniqueIndices(metadata, id.toDoubleQuoted()); + } + + @Override + protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) { + if (columnName != null) { + return !SYS_NC_PATTERN.matcher(columnName).matches(); + } + return false; + } + + private void overrideOracleSpecificColumnTypes( + Tables tables, TableId tableId, TableId tableIdWithCatalog) { + TableEditor editor = tables.editTable(tableId); + editor.tableId(tableIdWithCatalog); + + List columnNames = new ArrayList<>(editor.columnNames()); + for (String columnName : columnNames) { + Column column = editor.columnWithName(columnName); + if (column.jdbcType() == Types.TIMESTAMP) { + editor.addColumn( + column.edit() + .length(column.scale().orElse(Column.UNSET_INT_VALUE)) + .scale(null) + .create()); + } + // NUMBER columns without scale value have it set to -127 instead of null; + // let's rectify that + else if (column.jdbcType() == OracleTypes.NUMBER) { + column.scale() + .filter(s -> s == ORACLE_UNSET_SCALE) + .ifPresent( + s -> { + editor.addColumn(column.edit().scale(null).create()); + }); + } + } + tables.overwriteTable(editor.create()); + } + + /** + * Get the current, most recent system change number. + * + * @return the current system change number + * @throws SQLException if an exception occurred + * @throws IllegalStateException if the query does not return at least one row + */ + public Scn getCurrentScn() throws SQLException { + return queryAndMap( + "SELECT CURRENT_SCN FROM V$DATABASE", + (rs) -> { + if (rs.next()) { + return Scn.valueOf(rs.getString(1)); + } + throw new IllegalStateException("Could not get SCN"); + }); + } + + /** + * Get the maximum system change number in the archive logs. + * + * @param archiveLogDestinationName the archive log destination name to be queried, can be + * {@code null}. + * @return the maximum system change number in the archive logs + * @throws SQLException if a database exception occurred + * @throws DebeziumException if the maximum archive log system change number could not be found + */ + public Scn getMaxArchiveLogScn(String archiveLogDestinationName) throws SQLException { + String query = + "SELECT MAX(NEXT_CHANGE#) FROM V$ARCHIVED_LOG " + + "WHERE NAME IS NOT NULL " + + "AND ARCHIVED = 'YES' " + + "AND STATUS = 'A' " + + "AND DEST_ID IN (" + + "SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS " + + "WHERE STATUS = 'VALID' " + + "AND TYPE = 'LOCAL' "; + + if (Strings.isNullOrEmpty(archiveLogDestinationName)) { + query += "AND ROWNUM = 1"; + } else { + query += "AND UPPER(DEST_NAME) = '" + archiveLogDestinationName + "'"; + } + + query += ")"; + + return queryAndMap( + query, + (rs) -> { + if (rs.next()) { + return Scn.valueOf(rs.getString(1)).subtract(Scn.valueOf(1)); + } + throw new DebeziumException("Could not obtain maximum archive log scn."); + }); + } + + /** + * Generate a given table's DDL metadata. + * + * @param tableId table identifier, should never be {@code null} + * @return generated DDL + * @throws SQLException if an exception occurred obtaining the DDL metadata + */ + public String getTableMetadataDdl(TableId tableId) throws SQLException { + try { + // The storage and segment attributes aren't necessary + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;"); + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;"); + // In case DDL is returned as multiple DDL statements, this allows the parser to parse + // each separately. + // This is only critical during streaming as during snapshot the table structure is + // built from JDBC driver queries. + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;"); + return queryAndMap( + "SELECT dbms_metadata.get_ddl('TABLE','" + + tableId.table() + + "','" + + tableId.schema() + + "') FROM DUAL", + rs -> { + if (!rs.next()) { + throw new DebeziumException( + "Could not get DDL metadata for table: " + tableId); + } + + Object res = rs.getObject(1); + return ((Clob) res).getSubString(1, (int) ((Clob) res).length()); + }); + } finally { + executeWithoutCommitting( + "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'DEFAULT'); end;"); + } + } + + public static String connectionString(Configuration config) { + return config.getString(URL) != null + ? config.getString(URL) + : ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl(); + } + + private static ConnectionFactory resolveConnectionFactory(Configuration config) { + return JdbcConnection.patternBasedFactory(connectionString(config)); + } +} From 3652cb5ef9e79eaf19655f3e3c5857cda38496c1 Mon Sep 17 00:00:00 2001 From: Enoch Date: Tue, 14 Mar 2023 17:44:33 +0800 Subject: [PATCH 7/8] [base][feat] Expose metrics to context [oracle][feat] add currentScn metrics to flink metricGroup --- .../base/dialect/DataSourceDialect.java | 3 +++ .../base/source/IncrementalSource.java | 5 ++++- .../source/metrics/SourceReaderMetrics.java | 4 ++++ .../reader/IncrementalSourceSplitReader.java | 16 ++++++++++++++ .../base/experimental/MySqlDialect.java | 6 +++++ .../source/dialect/MongoDBDialect.java | 6 +++++ .../oracle/source/OracleDialect.java | 14 ++++++++++-- .../reader/fetch/OracleScanFetchTask.java | 11 ++++++++++ .../reader/fetch/OracleStreamFetchTask.java | 22 +++++++++++++++++++ 9 files changed, 84 insertions(+), 3 deletions(-) diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java index ab6ba1d89f1..ebb482a66b2 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java @@ -22,6 +22,7 @@ import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; @@ -68,4 +69,6 @@ public interface DataSourceDialect extends Serializable /** The task context used for fetch task to fetch data from external systems. */ FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, C sourceConfig); + + void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics); } diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java index d7b37517fcd..7782ee155fc 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java @@ -116,7 +116,10 @@ public IncrementalSourceReader createReader(SourceReaderContext readerCont Supplier> splitReaderSupplier = () -> new IncrementalSourceSplitReader<>( - readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig); + sourceReaderMetrics, + readerContext.getIndexOfSubtask(), + dataSourceDialect, + sourceConfig); return new IncrementalSourceReader<>( elementsQueue, splitReaderSupplier, diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java index 72dee083038..e0fad7d6d04 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java @@ -49,6 +49,10 @@ public SourceReaderMetrics(MetricGroup metricGroup) { this.metricGroup = metricGroup; } + public MetricGroup getMetricGroup() { + return metricGroup; + } + public void registerMetrics() { metricGroup.gauge( SourceReaderMetricConstants.CURRENT_FETCH_EVENT_TIME_LAG, diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java index 3aff776865a..1004cb5bac0 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java @@ -28,6 +28,7 @@ import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.base.source.reader.external.Fetcher; import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher; @@ -56,6 +57,19 @@ public class IncrementalSourceSplitReader @Nullable private String currentSplitId; private final DataSourceDialect dataSourceDialect; private final C sourceConfig; + private SourceReaderMetrics sourceReaderMetrics; + + public IncrementalSourceSplitReader( + SourceReaderMetrics sourceReaderMetrics, + int subtaskId, + DataSourceDialect dataSourceDialect, + C sourceConfig) { + this.sourceReaderMetrics = sourceReaderMetrics; + this.subtaskId = subtaskId; + this.splits = new ArrayDeque<>(); + this.dataSourceDialect = dataSourceDialect; + this.sourceConfig = sourceConfig; + } public IncrementalSourceSplitReader( int subtaskId, DataSourceDialect dataSourceDialect, C sourceConfig) { @@ -122,6 +136,7 @@ protected void checkSplitOrStartNext() throws IOException { if (currentFetcher == null) { final FetchTask.Context taskContext = dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig); + dataSourceDialect.setSourceReaderMetrics(sourceReaderMetrics); currentFetcher = new IncrementalSourceScanFetcher(taskContext, subtaskId); } } else { @@ -132,6 +147,7 @@ protected void checkSplitOrStartNext() throws IOException { } final FetchTask.Context taskContext = dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig); + dataSourceDialect.setSourceReaderMetrics(sourceReaderMetrics); currentFetcher = new IncrementalSourceStreamFetcher(taskContext, subtaskId); LOG.info("Stream fetcher is created."); } diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java index ff4d4bd516a..aff46bf616d 100644 --- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java +++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java @@ -33,6 +33,7 @@ import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.jdbc.JdbcConnection; @@ -154,4 +155,9 @@ public FetchTask createFetchTask(SourceSplitBase sourceSplitBas return new MySqlStreamFetchTask(sourceSplitBase.asStreamSplit()); } } + + @Override + public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) { + // TODO + } } diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java index 34c7c03d2aa..20a9e5e1c25 100644 --- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java +++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java @@ -22,6 +22,7 @@ import com.ververica.cdc.connectors.base.dialect.DataSourceDialect; import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.MongoDBChunkSplitter; import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig; @@ -168,4 +169,9 @@ public MongoDBFetchTaskContext createFetchTaskContext( discoveryInfo.getDiscoveredCollections()); return new MongoDBFetchTaskContext(this, sourceConfig, changeStreamDescriptor); } + + @Override + public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) { + // TODO + } } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java index 78b67d5ed66..dc520dbd4c4 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java @@ -25,6 +25,7 @@ import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter; import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter; import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig; @@ -53,6 +54,8 @@ public class OracleDialect implements JdbcDataSourceDialect { private static final long serialVersionUID = 1L; private transient OracleSchema oracleSchema; + private SourceReaderMetrics sourceReaderMetrics; + @Override public String getName() { return "Oracle"; @@ -136,12 +139,19 @@ public OracleSourceFetchTaskContext createFetchTaskContext( return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection); } + @Override + public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) { + this.sourceReaderMetrics = sourceReaderMetrics; + } + @Override public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) { if (sourceSplitBase.isSnapshotSplit()) { - return new OracleScanFetchTask(sourceSplitBase.asSnapshotSplit()); + return new OracleScanFetchTask( + sourceSplitBase.asSnapshotSplit(), this.sourceReaderMetrics); } else { - return new OracleStreamFetchTask(sourceSplitBase.asStreamSplit()); + return new OracleStreamFetchTask( + sourceSplitBase.asStreamSplit(), this.sourceReaderMetrics); } } } diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java index 2ff9d9f82d5..9a7df8c4a3f 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java @@ -21,6 +21,7 @@ import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; import io.debezium.DebeziumException; @@ -71,16 +72,24 @@ /** The task to work for fetching data of Oracle table snapshot split. */ public class OracleScanFetchTask implements FetchTask { + private static final Logger LOG = LoggerFactory.getLogger(OracleScanFetchTask.class); private final SnapshotSplit split; private volatile boolean taskRunning = false; private OracleSnapshotSplitReadTask snapshotSplitReadTask; + private SourceReaderMetrics sourceReaderMetrics; + public OracleScanFetchTask(SnapshotSplit split) { this.split = split; } + public OracleScanFetchTask(SnapshotSplit split, SourceReaderMetrics sourceReaderMetrics) { + this.split = split; + this.sourceReaderMetrics = sourceReaderMetrics; + } + @Override public SnapshotSplit getSplit() { return split; @@ -104,6 +113,8 @@ public void execute(Context context) throws Exception { sourceFetchContext.getConnection(), sourceFetchContext.getDispatcher(), split); + LOG.info("regist SnapshotChangeEventSourceMetrics"); + sourceFetchContext.getSnapshotChangeEventSourceMetrics().register(LOG); SnapshotSplitChangeEventSourceContext changeEventSourceContext = new SnapshotSplitChangeEventSourceContext(); SnapshotResult snapshotResult = diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java index 973690888a4..fb23171240b 100644 --- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java +++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java @@ -16,10 +16,13 @@ package com.ververica.cdc.connectors.oracle.source.reader.fetch; +import org.apache.flink.metrics.Gauge; + import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit; import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind; +import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics; import com.ververica.cdc.connectors.base.source.reader.external.FetchTask; import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset; import io.debezium.DebeziumException; @@ -43,15 +46,23 @@ /** The task to work for fetching data of Oracle table stream split. */ public class OracleStreamFetchTask implements FetchTask { + private static final Logger LOG = LoggerFactory.getLogger(OracleStreamFetchTask.class); private final StreamSplit split; private volatile boolean taskRunning = false; private RedoLogSplitReadTask redoLogSplitReadTask; + private SourceReaderMetrics sourceReaderMetrics; + public OracleStreamFetchTask(StreamSplit split) { this.split = split; } + public OracleStreamFetchTask(StreamSplit split, SourceReaderMetrics sourceReaderMetrics) { + this.split = split; + this.sourceReaderMetrics = sourceReaderMetrics; + } + @Override public void execute(Context context) throws Exception { OracleSourceFetchTaskContext sourceFetchContext = (OracleSourceFetchTaskContext) context; @@ -66,6 +77,17 @@ public void execute(Context context) throws Exception { sourceFetchContext.getSourceConfig().getOriginDbzConnectorConfig(), sourceFetchContext.getStreamingChangeEventSourceMetrics(), split); + LOG.info("regist StreamingChangeEventSourceMetrics"); + sourceFetchContext.getStreamingChangeEventSourceMetrics().register(LOG); + + LOG.info("regist currentScn"); + sourceReaderMetrics + .getMetricGroup() + .gauge( + "currentScn", + (Gauge) + sourceFetchContext.getStreamingChangeEventSourceMetrics() + ::getCurrentScn); RedoLogSplitChangeEventSourceContext changeEventSourceContext = new RedoLogSplitChangeEventSourceContext(); redoLogSplitReadTask.execute( From cb4d1678a1132a42064e9889c3307a35e6eeb70e Mon Sep 17 00:00:00 2001 From: Enoch Date: Tue, 4 Apr 2023 11:31:17 +0800 Subject: [PATCH 8/8] [oracle][fix] fix ClassCastException cause by OracleValueConverters while dealing timestampltz field --- .../debezium/connector/oracle/OracleConnection.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java index 42039620627..f4dbdd99a9d 100644 --- a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java @@ -6,6 +6,7 @@ package io.debezium.connector.oracle; +import com.zaxxer.hikari.pool.HikariProxyConnection; import io.debezium.DebeziumException; import io.debezium.config.Configuration; import io.debezium.config.Field; @@ -23,6 +24,7 @@ import org.slf4j.LoggerFactory; import java.sql.Clob; +import java.sql.Connection; import java.sql.DatabaseMetaData; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -470,4 +472,13 @@ public static String connectionString(Configuration config) { private static ConnectionFactory resolveConnectionFactory(Configuration config) { return JdbcConnection.patternBasedFactory(connectionString(config)); } + + @Override + public synchronized Connection connection() throws SQLException { + Connection connection = super.connection(); + if (connection instanceof HikariProxyConnection) { + connection = connection.unwrap(oracle.jdbc.OracleConnection.class); + } + return connection; + } }
TiDB typeTiDB type Flink SQL type NOTE