@@ -307,47 +302,7 @@ Connector Options
Optional startup mode for Oracle CDC consumer, valid enumerations are "initial"
and "latest-offset".
Please see Startup Reading Position section for more detailed information. |
-
-
- scan.incremental.snapshot.enabled |
- optional |
- true |
- Boolean |
- Incremental snapshot is a new mechanism to read snapshot of a table. Compared to the old snapshot mechanism,
- the incremental snapshot has many advantages, including:
- (1) source can be parallel during snapshot reading,
- (2) source can perform checkpoints in the chunk granularity during snapshot reading,
- (3) source doesn't need to acquire ROW SHARE MODE lock before snapshot reading.
- |
-
-
- scan.incremental.snapshot.chunk.size |
- optional |
- 8096 |
- Integer |
- The chunk size (number of rows) of table snapshot, captured tables are split into multiple chunks when read the snapshot of table. |
-
-
- scan.snapshot.fetch.size |
- optional |
- 1024 |
- Integer |
- The maximum fetch size for per poll when read table snapshot. |
-
-
- connect.max-retries |
- optional |
- 3 |
- Integer |
- The max retry times that the connector should retry to build Oracle database server connection. |
-
-
- connection.pool.size |
- optional |
- 20 |
- Integer |
- The connection pool size. |
-
+
debezium.* |
optional |
@@ -431,13 +386,11 @@ CREATE TABLE products (
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'inventory',
- 'table-name' = 'products',
- 'debezium.log.mining.strategy' = 'online_catalog',
- 'debezium.log.mining.continuous.mine' = 'true'
+ 'table-name' = 'products'
);
```
-**Note** : The Oracle dialect is case-sensitive, it converts field name to uppercase if the field name is not quoted, Flink SQL doesn't convert the field name. Thus for physical columns from oracle database, we should use its converted field name in Oracle when define an `oracle-cdc` table in Flink SQL.
+**Note** : The Oracle dialect is case-sensitive, it converts field name to uppercase if the field name is not quoted, Flink SQL doesn't convert the field name. Thus for physical columns from oracle database, we should use its converted field name in Oracle when define an `oracle-cdc` table in Flink SQL.
Features
--------
@@ -584,8 +537,7 @@ Data Type Mapping
VARCHAR2(n)
CLOB
NCLOB
- XMLType
- SYS.XMLTYPE
+ XMLType
STRING |
diff --git a/docs/content/connectors/postgres-cdc.md b/docs/content/connectors/postgres-cdc.md
index 73a0e2cac7d..66d31378029 100644
--- a/docs/content/connectors/postgres-cdc.md
+++ b/docs/content/connectors/postgres-cdc.md
@@ -14,7 +14,7 @@ In order to setup the Postgres CDC connector, the following table provides depen
com.ververica
flink-connector-postgres-cdc
-
2.4-SNAPSHOT
+
2.3.0
```
@@ -22,9 +22,9 @@ In order to setup the Postgres CDC connector, the following table provides depen
```Download link is available only for stable releases.```
-Download [flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4-SNAPSHOT/flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar) and put it under `
/lib/`.
+Download [flink-sql-connector-postgres-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/flink-sql-connector-postgres-cdc-2.3.0.jar) and put it under `/lib/`.
-**Note:** flink-sql-connector-postgres-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-postgres-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), the released version will be available in the Maven central warehouse.
+**Note:** flink-sql-connector-postgres-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-postgres-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-postgres-cdc), the released version will be available in the Maven central warehouse.
How to create a Postgres CDC table
----------------
@@ -315,7 +315,7 @@ Data Type Mapping
- PostgreSQL type |
+ PostgreSQL type |
Flink SQL type |
diff --git a/docs/content/connectors/sqlserver-cdc.md b/docs/content/connectors/sqlserver-cdc.md
index a500f57ea92..c928136da58 100644
--- a/docs/content/connectors/sqlserver-cdc.md
+++ b/docs/content/connectors/sqlserver-cdc.md
@@ -14,7 +14,7 @@ In order to setup the SQLServer CDC connector, the following table provides depe
com.ververica
flink-connector-sqlserver-cdc
- 2.4-SNAPSHOT
+ 2.3.0
```
@@ -22,9 +22,9 @@ In order to setup the SQLServer CDC connector, the following table provides depe
```Download link is available only for stable releases.```
-Download [flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar) and put it under `/lib/`.
+Download [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3.0/flink-sql-connector-sqlserver-cdc-2.3.0.jar) and put it under `/lib/`.
-**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-sqlserver-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc), the released version will be available in the Maven central warehouse.
+**Note:** flink-sql-connector-sqlserver-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-sqlserver-cdc), the released version will be available in the Maven central warehouse.
Setup SQLServer Database
----------------
@@ -315,7 +315,7 @@ Data Type Mapping
- SQLServer type |
+ SQLServer type |
Flink SQL type |
@@ -357,7 +357,7 @@ Data Type Mapping
float
real
- DOUBLE |
+ FLOAT |
bit |
@@ -369,7 +369,7 @@ Data Type Mapping
tinyint |
- SMALLINT |
+ TINYINT |
smallint |
diff --git a/docs/content/connectors/tidb-cdc.md b/docs/content/connectors/tidb-cdc.md
index 6250c5400c1..d35c239971a 100644
--- a/docs/content/connectors/tidb-cdc.md
+++ b/docs/content/connectors/tidb-cdc.md
@@ -14,7 +14,7 @@ In order to setup the TiDB CDC connector, the following table provides dependenc
com.ververica
flink-connector-tidb-cdc
- 2.4-SNAPSHOT
+ 2.3.0
```
@@ -22,9 +22,9 @@ In order to setup the TiDB CDC connector, the following table provides dependenc
```Download link is available only for stable releases.```
-Download [flink-sql-connector-tidb-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.4-SNAPSHOT/flink-sql-connector-tidb-cdc-2.4-SNAPSHOT.jar) and put it under `/lib/`.
+Download [flink-sql-connector-tidb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar) and put it under `/lib/`.
-**Note:** flink-sql-connector-tidb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-tidb-cdc-2.2.1.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-tidb-cdc), the released version will be available in the Maven central warehouse.
+**Note:** flink-sql-connector-tidb-cdc-XXX-SNAPSHOT version is the code corresponding to the development branch. Users need to download the source code and compile the corresponding jar. Users should use the released version, such as [flink-sql-connector-tidb-cdc-2.3.0.jar](https://mvnrepository.com/artifact/com.ververica/flink-sql-connector-tidb-cdc), the released version will be available in the Maven central warehouse.
How to create a TiDB CDC table
----------------
@@ -290,7 +290,7 @@ Data Type Mapping
- TiDB type |
+ TiDB type |
Flink SQL type |
NOTE |
diff --git a/docs/content/quickstart/build-real-time-data-lake-tutorial.md b/docs/content/quickstart/build-real-time-data-lake-tutorial.md
index 70eb112dddd..5a78fcf9467 100644
--- a/docs/content/quickstart/build-real-time-data-lake-tutorial.md
+++ b/docs/content/quickstart/build-real-time-data-lake-tutorial.md
@@ -85,7 +85,7 @@ The Docker Compose environment consists of the following containers:
If you want to run with your own Flink environment, remember to download the following packages and then put them to `FLINK_HOME/lib/`.
**Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. **
- - [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
- [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar)
- [iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar](https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar)
diff --git a/docs/content/quickstart/db2-tutorial.md b/docs/content/quickstart/db2-tutorial.md
index 5be533c2015..b0ad73f7c8e 100644
--- a/docs/content/quickstart/db2-tutorial.md
+++ b/docs/content/quickstart/db2-tutorial.md
@@ -62,7 +62,7 @@ docker-compose down
*Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. *
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-db2-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.4-SNAPSHOT/flink-sql-connector-db2-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-db2-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-db2-cdc/2.3.0/flink-sql-connector-db2-cdc-2.3.0.jar)
**3. Launch a Flink cluster and start a Flink SQL CLI**
diff --git a/docs/content/quickstart/mongodb-tutorial.md b/docs/content/quickstart/mongodb-tutorial.md
index b7628299526..56838ce68fc 100644
--- a/docs/content/quickstart/mongodb-tutorial.md
+++ b/docs/content/quickstart/mongodb-tutorial.md
@@ -110,7 +110,7 @@ db.customers.insertMany([
```Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. ```
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
- - [flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.4-SNAPSHOT/flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3.0/flink-sql-connector-mongodb-cdc-2.3.0.jar)
4. Launch a Flink cluster, then start a Flink SQL CLI and execute following SQL statements inside:
diff --git a/docs/content/quickstart/mysql-postgres-tutorial.md b/docs/content/quickstart/mysql-postgres-tutorial.md
index 20e5930d2bc..151ea929a98 100644
--- a/docs/content/quickstart/mysql-postgres-tutorial.md
+++ b/docs/content/quickstart/mysql-postgres-tutorial.md
@@ -78,8 +78,8 @@ We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kib
**Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. **
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
- - [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar)
- - [flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4-SNAPSHOT/flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
+ - [flink-sql-connector-postgres-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/flink-sql-connector-postgres-cdc-2.3.0.jar)
### Preparing data in databases
#### Preparing data in MySQL
diff --git a/docs/content/quickstart/oceanbase-tutorial.md b/docs/content/quickstart/oceanbase-tutorial.md
index c02ef34b37d..245eefdbfbb 100644
--- a/docs/content/quickstart/oceanbase-tutorial.md
+++ b/docs/content/quickstart/oceanbase-tutorial.md
@@ -112,7 +112,7 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
```Download links are only available for stable releases.```
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.4-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3.0/flink-sql-connector-oceanbase-cdc-2.3.0.jar)
### Use Flink DDL to create dynamic table in Flink SQL CLI
diff --git a/docs/content/quickstart/oracle-tutorial.md b/docs/content/quickstart/oracle-tutorial.md
index 636d3e5ee6d..1f708bee8a1 100644
--- a/docs/content/quickstart/oracle-tutorial.md
+++ b/docs/content/quickstart/oracle-tutorial.md
@@ -55,7 +55,7 @@ docker-compose down
*Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. *
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-oracle-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.4-SNAPSHOT/flink-sql-connector-oracle-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-oracle-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar)
**3. Launch a Flink cluster and start a Flink SQL CLI**
diff --git a/docs/content/quickstart/polardbx-tutorial.md b/docs/content/quickstart/polardbx-tutorial.md
index 6bf3eab02e1..ff1d4ed1c9c 100644
--- a/docs/content/quickstart/polardbx-tutorial.md
+++ b/docs/content/quickstart/polardbx-tutorial.md
@@ -67,7 +67,7 @@ We can also visit [http://localhost:5601/](http://localhost:5601/) to see if Kib
2. Download following JAR package required and put them under `flink-1.16.0/lib/`:
**Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. **
- - [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
### Preparing data in databases
diff --git a/docs/content/quickstart/sqlserver-tutorial.md b/docs/content/quickstart/sqlserver-tutorial.md
index 0a9b1d2c706..20f3961d0a1 100644
--- a/docs/content/quickstart/sqlserver-tutorial.md
+++ b/docs/content/quickstart/sqlserver-tutorial.md
@@ -64,7 +64,7 @@ docker-compose down
*Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. *
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3.0/flink-sql-connector-sqlserver-cdc-2.3.0.jar)
**Preparing data in SqlServer database**
diff --git a/docs/content/quickstart/tidb-tutorial.md b/docs/content/quickstart/tidb-tutorial.md
index e9e51aea328..4e389522aec 100644
--- a/docs/content/quickstart/tidb-tutorial.md
+++ b/docs/content/quickstart/tidb-tutorial.md
@@ -117,7 +117,7 @@ docker-compose down
*Download links are available only for stable releases, SNAPSHOT dependency need build by yourself. *
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-tidb-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.4-SNAPSHOT/flink-sql-connector-tidb-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-tidb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar)
**Preparing data in TiDB database**
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md"
index 13efb8c0070..d1dc8dbe5bc 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/build-real-time-data-lake-tutorial-zh.md"
@@ -85,7 +85,7 @@ volumes:
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译**
- - [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
- [flink-shaded-hadoop-2-uber-2.7.5-10.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-shaded-hadoop-2-uber/2.7.5-10.0/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar)
- [iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar](https://raw.githubusercontent.com/luoyuxia/flink-cdc-tutorial/main/flink-cdc-iceberg-demo/sql-client/lib/iceberg-flink-1.13-runtime-0.13.0-SNAPSHOT.jar)
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md"
index 6e472fddf57..13cd3f21dbf 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mongodb-tutorial-zh.md"
@@ -110,7 +110,7 @@ db.customers.insertMany([
```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译```
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
- - [flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.4-SNAPSHOT/flink-sql-connector-mongodb-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mongodb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mongodb-cdc/2.3.0/flink-sql-connector-mongodb-cdc-2.3.0.jar)
4. 然后启动 Flink 集群,再启动 SQL CLI.
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md"
index a699ed37936..4392516ecbc 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/mysql-postgres-tutorial-zh.md"
@@ -74,8 +74,8 @@ docker-compose up -d
**下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译**
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
- - [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar)
- - [flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.4-SNAPSHOT/flink-sql-connector-postgres-cdc-2.4-SNAPSHOT.jar)
+ - [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
+ - [flink-sql-connector-postgres-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-postgres-cdc/2.3.0/flink-sql-connector-postgres-cdc-2.3.0.jar)
### 准备数据
#### 在 MySQL 数据库中准备数据
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md"
index 358da349873..0c17b0a476b 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oceanbase-tutorial-zh.md"
@@ -111,7 +111,7 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译```
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.4-SNAPSHOT/flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-oceanbase-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oceanbase-cdc/2.3.0/flink-sql-connector-oceanbase-cdc-2.3.0.jar)
### 在 Flink SQL CLI 中使用 Flink DDL 创建表
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md"
index 23ead2aebc8..7b63420f366 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/oracle-tutorial-zh.md"
@@ -55,7 +55,7 @@ docker-compose down
*下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译*
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-oracle-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.4-SNAPSHOT/flink-sql-connector-oracle-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-oracle-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-oracle-cdc/2.3.0/flink-sql-connector-oracle-cdc-2.3.0.jar)
**3. 然后启动 Flink 集群,再启动 SQL CLI:**
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md"
index 0bdc617adf0..41ee0314093 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/polardbx-tutorial-zh.md"
@@ -109,7 +109,7 @@ VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
2. 下载下面列出的依赖包,并将它们放到目录 `flink-1.16.0/lib/` 下
```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译```
-- 用于订阅PolarDB-X Binlog: [flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.4-SNAPSHOT/flink-sql-connector-mysql-cdc-2.4-SNAPSHOT.jar)
+- 用于订阅PolarDB-X Binlog: [flink-sql-connector-mysql-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.3.0/flink-sql-connector-mysql-cdc-2.3.0.jar)
- 用于写入Elasticsearch: [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
3. 启动flink服务:
```shell
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md"
index f2b4d2afb66..4b74f80e06d 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/sqlserver-tutorial-zh.md"
@@ -64,7 +64,7 @@ docker-compose down
```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译```
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.4-SNAPSHOT/flink-sql-connector-sqlserver-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-sqlserver-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-sqlserver-cdc/2.3.0/flink-sql-connector-sqlserver-cdc-2.3.0.jar)
**在 SqlServer 数据库中准备数据**
diff --git "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md" "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md"
index a949d1e4fa6..71cb47ad2af 100644
--- "a/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md"
+++ "b/docs/content/\345\277\253\351\200\237\344\270\212\346\211\213/tidb-tutorial-zh.md"
@@ -117,7 +117,7 @@ docker-compose down
```下载链接只对已发布的版本有效, SNAPSHOT 版本需要本地编译```
- [flink-sql-connector-elasticsearch7-1.16.0.jar](https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-elasticsearch7/1.16.0/flink-sql-connector-elasticsearch7-1.16.0.jar)
-- [flink-sql-connector-tidb-cdc-2.4-SNAPSHOT.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.4-SNAPSHOT/flink-sql-connector-tidb-cdc-2.4-SNAPSHOT.jar)
+- [flink-sql-connector-tidb-cdc-2.3.0.jar](https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-tidb-cdc/2.3.0/flink-sql-connector-tidb-cdc-2.3.0.jar)
**在 TiDB 数据库中准备数据**
diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java
index ab6ba1d89f1..ebb482a66b2 100644
--- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java
+++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java
@@ -22,6 +22,7 @@
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@@ -68,4 +69,6 @@ public interface DataSourceDialect extends Serializable
/** The task context used for fetch task to fetch data from external systems. */
FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase, C sourceConfig);
+
+ void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics);
}
diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java
index d7b37517fcd..7782ee155fc 100644
--- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java
+++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/IncrementalSource.java
@@ -116,7 +116,10 @@ public IncrementalSourceReader createReader(SourceReaderContext readerCont
Supplier> splitReaderSupplier =
() ->
new IncrementalSourceSplitReader<>(
- readerContext.getIndexOfSubtask(), dataSourceDialect, sourceConfig);
+ sourceReaderMetrics,
+ readerContext.getIndexOfSubtask(),
+ dataSourceDialect,
+ sourceConfig);
return new IncrementalSourceReader<>(
elementsQueue,
splitReaderSupplier,
diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java
index 72dee083038..e0fad7d6d04 100644
--- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java
+++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/metrics/SourceReaderMetrics.java
@@ -49,6 +49,10 @@ public SourceReaderMetrics(MetricGroup metricGroup) {
this.metricGroup = metricGroup;
}
+ public MetricGroup getMetricGroup() {
+ return metricGroup;
+ }
+
public void registerMetrics() {
metricGroup.gauge(
SourceReaderMetricConstants.CURRENT_FETCH_EVENT_TIME_LAG,
diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
index 3aff776865a..1004cb5bac0 100644
--- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
+++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/source/reader/IncrementalSourceSplitReader.java
@@ -28,6 +28,7 @@
import com.ververica.cdc.connectors.base.source.meta.split.ChangeEventRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceRecords;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.base.source.reader.external.Fetcher;
import com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher;
@@ -56,6 +57,19 @@ public class IncrementalSourceSplitReader
@Nullable private String currentSplitId;
private final DataSourceDialect dataSourceDialect;
private final C sourceConfig;
+ private SourceReaderMetrics sourceReaderMetrics;
+
+ public IncrementalSourceSplitReader(
+ SourceReaderMetrics sourceReaderMetrics,
+ int subtaskId,
+ DataSourceDialect dataSourceDialect,
+ C sourceConfig) {
+ this.sourceReaderMetrics = sourceReaderMetrics;
+ this.subtaskId = subtaskId;
+ this.splits = new ArrayDeque<>();
+ this.dataSourceDialect = dataSourceDialect;
+ this.sourceConfig = sourceConfig;
+ }
public IncrementalSourceSplitReader(
int subtaskId, DataSourceDialect dataSourceDialect, C sourceConfig) {
@@ -122,6 +136,7 @@ protected void checkSplitOrStartNext() throws IOException {
if (currentFetcher == null) {
final FetchTask.Context taskContext =
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
+ dataSourceDialect.setSourceReaderMetrics(sourceReaderMetrics);
currentFetcher = new IncrementalSourceScanFetcher(taskContext, subtaskId);
}
} else {
@@ -132,6 +147,7 @@ protected void checkSplitOrStartNext() throws IOException {
}
final FetchTask.Context taskContext =
dataSourceDialect.createFetchTaskContext(nextSplit, sourceConfig);
+ dataSourceDialect.setSourceReaderMetrics(sourceReaderMetrics);
currentFetcher = new IncrementalSourceStreamFetcher(taskContext, subtaskId);
LOG.info("Stream fetcher is created.");
}
diff --git a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java
index ff4d4bd516a..aff46bf616d 100644
--- a/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java
+++ b/flink-cdc-base/src/test/java/com/ververica/cdc/connectors/base/experimental/MySqlDialect.java
@@ -33,6 +33,7 @@
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
@@ -154,4 +155,9 @@ public FetchTask createFetchTask(SourceSplitBase sourceSplitBas
return new MySqlStreamFetchTask(sourceSplitBase.asStreamSplit());
}
}
+
+ @Override
+ public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) {
+ // TODO
+ }
}
diff --git a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
index 34c7c03d2aa..20a9e5e1c25 100644
--- a/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
+++ b/flink-connector-mongodb-cdc/src/main/java/com/ververica/cdc/connectors/mongodb/source/dialect/MongoDBDialect.java
@@ -22,6 +22,7 @@
import com.ververica.cdc.connectors.base.dialect.DataSourceDialect;
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.mongodb.source.assigners.splitters.MongoDBChunkSplitter;
import com.ververica.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
@@ -168,4 +169,9 @@ public MongoDBFetchTaskContext createFetchTaskContext(
discoveryInfo.getDiscoveredCollections());
return new MongoDBFetchTaskContext(this, sourceConfig, changeStreamDescriptor);
}
+
+ @Override
+ public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) {
+ // TODO
+ }
}
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java
index ff41ea2f459..dc520dbd4c4 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleDialect.java
@@ -25,10 +25,10 @@
import com.ververica.cdc.connectors.base.source.assigner.splitter.ChunkSplitter;
import com.ververica.cdc.connectors.base.source.meta.offset.Offset;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.assigner.splitter.OracleChunkSplitter;
import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfig;
-import com.ververica.cdc.connectors.oracle.source.config.OracleSourceConfigFactory;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleScanFetchTask;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleSourceFetchTaskContext;
import com.ververica.cdc.connectors.oracle.source.reader.fetch.OracleStreamFetchTask;
@@ -52,14 +52,9 @@
public class OracleDialect implements JdbcDataSourceDialect {
private static final long serialVersionUID = 1L;
- private final OracleSourceConfigFactory configFactory;
- private final OracleSourceConfig sourceConfig;
private transient OracleSchema oracleSchema;
- public OracleDialect(OracleSourceConfigFactory configFactory) {
- this.configFactory = configFactory;
- this.sourceConfig = configFactory.create(0);
- }
+ private SourceReaderMetrics sourceReaderMetrics;
@Override
public String getName() {
@@ -87,8 +82,7 @@ public boolean isDataCollectionIdCaseSensitive(JdbcSourceConfig sourceConfig) {
@Override
public JdbcConnection openJdbcConnection(JdbcSourceConfig sourceConfig) {
- return OracleConnectionUtils.createOracleConnection(
- sourceConfig.getDbzConnectorConfig().getJdbcConfig());
+ return OracleConnectionUtils.createOracleConnection(sourceConfig);
}
@Override
@@ -116,7 +110,7 @@ public List discoverDataCollections(JdbcSourceConfig sourceConfig) {
public Map discoverDataCollectionSchemas(JdbcSourceConfig sourceConfig) {
final List capturedTableIds = discoverDataCollections(sourceConfig);
- try (OracleConnection jdbc = createOracleConnection(sourceConfig.getDbzConfiguration())) {
+ try (OracleConnection jdbc = createOracleConnection(sourceConfig)) {
// fetch table schemas
Map tableSchemas = new HashMap<>();
for (TableId tableId : capturedTableIds) {
@@ -141,17 +135,23 @@ public TableChange queryTableSchema(JdbcConnection jdbc, TableId tableId) {
@Override
public OracleSourceFetchTaskContext createFetchTaskContext(
SourceSplitBase sourceSplitBase, JdbcSourceConfig taskSourceConfig) {
- final OracleConnection jdbcConnection =
- createOracleConnection(taskSourceConfig.getDbzConfiguration());
+ final OracleConnection jdbcConnection = createOracleConnection(taskSourceConfig);
return new OracleSourceFetchTaskContext(taskSourceConfig, this, jdbcConnection);
}
+ @Override
+ public void setSourceReaderMetrics(SourceReaderMetrics sourceReaderMetrics) {
+ this.sourceReaderMetrics = sourceReaderMetrics;
+ }
+
@Override
public FetchTask createFetchTask(SourceSplitBase sourceSplitBase) {
if (sourceSplitBase.isSnapshotSplit()) {
- return new OracleScanFetchTask(sourceSplitBase.asSnapshotSplit());
+ return new OracleScanFetchTask(
+ sourceSplitBase.asSnapshotSplit(), this.sourceReaderMetrics);
} else {
- return new OracleStreamFetchTask(sourceSplitBase.asStreamSplit());
+ return new OracleStreamFetchTask(
+ sourceSplitBase.asStreamSplit(), this.sourceReaderMetrics);
}
}
}
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java
index 4d7b38156fd..ac49aab615c 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/OracleSourceBuilder.java
@@ -219,7 +219,7 @@ public OracleSourceBuilder deserializer(DebeziumDeserializationSchema dese
*/
public OracleIncrementalSource build() {
this.offsetFactory = new RedoLogOffsetFactory();
- this.dialect = new OracleDialect(configFactory);
+ this.dialect = new OracleDialect();
return new OracleIncrementalSource(
configFactory, checkNotNull(deserializer), offsetFactory, dialect);
}
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java
index edc0924d89f..9a7df8c4a3f 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleScanFetchTask.java
@@ -21,6 +21,7 @@
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.DebeziumException;
@@ -71,16 +72,24 @@
/** The task to work for fetching data of Oracle table snapshot split. */
public class OracleScanFetchTask implements FetchTask {
+ private static final Logger LOG = LoggerFactory.getLogger(OracleScanFetchTask.class);
private final SnapshotSplit split;
private volatile boolean taskRunning = false;
private OracleSnapshotSplitReadTask snapshotSplitReadTask;
+ private SourceReaderMetrics sourceReaderMetrics;
+
public OracleScanFetchTask(SnapshotSplit split) {
this.split = split;
}
+ public OracleScanFetchTask(SnapshotSplit split, SourceReaderMetrics sourceReaderMetrics) {
+ this.split = split;
+ this.sourceReaderMetrics = sourceReaderMetrics;
+ }
+
@Override
public SnapshotSplit getSplit() {
return split;
@@ -104,6 +113,8 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getConnection(),
sourceFetchContext.getDispatcher(),
split);
+ LOG.info("regist SnapshotChangeEventSourceMetrics");
+ sourceFetchContext.getSnapshotChangeEventSourceMetrics().register(LOG);
SnapshotSplitChangeEventSourceContext changeEventSourceContext =
new SnapshotSplitChangeEventSourceContext();
SnapshotResult snapshotResult =
@@ -172,7 +183,7 @@ private RedoLogSplitReadTask createBackfillRedoLogReadTask(
// task to read binlog and backfill for current split
return new RedoLogSplitReadTask(
new OracleConnectorConfig(dezConf),
- createOracleConnection(context.getSourceConfig().getDbzConfiguration()),
+ createOracleConnection(context.getSourceConfig()),
context.getDispatcher(),
context.getErrorHandler(),
context.getDatabaseSchema(),
@@ -280,6 +291,7 @@ protected SnapshotResult doExecute(
"Snapshot step 3 - Determining high watermark {} for split {}",
highWatermark,
snapshotSplit);
+ jdbcConnection.close();
((SnapshotSplitChangeEventSourceContext) (context)).setHighWatermark(lowWatermark);
dispatcher.dispatchWatermarkEvent(
offsetContext.getPartition(), snapshotSplit, highWatermark, WatermarkKind.HIGH);
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
index 0051f3c2121..fc38475f8a9 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleSourceFetchTaskContext.java
@@ -97,7 +97,7 @@ public void configure(SourceSplitBase sourceSplitBase) {
.getDbzConfiguration()
.getString(EmbeddedFlinkDatabaseHistory.DATABASE_HISTORY_INSTANCE_NAME),
sourceSplitBase.getTableSchemas().values());
- this.databaseSchema = OracleUtils.createOracleDatabaseSchema(connectorConfig);
+ this.databaseSchema = OracleUtils.createOracleDatabaseSchema(sourceConfig, connectorConfig);
// todo logMiner or xStream
this.offsetContext =
loadStartingOffsetState(
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java
index 973690888a4..fb23171240b 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/reader/fetch/OracleStreamFetchTask.java
@@ -16,10 +16,13 @@
package com.ververica.cdc.connectors.oracle.source.reader.fetch;
+import org.apache.flink.metrics.Gauge;
+
import com.ververica.cdc.connectors.base.relational.JdbcSourceEventDispatcher;
import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase;
import com.ververica.cdc.connectors.base.source.meta.split.StreamSplit;
import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkKind;
+import com.ververica.cdc.connectors.base.source.metrics.SourceReaderMetrics;
import com.ververica.cdc.connectors.base.source.reader.external.FetchTask;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.DebeziumException;
@@ -43,15 +46,23 @@
/** The task to work for fetching data of Oracle table stream split. */
public class OracleStreamFetchTask implements FetchTask {
+ private static final Logger LOG = LoggerFactory.getLogger(OracleStreamFetchTask.class);
private final StreamSplit split;
private volatile boolean taskRunning = false;
private RedoLogSplitReadTask redoLogSplitReadTask;
+ private SourceReaderMetrics sourceReaderMetrics;
+
public OracleStreamFetchTask(StreamSplit split) {
this.split = split;
}
+ public OracleStreamFetchTask(StreamSplit split, SourceReaderMetrics sourceReaderMetrics) {
+ this.split = split;
+ this.sourceReaderMetrics = sourceReaderMetrics;
+ }
+
@Override
public void execute(Context context) throws Exception {
OracleSourceFetchTaskContext sourceFetchContext = (OracleSourceFetchTaskContext) context;
@@ -66,6 +77,17 @@ public void execute(Context context) throws Exception {
sourceFetchContext.getSourceConfig().getOriginDbzConnectorConfig(),
sourceFetchContext.getStreamingChangeEventSourceMetrics(),
split);
+ LOG.info("regist StreamingChangeEventSourceMetrics");
+ sourceFetchContext.getStreamingChangeEventSourceMetrics().register(LOG);
+
+ LOG.info("regist currentScn");
+ sourceReaderMetrics
+ .getMetricGroup()
+ .gauge(
+ "currentScn",
+ (Gauge)
+ sourceFetchContext.getStreamingChangeEventSourceMetrics()
+ ::getCurrentScn);
RedoLogSplitChangeEventSourceContext changeEventSourceContext =
new RedoLogSplitChangeEventSourceContext();
redoLogSplitReadTask.execute(
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java
index 7e926e9922c..d520ababfff 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleConnectionUtils.java
@@ -18,6 +18,9 @@
import org.apache.flink.util.FlinkRuntimeException;
+import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
+import com.ververica.cdc.connectors.base.relational.connection.JdbcConnectionFactory;
+import com.ververica.cdc.connectors.oracle.source.OraclePooledDataSourceFactory;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
@@ -34,8 +37,6 @@
import java.util.List;
import java.util.Set;
-import static io.debezium.config.CommonConnectorConfig.DATABASE_CONFIG_PREFIX;
-
/** Oracle connection Utilities. */
public class OracleConnectionUtils {
@@ -44,15 +45,17 @@ public class OracleConnectionUtils {
/** Returned by column metadata in Oracle if no scale is set. */
private static final int ORACLE_UNSET_SCALE = -127;
+ private static final OraclePooledDataSourceFactory FACTORY =
+ new OraclePooledDataSourceFactory();
+
/** show current scn sql in oracle. */
private static final String SHOW_CURRENT_SCN = "SELECT CURRENT_SCN FROM V$DATABASE";
- /** Creates a new {@link OracleConnection}, but not open the connection. */
- public static OracleConnection createOracleConnection(Configuration dbzConfiguration) {
- Configuration configuration = dbzConfiguration.subset(DATABASE_CONFIG_PREFIX, true);
- return new OracleConnection(
- configuration.isEmpty() ? dbzConfiguration : configuration,
- OracleConnectionUtils.class::getClassLoader);
+ public static OracleConnection createOracleConnection(JdbcSourceConfig sourceConfig) {
+ Configuration dbzConfig = sourceConfig.getDbzConnectorConfig().getJdbcConfig();
+ JdbcConnectionFactory jdbcConnectionFactory =
+ new JdbcConnectionFactory(sourceConfig, FACTORY);
+ return new OracleConnection(dbzConfig, jdbcConnectionFactory);
}
/** Fetch current redoLog offsets in Oracle Server. */
diff --git a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java
index 58fee6746cc..0f23fe4561f 100644
--- a/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java
+++ b/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/source/utils/OracleUtils.java
@@ -18,9 +18,10 @@
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import com.ververica.cdc.connectors.base.config.JdbcSourceConfig;
import com.ververica.cdc.connectors.oracle.source.meta.offset.RedoLogOffset;
-import io.debezium.config.Configuration;
import io.debezium.connector.oracle.OracleConnection;
import io.debezium.connector.oracle.OracleConnectorConfig;
import io.debezium.connector.oracle.OracleDatabaseSchema;
@@ -261,43 +262,24 @@ public static RowType getSplitType(Table table) {
/** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
public static OracleDatabaseSchema createOracleDatabaseSchema(
- OracleConnectorConfig dbzOracleConfig) {
+ JdbcSourceConfig sourceConfig, OracleConnectorConfig dbzOracleConfig) {
TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
- OracleConnection oracleConnection =
- OracleConnectionUtils.createOracleConnection(dbzOracleConfig.getJdbcConfig());
- // OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig);
- OracleValueConverters oracleValueConverters =
- new OracleValueConverters(dbzOracleConfig, oracleConnection);
- StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
- dbzOracleConfig.getAdapter().getTableNameCaseSensitivity(oracleConnection);
- return new OracleDatabaseSchema(
- dbzOracleConfig,
- oracleValueConverters,
- schemaNameAdjuster,
- topicSelector,
- tableNameCaseSensitivity);
- }
-
- /** Creates a new {@link OracleDatabaseSchema} to monitor the latest oracle database schemas. */
- public static OracleDatabaseSchema createOracleDatabaseSchema(
- OracleConnectorConfig dbzOracleConfig, boolean tableIdCaseInsensitive) {
- TopicSelector topicSelector = OracleTopicSelector.defaultSelector(dbzOracleConfig);
- SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create();
- OracleConnection oracleConnection =
- OracleConnectionUtils.createOracleConnection((Configuration) dbzOracleConfig);
- OracleValueConverters oracleValueConverters =
- new OracleValueConverters(dbzOracleConfig, oracleConnection);
- StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
- tableIdCaseInsensitive
- ? StreamingAdapter.TableNameCaseSensitivity.SENSITIVE
- : StreamingAdapter.TableNameCaseSensitivity.INSENSITIVE;
- return new OracleDatabaseSchema(
- dbzOracleConfig,
- oracleValueConverters,
- schemaNameAdjuster,
- topicSelector,
- tableNameCaseSensitivity);
+ try (OracleConnection oracleConnection =
+ OracleConnectionUtils.createOracleConnection(sourceConfig)) {
+ OracleValueConverters oracleValueConverters =
+ new OracleValueConverters(dbzOracleConfig, oracleConnection);
+ StreamingAdapter.TableNameCaseSensitivity tableNameCaseSensitivity =
+ dbzOracleConfig.getAdapter().getTableNameCaseSensitivity(oracleConnection);
+ return new OracleDatabaseSchema(
+ dbzOracleConfig,
+ oracleValueConverters,
+ schemaNameAdjuster,
+ topicSelector,
+ tableNameCaseSensitivity);
+ } catch (SQLException e) {
+ throw new FlinkRuntimeException(e);
+ }
}
public static RedoLogOffset getRedoLogPosition(SourceRecord dataRecord) {
diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java
new file mode 100644
index 00000000000..f4dbdd99a9d
--- /dev/null
+++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/OracleConnection.java
@@ -0,0 +1,484 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.oracle;
+
+import com.zaxxer.hikari.pool.HikariProxyConnection;
+import io.debezium.DebeziumException;
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.connector.oracle.OracleConnectorConfig.ConnectorAdapter;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.Column;
+import io.debezium.relational.TableEditor;
+import io.debezium.relational.TableId;
+import io.debezium.relational.Tables;
+import io.debezium.relational.Tables.ColumnNameFilter;
+import io.debezium.relational.Tables.TableFilter;
+import io.debezium.util.Strings;
+import oracle.jdbc.OracleTypes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Clob;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** copy from debezium to fix connection pool problem. */
+public class OracleConnection extends JdbcConnection {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(OracleConnection.class);
+
+ /** Returned by column metadata in Oracle if no scale is set. */
+ private static final int ORACLE_UNSET_SCALE = -127;
+
+ /** Pattern to identify system generated indices and column names. */
+ private static final Pattern SYS_NC_PATTERN =
+ Pattern.compile("^SYS_NC(?:_OID|_ROWINFO|[0-9][0-9][0-9][0-9][0-9])\\$$");
+
+ /** A field for the raw jdbc url. This field has no default value. */
+ private static final Field URL = Field.create("url", "Raw JDBC url");
+
+ /** The database version. */
+ private final OracleDatabaseVersion databaseVersion;
+
+ public OracleConnection(Configuration config, Supplier classLoaderSupplier) {
+ super(config, resolveConnectionFactory(config), classLoaderSupplier);
+
+ this.databaseVersion = resolveOracleDatabaseVersion();
+ LOGGER.info("Database Version: {}", databaseVersion.getBanner());
+ }
+
+ public OracleConnection(Configuration config, ConnectionFactory factory) {
+ super(config, factory);
+ this.databaseVersion = resolveOracleDatabaseVersion();
+ LOGGER.info("Database Version: {}", databaseVersion.getBanner());
+ }
+
+ public void setSessionToPdb(String pdbName) {
+ Statement statement = null;
+
+ try {
+ statement = connection().createStatement();
+ statement.execute("alter session set container=" + pdbName);
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (statement != null) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ LOGGER.error("Couldn't close statement", e);
+ }
+ }
+ }
+ }
+
+ public void resetSessionToCdb() {
+ Statement statement = null;
+
+ try {
+ statement = connection().createStatement();
+ statement.execute("alter session set container=cdb$root");
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ if (statement != null) {
+ try {
+ statement.close();
+ } catch (SQLException e) {
+ LOGGER.error("Couldn't close statement", e);
+ }
+ }
+ }
+ }
+
+ public OracleDatabaseVersion getOracleVersion() {
+ return databaseVersion;
+ }
+
+ private OracleDatabaseVersion resolveOracleDatabaseVersion() {
+ String versionStr;
+ try {
+ try {
+ // Oracle 18.1 introduced BANNER_FULL as the new column rather than BANNER
+ // This column uses a different format than the legacy BANNER.
+ versionStr =
+ queryAndMap(
+ "SELECT BANNER_FULL FROM V$VERSION WHERE BANNER_FULL LIKE 'Oracle Database%'",
+ (rs) -> {
+ if (rs.next()) {
+ return rs.getString(1);
+ }
+ return null;
+ });
+ } catch (SQLException e) {
+ // exception ignored
+ if (e.getMessage().contains("ORA-00904: \"BANNER_FULL\"")) {
+ LOGGER.debug(
+ "BANNER_FULL column not in V$VERSION, using BANNER column as fallback");
+ versionStr = null;
+ } else {
+ throw e;
+ }
+ }
+
+ // For databases prior to 18.1, a SQLException will be thrown due to BANNER_FULL not
+ // being a column and
+ // this will cause versionStr to remain null, use fallback column BANNER for versions
+ // prior to 18.1.
+ if (versionStr == null) {
+ versionStr =
+ queryAndMap(
+ "SELECT BANNER FROM V$VERSION WHERE BANNER LIKE 'Oracle Database%'",
+ (rs) -> {
+ if (rs.next()) {
+ return rs.getString(1);
+ }
+ return null;
+ });
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException("Failed to resolve Oracle database version", e);
+ }
+
+ if (versionStr == null) {
+ throw new RuntimeException("Failed to resolve Oracle database version");
+ }
+
+ return OracleDatabaseVersion.parse(versionStr);
+ }
+
+ @Override
+ public Set readTableNames(
+ String databaseCatalog,
+ String schemaNamePattern,
+ String tableNamePattern,
+ String[] tableTypes)
+ throws SQLException {
+
+ Set tableIds =
+ super.readTableNames(null, schemaNamePattern, tableNamePattern, tableTypes);
+
+ return tableIds.stream()
+ .map(t -> new TableId(databaseCatalog, t.schema(), t.table()))
+ .collect(Collectors.toSet());
+ }
+
+ /**
+ * Retrieves all {@code TableId} in a given database catalog, filtering certain ids that should
+ * be omitted from the returned set such as special spatial tables and index-organized tables.
+ *
+ * @param catalogName the catalog/database name
+ * @return set of all table ids for existing table objects
+ * @throws SQLException if a database exception occurred
+ */
+ protected Set getAllTableIds(String catalogName) throws SQLException {
+ final String query =
+ "select owner, table_name from all_tables "
+ +
+ // filter special spatial tables
+ "where table_name NOT LIKE 'MDRT_%' "
+ + "and table_name NOT LIKE 'MDRS_%' "
+ + "and table_name NOT LIKE 'MDXT_%' "
+ +
+ // filter index-organized-tables
+ "and (table_name NOT LIKE 'SYS_IOT_OVER_%' and IOT_NAME IS NULL) ";
+
+ Set tableIds = new HashSet<>();
+ query(
+ query,
+ (rs) -> {
+ while (rs.next()) {
+ tableIds.add(new TableId(catalogName, rs.getString(1), rs.getString(2)));
+ }
+ LOGGER.trace("TableIds are: {}", tableIds);
+ });
+
+ return tableIds;
+ }
+
+ // todo replace metadata with something like this
+ private ResultSet getTableColumnsInfo(String schemaNamePattern, String tableName)
+ throws SQLException {
+ String columnQuery =
+ "select column_name, data_type, data_length, data_precision, data_scale, default_length, density, char_length from "
+ + "all_tab_columns where owner like '"
+ + schemaNamePattern
+ + "' and table_name='"
+ + tableName
+ + "'";
+
+ PreparedStatement statement = connection().prepareStatement(columnQuery);
+ return statement.executeQuery();
+ }
+
+ // this is much faster, we will use it until full replacement of the metadata usage TODO
+ public void readSchemaForCapturedTables(
+ Tables tables,
+ String databaseCatalog,
+ String schemaNamePattern,
+ ColumnNameFilter columnFilter,
+ boolean removeTablesNotFoundInJdbc,
+ Set capturedTables)
+ throws SQLException {
+
+ Set tableIdsBefore = new HashSet<>(tables.tableIds());
+
+ DatabaseMetaData metadata = connection().getMetaData();
+ Map> columnsByTable = new HashMap<>();
+
+ for (TableId tableId : capturedTables) {
+ try (ResultSet columnMetadata =
+ metadata.getColumns(
+ databaseCatalog, schemaNamePattern, tableId.table(), null)) {
+ while (columnMetadata.next()) {
+ // add all whitelisted columns
+ readTableColumn(columnMetadata, tableId, columnFilter)
+ .ifPresent(
+ column -> {
+ columnsByTable
+ .computeIfAbsent(tableId, t -> new ArrayList<>())
+ .add(column.create());
+ });
+ }
+ }
+ }
+
+ // Read the metadata for the primary keys ...
+ for (Map.Entry> tableEntry : columnsByTable.entrySet()) {
+ // First get the primary key information, which must be done for *each* table ...
+ List pkColumnNames = readPrimaryKeyNames(metadata, tableEntry.getKey());
+
+ // Then define the table ...
+ List columns = tableEntry.getValue();
+ Collections.sort(columns);
+ tables.overwriteTable(tableEntry.getKey(), columns, pkColumnNames, null);
+ }
+
+ if (removeTablesNotFoundInJdbc) {
+ // Remove any definitions for tables that were not found in the database metadata ...
+ tableIdsBefore.removeAll(columnsByTable.keySet());
+ tableIdsBefore.forEach(tables::removeTable);
+ }
+
+ for (TableId tableId : capturedTables) {
+ overrideOracleSpecificColumnTypes(tables, tableId, tableId);
+ }
+ }
+
+ @Override
+ public void readSchema(
+ Tables tables,
+ String databaseCatalog,
+ String schemaNamePattern,
+ TableFilter tableFilter,
+ ColumnNameFilter columnFilter,
+ boolean removeTablesNotFoundInJdbc)
+ throws SQLException {
+
+ super.readSchema(
+ tables,
+ null,
+ schemaNamePattern,
+ tableFilter,
+ columnFilter,
+ removeTablesNotFoundInJdbc);
+
+ Set tableIds =
+ tables.tableIds().stream()
+ .filter(x -> schemaNamePattern.equals(x.schema()))
+ .collect(Collectors.toSet());
+
+ for (TableId tableId : tableIds) {
+ // super.readSchema() populates ids without the catalog; hence we apply the filtering
+ // only
+ // here and if a table is included, overwrite it with a new id including the catalog
+ TableId tableIdWithCatalog =
+ new TableId(databaseCatalog, tableId.schema(), tableId.table());
+
+ if (tableFilter.isIncluded(tableIdWithCatalog)) {
+ overrideOracleSpecificColumnTypes(tables, tableId, tableIdWithCatalog);
+ }
+
+ tables.removeTable(tableId);
+ }
+ }
+
+ @Override
+ public List readTableUniqueIndices(DatabaseMetaData metadata, TableId id)
+ throws SQLException {
+ return super.readTableUniqueIndices(metadata, id.toDoubleQuoted());
+ }
+
+ @Override
+ protected boolean isTableUniqueIndexIncluded(String indexName, String columnName) {
+ if (columnName != null) {
+ return !SYS_NC_PATTERN.matcher(columnName).matches();
+ }
+ return false;
+ }
+
+ private void overrideOracleSpecificColumnTypes(
+ Tables tables, TableId tableId, TableId tableIdWithCatalog) {
+ TableEditor editor = tables.editTable(tableId);
+ editor.tableId(tableIdWithCatalog);
+
+ List columnNames = new ArrayList<>(editor.columnNames());
+ for (String columnName : columnNames) {
+ Column column = editor.columnWithName(columnName);
+ if (column.jdbcType() == Types.TIMESTAMP) {
+ editor.addColumn(
+ column.edit()
+ .length(column.scale().orElse(Column.UNSET_INT_VALUE))
+ .scale(null)
+ .create());
+ }
+ // NUMBER columns without scale value have it set to -127 instead of null;
+ // let's rectify that
+ else if (column.jdbcType() == OracleTypes.NUMBER) {
+ column.scale()
+ .filter(s -> s == ORACLE_UNSET_SCALE)
+ .ifPresent(
+ s -> {
+ editor.addColumn(column.edit().scale(null).create());
+ });
+ }
+ }
+ tables.overwriteTable(editor.create());
+ }
+
+ /**
+ * Get the current, most recent system change number.
+ *
+ * @return the current system change number
+ * @throws SQLException if an exception occurred
+ * @throws IllegalStateException if the query does not return at least one row
+ */
+ public Scn getCurrentScn() throws SQLException {
+ return queryAndMap(
+ "SELECT CURRENT_SCN FROM V$DATABASE",
+ (rs) -> {
+ if (rs.next()) {
+ return Scn.valueOf(rs.getString(1));
+ }
+ throw new IllegalStateException("Could not get SCN");
+ });
+ }
+
+ /**
+ * Get the maximum system change number in the archive logs.
+ *
+ * @param archiveLogDestinationName the archive log destination name to be queried, can be
+ * {@code null}.
+ * @return the maximum system change number in the archive logs
+ * @throws SQLException if a database exception occurred
+ * @throws DebeziumException if the maximum archive log system change number could not be found
+ */
+ public Scn getMaxArchiveLogScn(String archiveLogDestinationName) throws SQLException {
+ String query =
+ "SELECT MAX(NEXT_CHANGE#) FROM V$ARCHIVED_LOG "
+ + "WHERE NAME IS NOT NULL "
+ + "AND ARCHIVED = 'YES' "
+ + "AND STATUS = 'A' "
+ + "AND DEST_ID IN ("
+ + "SELECT DEST_ID FROM V$ARCHIVE_DEST_STATUS "
+ + "WHERE STATUS = 'VALID' "
+ + "AND TYPE = 'LOCAL' ";
+
+ if (Strings.isNullOrEmpty(archiveLogDestinationName)) {
+ query += "AND ROWNUM = 1";
+ } else {
+ query += "AND UPPER(DEST_NAME) = '" + archiveLogDestinationName + "'";
+ }
+
+ query += ")";
+
+ return queryAndMap(
+ query,
+ (rs) -> {
+ if (rs.next()) {
+ return Scn.valueOf(rs.getString(1)).subtract(Scn.valueOf(1));
+ }
+ throw new DebeziumException("Could not obtain maximum archive log scn.");
+ });
+ }
+
+ /**
+ * Generate a given table's DDL metadata.
+ *
+ * @param tableId table identifier, should never be {@code null}
+ * @return generated DDL
+ * @throws SQLException if an exception occurred obtaining the DDL metadata
+ */
+ public String getTableMetadataDdl(TableId tableId) throws SQLException {
+ try {
+ // The storage and segment attributes aren't necessary
+ executeWithoutCommitting(
+ "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'STORAGE', false); end;");
+ executeWithoutCommitting(
+ "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SEGMENT_ATTRIBUTES', false); end;");
+ // In case DDL is returned as multiple DDL statements, this allows the parser to parse
+ // each separately.
+ // This is only critical during streaming as during snapshot the table structure is
+ // built from JDBC driver queries.
+ executeWithoutCommitting(
+ "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'SQLTERMINATOR', true); end;");
+ return queryAndMap(
+ "SELECT dbms_metadata.get_ddl('TABLE','"
+ + tableId.table()
+ + "','"
+ + tableId.schema()
+ + "') FROM DUAL",
+ rs -> {
+ if (!rs.next()) {
+ throw new DebeziumException(
+ "Could not get DDL metadata for table: " + tableId);
+ }
+
+ Object res = rs.getObject(1);
+ return ((Clob) res).getSubString(1, (int) ((Clob) res).length());
+ });
+ } finally {
+ executeWithoutCommitting(
+ "begin dbms_metadata.set_transform_param(DBMS_METADATA.SESSION_TRANSFORM, 'DEFAULT'); end;");
+ }
+ }
+
+ public static String connectionString(Configuration config) {
+ return config.getString(URL) != null
+ ? config.getString(URL)
+ : ConnectorAdapter.parse(config.getString("connection.adapter")).getConnectionUrl();
+ }
+
+ private static ConnectionFactory resolveConnectionFactory(Configuration config) {
+ return JdbcConnection.patternBasedFactory(connectionString(config));
+ }
+
+ @Override
+ public synchronized Connection connection() throws SQLException {
+ Connection connection = super.connection();
+ if (connection instanceof HikariProxyConnection) {
+ connection = connection.unwrap(oracle.jdbc.OracleConnection.class);
+ }
+ return connection;
+ }
+}