Skip to content

Commit

Permalink
[FLINK-34753][cdc][docs] Improve and update outdated mongodb cdc FAQ
Browse files Browse the repository at this point in the history
  • Loading branch information
Shawn-Hx committed Mar 29, 2024
1 parent f7f248c commit 927a0ec
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 38 deletions.
26 changes: 11 additions & 15 deletions docs/content.zh/docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,17 @@ tableList选项要求表名使用架构名,而不是DataStream API中的表名

### Q1: MongoDB CDC 支持 全量+增量读 和 只读增量吗?

支持,默认为 全量+增量 读取;使用copy.existing=false参数设置为只读增量
支持,默认为 全量+增量 读取;使用 'scan.startup.mode' = 'latest-offset' 参数设置为只读增量

### Q2: MongoDB CDC 支持从 checkpoint 恢复吗? 原理是怎么样的呢?

支持,checkpoint 会记录 ChangeStream 的 resumeToken,恢复的时候可以通过resumeToken重新恢复ChangeStream。其中 resumeToken 对应 `oplog.rs` (MongoDB 变更日志collection) 的位置,`oplog.rs` 是一个固定容量的 collection。当 resumeToken 对应的记录在 `oplog.rs` 中不存在的时候,可能会出现 Invalid resumeToken 的异常。这种情况,在使用时可以设置合适`oplog.rs`的集合大小,避免`oplog.rs`保留时间过短,可以参考 https://docs.mongodb.com/manual/tutorial/change-oplog-size/ 另外,resumeToken 可以通过新到的变更记录和 heartbeat 记录来刷新。
支持,checkpoint 会记录 ChangeStream 的 resumeToken,恢复的时候可以通过resumeToken重新恢复ChangeStream。其中 resumeToken 对应 `oplog.rs` (MongoDB 变更日志collection) 的位置,`oplog.rs` 是一个固定容量的 collection。当 resumeToken 对应的记录在 `oplog.rs` 中不存在的时候,可能会出现 Invalid resumeToken 的异常。这种情况,在使用时可以设置合适`oplog.rs`的集合大小,避免`oplog.rs`保留时间过短,可以参考 https://docs.mongodb.com/manual/tutorial/change-oplog-size/ 另外,resumeToken 可以通过新到的变更记录和 heartbeat 记录来刷新。

### Q3: MongoDB CDC 支持输出 -U(update_before,更新前镜像值)消息吗?

MongoDB 原始的 `oplog.rs` 只有 INSERT, UPDATE, REPLACE, DELETE 这几种操作类型,没有保留更新前的信息,不能输出-U 消息,在 Flink 中只能实现 UPSERT 语义。在使用MongoDBTableSource 时,Flink planner 会自动进行 ChangelogNormalize 优化,补齐缺失的 -U 消息,输出完整的 +I, -U, +U, -D 四种消息, 代价是 ChangelogNormalize 优化的代价是该节点会保存之前所有 key 的状态。所以,如果是 DataStream 作业直接使用 MongoDBSource,如果没有 Flink planner 的优化,将不会自动进行 ChangelogNormalize,所以不能直接获取 —U 消息。想要获取更新前镜像值,需要自己管理状态,如果不希望自己管理状态,可以将 MongoDBTableSource 转换为 ChangelogStream 或者 RetractStream,借助 Flink planner 的优化能力补齐更新前镜像值,示例如下:
在 MongoDB 6.0 及以上版本,若数据库开启了[前像或后像功能](https://www.mongodb.com/docs/atlas/app-services/mongodb/preimages/),可以在SQL作业中配置参数 'scan.full-changelog' = 'true',使得数据源能够输出-U 消息,从而省去ChangelogNormalize。

在 MongoDB 6.0 版本前,MongoDB 原始的 `oplog.rs` 只有 INSERT, UPDATE, REPLACE, DELETE 这几种操作类型,没有保留更新前的信息,不能输出-U 消息,在 Flink 中只能实现 UPSERT 语义。在使用MongoDBTableSource 时,Flink planner 会自动进行 ChangelogNormalize 优化,补齐缺失的 -U 消息,输出完整的 +I, -U, +U, -D 四种消息, 代价是 ChangelogNormalize 优化的代价是该节点会保存之前所有 key 的状态。所以,如果是 DataStream 作业直接使用 MongoDBSource,如果没有 Flink planner 的优化,将不会自动进行 ChangelogNormalize,所以不能直接获取 —U 消息。想要获取更新前镜像值,需要自己管理状态,如果不希望自己管理状态,可以将 MongoDBTableSource 转换为 ChangelogStream 或者 RetractStream,借助 Flink planner 的优化能力补齐更新前镜像值,示例如下:

```
tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )");
Expand All @@ -266,34 +268,28 @@ MongoDB 原始的 `oplog.rs` 只有 INSERT, UPDATE, REPLACE, DELETE 这几种操
env.execute();
```

### Q4: MongoDB CDC 支持订阅多个集合吗?

支持订阅整库的 collection,例如配置 database 为 'mgdb',并且配置 collection 为空字符串,则会订阅 'mgdb' 库下所有 collection。

### Q4: Does mongodb CDC support subscribing to multiple collections?

仅支持订阅整库的 collection,筛选部分 collection 功能还不支持,例如配置 database 为 'mgdb',collection 为空字符串,则会订阅 'mgdb' 库下所有 collection。

也支持通过正则表达式匹配 collection,如果要监控的集合名称中包含正则表达式特殊字符,则 collection 参数必须配置为完全限定的名字空间(数据库名称.集合名称),否则无法捕获对应 collection 的变更。

### Q5: MongoDB CDC 支持 MongoDB 的版本是哪些?

MongoDB CDC 基于 ChangeStream 特性实现,ChangeStream 是 MongoDB 3.6 推出的新特性。MongoDB CDC 理论上支持 3.6 以上版本,建议运行版本 >= 4.0, 在低于3.6版本执行时,会出现错误: Unrecognized pipeline stage name: '$changeStream' 。
MongoDB CDC 基于 ChangeStream 特性实现,ChangeStream 是 MongoDB 3.6 推出的新特性。MongoDB CDC 理论上支持 3.6 及以上版本,建议运行版本 >= 4.0, 在低于3.6版本执行时,会出现错误: Unrecognized pipeline stage name: '$changeStream' 。

### Q6: MongoDB CDC 支持 MongoDB 的运行模式是什么?

ChangeStream 需要 MongoDB 以副本集或者分片模式运行,本地测试可以使用单机版副本集 rs.initiate() 。在 standalone 模式下会出现错误:The $changestage is only supported on replica sets.

ChangeStream 需要 MongoDB 以副本集或者分片模式运行,本地测试可以使用单机版副本集 `rs.initiate()` 。在 standalone 模式下会出现错误:The $changestage is only supported on replica sets.

### Q7: MongoDB CDC 报错用户名密码错误, 但其他组件使用该用户名密码都能正常连接,这是什么原因?

If 如果用户是创建在需要连接的db 下,需要在with参数里加下 'connection.options' = 'authSource=用户所在的db'。
如果用户不是在默认的admin数据库下创建的,需要在with参数里加下 'connection.options' = 'authSource=用户所在的db'。

### Q8: MongoDB CDC 是否支持 debezium 相关的参数?

不支持的,因为 MongoDB CDC 连接器是在 Flink CDC 项目中独立开发,并不依赖Debezium项目,所以不支持。

### Q9: MongoDB CDC 全量读取阶段,作业失败后,可以从 checkpoint 继续读取吗?

MongoDB CDC 全量读取阶段是不做 checkpoint 的,直到全量阶段读取完后才开始作 checkpoint,如果在全量读取阶段失败,MongoDB CDC 会重新读取存量数据。

## Oracle CDC FAQ

### Q1: Oracle CDC 的归档日志增长很快,且读取 log 慢?
Expand Down
42 changes: 19 additions & 23 deletions docs/content/docs/faq/faq.md
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,19 @@ The `tableList` option requires table name with schema name rather than table na

## MongoDB CDC FAQ

### Q1: Does mongodb CDC support full + incremental read and read-only incremental?
### Q1: Does MongoDB CDC support full + incremental read and read-only incremental?

Yes, the default is full + incremental reading; Use copy The existing = false parameter is set to read-only increment.
Yes, the default is full + incremental reading; Using 'scan.startup.mode' = 'latest-offset' parameter can set to read-only incremental.

### Q2: Does mongodb CDC support recovery from checkpoint? What is the principle?
### Q2: Does MongoDB CDC support recovery from checkpoint? What is the principle?

Yes, the checkpoint will record the resumetoken of the changestream. During recovery, the changestream can be restored through the resumetoken. Where resumetoken corresponds to oplog RS (mongodb change log collection), oplog RS is a fixed capacity collection. When the corresponding record of resumetoken is in oplog When RS does not exist, an exception of invalid resumetoken may occur. In this case, you can set the appropriate oplog Set size of RS to avoid oplog RS retention time is too short, you can refer to https://docs.mongodb.com/manual/tutorial/change-oplog-size/ In addition, the resumetoken can be refreshed through the newly arrived change record and heartbeat record.
Yes, the checkpoint will record the resumeToken of the changeStream. During recovery, the changeStream can be restored through the resumeToken. Where resumeToken corresponds to `oplog.rs` (Change log collection in MongoDB), `oplog.rs` is a fixed capacity collection. When the corresponding record of resumeToken does not exist in `oplog.rs`, an Invalid resumeToken Exception may occur. In this case, you can set the appropriate size of `oplog.rs` to avoid retention time of `oplog.rs` is too short, you can refer to https://docs.mongodb.com/manual/tutorial/change-oplog-size/. In addition, the resumeToken can be refreshed through the newly arrived change record and heartbeat record.

### Q3: Does mongodb CDC support outputting - U (update_before) messages?
### Q3: Does MongoDB CDC support outputting - U (update_before) messages?

Mongodb original oplog RS has only insert, update, replace and delete operation types. It does not retain the information before update. It cannot output - U messages. It can only realize the update semantics in Flink. When using mongodbtablesource, Flink planner will automatically perform changelognormalize optimization, fill in the missing - U messages, and output complete + I, - u, + U, and - D messages. The cost of changelognormalize optimization is that the node will save the status of all previous keys. Therefore, if the DataStream job directly uses mongodbsource, without the optimization of Flink planner, changelognormalize will not be performed automatically, so - U messages cannot be obtained directly. To obtain the pre update image value, you need to manage the status yourself. If you don't want to manage the status yourself, you can convert mongodbtablesource to changelogstream or retractstream and supplement the pre update image value with the optimization ability of Flink planner. An example is as follows:
In MongoDB versions >= 6.0, if MongoDB enable [document preimages](https://www.mongodb.com/docs/atlas/app-services/mongodb/preimages/), setting 'scan.full-changelog' = 'true' in Flink SQL can make source output -U messages, so ChangelogNormalize operator can be removed.

In MongoDB versions < 6.0, the original `oplog.rs` in MongoDB only has operation types including insert, update, replace and delete. It does not save the information before update, so it cannot output - U messages. It can only realize the UPSERT semantics in Flink. When using MongoDBTableSource, Flink planner will automatically perform ChangelogNormalize optimization, fill in the missing - U messages, and output complete + I, - U, + U, and - D messages. The cost of ChangelogNormalize optimization is that the operator will save the states of all previous keys. Therefore, if the DataStream job directly uses MongoDBSource, without the optimization of Flink planner, ChangelogNormalize will not be performed automatically, so - U messages cannot be obtained directly. To obtain the pre update image value, you need to manage the status yourself. If you don't want to manage the status yourself, you can convert MongodbTableSource to changelogstream or retractstream and supplement the pre update image value with the optimization ability of Flink planner. An example is as follows:

```
tEnv.executeSql("CREATE TABLE orders ( ... ) WITH ( 'connector'='mongodb-cdc',... )");
Expand All @@ -271,33 +273,27 @@ Mongodb original oplog RS has only insert, update, replace and delete operation
env.execute();
```

### Q4: Does MongoDB CDC support subscribing multiple collections?

All collections in database can be subscribed. For example, if database is configured as ' mgdb' and collection is configured as an empty string, all collections under 'mgdb' database will be subscribed.

### Q4: Does mongodb CDC support subscribing to multiple collections?

Only the collection of the whole database can be subscribed, but some collection filtering functions are not supported. For example, if the database is configured as' mgdb 'and the collection is an empty string, all collections under the' mgdb 'database will be subscribed.

### Q5: What versions of mongodb are supported by mongodb CDC?

Mongodb CDC is implemented based on the changestream feature, which is a new feature launched by mongodb 3.6. Mongodb CDC theoretically supports versions above 3.6. It is recommended to run version > = 4.0. When executing versions lower than 3.6, an error will occur: unrecognized pipeline stage name: '$changestream'.

### Q6: What is the operation mode of mongodb supported by mongodb CDC?
It also supports subscribing collections using regular expressions. If the name of the collections to be monitored contains special characters used in regular expressions, then the collection parameter must be configured as a fully qualified namespace ("database-name.collection-name"), otherwise the changes to the corresponding collections cannot be captured.

Changestream requires mongodb to run in replica set or fragment mode. Local tests can use stand-alone replica set rs.initiate().
### Q5: Which versions of MongoDB are supported by MongoDB CDC?

Errors occur in standalone mode : The $changestage is only supported on replica sets.
MongoDB CDC is implemented based on the ChangeStream feature, which is a new feature introduced in MongoDB 3.6. Mongodb CDC theoretically supports versions >= 3.6. It is recommended to run on version >= 4.0. When executed on versions < 3.6, an error will occur: Unrecognized pipeline stage name: '$changeStream'.

### Q7: Mongodb CDC reports an error. The user name and password are incorrect, but other components can connect normally with this user name and password. What is the reason?
### Q6: Which operational modes of MongoDB are supported by MongoDB CDC?

If the user is creating a DB that needs to be connected, add 'connection' to the with parameter Options' ='authsource = DB where the user is located '.
ChangeStream requires MongoDB to run in replica set or sharded cluster mode. For local test, a single-node replica set can be initialized with `rs.initiate()`. An error will occur in standalone mode: The $changeStream stage is only supported on replica sets.

### Q8: Does mongodb CDC support debezium related parameters?
### Q7: MongoDB CDC reports an error. The username and password are incorrect, but other components can connect normally with this username and password. What is the reason?

The mongodb CDC connector is not supported because it is independently developed in the Flink CDC project and does not rely on the debezium project.
If the user is not created in the default admin database, you need to add parameter 'connection.options' = 'authSource={{ database where the user is created }}'.

### Q9: In the mongodb CDC full reading phase, can I continue reading from the checkpoint after the job fails?
### Q8: Does MongoDB CDC support debezium related parameters?

In the full reading phase, mongodb CDC does not do checkpoint until the full reading phase is completed. If it fails in the full reading phase, mongodb CDC will read the stock data again.
It is not supported, because MongoDB CDC connector is developed independently in the Flink CDC project and does not rely on the debezium project.

## Oracle CDC FAQ

Expand Down

0 comments on commit 927a0ec

Please sign in to comment.