Skip to content

feat: cdc checker#460

Merged
loomts merged 52 commits intomainfrom
feat/cdc-checker
Apr 14, 2026
Merged

feat: cdc checker#460
loomts merged 52 commits intomainfrom
feat/cdc-checker

Conversation

@loomts
Copy link
Copy Markdown
Contributor

@loomts loomts commented Jan 22, 2026

本地测试: 1,000,000mixed_write 工作负载下 MySQL / PostgreSQL32 / 64 两档并发。

  • MySQL 行使用 sysbench,PostgreSQL 行使用 pgbench
  • 这组数据使用 1,000,000 行的 mixed_write,workload 并发为 32 / 64
  • check off 表示纯 CDC 路径:[extractor] extract_type=cdc[sinker] sink_type=write
    [parallelizer] parallel_type=rdb_merge
  • check on 表示在相同 CDC 路径上启用 inline cdc check:
    [checker] enable=true[checker] batch_size=200
    [resumer] resume_type=from_target
  • 这组复测共用的任务侧调优参数为:
    [sinker] batch_size=200[parallelizer] parallel_size=8
    [pipeline] buffer_size=16000[pipeline] checkpoint_interval_secs=10
  • Workload tx events 是压测工具自身统计的事务数,不是 CDC 行数。
  • Sinker 相比 off 衰减 表示同引擎、同并发下,check on 相对 check off 的对比结果。
  • 最终一致 表示追平后的最终校验通过。
引擎 并发 模式 Workload tx events TPS Workload 时长 End-to-end 追平 Sinker 平均速率 Checker 平均速率 Sinker 相比 off 衰减 Pipeline 填充率 Checker 填充率 Queue drops Checker diff total 最终一致
MySQL 32 check off 10364 687.30/s 15.08s caught up 3662.41/s - baseline 100% - 0 - yes
MySQL 32 check on 7733 514.10/s 15.03s caught up 1810.09/s 1803.50/s -50.6% 100% 0% 0 560 yes
MySQL 64 check off 11857 787.87/s 15.05s caught up 4298.00/s - baseline 100% - 0 - yes
MySQL 64 check on 11857 786.17/s 15.08s caught up 2989.77/s 3005.42/s -30.4% 100% 2.25% 0 654 yes
PostgreSQL 32 check off 96302 6420.84/s 15s caught up 9547.28/s - baseline 16.0% - 0 - yes
PostgreSQL 32 check on 114086 7611.53/s 15s caught up 2918.79/s 5519.26/s -69.4% 100% 2.8% 0 18625 yes
PostgreSQL 64 check off 106898 7158.22/s 15s caught up 10562.80/s - baseline 100% - 0 - yes
PostgreSQL 64 check on 156715 10169.59/s 15s caught up 2869.77/s 5433.13/s -72.8% 100% 9.6% 0 25058 yes

Summary

  • Supported matrix / constraints:
    • standalone snapshot check: mysql / pg / mongo
    • standalone struct check: mysql / pg only
    • inline snapshot check: write sink + mysql / pg / mongo
    • inline cdc check: write sink + mysql / pg only, requires parallel_type=rdb_check, pipeline_type=basic, and [resumer] resume_type=from_target|from_db
  • Breaking config changes:
    • inline check no longer accepts [checker].db_type/url/username/password; target now always comes from [sinker]
    • parallel_type=rdb_check now requires [checker]
    • legacy resumer keys are not just deprecated; they are rejected
  • Runtime semantics:
    • full checker queue applies backpressure instead of dropping check work
    • mismatches are logged only and do not stop the main write path
    • runtime checker errors are mode-dependent (inline fail-open vs standalone fail-close)
  • Operational impact:
    • inline CDC check now persists durable checker state in the resumer backend, including unresolved rows in apedts_unconsistent_rows
    • resume behavior is effectively “recheck unresolved rows first, then continue from the durable checkpoint”
    • this may require schema/table create privileges on the resumer store

Check flow


  source data / change stream
            |
            v
       [Pipeline]
            |
            +-----------------------------+-----------------------------+
            |                                                           |
            | standalone check                                          | inline check
            |                                                           |
            v                                                           v
      [DummySinker]                                      [RealSinker: WRITE first]
            |                                                           |
            +-----------------------------+-----------------------------+
                                          |
                                          v
                                   [CheckedSinker]
                                          |
                                          | enqueue checked rows
                                          | (bounded by queue_size)
                                          v
                              [DataCheckerHandle: queue/worker]
                                          |
                       +------------------+------------------+
                       |                                     |
                       | queue full                          | checker runtime error
                       v                                     v
                 BACKPRESSURE                       mode-dependent handling
            (main path waits here)                 - inline     => FAIL-OPEN
                                                   - standalone => FAIL-CLOSE
                                          |
                                          v
                                   [checker_engine]
                                          |
                      +-------------------+-------------------+-------------------+
                      |                                       |                   |
                      v                                       v                   v
               [MysqlChecker]                          [PgChecker]        [MongoChecker]
                      \                                       |                   /
                       \                                      |                  /
                        +-------------------------------------+-----------------+
                                                              |
                                                              v
                                              compare source vs final target state
                                                              |
                                                              +--> diff / miss
                                                                   |
                                                                   v
            [diff.log / miss.log / sql.log / summary.log / metrics / monitor]
            (mismatch is LOGGED; it does NOT stop the main path)


  CDC + CHECK PERSISTENCE / RECOVERY
  ==================================

                                [checker_engine]
                                       |
                                       v
                                    [cdc_state]
                                       |
                                       | checkpoint persist
                                       | - clean: checkpoint only
                                       | - dirty: checkpoint + unresolved rows (atomic)
                                       v
                             [CheckerStateStore]
                                       |
            +--------------------------+---------------------------+
            |                                                      |
            v                                                      v
  [last durable CDC checkpoint]                     [unresolved checker rows]
  [resumer].table_full_name                         <same schema>.apedts_unconsistent_rows
  e.g. apecloud_metadata.                           stores rows that still need recheck
       apedts_task_position

  stores the latest recoverable CDC checkpoint 
  aligned with durable checker state
            |                                                      |
            +--------------------------+---------------------------+
                                       |
                                       v
                                     RESUME
                                       |
               +-----------------------+------------------------+
               |                                                |
               v                                                v
  load CDC checkpoint                               load unresolved checker rows
  from apedts_task_position                         from apedts_unconsistent_rows
               \                                                /
                \                                              /
                 +--------------------------------------------+
                                      |
                                      v
                       if unresolved rows exist: RECHECK FIRST
                                      |
                                      v
                      then continue CDC from the durable checkpoint

@loomts loomts marked this pull request as draft January 23, 2026 12:20
@loomts loomts force-pushed the feat/cdc-checker branch 2 times, most recently from fd2c9d2 to 5e85cf9 Compare February 9, 2026 13:51
@loomts loomts force-pushed the feat/cdc-checker branch from 3a1c850 to 2f1f040 Compare March 5, 2026 10:47
@loomts loomts force-pushed the feat/cdc-checker branch 3 times, most recently from ed7d1cf to 16425fc Compare March 24, 2026 07:29
@loomts loomts marked this pull request as ready for review March 26, 2026 03:46
Comment thread dt-connector/src/sinker/checked_sinker.rs Outdated
Comment thread dt-connector/src/sinker/checked_sinker.rs Outdated
Comment thread dt-connector/src/checker/checker_engine.rs Outdated
Comment thread dt-pipeline/src/base_pipeline.rs Outdated
Comment thread dt-pipeline/src/base_pipeline.rs Outdated
Comment thread dt-connector/src/checker/cdc_state.rs Outdated
Comment thread dt-connector/src/checker/cdc_state.rs Outdated
Comment thread docs/en/cdc/sync.md Outdated
@loomts loomts force-pushed the feat/cdc-checker branch from 42f68c3 to a649969 Compare April 13, 2026 07:42
@loomts loomts merged commit bc22816 into main Apr 14, 2026
2 checks passed
@loomts loomts deleted the feat/cdc-checker branch April 14, 2026 09:42
@loomts loomts linked an issue Apr 20, 2026 that may be closed by this pull request
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Feature] add incremental check

2 participants