Skip to content

fix command forwarding to standby node#240

Merged
MrGuin merged 7 commits intomainfrom
fix_standby_forward
Nov 28, 2025
Merged

fix command forwarding to standby node#240
MrGuin merged 7 commits intomainfrom
fix_standby_forward

Conversation

@MrGuin
Copy link
Collaborator

@MrGuin MrGuin commented Nov 25, 2025

  • set_has_overwrite if the cce is in Deleted status when forwarding the command to standby node;
  • fix the bug that datasyncscan is wrongly marked success when cces HasBufferCommandList and are skipped.

Related PR:
eloqdata/eloqkv#305

Summary by CodeRabbit

  • Bug Fixes

    • Deleted objects are now flagged so standby propagation treats them as overwrites.
    • Checkpoint success now requires no skipped entries; standby sync retry timeout increased for more reliable synchronization.
  • Enhancements

    • Data-sync now continues when buffered commands exist, improving replay coverage and reducing missed updates.
    • New status tracking for skipped entries prevents unintended log truncation and surfaces warnings.

✏️ Tip: You can customize this high-level summary in your review settings.

@coderabbitai
Copy link

coderabbitai bot commented Nov 25, 2025

Walkthrough

Marks standby forwards as overwrites for Deleted payloads; stops skipping buffered-command cases during RangePartition data-sync and instead flags replay commands; increases SnapshotManager OnSnapshotSynced RPC retry timeout and requires no skipped entries for checkpoint success; adds DataSyncStatus tracking and setter for skipped entries.

Changes

Cohort / File(s) Summary
Standby overwrite flag
include/cc/object_cc_map.h
When building a StandbyForwardEntry, if object.payload_status == Deleted, call forward_req->set_has_overwrite(true) to mark the forward as an overwrite.
Data-sync buffered-command handling
include/cc/template_cc_map.h
Remove the early continue that skipped processing when a buffered command matched the data-sync timestamp; log the buffered-command condition and set replay_cmds_notnull so the existing fetch/replay flow proceeds.
Snapshot RPC timeout & checkpoint success
src/store/snapshot_manager.cpp
Increase OnSnapshotSynced RPC retry timeout from 1 ms to 1000 ms; require checkpoint result NO_ERROR and that has_skipped_entries == false for final success; add warning when skipped entries detected.
DataSync status API and usages
include/data_sync_task.h, src/cc/local_cc_shards.cpp
Add bool has_skipped_entries and void SetEntriesSkippedAndNoTruncateLog() to DataSyncStatus. Replace calls to SetNoTruncateLog() with SetEntriesSkippedAndNoTruncateLog() when encountering LOG_NOT_TRUNCATABLE.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Range as RangePartitionDataSyncScanCc
  participant Buffer as BufferedCmds
  participant Replay as Fetch/ReplayPath

  Range->>Buffer: inspect buffered_cmds.txn_cmd_list_.back().new_version_
  alt Old flow: matched -> early skip
    Note over Range,Buffer: Old flow performed an early continue (skipped processing)
  else New flow: matched -> allow replay
    Note over Range,Buffer: New flow logs buffered-cmds and sets replay_cmds_notnull
    Range->>Replay: proceed with fetch/replay (with replay_cmds_notnull)
  end
Loading
sequenceDiagram
  autonumber
  participant CC as object_cc_map
  participant Remote as remote::KeyObjectStandbyForwardRequest
  participant Standby as StandbyReceiver

  CC->>CC: inspect object.payload_status
  alt payload_status == Deleted
    CC->>Remote: forward_req->set_has_overwrite(true)
  else
    CC->>Remote: (no overwrite flag)
  end
  CC->>Standby: send StandbyForwardEntry(forward_req)
  Standby-->>CC: ack / process
Loading
sequenceDiagram
  autonumber
  participant Snapshot as SnapshotManager
  participant RemoteSvc as StandbyService

  Snapshot->>RemoteSvc: OnSnapshotSynced RPC (retry timeout 1000ms)
  RemoteSvc-->>Snapshot: response (NO_ERROR / error)
  Snapshot->>Snapshot: check has_skipped_entries == false
  alt NO_ERROR and no skipped entries
    Snapshot->>Snapshot: mark checkpoint success
  else
    Snapshot->>Snapshot: mark checkpoint incomplete/failure
  end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

  • Verify the protobuf-generated set_has_overwrite(bool) exists and matches the usage site.
  • Review buffered-command path for potential double-replay, ordering, or duplication risks.
  • Confirm interactions between has_skipped_entries and truncate-log behavior; ensure setters/clears happen in all relevant paths.
  • Check increased RPC timeout for alignment with retry/backoff expectations.

Possibly related PRs

Suggested reviewers

  • liunyl

Poem

🐰 I hop through patches, nibble the bits,
Deleted flags now wear tiny wits.
No more skips — replays may run,
Timeouts stretch, checkpoints weigh a ton.
I twitch my nose and bound — change done. 🥕

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 33.33% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix command forwarding to standby node' directly addresses the primary objective of the PR, which focuses on fixing command forwarding behavior to standby nodes.
Description check ✅ Passed The PR description clearly explains the two main fixes but lacks proper documentation of all template requirements like test additions, issue/RFC references, and test suite passage confirmation.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix_standby_forward

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3e079d2 and 3d3665c.

📒 Files selected for processing (2)
  • include/data_sync_task.h (2 hunks)
  • src/store/snapshot_manager.cpp (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/store/snapshot_manager.cpp
  • include/data_sync_task.h

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
src/store/snapshot_manager.cpp (1)

248-274: Check overall impact of raising OnSnapshotSynced timeout to 1000 ms

Bumping the per‑call timeout from 1 ms to 1000 ms in the 5‑retry loop makes this RPC actually viable and should significantly reduce spurious failures while the standby is still spinning up. Note that this also raises worst‑case wait time to roughly 10 seconds per standby (timeout + 1s sleep per retry, times 5) inside the snapshot sync worker and for each entry in tasks. Please confirm this additional potential blocking is acceptable for your deployment characteristics and doesn’t starve other pending sync requests more than intended.

If you want more flexibility later, consider making this timeout (and possibly retry count) configurable rather than hard‑coding 1000 here.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c3d1ad4 and f10b273.

📒 Files selected for processing (2)
  • include/cc/template_cc_map.h (2 hunks)
  • src/store/snapshot_manager.cpp (1 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • include/cc/template_cc_map.h
🔇 Additional comments (2)
include/cc/template_cc_map.h (2)

5893-5901: Buffered-command DLOG looks correct and non-intrusive

The added DLOG with key, payload status, buffered-command contents, and data_sync_ts is consistent with existing patterns and does not affect control flow; using replay_cmds_notnull here is also appropriate for later log-truncation decisions.


5940-5941: Error-path logging of buffered commands is reasonable

Extending the LOG(ERROR) to stream the full BufferedCommandList (with a newline) in the invariant-violation branch is fine given this path immediately asserts and is only for diagnosing unexpected leader-state; no functional impact on the data-sync logic.

<< ", cce status: " << int(cce->PayloadStatus())
<< ", BufferCommand: " << cce->BufferedCommandList()
<< ", data_sync_ts_: " << req.data_sync_ts_;
replay_cmds_notnull = true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we're sending this snapshot to standby for sync, we need to wait for all buffered cmds to be cleared. Otherwise the unapplied cmds will be missed on standby node

@MrGuin MrGuin force-pushed the fix_standby_forward branch from 0120eef to d386eb9 Compare November 26, 2025 11:06
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
include/cc/template_cc_map.h (1)

5893-5900: Buffered-commands handling now correctly inhibits WAL truncation; logging looks fine

Setting replay_cmds_notnull = true whenever cce->HasBufferedCommandList() and then calling req.SetNotTruncateLog() after the loop ensures we don’t truncate WAL in scans that had to skip CCEs with buffered commands, which addresses the standby data-loss risk described in the PR. The control flow still skips exporting those CCEs for this batch but preserves correctness via replayable log, and the assert(!VersionedRecord) guard matches the surrounding non-versioned branch.

Only minor note: DLOG(INFO) << cce->BufferedCommandList() in a scan loop could become chatty if many entries have buffered commands; if this shows up in practice, consider dialing it back (e.g., logging only counts or sampling). Otherwise the change looks solid.

Also applies to: 5936-5941

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 0120eef and d386eb9.

📒 Files selected for processing (3)
  • include/cc/object_cc_map.h (1 hunks)
  • include/cc/template_cc_map.h (2 hunks)
  • src/store/snapshot_manager.cpp (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • include/cc/object_cc_map.h
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-10-20T04:30:07.884Z
Learnt from: liunyl
Repo: eloqdata/tx_service PR: 149
File: include/cc/cc_request.h:1876-1927
Timestamp: 2025-10-20T04:30:07.884Z
Learning: ScanNextBatchCc in include/cc/cc_request.h is used only for hash-partition scans; range-partition scans are handled by ScanSliceCc.

Applied to files:

  • include/cc/template_cc_map.h
🧬 Code graph analysis (1)
src/store/snapshot_manager.cpp (1)
src/checkpointer.cpp (1)
  • ckpt_req (89-89)
🔇 Additional comments (2)
src/store/snapshot_manager.cpp (2)

252-252: LGTM! Timeout fix for standby RPC.

The timeout increase from 1 ms to 1000 ms is appropriate. The original 1 ms timeout was far too short for network RPCs and would cause nearly immediate timeouts. The new 1000 ms timeout is reasonable for network operations and aligns with the 1-second retry sleep interval.


391-393: Stricter checkpoint success condition fixes the datasync bug.

The updated success condition now requires both NO_ERROR and need_truncate_log_ to be true. This prevents false positives where the checkpoint was incorrectly marked successful when CCEs with buffered commands were skipped without processing.

The stricter condition ensures that a checkpoint only succeeds when actual work is performed (indicated by need_truncate_log_), which aligns with the PR's bug fix objectives.

Please ensure that all legitimate checkpoint scenarios still succeed with this stricter condition. Verify through integration tests that:

  1. Checkpoints with processed commands succeed correctly
  2. Checkpoints with no work to do behave as expected
  3. The need_truncate_log_ flag is set correctly in all data sync code paths

Comment on lines +329 to +334
uint64_t ckpt_ts = ckpt_req.GetCkptTs();

// Get table names in this node group, checkpointer should be TableName
// string owner.
std::unordered_map<TableName, bool> tables =
local_shards.GetCatalogTableNameSnapshot(node_group, ckpt_ts);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

Script executed:

#!/bin/bash
# Find the snapshot_manager.cpp file and inspect the relevant section
find . -name "snapshot_manager.cpp" -type f

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

./src/store/snapshot_manager.cpp

</shell_result>


Script executed:

#!/bin/bash
# Read the context around lines 329-334
sed -n '320,350p' ./src/store/snapshot_manager.cpp

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

    auto &local_shards = *Sharder::Instance().GetLocalCcShards();

    std::shared_ptr<DataSyncStatus> data_sync_status =
        std::make_shared<DataSyncStatus>(node_group, ng_leader_term, true);

    bool can_be_skipped = false;
    uint64_t last_ckpt_ts = Sharder::Instance().GetNodeGroupCkptTs(node_group);
    size_t core_cnt = local_shards.Count();
    CkptTsCc ckpt_req(core_cnt, node_group);
    uint64_t ckpt_ts = ckpt_req.GetCkptTs();

    // Get table names in this node group, checkpointer should be TableName
    // string owner.
    std::unordered_map<TableName, bool> tables =
        local_shards.GetCatalogTableNameSnapshot(node_group, ckpt_ts);

    // Find minimum ckpt_ts from all the ccshards in parallel. ckpt_ts is
    // the minimum timestamp minus 1 among all the active transactions, thus
    // it's safe to flush all the entries smaller than or equal to ckpt_ts.
    for (size_t i = 0; i < local_shards.Count(); i++)
    {
        local_shards.EnqueueCcRequest(i, &ckpt_req);
    }
    ckpt_req.Wait();

    // Iterate all the tables and execute CkptScanCc requests on this node
    // group's ccmaps on each ccshard. The result of CkptScanCc is stored in
    // ckpt_vec.
    for (auto it = tables.begin(); it != tables.end(); ++it)
    {
        const TableName &table_name = it->first;

</shell_result>


Script executed:

#!/bin/bash
# Search for CkptTsCc class definition
rg -n "class CkptTsCc" --type=cpp -A 30

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 1

</shell_result>


Script executed:

#!/bin/bash
# Search for CkptTsCc more broadly
rg -n "CkptTsCc" --type=cpp --type=h -B 2 -A 15 | head -100

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

src/tx_trace.cpp-1850-    return outs;
src/tx_trace.cpp-1851-};
src/tx_trace.cpp:1852:std::ostream &operator<<(std::ostream &outs, txservice::CkptTsCc *r)
src/tx_trace.cpp-1853-{
src/tx_trace.cpp-1854-    if (!r)
src/tx_trace.cpp-1855-    {
src/tx_trace.cpp-1856-        return outs << "{}";
src/tx_trace.cpp-1857-    }
src/tx_trace.cpp-1858-    outs << "{\"tx_number_\":" << r->Txn() << "\"isolation_level\":\""
src/tx_trace.cpp-1859-         << r->Isolation() << "\""
src/tx_trace.cpp-1860-         << ",\"proto_\":\"" << r->Protocol() << "\""
src/tx_trace.cpp-1861-         << ",\"ckpt_ts_\":" << r->GetCkptTs() << "}";
src/tx_trace.cpp-1862-    return outs;
src/tx_trace.cpp-1863-};
src/tx_trace.cpp-1864-std::ostream &operator<<(std::ostream &outs, txservice::CkptScanCc *r)
src/tx_trace.cpp-1865-{
src/tx_trace.cpp-1866-    if (!r)
src/tx_trace.cpp-1867-    {
--
include/data_sync_task.h-86-    absl::flat_hash_map<
include/data_sync_task.h-87-        size_t,
include/data_sync_task.h:88:        std::vector<std::vector<UpdateCceCkptTsCc::CkptTsEntry>>>
include/data_sync_task.h-89-        cce_entries_;
include/data_sync_task.h-90-    std::unordered_set<std::string> dedup_kv_table_names_;
include/data_sync_task.h-91-
include/data_sync_task.h-92-    std::mutex mux_;
include/data_sync_task.h-93-    std::condition_variable cv_;
include/data_sync_task.h-94-};
include/data_sync_task.h-95-
include/data_sync_task.h-96-struct TableRangeEntry;
include/data_sync_task.h-97-
include/data_sync_task.h-98-// On HashPartition, we handle DataSyncTask separatelly by core.
include/data_sync_task.h-99-// On RangePartition, we handle DataSyncTask separatelly by range.
include/data_sync_task.h-100-struct DataSyncTask
include/data_sync_task.h-101-{
include/data_sync_task.h-102-public:
include/data_sync_task.h-103-    DataSyncTask(
--
include/data_sync_task.h-224-    bthread::Mutex update_cce_mux_;
include/data_sync_task.h-225-    std::string kv_table_name_;
include/data_sync_task.h:226:    absl::flat_hash_map<size_t, std::vector<UpdateCceCkptTsCc::CkptTsEntry>>
include/data_sync_task.h-227-        cce_entries_;
include/data_sync_task.h-228-
include/data_sync_task.h-229-    bool need_update_ckpt_ts_{true};
include/data_sync_task.h-230-};
include/data_sync_task.h-231-
include/data_sync_task.h-232-struct FlushTaskEntry
include/data_sync_task.h-233-{
include/data_sync_task.h-234-public:
include/data_sync_task.h-235-    FlushTaskEntry(
include/data_sync_task.h-236-        std::unique_ptr<std::vector<FlushRecord>> &&data_sync_vec,
include/data_sync_task.h-237-        std::unique_ptr<std::vector<FlushRecord>> &&archive_vec,
include/data_sync_task.h-238-        std::unique_ptr<std::vector<std::pair<TxKey, int32_t>>> &&mv_base_vec,
include/data_sync_task.h-239-        TransactionExecution *data_sync_txm,
include/data_sync_task.h-240-        std::shared_ptr<DataSyncTask> data_sync_task,
include/data_sync_task.h-241-        std::shared_ptr<const TableSchema> table_schema,
--
include/cc/cc_req_misc.h-886-    CcErrorCode error_code_;
include/cc/cc_req_misc.h-887-};
include/cc/cc_req_misc.h:888:struct UpdateCceCkptTsCc : public CcRequestBase
include/cc/cc_req_misc.h-889-{
include/cc/cc_req_misc.h-890-public:
include/cc/cc_req_misc.h-891-    static constexpr size_t SCAN_BATCH_SIZE = 128;
include/cc/cc_req_misc.h-892-
include/cc/cc_req_misc.h-893-    struct CkptTsEntry
include/cc/cc_req_misc.h-894-    {
include/cc/cc_req_misc.h-895-        CkptTsEntry() = default;
include/cc/cc_req_misc.h-896-        CkptTsEntry(LruEntry *cce, uint64_t commit_ts, size_t post_flush_size)
include/cc/cc_req_misc.h-897-            : cce_(cce),
include/cc/cc_req_misc.h-898-              commit_ts_(commit_ts),
include/cc/cc_req_misc.h-899-              post_flush_size_(post_flush_size)
include/cc/cc_req_misc.h-900-
include/cc/cc_req_misc.h-901-        {
include/cc/cc_req_misc.h-902-        }
include/cc/cc_req_misc.h-903-
--
include/cc/cc_req_misc.h-907-    };
include/cc/cc_req_misc.h-908-
include/cc/cc_req_misc.h:909:    UpdateCceCkptTsCc(
include/cc/cc_req_misc.h-910-        NodeGroupId node_group_id,
include/cc/cc_req_misc.h-911-        int64_t term,
include/cc/cc_req_misc.h-912-        absl::flat_hash_map<size_t, std::vector<CkptTsEntry>> &cce_entries,
include/cc/cc_req_misc.h-913-        bool range_partitioned,
include/cc/cc_req_misc.h-914-        bool versioned_payload)
include/cc/cc_req_misc.h-915-        : cce_entries_(cce_entries),
include/cc/cc_req_misc.h-916-          node_group_id_(node_group_id),
include/cc/cc_req_misc.h-917-          term_(term),
include/cc/cc_req_misc.h-918-          range_partitioned_(range_partitioned),
include/cc/cc_req_misc.h-919-          versioned_payload_(versioned_payload)
include/cc/cc_req_misc.h-920-    {
include/cc/cc_req_misc.h-921-        unfinished_core_cnt_ = cce_entries_.size();
include/cc/cc_req_misc.h-922-        assert(unfinished_core_cnt_ > 0);
include/cc/cc_req_misc.h-923-
include/cc/cc_req_misc.h-924-        for (const auto &entry : cce_entries_)
--
include/cc/cc_req_misc.h-928-    }
include/cc/cc_req_misc.h-929-
include/cc/cc_req_misc.h:930:    UpdateCceCkptTsCc(const UpdateCceCkptTsCc &) = delete;
include/cc/cc_req_misc.h:931:    UpdateCceCkptTsCc &operator=(const UpdateCceCkptTsCc &) = delete;
include/cc/cc_req_misc.h-932-

</shell_result>


Script executed:

#!/bin/bash
# Search more specifically for CkptTsCc structure/class definition
rg -n "struct CkptTsCc|class CkptTsCc" --type=h --type=cpp -A 20

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

include/cc/cc_request.h:3036:struct CkptTsCc : public CcRequestBase
include/cc/cc_request.h-3037-{
include/cc/cc_request.h-3038-public:
include/cc/cc_request.h-3039-    CkptTsCc(size_t shard_cnt, NodeGroupId ng_id)
include/cc/cc_request.h-3040-        : ckpt_ts_(UINT64_MAX),
include/cc/cc_request.h-3041-          mux_(),
include/cc/cc_request.h-3042-          cv_(),
include/cc/cc_request.h-3043-          unfinish_cnt_(shard_cnt),
include/cc/cc_request.h-3044-          cc_ng_id_(ng_id)
include/cc/cc_request.h-3045-    {
include/cc/cc_request.h-3046-        for (size_t i = 0; i < unfinish_cnt_; i++)
include/cc/cc_request.h-3047-        {
include/cc/cc_request.h-3048-            memory_allocated_vec_.emplace_back(0);
include/cc/cc_request.h-3049-            memory_committed_vec_.emplace_back(0);
include/cc/cc_request.h-3050-            heap_full_vec_.emplace_back(false);
include/cc/cc_request.h-3051-            standby_msg_seq_id_vec_.emplace_back(0);
include/cc/cc_request.h-3052-        }
include/cc/cc_request.h-3053-    }
include/cc/cc_request.h-3054-
include/cc/cc_request.h-3055-    CkptTsCc() = delete;
include/cc/cc_request.h-3056-    CkptTsCc(const CkptTsCc &) = delete;

</shell_result>


Script executed:

#!/bin/bash
# Find GetCkptTs() method in CkptTsCc
rg -n "GetCkptTs" include/cc/cc_request.h -A 3 -B 1

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

3111-
3112:    uint64_t GetCkptTs() const
3113-    {
3114-        return ckpt_ts_.load(std::memory_order_relaxed);
3115-    }

</shell_result>


Script executed:

#!/bin/bash
# Find where ckpt_ts_ is modified in CkptTsCc
sed -n '3036,3200p' include/cc/cc_request.h | grep -n "ckpt_ts_"

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

5:        : ckpt_ts_(UINT64_MAX),
45:        uint64_t old_val = ckpt_ts_.load(std::memory_order_relaxed);
48:            while (!ckpt_ts_.compare_exchange_weak(
79:        return ckpt_ts_.load(std::memory_order_relaxed);
143:        req.set_consistent_ts(ckpt_ts_);
165:    std::atomic<uint64_t> ckpt_ts_;

</shell_result>


Script executed:

#!/bin/bash
# Get full context of CkptTsCc structure to understand UpdateCkptTs logic
sed -n '3036,3200p' include/cc/cc_request.h

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

struct CkptTsCc : public CcRequestBase
{
public:
    CkptTsCc(size_t shard_cnt, NodeGroupId ng_id)
        : ckpt_ts_(UINT64_MAX),
          mux_(),
          cv_(),
          unfinish_cnt_(shard_cnt),
          cc_ng_id_(ng_id)
    {
        for (size_t i = 0; i < unfinish_cnt_; i++)
        {
            memory_allocated_vec_.emplace_back(0);
            memory_committed_vec_.emplace_back(0);
            heap_full_vec_.emplace_back(false);
            standby_msg_seq_id_vec_.emplace_back(0);
        }
    }

    CkptTsCc() = delete;
    CkptTsCc(const CkptTsCc &) = delete;
    CkptTsCc(CkptTsCc &&) = delete;

    bool Execute(CcShard &ccs) override
    {
        uint64_t tx_min_ts = 0;
        if (Sharder::Instance().StandbyNodeTerm() > 0)
        {
            tx_min_ts = ccs.MinLastStandbyConsistentTs();
        }
        else
        {
            tx_min_ts = ccs.ActiveTxMinTs(cc_ng_id_);
            standby_msg_seq_id_vec_[ccs.core_id_] =
                ccs.NextStandbyMessageSequence() - 1;
            if (ccs.core_id_ == 0)
            {
                subscribed_node_ids_ = ccs.GetSubscribedStandbys();
            }
        }

        int64_t allocated, committed;
        bool full = ccs.GetShardHeap()->Full(&allocated, &committed);

        uint64_t old_val = ckpt_ts_.load(std::memory_order_relaxed);
        if (old_val > tx_min_ts)
        {
            while (!ckpt_ts_.compare_exchange_weak(
                       old_val, tx_min_ts, std::memory_order_acq_rel) &&
                   old_val > tx_min_ts)
                ;
        }
        memory_allocated_vec_[ccs.LocalCoreId()] = allocated;
        memory_committed_vec_[ccs.LocalCoreId()] = committed;
        heap_full_vec_[ccs.LocalCoreId()] = full;

        std::unique_lock lk(mux_);
        if (--unfinish_cnt_ == 0)
        {
            cv_.notify_one();
        }

        // return false since CkptTsCc is not reused and does not need to call
        // CcRequestBase::Free
        return false;
    }

    void Wait()
    {
        std::unique_lock lk(mux_);
        while (unfinish_cnt_ > 0)
        {
            cv_.wait(lk);
        }
    }

    uint64_t GetCkptTs() const
    {
        return ckpt_ts_.load(std::memory_order_relaxed);
    }

    uint64_t GetMemUsage() const
    {
        uint64_t total_usage = 0;
        for (uint64_t shard_usage : memory_allocated_vec_)
        {
            total_usage += shard_usage;
        }
        // return in kb
        return total_usage / 1024;
    }

    uint64_t GetMemCommited() const
    {
        uint64_t total_cmt = 0;
        for (uint64_t shard_cmt : memory_committed_vec_)
        {
            total_cmt += shard_cmt;
        }
        // return in kb
        return total_cmt / 1024;
    }

    std::vector<uint64_t> GetStandbySequenceIds() const
    {
        return standby_msg_seq_id_vec_;
    }

    void ShardMemoryUsageReport()
    {
        for (uint16_t core_id = 0; core_id < memory_allocated_vec_.size();
             core_id++)
        {
            uint64_t &allocated = memory_allocated_vec_[core_id];
            uint64_t &committed = memory_committed_vec_[core_id];
            bool heap_full = heap_full_vec_[core_id];

            LOG(INFO) << "ccs " << core_id << " memory usage report, committed "
                      << committed << ", allocated " << allocated
                      << ", frag ratio " << std::setprecision(2)
                      << 100 * (static_cast<float>(committed - allocated) /
                                committed)
                      << " , heap full: " << heap_full;
        }
    }

    void UpdateStandbyConsistentTs()
    {
        if (subscribed_node_ids_.empty())
        {
            return;
        }
        int64_t ng_term = Sharder::Instance().LeaderTerm(cc_ng_id_);
        if (ng_term < 0)
        {
            return;
        }
        brpc::Controller cntl;
        remote::UpdateStandbyConsistentTsRequest req;
        remote::UpdateStandbyConsistentTsResponse resp;
        req.set_ng_term(ng_term);
        req.set_node_group_id(cc_ng_id_);
        req.set_consistent_ts(ckpt_ts_);
        for (uint64_t seq_id : standby_msg_seq_id_vec_)
        {
            req.add_seq_ids(seq_id);
        }
        for (uint32_t node_id : subscribed_node_ids_)
        {
            auto channel = Sharder::Instance().GetCcNodeServiceChannel(node_id);
            if (channel)
            {
                remote::CcRpcService_Stub stub(channel.get());
                cntl.Reset();
                cntl.set_timeout_ms(500);
                resp.Clear();

                // We don't care about response
                stub.UpdateStandbyConsistentTs(&cntl, &req, &resp, nullptr);
            }
        }
    }

private:
    std::atomic<uint64_t> ckpt_ts_;

</shell_result>


Script executed:

#!/bin/bash
# Check how GetCatalogTableNameSnapshot uses the timestamp parameter
rg -n "GetCatalogTableNameSnapshot" --type=cpp --type=h -A 5 -B 2 | head -50

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

src/tx_operation.cpp-7502-
src/tx_operation.cpp-7503-        hash_partitioned_tables_snapshot_.clear();
src/tx_operation.cpp:7504:        auto all_catalog_snapshot = local_shards->GetCatalogTableNameSnapshot(
src/tx_operation.cpp-7505-            txm->TxCcNodeId(), txm->CommitTs());
src/tx_operation.cpp-7506-        for (auto &[tbl, is_dirty] : all_catalog_snapshot)
src/tx_operation.cpp-7507-        {
src/tx_operation.cpp-7508-            if (tbl.IsHashPartitioned())
src/tx_operation.cpp-7509-            {
--
src/tx_operation.cpp-7722-        //  data in this bucket.
src/tx_operation.cpp-7723-        hash_partitioned_tables_snapshot_.clear();
src/tx_operation.cpp:7724:        auto all_catalog_snapshot = local_shards->GetCatalogTableNameSnapshot(
src/tx_operation.cpp-7725-            txm->TxCcNodeId(), txm->CommitTs());
src/tx_operation.cpp-7726-        for (auto &[tbl, is_dirty] : all_catalog_snapshot)
src/tx_operation.cpp-7727-        {
src/tx_operation.cpp-7728-            if (tbl.IsHashPartitioned())
src/tx_operation.cpp-7729-            {
--
src/checkpointer.cpp-233-        // string owner.
src/checkpointer.cpp-234-        std::unordered_map<TableName, bool> tables =
src/checkpointer.cpp:235:            local_shards_.GetCatalogTableNameSnapshot(node_group, ckpt_ts);
src/checkpointer.cpp-236-
src/checkpointer.cpp-237-        std::shared_ptr<DataSyncStatus> status =
src/checkpointer.cpp-238-            std::make_shared<DataSyncStatus>(
src/checkpointer.cpp-239-                node_group,
src/checkpointer.cpp-240-                is_standby_node ? standby_node_term : leader_term,
--
src/store/snapshot_manager.cpp-332-    // string owner.
src/store/snapshot_manager.cpp-333-    std::unordered_map<TableName, bool> tables =
src/store/snapshot_manager.cpp:334:        local_shards.GetCatalogTableNameSnapshot(node_group, ckpt_ts);
src/store/snapshot_manager.cpp-335-
src/store/snapshot_manager.cpp-336-    // Find minimum ckpt_ts from all the ccshards in parallel. ckpt_ts is
src/store/snapshot_manager.cpp-337-    // the minimum timestamp minus 1 among all the active transactions, thus
src/store/snapshot_manager.cpp-338-    // it's safe to flush all the entries smaller than or equal to ckpt_ts.
src/store/snapshot_manager.cpp-339-    for (size_t i = 0; i < local_shards.Count(); i++)
--
src/cc/cc_shard.cpp-2529-}
src/cc/cc_shard.cpp-2530-
src/cc/cc_shard.cpp:2531:std::unordered_map<TableName, bool> CcShard::GetCatalogTableNameSnapshot(
src/cc/cc_shard.cpp-2532-    NodeGroupId cc_ng_id)
src/cc/cc_shard.cpp-2533-{
src/cc/cc_shard.cpp-2534-    uint64_t ckpt_ts = Now();
src/cc/cc_shard.cpp:2535:    return local_shards_.GetCatalogTableNameSnapshot(cc_ng_id, ckpt_ts);
src/cc/cc_shard.cpp-2536-}
src/cc/cc_shard.cpp-2537-
src/cc/cc_shard.cpp-2538-void CcShard::AddCandidateStandby(uint32_t node_id, uint64_t start_seq_id)
src/cc/cc_shard.cpp-2539-{
src/cc/cc_shard.cpp-2540-    candidate_standby_nodes_[node_id] = start_seq_id;
--
src/cc/local_cc_shards.cpp-567-}

</shell_result>


Script executed:

#!/bin/bash
# Find GetCatalogTableNameSnapshot implementation
rg -n "GetCatalogTableNameSnapshot" src/cc/local_cc_shards.cpp -A 20

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

569:std::unordered_map<TableName, bool> LocalCcShards::GetCatalogTableNameSnapshot(
570-    NodeGroupId cc_ng_id, uint64_t snapshot_ts)
571-{
572-    std::unordered_map<TableName, bool> tables;
573-    std::shared_lock<std::shared_mutex> lk(meta_data_mux_);
574-    for (const auto &[base_table_name, ng_catalog_map] : table_catalogs_)
575-    {
576-        auto catalog_it = ng_catalog_map.find(cc_ng_id);
577-        if (catalog_it != ng_catalog_map.end())
578-        {
579-            CatalogEntry &catalog_entry =
580-                const_cast<CatalogEntry &>(catalog_it->second);
581-            std::shared_lock<std::shared_mutex> lk(catalog_entry.s_mux_);
582-            if (catalog_entry.schema_ != nullptr)
583-            {
584-                auto ins_it = tables.emplace(
585-                    std::piecewise_construct,
586-                    std::forward_as_tuple(base_table_name.StringView().data(),
587-                                          base_table_name.StringView().size(),
588-                                          base_table_name.Type(),
589-                                          base_table_name.Engine()),
--
5694:                GetCatalogTableNameSnapshot(node_group, sync_ts);
5695-
5696-            // Loop over all tables and sync stats.
5697-            for (auto it = tables.begin(); it != tables.end(); ++it)
5698-            {
5699-                if (Sharder::Instance().LeaderTerm(node_group) != leader_term)
5700-                {
5701-                    // Skip the node groups that are no longer on this node.
5702-                    break;
5703-                }
5704-
5705-                const TableName &table_name = it->first;
5706-                bool is_dirty = it->second;
5707-                if (!table_name.IsMeta() && table_name != sequence_table_name)
5708-                {
5709-                    // Set isolation level to RepeatableRead to ensure the
5710-                    // readlock will be set during the execution of the
5711-                    // following ReadTxRequest.
5712-                    TransactionExecution *txm =
5713-                        NewTxInit(tx_service_,
5714-                                  IsolationLevel::RepeatableRead,
--
6059:                GetCatalogTableNameSnapshot(ng_id_, ClockTs());
6060-
6061-            std::vector<TableName> table_names;
6062-            for (auto &table : tables)
6063-            {
6064-                if (table.first.IsMeta())
6065-                {
6066-                    continue;
6067-                }
6068-                table_names.push_back(table.first);
6069-            }
6070-
6071-            for (auto &table_name : table_names)
6072-            {
6073-                res.Reset();
6074-                {
6075-                    std::unique_lock<std::mutex> lk(mux);
6076-                    done = false;
6077-                }
6078-                purge_cc.Reset(table_name,
6079-                               node_group,
--
6147:                GetCatalogTableNameSnapshot(ng_id_, ClockTs());
6148-
6149-            std::vector<TableName> table_names;
6150-            for (auto &table : tables)
6151-            {
6152-                if (table.first.IsMeta())
6153-                {
6154-                    continue;
6155-                }
6156-                table_names.push_back(table.first);
6157-            }
6158-
6159-            for (auto &table_name : table_names)
6160-            {
6161-                res.Reset();
6162-                {
6163-                    std::unique_lock<std::mutex> lk(mux);
6164-                    done = false;
6165-                }
6166-
6167-                kickout_cc.Reset(table_name,

</shell_result>


Script executed:

#!/bin/bash
# Get full GetCatalogTableNameSnapshot implementation
sed -n '569,610p' src/cc/local_cc_shards.cpp

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

std::unordered_map<TableName, bool> LocalCcShards::GetCatalogTableNameSnapshot(
    NodeGroupId cc_ng_id, uint64_t snapshot_ts)
{
    std::unordered_map<TableName, bool> tables;
    std::shared_lock<std::shared_mutex> lk(meta_data_mux_);
    for (const auto &[base_table_name, ng_catalog_map] : table_catalogs_)
    {
        auto catalog_it = ng_catalog_map.find(cc_ng_id);
        if (catalog_it != ng_catalog_map.end())
        {
            CatalogEntry &catalog_entry =
                const_cast<CatalogEntry &>(catalog_it->second);
            std::shared_lock<std::shared_mutex> lk(catalog_entry.s_mux_);
            if (catalog_entry.schema_ != nullptr)
            {
                auto ins_it = tables.emplace(
                    std::piecewise_construct,
                    std::forward_as_tuple(base_table_name.StringView().data(),
                                          base_table_name.StringView().size(),
                                          base_table_name.Type(),
                                          base_table_name.Engine()),
                    std::forward_as_tuple(false));
                assert(ins_it.second);
                (void) ins_it;
                for (const txservice::TableName &index_table_name :
                     catalog_entry.schema_->IndexNames())
                {
                    auto ins_it =
                        tables.emplace(std::piecewise_construct,
                                       std::forward_as_tuple(
                                           index_table_name.StringView().data(),
                                           index_table_name.StringView().size(),
                                           index_table_name.Type(),
                                           index_table_name.Engine()),
                                       std::forward_as_tuple(false));
                    assert(ins_it.second);
                    // This silences the -Wunused-but-set-variable warning
                    // without any runtime overhead.
                    (void) ins_it;
                }

                // For alter table, should include new index tables.

</shell_result>


Script executed:

#!/bin/bash
# Search for snapshot_ts usage in the file
sed -n '569,620p' src/cc/local_cc_shards.cpp | grep -n "snapshot_ts"

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

2:    NodeGroupId cc_ng_id, uint64_t snapshot_ts)
44:                // than the `snapshot_ts`, it means there is no data in the
45:                // dirty index table before the `snapshot_ts`, then, there is no
48:                    catalog_entry.dirty_schema_version_ <= snapshot_ts)

</shell_result>


Script executed:

#!/bin/bash
# Get the context around dirty_schema_version check
sed -n '610,630p' src/cc/local_cc_shards.cpp

Repository: eloqdata/tx_service


<shell_result>
Repository: eloqdata/tx_service
Exit code: 0

stdout:

                // For alter table, should include new index tables.
                // For dirty index tables, if the dirty schema version is larger
                // than the `snapshot_ts`, it means there is no data in the
                // dirty index table before the `snapshot_ts`, then, there is no
                // need to do checkpoint or table stats sync.
                if (catalog_entry.dirty_schema_ != nullptr &&
                    catalog_entry.dirty_schema_version_ <= snapshot_ts)
                {
                    // Only search new index table name, because the base table
                    // and the old index have been obtained via above.
                    for (const txservice::TableName &index_table_name :
                         catalog_entry.dirty_schema_->IndexNames())
                    {
                        auto iter = tables.find(index_table_name);
                        if (iter == tables.end())
                        {
                            auto ins_it = tables.emplace(
                                std::piecewise_construct,
                                std::forward_as_tuple(
                                    index_table_name.StringView().data(),
                                    index_table_name.StringView().size(),

</shell_result>


Perfect! I now have all the information needed. The review comment is correct - there is indeed a real issue with the ordering.

Summary of Findings

The checkpoint timestamp is used in GetCatalogTableNameSnapshot() to determine which dirty schema version tables to include (the function filters based on catalog_entry.dirty_schema_version_ <= snapshot_ts).

The bug:

  1. Line 329 calls GetCkptTs() immediately after construction, returning UINT64_MAX (initialized in the constructor)
  2. Line 334 uses this UINT64_MAX value to get the catalog snapshot, potentially including incorrect dirty schema tables
  3. Lines 339-342 enqueue the parallel requests that would compute the actual minimum timestamp
  4. Line 343 calls Wait() to complete the parallel scan
  5. Only after Wait() would ckpt_ts_ be updated to the correct minimum value

This is a critical ordering bug - the code uses an uninitialized checkpoint timestamp value before the parallel computation that determines the correct value.


Verify that the checkpoint timestamp computation happens before usage.

The checkpoint timestamp is obtained from ckpt_req.GetCkptTs() on line 329, which returns the initial value UINT64_MAX. However, the parallel scan that computes the actual minimum checkpoint timestamp (lines 339-342) happens after this call and requires Wait() on line 343 to complete.

The issue is that GetCatalogTableNameSnapshot() uses this timestamp to filter dirty schema tables (line 616 in local_cc_shards.cpp), but it receives UINT64_MAX instead of the computed minimum. This could result in including incorrect dirty schema versions in the catalog snapshot.

Recommend: Call ckpt_req.Wait() before calling GetCkptTs() and using the result, or compute the minimum timestamp through a different mechanism before querying the catalog.

🤖 Prompt for AI Agents
In src/store/snapshot_manager.cpp around lines 329-334, the code calls
ckpt_req.GetCkptTs() (which returns UINT64_MAX initially) before the parallel
tasks that compute the real minimum checkpoint timestamp have completed; this
leads to GetCatalogTableNameSnapshot() being passed an incorrect timestamp and
possibly including wrong dirty-schema tables—fix by ensuring the checkpoint
timestamp is computed before use: call ckpt_req.Wait() (or otherwise complete
the parallel minimum-timestamp computation) before calling GetCkptTs() and
passing the result to GetCatalogTableNameSnapshot(), or reorder the code so the
parallel enqueue/Wait happen prior to retrieving and using ckpt_ts.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
include/data_sync_task.h (1)

62-67: Skip tracking helper and flag look correct; minor naming nit

SetEntriesSkippedAndNoTruncateLog() correctly takes the same lock as SetNoTruncateLog(), atomically marks has_skipped_entries and disables need_truncate_log_, which matches the intended checkpoint semantics when DataSyncScan skips entries. As a minor style nit, consider renaming has_skipped_entries to has_skipped_entries_ to align with the other DataSyncStatus data members.

Also applies to: 86-89

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d386eb9 and e1654c7.

📒 Files selected for processing (3)
  • include/data_sync_task.h (2 hunks)
  • src/cc/local_cc_shards.cpp (1 hunks)
  • src/store/snapshot_manager.cpp (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/store/snapshot_manager.cpp
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • src/cc/local_cc_shards.cpp
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • src/cc/local_cc_shards.cpp
🔇 Additional comments (1)
src/cc/local_cc_shards.cpp (1)

4449-4465: Hash-partition LOG_NOT_TRUNCATABLE handling correctly records skipped entries

Special-casing CcErrorCode::LOG_NOT_TRUNCATABLE to call data_sync_task->status_->SetEntriesSkippedAndNoTruncateLog() ensures the checkpoint status both disables log truncation and flags that entries were skipped, without treating the scan as a hard failure. The helper’s internal mutex makes this safe even if invoked multiple times.

@MrGuin MrGuin force-pushed the fix_standby_forward branch from e1654c7 to 3e079d2 Compare November 28, 2025 07:35
@MrGuin MrGuin merged commit 00dfba4 into main Nov 28, 2025
4 checks passed
@MrGuin MrGuin deleted the fix_standby_forward branch November 28, 2025 07:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants