From c2a6456ad099ad6f6db2e22966e027b0aa7824ab Mon Sep 17 00:00:00 2001 From: He Wang Date: Mon, 5 Jun 2023 16:40:54 +0800 Subject: [PATCH] [oceanbase] support libobcdc 4.x and fix restore timestamp config (#2161) --- README.md | 2 +- docs/content/about.md | 2 +- docs/content/connectors/oceanbase-cdc(ZH).md | 443 +++++++++++++++--- docs/content/connectors/oceanbase-cdc.md | 26 +- flink-connector-oceanbase-cdc/pom.xml | 10 +- .../connectors/oceanbase/OceanBaseSource.java | 6 + .../source/OceanBaseRichSourceFunction.java | 89 ++-- flink-sql-connector-oceanbase-cdc/pom.xml | 4 +- pom.xml | 2 +- 9 files changed, 441 insertions(+), 143 deletions(-) diff --git a/README.md b/README.md index 33a0d21897..b2c20d2c8d 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ This README is meant as a brief walkthrough on the core features of CDC Connecto |------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------| | [mongodb-cdc](docs/content/connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [mysql-cdc](docs/content/connectors/mysql-cdc.md) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 | -| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x | JDBC Driver: 5.1.4x | +| [oceanbase-cdc](/docs/content/connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x | | [oracle-cdc](docs/content/connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [postgres-cdc](docs/content/connectors/postgres-cdc.md) |
  • [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.27 | | [sqlserver-cdc](docs/content/connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 | diff --git a/docs/content/about.md b/docs/content/about.md index 5814abd632..2b96785c84 100644 --- a/docs/content/about.md +++ b/docs/content/about.md @@ -11,7 +11,7 @@ The CDC Connectors for Apache Flink® integrate Debezium as the engin |----------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------| | [mongodb-cdc](connectors/mongodb-cdc.md) |
  • [MongoDB](https://www.mongodb.com): 3.6, 4.x, 5.0 | MongoDB Driver: 4.3.1 | | [mysql-cdc](connectors/mysql-cdc.md) |
  • [MySQL](https://dev.mysql.com/doc): 5.6, 5.7, 8.0.x
  • [RDS MySQL](https://www.aliyun.com/product/rds/mysql): 5.6, 5.7, 8.0.x
  • [PolarDB MySQL](https://www.aliyun.com/product/polardb): 5.6, 5.7, 8.0.x
  • [Aurora MySQL](https://aws.amazon.com/cn/rds/aurora): 5.6, 5.7, 8.0.x
  • [MariaDB](https://mariadb.org): 10.x
  • [PolarDB X](https://github.com/ApsaraDB/galaxysql): 2.0.1 | JDBC Driver: 8.0.27 | -| [oceanbase-cdc](connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x | JDBC Driver: 5.1.4x | +| [oceanbase-cdc](connectors/oceanbase-cdc.md) |
  • [OceanBase CE](https://open.oceanbase.com): 3.1.x
  • [OceanBase EE](https://www.oceanbase.com/product/oceanbase) (MySQL mode): 2.x, 3.x, 4.x | JDBC Driver: 5.1.4x | | [oracle-cdc](connectors/oracle-cdc.md) |
  • [Oracle](https://www.oracle.com/index.html): 11, 12, 19 | Oracle Driver: 19.3.0.0 | | [postgres-cdc](connectors/postgres-cdc.md) |
  • [PostgreSQL](https://www.postgresql.org): 9.6, 10, 11, 12 | JDBC Driver: 42.2.12 | | [sqlserver-cdc](connectors/sqlserver-cdc.md) |
  • [Sqlserver](https://www.microsoft.com/sql-server): 2012, 2014, 2016, 2017, 2019 | JDBC Driver: 7.2.2.jre8 | diff --git a/docs/content/connectors/oceanbase-cdc(ZH).md b/docs/content/connectors/oceanbase-cdc(ZH).md index 7cb220f647..54e8d92fc5 100644 --- a/docs/content/connectors/oceanbase-cdc(ZH).md +++ b/docs/content/connectors/oceanbase-cdc(ZH).md @@ -25,8 +25,8 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。 ### 配置 OceanBase 数据库和 oblogproxy 服务 -1. 按照 [部署文档](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/deploy-the-distributed-oceanbase-cluster) 配置 OceanBase 集群。 -2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。更多信息,参考 [用户管理文档](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/create-user-3)。 +1. 按照 [文档](https://github.com/oceanbase/oceanbase#quick-start) 配置 OceanBase 集群。 +2. 在 sys 租户中,为 oblogproxy 创建一个带密码的用户。 ```bash mysql -h${host} -P${port} -uroot @@ -47,7 +47,7 @@ OceanBase CDC 连接器允许从 OceanBase 读取快照数据和增量数据。 mysql> show parameters like 'obconfig_url'; ``` -5. 按照 [快速入门 文档](https://github.com/oceanbase/oblogproxy#quick-start) 配置 oblogproxy。 +5. 按照 [文档](https://github.com/oceanbase/oblogproxy#getting-started) 配置 oblogproxy。 ## 创建 OceanBase CDC 表 @@ -72,8 +72,8 @@ Flink SQL> CREATE TABLE orders ( 'username' = 'user@test_tenant', 'password' = 'pswd', 'tenant-name' = 'test_tenant', - 'database-name' = 'test_db', - 'table-name' = 'orders', + 'database-name' = '^test_db$', + 'table-name' = '^orders$', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', @@ -96,39 +96,197 @@ OceanBase CDC 连接器包括用于 SQL 和 DataStream API 的选项,如下表 1. 使用 `database-name` 和 `table-name` 匹配正则表达式中的数据库和表名。 由于`obcdc`(以前的`liboblog`)现在只支持`fnmatch`匹配,我们不能直接使用正则过滤 changelog 事件,所以通过两个选项去匹配去指定监听表只能在`initial`启动模式下使用。 2. 使用 `table-list` 去匹配数据库名和表名的准确列表。 - -配置项 | 是否必选 | 默认值 | 类型 | 描述 ----- | ----- | ------ | ----- | ---- -connector | 是 | 无 | String | 指定要使用的连接器。此处为 `oceanbase-cdc`。 -scan.startup.mode | 是 | 无 | String | 指定 OceanBase CDC 消费者的启动模式。可取值为 `initial`、`latest-offset` 或`timestamp`。 -scan.startup.timestamp | 否 | 无 | Long | 起始点的时间戳,单位为秒。仅在 `scan.startup.mode` 的值为 `timestamp` 时适用。 -username | 是 | 无 | String | 连接 OceanBase 数据库的用户的名称。 -password | 是 | 无 | String | 连接 OceanBase 数据库时使用的密码。 -tenant-name | 是 | 无 | String | 待监控 OceanBase 数据库的租户名,应该填入精确值。 -database-name | 是 | 无 | String | 待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。 -table-name | 否 | 无 | String | 待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。 -table-list | 否 | 无 | String | 待监控 OceanBase 数据库的全路径的表名列表,逗号分隔,如:"db1.table1, db2.table2"。 -hostname | 否 | 无 | String | OceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。 -port | 否 | 无 | Integer | OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 2881)
    或 OceanBase代理服务的端口号(默认值为 2883)。 -connect.timeout | 否 | 30s | Duration | 连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。 -server-time-zone | 否 | +00:00 | String | 数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。
    合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,
    如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。 -logproxy.host | 是 | 无 | String | OceanBase 日志代理服务 的 IP 地址或主机名。 -logproxy.port | 是 | 无 | Integer | OceanBase 日志代理服务 的端口号。 -logproxy.client.id | 否 | 按规则生成 | String | OceanBase日志代理服务的客户端链接,默认值的生成规则是 {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}。 -rootserver-list | 是 | 无 | String | OceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,
    多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。 -config-url | 否 | 无 | String | 从配置服务器获取服务器信息的 url, OceanBase 企业版本必填。 -working-mode | 否 | storage | String | 日志代理中 `obcdc` 的工作模式 , 可以是 `storage` 或 `memory`。 +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    配置项是否必选默认值类型描述
    connectorString指定要使用的连接器,此处为 'oceanbase-cdc'
    scan.startup.modeString指定 OceanBase CDC 消费者的启动模式。可取值为'initial','latest-offset' or + 'timestamp'
    scan.startup.timestampLong起始点的时间戳,单位为秒。仅在启动模式为 'timestamp' 时可用。
    usernameString连接 OceanBase 数据库的用户的名称。
    passwordString连接 OceanBase 数据库时使用的密码。
    tenant-nameString待监控 OceanBase 数据库的租户名,应该填入精确值。
    database-nameString待监控 OceanBase 数据库的数据库名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。
    table-nameString待监控 OceanBase 数据库的表名,应该是正则表达式,该选项只支持和 'initial' 模式一起使用。
    table-listString待监控 OceanBase 数据库的全路径的表名列表,逗号分隔,如:"db1.table1, db2.table2"。
    hostnameStringOceanBase 数据库或 OceanBbase 代理 ODP 的 IP 地址或主机名。
    portInteger + OceanBase 数据库服务器的整数端口号。可以是 OceanBase 服务器的 SQL 端口号(默认值为 2881)
    + 或 OceanBase代理服务的端口号(默认值为 2883)
    connect.timeout30sDuration连接器在尝试连接到 OceanBase 数据库服务器超时前的最长时间。
    server-time-zone+00:00String + 数据库服务器中的会话时区,用户控制 OceanBase 的时间类型如何转换为 STRING。
    + 合法的值可以是格式为"±hh:mm"的 UTC 时区偏移量,
    + 如果 mysql 数据库中的时区信息表已创建,合法的值则可以是创建的时区。 +
    logproxy.hostStringOceanBase 日志代理服务 的 IP 地址或主机名。
    logproxy.portIntegerOceanBase 日志代理服务 的端口号。
    logproxy.client.id规则生成StringOceanBase日志代理服务的客户端连接 ID,默认值的生成规则是 {flink_ip}_{process_id}_{timestamp}_{thread_id}_{tenant}。
    rootserver-listStringOceanBase root 服务器列表,服务器格式为 `ip:rpc_port:sql_port`,
    多个服务器地址使用英文分号 `;` 隔开,OceanBase 社区版本必填。
    config-urlString从配置服务器获取服务器信息的 url, OceanBase 企业版本必填。
    working-modestorageString日志代理中 `libobcdc` 的工作模式 , 可以是 `storage` 或 `memory`。
    +
    ## 支持的元数据 在创建表时,您可以使用以下格式的元数据作为只读列(VIRTUAL)。 -列名 | 数据类型 | 描述 ------ | ----- | ----- -tenant_name | STRING NOT NULL | 当前记录所属的租户名称。 -database_name | STRING NOT NULL | 当前记录所属的数据库名称。 -table_name | STRING NOT NULL | 当前记录所属的表名称。 -op_ts | TIMESTAMP_LTZ(3) NOT NULL | 该值表示此修改在数据库中发生的时间。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。 + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    列名数据类型描述
    tenant_nameSTRING NOT NULL当前记录所属的租户名称。
    database_nameSTRING NOT NULL当前记录所属的库名。
    table_nameSTRING NOT NULL当前记录所属的表名称。
    op_tsTIMESTAMP_LTZ(3) NOT NULL该值表示此修改在数据库中发生的时间。如果这条记录是该表在快照阶段读取的记录,则该值返回 0。
    如下 SQL 展示了如何在表中使用这些元数据列: @@ -151,8 +309,8 @@ CREATE TABLE products ( 'username' = 'user@test_tenant', 'password' = 'pswd', 'tenant-name' = 'test_tenant', - 'database-name' = 'test_db', - 'table-name' = 'orders', + 'database-name' = '^test_db$', + 'table-name' = '^orders$', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', @@ -234,13 +392,13 @@ public class OceanBaseSourceExample { .username("user@test_tenant") .password("pswd") .tenantName("test_tenant") - .databaseName("test_db") - .tableName("test_table") + .databaseName("^test_db$") + .tableName("^test_table$") .hostname("127.0.0.1") .port(2881) .logProxyHost("127.0.0.1") .logProxyPort(2983) - .serverTimeZone(serverTimezone) + .serverTimeZone(serverTimeZone) .deserializer(deserializer) .build(); @@ -257,30 +415,185 @@ public class OceanBaseSourceExample { ## 数据类型映射 -OceanBase 数据类型 | Flink SQL 类型 | 描述 ------ | ----- | ------ -BOOLEAN
    TINYINT(1)
    BIT(1)    | BOOLEAN | -TINYINT | TINYINT | -SMALLINT
    TINYINT UNSIGNED | SMALLINT | -INT
    MEDIUMINT
    SMALLINT UNSIGNED | INT | -BIGINT
    INT UNSIGNED | BIGINT | -BIGINT UNSIGNED | DECIMAL(20, 0) | -REAL
    FLOAT | FLOAT | -DOUBLE | DOUBLE | -NUMERIC(p, s)
    DECIMAL(p, s)
    where p <= 38 | DECIMAL(p, s) | -NUMERIC(p, s)
    DECIMAL(p, s)
    where 38 < p <=65 | STRING | DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。
    但在 Flink 中,DECIMAL 的最高精度为 38。因此,
    如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。 | -DATE | DATE | -TIME [(p)] | TIME [(p)] | -DATETIME [(p)] | TIMESTAMP [(p)] | -TIMESTAMP [(p)] | TIMESTAMP_LTZ [(p)] | -CHAR(n) | CHAR(n) | -VARCHAR(n) | VARCHAR(n) | -BIT(n) | BINARY(⌈n/8⌉) | -BINARY(n) | BINARY(n) | -VARBINARY(N) | VARBINARY(N) | -TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT | STRING | -TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB | BYTES | -YEAR | INT | -ENUM | STRING | -SET | ARRAY<STRING> | 因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,
    所以映射到 Flink 时是一个字符串数组 -JSON | STRING | JSON 类型的数据在 Flink 中会转化为 JSON 格式的字符串 +### Mysql 模式 + +
    + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
    OceanBase 数据类型Flink SQL 类型描述
    + BOOLEAN
    + TINYINT(1)
    + BIT(1) +
    BOOLEAN
    TINYINTTINYINT
    + SMALLINT
    + TINYINT UNSIGNED +
    SMALLINT
    + INT
    + MEDIUMINT
    + SMALLINT UNSIGNED +
    INT
    + BIGINT
    + INT UNSIGNED +
    BIGINT
    BIGINT UNSIGNEDDECIMAL(20, 0)
    + REAL
    + FLOAT +
    FLOAT
    + DOUBLE + DOUBLE
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    + where p <= 38
    DECIMAL(p, s)
    + NUMERIC(p, s)
    + DECIMAL(p, s)
    + where 38 < p <=65
    STRING + DECIMAL 等同于 NUMERIC。在 OceanBase 数据库中,DECIMAL 数据类型的精度最高为 65。
    + 但在 Flink 中,DECIMAL 的最高精度为 38。因此,
    + 如果你定义了一个精度大于 38 的 DECIMAL 列,你应当将其映射为 STRING,以避免精度损失。 +
    DATEDATE
    TIME [(p)]TIME [(p)]
    DATETIME [(p)]TIMESTAMP [(p)]
    TIMESTAMP [(p)]TIMESTAMP_LTZ [(p)]
    CHAR(n)CHAR(n)
    VARCHAR(n)VARCHAR(n)
    BIT(n)BINARY(⌈n/8⌉)
    BINARY(n)BINARY(n)
    VARBINARY(N)VARBINARY(N)
    + TINYTEXT
    + TEXT
    + MEDIUMTEXT
    + LONGTEXT +
    STRING
    + TINYBLOB
    + BLOB
    + MEDIUMBLOB
    + LONGBLOB +
    BYTES
    YEARINT
    ENUMSTRING
    SETARRAY<STRING> + 因为 OceanBase 的 SET 类型是用包含一个或多个值的字符串对象表示,
    + 所以映射到 Flink 时是一个字符串数组 +
    JSONSTRINGJSON 类型的数据在 Flink 中会转化为 JSON 格式的字符串
    +
    diff --git a/docs/content/connectors/oceanbase-cdc.md b/docs/content/connectors/oceanbase-cdc.md index 12af31ac25..60405fd5bc 100644 --- a/docs/content/connectors/oceanbase-cdc.md +++ b/docs/content/connectors/oceanbase-cdc.md @@ -1,11 +1,11 @@ # OceanBase CDC Connector -The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to setup the OceanBase CDC connector to run SQL queries against OceanBase. +The OceanBase CDC connector allows for reading snapshot data and incremental data from OceanBase. This document describes how to set up the OceanBase CDC connector to run SQL queries against OceanBase. Dependencies ------------ -In order to setup the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. +In order to set up the OceanBase CDC connector, the following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. ```xml @@ -27,9 +27,9 @@ Download [flink-sql-connector-oceanbase-cdc-2.4-SNAPSHOT.jar](https://repo1.mave Setup OceanBase and LogProxy Server ---------------------- -1. Setup the OceanBase cluster following the [deployment doc](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/deploy-the-distributed-oceanbase-cluster). +1. Set up the OceanBase cluster following the [doc](https://github.com/oceanbase/oceanbase#quick-start). -2. Create a user with password in `sys` tenant, this user is used in OceanBase LogProxy. See [user management doc](https://open.oceanbase.com/docs/community/oceanbase-database/V3.1.1/create-user-3). +2. Create a user with password in `sys` tenant, this user is used in OceanBase LogProxy. ```shell mysql -h${host} -P${port} -uroot @@ -53,7 +53,7 @@ Setup OceanBase and LogProxy Server mysql> show parameters like 'obconfig_url'; ``` -5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#quick-start). +5. Setup OceanBase LogProxy. For users of OceanBase Community Edition, you can follow the [quick start](https://github.com/oceanbase/oblogproxy#getting-started). How to create a OceanBase CDC table ---------------- @@ -79,8 +79,8 @@ Flink SQL> CREATE TABLE orders ( 'username' = 'user@test_tenant', 'password' = 'pswd', 'tenant-name' = 'test_tenant', - 'database-name' = 'test_db', - 'table-name' = 'orders', + 'database-name' = '^test_db$', + 'table-name' = '^orders$', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', @@ -314,8 +314,8 @@ CREATE TABLE products ( 'username' = 'user@test_tenant', 'password' = 'pswd', 'tenant-name' = 'test_tenant', - 'database-name' = 'test_db', - 'table-name' = 'orders', + 'database-name' = '^test_db$', + 'table-name' = '^orders$', 'hostname' = '127.0.0.1', 'port' = '2881', 'rootserver-list' = '127.0.0.1:2882:2881', @@ -400,13 +400,13 @@ public class OceanBaseSourceExample { .username("user@test_tenant") .password("pswd") .tenantName("test_tenant") - .databaseName("test_db") - .tableName("test_table") + .databaseName("^test_db$") + .tableName("^test_table$") .hostname("127.0.0.1") .port(2881) .logProxyHost("127.0.0.1") .logProxyPort(2983) - .serverTimeZone(serverTimezone) + .serverTimeZone(serverTimeZone) .deserializer(deserializer) .build(); @@ -423,6 +423,8 @@ public class OceanBaseSourceExample { Data Type Mapping ---------------- +### Mysql Mode +
    diff --git a/flink-connector-oceanbase-cdc/pom.xml b/flink-connector-oceanbase-cdc/pom.xml index 98bdd3035e..d7a40891cc 100644 --- a/flink-connector-oceanbase-cdc/pom.xml +++ b/flink-connector-oceanbase-cdc/pom.xml @@ -43,15 +43,9 @@ under the License. - com.oceanbase.logclient - logproxy-client + com.oceanbase + oblogclient-logproxy ${oblogclient.version} - - - logback-classic - ch.qos.logback - - diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java index dcf28431a2..3f985dbfe6 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/OceanBaseSource.java @@ -183,6 +183,12 @@ public SourceFunction build() { startupMode + " mode is not supported."); } + if (!startupMode.equals(StartupMode.INITIAL) + && (StringUtils.isNotEmpty(databaseName) + || StringUtils.isNotEmpty(tableName))) { + throw new IllegalArgumentException( + "If startup mode is not 'INITIAL', 'database-name' and 'table-name' must not be configured"); + } if (StringUtils.isNotEmpty(databaseName) || StringUtils.isNotEmpty(tableName)) { if (StringUtils.isEmpty(databaseName) || StringUtils.isEmpty(tableName)) { throw new IllegalArgumentException( diff --git a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java index 39d4a09ee2..64df710c4d 100644 --- a/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java +++ b/flink-connector-oceanbase-cdc/src/main/java/com/ververica/cdc/connectors/oceanbase/source/OceanBaseRichSourceFunction.java @@ -35,7 +35,6 @@ import com.oceanbase.clogproxy.client.config.ObReaderConfig; import com.oceanbase.clogproxy.client.exception.LogProxyClientException; import com.oceanbase.clogproxy.client.listener.RecordListener; -import com.oceanbase.oms.logmessage.DataMessage; import com.oceanbase.oms.logmessage.LogMessage; import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema; import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord; @@ -46,6 +45,7 @@ import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; @@ -56,7 +56,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.stream.Collectors; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -90,10 +89,11 @@ public class OceanBaseRichSourceFunction extends RichSourceFunction private final OceanBaseDeserializationSchema deserializer; private final AtomicBoolean snapshotCompleted = new AtomicBoolean(false); - private final List logMessageBuffer = new LinkedList<>(); + private final List changeRecordBuffer = new LinkedList<>(); private transient Set tableSet; private transient volatile long resolvedTimestamp; + private transient volatile long startTimestamp; private transient volatile OceanBaseConnection snapshotConnection; private transient LogProxyClient logProxyClient; private transient ListState offsetState; @@ -136,7 +136,6 @@ public OceanBaseRichSourceFunction( public void open(final Configuration config) throws Exception { super.open(config); this.outputCollector = new OutputCollector<>(); - this.resolvedTimestamp = -1; } @Override @@ -150,23 +149,18 @@ public void run(SourceContext ctx) throws Exception { readChangeRecords(); if (shouldReadSnapshot()) { - synchronized (ctx.getCheckpointLock()) { - try { - readSnapshotRecords(); - } finally { - closeSnapshotConnection(); - } - LOG.info("Snapshot reading finished"); - } + LOG.info("Snapshot reading started"); + readSnapshotRecords(); + LOG.info("Snapshot reading finished"); } else { - LOG.info("Skip snapshot reading"); + LOG.info("Snapshot reading skipped"); } logProxyClient.join(); } private boolean shouldReadSnapshot() { - return resolvedTimestamp == -1 && snapshot; + return resolvedTimestamp <= 0 && snapshot; } private OceanBaseConnection getSnapshotConnection() { @@ -210,27 +204,32 @@ private void initTableWhiteList() { } } - if (shouldReadSnapshot() - && StringUtils.isNotBlank(databaseName) - && StringUtils.isNotBlank(tableName)) { + if (StringUtils.isNotBlank(databaseName) && StringUtils.isNotBlank(tableName)) { try { String sql = String.format( "SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES " + "WHERE TABLE_TYPE='BASE TABLE' and TABLE_SCHEMA REGEXP '%s' and TABLE_NAME REGEXP '%s'", databaseName, tableName); + final List matchedTables = new ArrayList<>(); getSnapshotConnection() .query( sql, rs -> { while (rs.next()) { - localTableSet.add( + matchedTables.add( String.format( "%s.%s", rs.getString(1), rs.getString(2))); } }); + LOG.info("Pattern matched tables: {}", matchedTables); + localTableSet.addAll(matchedTables); } catch (SQLException e) { - LOG.error("Query database and table name failed", e); + LOG.error( + String.format( + "Query table list by 'database-name' %s and 'table-name' %s failed.", + databaseName, tableName), + e); throw new FlinkRuntimeException(e); } } @@ -239,11 +238,9 @@ private void initTableWhiteList() { throw new FlinkRuntimeException("No valid table found"); } + LOG.info("Table list: {}", localTableSet); this.tableSet = localTableSet; - this.obReaderConfig.setTableWhiteList( - localTableSet.stream() - .map(table -> String.format("%s.%s", tenantName, table)) - .collect(Collectors.joining("|"))); + this.obReaderConfig.setTableWhiteList(String.format("%s.*.*", tenantName)); } protected void readSnapshotRecords() { @@ -253,6 +250,7 @@ protected void readSnapshotRecords() { readSnapshotRecordsByTable(schema[0], schema[1]); }); snapshotCompleted.set(true); + resolvedTimestamp = startTimestamp; } private void readSnapshotRecordsByTable(String databaseName, String tableName) { @@ -315,6 +313,7 @@ public void notify(LogMessage message) { case BEGIN: if (!started) { started = true; + startTimestamp = Long.parseLong(message.getSafeTimestamp()); latch.countDown(); } break; @@ -324,22 +323,24 @@ public void notify(LogMessage message) { if (!started) { break; } - logMessageBuffer.add(message); + OceanBaseRecord record = getChangeRecord(message); + if (record != null) { + changeRecordBuffer.add(record); + } break; case COMMIT: // flush buffer after snapshot completed if (!shouldReadSnapshot() || snapshotCompleted.get()) { - logMessageBuffer.forEach( - msg -> { + changeRecordBuffer.forEach( + r -> { try { - deserializer.deserialize( - getChangeRecord(msg), outputCollector); + deserializer.deserialize(r, outputCollector); } catch (Exception e) { throw new FlinkRuntimeException(e); } }); - logMessageBuffer.clear(); - long timestamp = getCheckpointTimestamp(message); + changeRecordBuffer.clear(); + long timestamp = Long.parseLong(message.getSafeTimestamp()); if (timestamp > resolvedTimestamp) { resolvedTimestamp = timestamp; } @@ -369,41 +370,23 @@ public void onException(LogProxyClientException e) { if (!latch.await(connectTimeout.getSeconds(), TimeUnit.SECONDS)) { throw new TimeoutException("Timeout to receive messages in RecordListener"); } - LOG.info("LogProxyClient packet processing started"); + LOG.info("LogProxyClient packet processing started from timestamp {}", startTimestamp); } private OceanBaseRecord getChangeRecord(LogMessage message) { String databaseName = message.getDbName().replace(tenantName + ".", ""); + if (!tableSet.contains(String.format("%s.%s", databaseName, message.getTableName()))) { + return null; + } OceanBaseRecord.SourceInfo sourceInfo = new OceanBaseRecord.SourceInfo( tenantName, databaseName, message.getTableName(), - getCheckpointTimestamp(message)); + Long.parseLong(message.getSafeTimestamp())); return new OceanBaseRecord(sourceInfo, message.getOpt(), message.getFieldList()); } - /** - * Get log message checkpoint timestamp in seconds. Refer to 'globalSafeTimestamp' in {@link - * LogMessage}. - * - * @param message Log message. - * @return Timestamp in seconds. - */ - private long getCheckpointTimestamp(LogMessage message) { - long timestamp = -1; - try { - if (DataMessage.Record.Type.HEARTBEAT.equals(message.getOpt())) { - timestamp = Long.parseLong(message.getTimestamp()); - } else { - timestamp = message.getFileNameOffset(); - } - } catch (Throwable t) { - LOG.error("Failed to get checkpoint from log message", t); - } - return timestamp; - } - @Override public void notifyCheckpointComplete(long l) { // do nothing diff --git a/flink-sql-connector-oceanbase-cdc/pom.xml b/flink-sql-connector-oceanbase-cdc/pom.xml index ba7f80aa0c..0ce8eed34f 100644 --- a/flink-sql-connector-oceanbase-cdc/pom.xml +++ b/flink-sql-connector-oceanbase-cdc/pom.xml @@ -56,8 +56,8 @@ under the License. com.ververica:flink-connector-debezium com.ververica:flink-connector-oceanbase-cdc mysql:mysql-connector-java - com.oceanbase.logclient:* - io.netty:netty-all + com.oceanbase:* + io.netty:* com.google.protobuf:protobuf-java commons-codec:commons-codec org.lz4:lz4-java diff --git a/pom.xml b/pom.xml index 4cefd70f1a..137602b441 100644 --- a/pom.xml +++ b/pom.xml @@ -91,7 +91,7 @@ under the License. 1.7.15 2.17.1 2.4.2 - 1.0.6 + 1.1.0 1 true