Skip to content

Replace fixed-size standby message buffer with memory-bounded queue#211

Merged
liunyl merged 6 commits intomainfrom
memory_bound
Nov 13, 2025
Merged

Replace fixed-size standby message buffer with memory-bounded queue#211
liunyl merged 6 commits intomainfrom
memory_bound

Conversation

@liunyl
Copy link
Contributor

@liunyl liunyl commented Nov 12, 2025

Replace the fixed-size vector-based standby message buffer with a memory-bounded queue that tracks memory usage and evicts oldest messages when the limit is reached. This change improves memory efficiency by allocating entries on-demand and freeing them immediately when no longer needed.

Fixes bug in #206

Summary by CodeRabbit

  • Refactor

    • Standby message buffering now uses a memory‑bounded history with per‑entry size accounting, automatic eviction, and deterministic lifecycles to prevent leaks and double‑frees.
    • Forwarding and retry paths now tolerate evicted/missing entries and perform targeted cleanup to maintain consistency.
  • New Features

    • Tracking of candidate standby subscriptions with coordinated rollback across shards to ensure reliable subscription changes and resends.

@coderabbitai
Copy link

coderabbitai bot commented Nov 12, 2025

Walkthrough

Convert raw StandbyForwardEntry* ownership to std::unique_ptr, move allocation/transfer paths to explicit ownership transfers, replace fixed-size standby buffers with a memory-bounded deque+map history and eviction, and add candidate-standby tracking, lifecycle APIs, and rollback integration. (50 words)

Changes

Cohort / File(s) Summary
Ownership & API changes
include/cc/cc_entry.h, include/cc/non_blocking_lock.h, src/cc/non_blocking_lock.cpp, include/cc/object_cc_map.h, include/cc/catalog_cc_map.h
Replace raw StandbyForwardEntry* with std::unique_ptr<StandbyForwardEntry> in storage and transfer APIs. SetForwardEntry() now takes std::unique_ptr; ReleaseForwardEntry() added. Call sites updated to create entries with std::make_unique(...) and move/release ownership when forwarding; PostCommit path in catalog_cc_map.h now releases a unique_ptr into ForwardStandbyMessage.
Standby buffer → deque + accounting
include/cc/cc_shard.h, src/cc/cc_shard.cpp
Remove legacy fixed-size buffers and GetNextStandbyForwardEntry; add history_standby_msg_ (std::deque<std::unique_ptr<StandbyForwardEntry>>), seq_id_to_entry_map_, memory accounting fields (total_standby_buffer_memory_usage_, standby_buffer_memory_limit_), eviction and CheckAndFreeUnneededEntries(), destructor and per-entry memory sizing. ForwardStandbyMessage and retry paths updated for ownership and accounting.
Candidate standby tracking & lifecycle
include/cc/cc_shard.h, src/cc/cc_shard.cpp, src/remote/cc_node_service.cpp
Add candidate_standby_nodes_ map and methods AddCandidateStandby() / RemoveCandidateStandby() / CheckAndFreeUnneededEntries(). Integrate candidate tracking into StandbyStartFollowing and ResetStandbySequenceId; record successful per-shard additions and roll back candidate additions on later errors by removing previously-added candidates.
StandbyForwardEntry simplification
include/standby.h, include/cc/catalog_cc_map.h, include/cc/object_cc_map.h
Remove Free(), IsFree(), Reset() and in_use_ from StandbyForwardEntry. Constructor now sets out_of_sync=false and sequence_id_ = UINT64_MAX. Add MemorySize(); forward-entry creation path populates additional request metadata and serialized key.
Forwarding, retry & eviction flow
src/cc/cc_shard.cpp, include/cc/..., src/remote/cc_node_service.cpp
ForwardStandbyMessage and resend logic now accept ownership via unique_ptr, assign sequence IDs, enqueue in deque/map, perform eviction when memory limit exceeded, mark out-of-sync on eviction, and use seq_id_to_entry_map_ for retries. Resubscribe/rollback flows updated when entries missing/evicted.

Sequence Diagram(s)

sequenceDiagram
  participant Op as Operation (commit/abort)
  participant CCE as CcEntry / LruEntry
  participant NB as KeyGapLockAndExtraData
  participant CcShard as CcShard
  participant Stream as StreamSender

  Note over Op: Commit/Truncate triggers standby forward
  Op->>CCE: request standby forward
  CCE->>NB: SetForwardEntry(std::unique_ptr) — ownership stored
  CCE->>CcShard: ForwardStandbyMessage(std::move(unique_ptr)) — transfer ownership
  CcShard->>CcShard: assign sequence_id, enqueue in history_standby_msg_ and seq_id_to_entry_map_
  alt memory limit exceeded
    CcShard->>CcShard: CheckAndFreeUnneededEntries() → evict oldest entries, set out_of_sync
  end
  CcShard->>Stream: send message (uses raw ptr from unique_ptr)
  Stream-->>CcShard: send result (success/fail)
  alt send failed
    CcShard->>CcShard: schedule retry (lookup by seq_id in map) or trigger resubscribe if evicted
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

  • Files/areas to scrutinize:
    • Ownership transfers (SetForwardEntry / ReleaseForwardEntry / ForwardStandbyMessage) to avoid use-after-free or leaks.
    • Memory accounting and eviction correctness in CcShard (MemorySize usage, updates to total_standby_buffer_memory_usage_).
    • Retry/resend behavior for entries present vs evicted (seq_id_to_entry_map_ handling).
    • Candidate add/remove and rollback sequencing in src/remote/cc_node_service.cpp.

Poem

🐰 I wrapped each pointer snug and small,
A deque to catch the hops and all.
Candidates noted, old bytes set free,
Sequence IDs hop — queues keep the key.
Forwarding rolls on, neat and merry!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 10.53% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Description check ❓ Inconclusive The description covers the main change and issue reference but is missing completion of several template checklist items (tests, documentation, test suite pass confirmation). Complete the checklist by confirming test addition, documentation updates, and successful test suite execution (./mtr --suite=mono_main,mono_multi,mono_basic).
✅ Passed checks (1 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly and concisely summarizes the main architectural change: replacing a fixed-size vector-based buffer with a memory-bounded queue approach, which aligns with the core objectives and file changes.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch memory_bound

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4fad0a0 and a20b7ce.

📒 Files selected for processing (4)
  • .cursor_knowledge/standby-message-buffer-memory-bounded.md (1 hunks)
  • include/cc/cc_shard.h (5 hunks)
  • include/standby.h (2 hunks)
  • src/cc/cc_shard.cpp (10 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • src/cc/cc_shard.cpp
  • include/cc/cc_shard.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • src/cc/cc_shard.cpp
  • .cursor_knowledge/standby-message-buffer-memory-bounded.md
  • include/cc/cc_shard.h
🧬 Code graph analysis (2)
src/cc/cc_shard.cpp (1)
include/cc/cc_shard.h (4)
  • standby_buffer_memory_limit_ (1167-1167)
  • CcShard (233-926)
  • total_standby_buffer_memory_usage_ (1165-1165)
  • next_forward_sequence_id_ (1168-1168)
include/cc/cc_shard.h (1)
src/cc/cc_shard.cpp (8)
  • CcShard (58-200)
  • CcShard (202-210)
  • AddCandidateStandby (2532-2538)
  • AddCandidateStandby (2532-2532)
  • RemoveCandidateStandby (2540-2549)
  • RemoveCandidateStandby (2540-2540)
  • CheckAndFreeUnneededEntries (2551-2633)
  • CheckAndFreeUnneededEntries (2551-2551)
🪛 markdownlint-cli2 (0.18.1)
.cursor_knowledge/standby-message-buffer-memory-bounded.md

52-52: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (11)
include/standby.h (2)

42-43: Good defensive initialization.

Initializing out_of_sync to false and sequence_id_ to UINT64_MAX provides clear sentinel values for uninitialized entries.


70-77: Well-encapsulated memory calculation.

Moving memory size calculation into the entry class improves encapsulation. The method correctly accounts for the protobuf field and sequence ID overhead.

include/cc/cc_shard.h (3)

31-31: LGTM: Necessary include for new data structure.

The <deque> include is required for the new history_standby_msg_ member.


955-957: Clean API for candidate standby lifecycle.

The three new methods provide a clear interface for managing candidate standbys and memory cleanup.


1156-1167: Well-structured memory-bounded storage.

The combination of deque for FIFO ordering, unordered_map for O(1) lookup, and explicit memory tracking fields provides efficient memory-bounded buffering.

src/cc/cc_shard.cpp (6)

111-113: Memory limit calculation is correct.

The standby buffer gets 10% of node memory per shard. Combined with the main memory limit (90%), this totals 100% per shard, which is correct partitioning.


202-210: Proper cleanup in destructor.

The destructor correctly clears all memory-bounded structures. The unique_ptr containers will automatically free entries.


2551-2633: Correct logic for freeing unneeded entries.

The method properly:

  • Finds minimum needed seq_id from both subscribed and candidate followers
  • Removes only entries with seq_id < min_needed_seq_id
  • Updates memory accounting correctly
  • Leverages FIFO ordering for efficient front removal

2635-2785: Solid ownership transfer and memory management.

The refactored ForwardStandbyMessage correctly:

  • Takes ownership via unique_ptr
  • Tracks send failures to determine if entry is needed
  • Considers candidate followers in need-check
  • Adds to history only when needed
  • Evicts oldest entries when memory limit exceeded
  • Removes candidates when their needed messages are evicted (line 2756: >= candidate_it->second is correct)
  • Frees entry automatically when not needed

2787-2898: Efficient retry with O(1) map lookup.

The retry logic now uses map lookup instead of scanning a vector. The out-of-sync notification when messages are evicted ensures standbys can recover.


2999-3015: Complete reset including candidates.

The method now properly clears candidate_standby_nodes_ (line 3007) along with the history queue and other state.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/remote/cc_node_service.cpp (1)

1622-1637: Roll back candidate tracking when start-following fails
If CheckLeaderTerm fails on any shard after earlier shards have already executed AddCandidateStandby, we return early with response->set_error(true) but never remove the entries that were inserted on those earlier shards. The new candidate bookkeeping therefore leaves the standby marked as “in progress” even though the start-following attempt failed, which can skew memory-bounded eviction logic downstream. Please add a rollback path (e.g., enqueue a WaitableCc over all shards that calls RemoveCandidateStandby(node_id)) before returning in the error branch, or defer calling AddCandidateStandby until after the loop succeeds for every shard. Otherwise the candidate set can permanently accumulate stale nodes.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5eaf657 and a31f942.

📒 Files selected for processing (9)
  • include/cc/catalog_cc_map.h (2 hunks)
  • include/cc/cc_entry.h (1 hunks)
  • include/cc/cc_shard.h (5 hunks)
  • include/cc/non_blocking_lock.h (4 hunks)
  • include/cc/object_cc_map.h (9 hunks)
  • include/standby.h (1 hunks)
  • src/cc/cc_shard.cpp (10 hunks)
  • src/cc/non_blocking_lock.cpp (1 hunks)
  • src/remote/cc_node_service.cpp (2 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T07:10:40.324Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • include/cc/cc_shard.h
  • src/cc/cc_shard.cpp
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • include/cc/cc_shard.h
  • src/cc/cc_shard.cpp
  • src/cc/non_blocking_lock.cpp
🧬 Code graph analysis (5)
include/cc/cc_entry.h (1)
src/cc/non_blocking_lock.cpp (4)
  • SetForwardEntry (753-757)
  • SetForwardEntry (753-754)
  • ReleaseForwardEntry (764-768)
  • ReleaseForwardEntry (765-765)
include/cc/non_blocking_lock.h (2)
include/cc/cc_entry.h (1)
  • SetForwardEntry (383-387)
src/cc/non_blocking_lock.cpp (4)
  • SetForwardEntry (753-757)
  • SetForwardEntry (753-754)
  • ReleaseForwardEntry (764-768)
  • ReleaseForwardEntry (765-765)
include/cc/cc_shard.h (1)
src/cc/cc_shard.cpp (10)
  • CcShard (58-200)
  • CcShard (202-210)
  • AddCandidateStandby (2541-2547)
  • AddCandidateStandby (2541-2541)
  • RemoveCandidateStandby (2549-2558)
  • RemoveCandidateStandby (2549-2549)
  • CheckAndFreeUnneededEntries (2560-2642)
  • CheckAndFreeUnneededEntries (2560-2560)
  • CalculateEntryMemorySize (2532-2539)
  • CalculateEntryMemorySize (2532-2533)
src/cc/cc_shard.cpp (1)
include/cc/cc_shard.h (4)
  • standby_buffer_memory_limit_ (1170-1170)
  • CcShard (233-926)
  • total_standby_buffer_memory_usage_ (1168-1168)
  • next_forward_sequence_id_ (1171-1171)
src/cc/non_blocking_lock.cpp (1)
include/cc/cc_entry.h (2)
  • SetForwardEntry (383-387)
  • ForwardEntry (377-381)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/cc/cc_shard.cpp (1)

2757-2777: CRITICAL: Fix backwards candidate eviction condition.

The condition if (oldest_seq_id <= candidate_it->second) incorrectly evicts candidates. This matches the issue flagged in the past review comment.

Problem: When we evict oldest_seq_id=10 and a candidate needs start_seq_id=15, the condition 10 <= 15 evaluates to true, causing premature removal even though the candidate doesn't need that message yet. Conversely, when oldest_seq_id=15 and candidate needs start_seq_id=10, the condition 15 <= 10 evaluates to false, so we don't remove the candidate even though we just evicted an entry they need.

Fix: Reverse the comparison:

-                if (oldest_seq_id <= candidate_it->second)
+                if (oldest_seq_id >= candidate_it->second)

This ensures candidates are only removed when we evict an entry at or after their required starting sequence.

🧹 Nitpick comments (2)
include/cc/object_cc_map.h (1)

953-955: Null out the locals after moving the entry.

After ReleaseForwardEntry() succeeds, the local forward_entry (and the adjacent forward_req) still look non-null even though ownership has moved away. Later guards such as if (forward_entry) will keep triggering despite the entry being gone, which is confusing and makes accidental reuse easier. Please set both locals to nullptr immediately after the move so their state mirrors reality.

src/cc/cc_shard.cpp (1)

2648-2653: Consider accepting unique_ptr parameter for clearer ownership semantics.

Taking a raw pointer and immediately wrapping it in unique_ptr obscures ownership transfer at call sites. Consider changing the signature to:

void ForwardStandbyMessage(std::unique_ptr<StandbyForwardEntry> entry)

This makes ownership transfer explicit in the function signature.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a31f942 and a42af12.

📒 Files selected for processing (9)
  • include/cc/catalog_cc_map.h (2 hunks)
  • include/cc/cc_entry.h (1 hunks)
  • include/cc/cc_shard.h (5 hunks)
  • include/cc/non_blocking_lock.h (4 hunks)
  • include/cc/object_cc_map.h (9 hunks)
  • include/standby.h (1 hunks)
  • src/cc/cc_shard.cpp (10 hunks)
  • src/cc/non_blocking_lock.cpp (1 hunks)
  • src/remote/cc_node_service.cpp (2 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/cc/cc_entry.h
  • src/remote/cc_node_service.cpp
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T07:10:40.324Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.324Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • src/cc/cc_shard.cpp
  • include/cc/cc_shard.h
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • src/cc/cc_shard.cpp
  • include/cc/cc_shard.h
  • include/cc/object_cc_map.h
  • src/cc/non_blocking_lock.cpp
🧬 Code graph analysis (4)
src/cc/cc_shard.cpp (1)
include/cc/cc_shard.h (4)
  • standby_buffer_memory_limit_ (1170-1170)
  • CcShard (233-926)
  • total_standby_buffer_memory_usage_ (1168-1168)
  • next_forward_sequence_id_ (1171-1171)
include/cc/cc_shard.h (1)
src/cc/cc_shard.cpp (10)
  • CcShard (58-200)
  • CcShard (202-210)
  • AddCandidateStandby (2541-2547)
  • AddCandidateStandby (2541-2541)
  • RemoveCandidateStandby (2549-2558)
  • RemoveCandidateStandby (2549-2549)
  • CheckAndFreeUnneededEntries (2560-2642)
  • CheckAndFreeUnneededEntries (2560-2560)
  • CalculateEntryMemorySize (2532-2539)
  • CalculateEntryMemorySize (2532-2533)
include/cc/non_blocking_lock.h (2)
include/cc/cc_entry.h (1)
  • SetForwardEntry (383-387)
src/cc/non_blocking_lock.cpp (4)
  • SetForwardEntry (753-757)
  • SetForwardEntry (753-754)
  • ReleaseForwardEntry (764-768)
  • ReleaseForwardEntry (765-765)
src/cc/non_blocking_lock.cpp (1)
include/cc/cc_entry.h (2)
  • SetForwardEntry (383-387)
  • ForwardEntry (377-381)
🔇 Additional comments (13)
include/cc/cc_shard.h (5)

31-31: LGTM: Include added for new data structure.

The <deque> include supports the new history_standby_msg_ member.


238-238: LGTM: Destructor declaration added.

The destructor is needed to properly clean up the new memory-bounded standby buffer structures.


955-957: LGTM: Public API additions for candidate standby tracking.

The new methods provide clean lifecycle management for candidate standby nodes transitioning to subscribed state.


1074-1076: LGTM: Memory accounting helper added.

The private helper encapsulates entry size calculation for memory tracking.


1159-1170: LGTM: Memory-bounded buffer replaces fixed-size approach.

The new design uses:

  • history_standby_msg_ (deque of unique_ptr) for ownership and FIFO eviction
  • seq_id_to_entry_map_ for O(1) lookups during resend
  • candidate_standby_nodes_ to track nodes transitioning from candidate to subscribed
  • Memory tracking fields to enforce the configured limit

This addresses the PR objectives to improve memory efficiency.

src/cc/cc_shard.cpp (8)

111-113: LGTM: Standby buffer memory limit configured.

Allocating 10% of node memory per shard for the standby buffer provides a reasonable bound while allowing flexibility for replication lag tolerance.


202-210: LGTM: Destructor cleanup order is correct.

Clearing history_standby_msg_ first ensures entries are freed before clearing seq_id_to_entry_map_, which contains non-owning pointers.


221-225: LGTM: Initialization is explicit and safe.

While the structures are already empty after construction, explicit clearing makes the initialization intent clear and safe for potential reinitialization paths.


2541-2547: LGTM: Candidate tracking added with logging.

The implementation correctly tracks candidate standby nodes. The map insertion will overwrite if a node is added twice, which is acceptable for this use case.


2549-2558: LGTM: Safe candidate removal with logging.

The implementation safely checks for existence before removal and logs the operation for observability.


2560-2642: LGTM: Entry garbage collection logic is sound.

The function correctly:

  1. Computes the minimum sequence ID needed across all subscribed and candidate standbys
  2. Removes entries from the front of the deque (FIFO) that are no longer needed
  3. Updates memory accounting as entries are freed

The logic properly handles the case where no standbys exist (clears everything).


2796-2907: LGTM: Resend logic correctly handles memory-bounded buffer.

The refactored resend logic properly:

  1. Uses seq_id_to_entry_map_ for O(1) lookups instead of linear scan
  2. Handles evicted entries gracefully by sending out-of-sync notifications to lagging standbys
  3. Calls CheckAndFreeUnneededEntries() after retries to reclaim memory from entries no longer needed

The batch limit of 500 messages per standby prevents excessive blocking.


3007-3022: LGTM: Complete reset of standby state.

The function properly clears all memory-bounded buffer structures and resets memory accounting when standby sequence is reset.

@liunyl liunyl force-pushed the memory_bound branch 3 times, most recently from 1ed633d to c2da4ad Compare November 12, 2025 10:59
@liunyl
Copy link
Contributor Author

liunyl commented Nov 12, 2025

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Nov 12, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
.cursor_knowledge/standby-message-buffer-memory-bounded.md (1)

216-228: Reinforce memory calculation accuracy concerns and add validation guidance.

The "Open Questions" section raises valid concerns about memory calculation accuracy and eviction churn. Consider adding:

  1. Memory calculation accuracy: Protobuf heap allocation patterns vary. Suggest caching per-entry size at allocation time rather than recalculating during eviction.
  2. Test coverage: Add guidance for test scenarios validating eviction behavior, candidate tracking lifecycle, and out-of-sync notification handling.
  3. Monitoring: Recommend observability points for tracking memory pressure events and eviction frequency to tune the 10% memory limit in production.

Consider expanding the "Open Questions & Known Risks" section with concrete mitigation strategies and test case patterns for validation.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between c2da4ad and 9476630.

📒 Files selected for processing (1)
  • .cursor_knowledge/standby-message-buffer-memory-bounded.md (1 hunks)
🧰 Additional context used
🪛 markdownlint-cli2 (0.18.1)
.cursor_knowledge/standby-message-buffer-memory-bounded.md

50-50: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

Comment on lines +50 to +68
```
tx_service/include/cc/
- cc_shard.h (data structures, method declarations)
- cc_entry.h (SetForwardEntry, ReleaseForwardEntry)
- non_blocking_lock.h (forward_entry_ as unique_ptr)
- object_cc_map.h (allocation point, ownership transfer)
- catalog_cc_map.h (catalog allocation point)

tx_service/src/cc/
- cc_shard.cpp (ForwardStandbyMessage, ResendFailedForwardMessages, memory tracking, eviction)
- cc_entry.cpp (SetForwardEntry, ReleaseForwardEntry implementations)
- non_blocking_lock.cpp (SetForwardEntry, ReleaseForwardEntry implementations)

tx_service/src/remote/
- cc_node_service.cpp (StandbyStartFollowing, ResetStandbySequenceId handlers)

tx_service/include/
- standby.h (StandbyForwardEntry structure, Remove Reset() method)
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add language identifier to code fence.

The file structure code block lacks a language specification. Use text or plaintext for the fenced code block.

-**File Structure:**
-```
+**File Structure:**
+```text
 tx_service/include/cc/
   - cc_shard.h (data structures, method declarations)
   - cc_entry.h (SetForwardEntry, ReleaseForwardEntry)
   - non_blocking_lock.h (forward_entry_ as unique_ptr)
   - object_cc_map.h (allocation point, ownership transfer)
   - catalog_cc_map.h (catalog allocation point)

 tx_service/src/cc/
   - cc_shard.cpp (ForwardStandbyMessage, ResendFailedForwardMessages, memory tracking, eviction)
   - cc_entry.cpp (SetForwardEntry, ReleaseForwardEntry implementations)
   - non_blocking_lock.cpp (SetForwardEntry, ReleaseForwardEntry implementations)

 tx_service/src/remote/
   - cc_node_service.cpp (StandbyStartFollowing, ResetStandbySequenceId handlers)

 tx_service/include/
   - standby.h (StandbyForwardEntry structure, Remove Reset() method)
-```
+```
🧰 Tools
🪛 markdownlint-cli2 (0.18.1)

50-50: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
In .cursor_knowledge/standby-message-buffer-memory-bounded.md around lines 50 to
68 the fenced file-structure block is missing a language identifier; update the
opening fence from ``` to ```text (or ```plaintext) so the block reads ```text
and keep the closing ``` as-is, ensuring the diffs reflect this change.

@liunyl
Copy link
Contributor Author

liunyl commented Nov 13, 2025

@CodeRabbit review

@coderabbitai
Copy link

coderabbitai bot commented Nov 13, 2025

✅ Actions performed

Review triggered.

Note: CodeRabbit is an incremental review system and does not re-review already reviewed commits. This command is applicable only when automatic reviews are paused.

Copy link
Collaborator

@githubzilla githubzilla left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Collaborator

@MrGuin MrGuin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should CheckAndFreeUnneededEntries when history_standby_msg_ is no longer needed, like RemoveSubscribedStandby and ResetStandbySequenceId.

assert(entry != nullptr);
// Memory size = protobuf message size + overhead
// Overhead: bool in_use_ + uint64_t sequence_id_ = 9 bytes
return entry->Message().ByteSizeLong() + sizeof(bool) + sizeof(uint64_t);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From ChatGpt, seems that ByteSizeLong iterates all the protobuf message fields, the CcMessage. If it takes quite some time maybe we should only calucalte the necessary fields set.
And ByteSizeLong might not be consistent with the real memory allocated.

// In ResetStandbySequenceId handler:
ccs.RemoveCandidateStandby(node_id);
ccs.AddSubscribedStandby(node_id, start_seq_id, standby_node_term);
ccs.CheckAndFreeUnneededEntries(); // Free entries no longer needed by this candidate
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the code, ResetStandbySequenceId handler does not call ccs.CheckAndFreeUnneededEntries().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes I have removed this line since candidate standby is not removed here, it is just moved to official standby and the referenced msg has not changed

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (3)
.cursor_knowledge/standby-message-buffer-memory-bounded.md (2)

50-68: Add language identifier to fenced code block.

Use "text" to satisfy MD040.

Apply:

-**File Structure:**
-```
+**File Structure:**
+```text
 ...

---

`88-94`: **Fix stale overhead comment and formula (no in_use_ field).**

Remove sizeof(bool) and describe actual members.


Apply:

```diff
-uint64_t CalculateEntryMemorySize(const StandbyForwardEntry *entry) const {
-    // protobuf size + overhead (bool in_use_ + uint64_t sequence_id_)
-    return entry->Message().ByteSizeLong() + sizeof(bool) + sizeof(uint64_t);
-}
+uint64_t CalculateEntryMemorySize(const StandbyForwardEntry *entry) const {
+    // protobuf size + overhead (uint64_t sequence_id_)
+    return entry->Message().ByteSizeLong() + sizeof(uint64_t);
+}
src/cc/cc_shard.cpp (1)

2532-2539: Fix memory accounting: remove nonexistent bool and stale comment.

Incorrect overhead skews eviction decisions.

Apply:

-uint64_t CcShard::CalculateEntryMemorySize(
-    const StandbyForwardEntry *entry) const
-{
-    assert(entry != nullptr);
-    // Memory size = protobuf message size + overhead
-    // Overhead: bool in_use_ + uint64_t sequence_id_ = 9 bytes
-    return entry->Message().ByteSizeLong() + sizeof(bool) + sizeof(uint64_t);
-}
+uint64_t CcShard::CalculateEntryMemorySize(
+    const StandbyForwardEntry *entry) const
+{
+    assert(entry != nullptr);
+    // Memory size = protobuf message size + overhead
+    // Overhead: uint64_t sequence_id_
+    return entry->Message().ByteSizeLong() + sizeof(uint64_t);
+}
🧹 Nitpick comments (2)
src/cc/cc_shard.cpp (2)

2549-2558: Free memory promptly after removing a candidate.

Call CheckAndFreeUnneededEntries() to drop entries that are no longer needed once a candidate is removed.

Apply:

 void CcShard::RemoveCandidateStandby(uint32_t node_id)
 {
     auto it = candidate_standby_nodes_.find(node_id);
     if (it != candidate_standby_nodes_.end())
     {
         candidate_standby_nodes_.erase(it);
         LOG(INFO) << "Removed candidate standby node " << node_id
                   << " from shard " << core_id_;
+        CheckAndFreeUnneededEntries();
     }
 }

2738-2756: Avoid repeated ByteSizeLong() calls in hot path.

Cache entry size once at enqueue time (store in StandbyForwardEntry) to reduce protobuf traversal during add/evict loops.

I can draft a small change adding a cached_size_ field in StandbyForwardEntry and using it here; say the word.

Also applies to: 2628-2634

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 9476630 and eb178a8.

📒 Files selected for processing (10)
  • .cursor_knowledge/standby-message-buffer-memory-bounded.md (1 hunks)
  • include/cc/catalog_cc_map.h (2 hunks)
  • include/cc/cc_entry.h (1 hunks)
  • include/cc/cc_shard.h (6 hunks)
  • include/cc/non_blocking_lock.h (4 hunks)
  • include/cc/object_cc_map.h (9 hunks)
  • include/standby.h (1 hunks)
  • src/cc/cc_shard.cpp (10 hunks)
  • src/cc/non_blocking_lock.cpp (1 hunks)
  • src/remote/cc_node_service.cpp (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/cc/catalog_cc_map.h
  • src/cc/non_blocking_lock.cpp
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • include/cc/cc_shard.h
  • src/cc/cc_shard.cpp
  • src/remote/cc_node_service.cpp
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • include/cc/cc_shard.h
  • include/cc/object_cc_map.h
  • include/cc/non_blocking_lock.h
  • src/cc/cc_shard.cpp
🧬 Code graph analysis (4)
include/cc/cc_entry.h (1)
src/cc/non_blocking_lock.cpp (4)
  • SetForwardEntry (753-757)
  • SetForwardEntry (753-754)
  • ReleaseForwardEntry (764-768)
  • ReleaseForwardEntry (765-765)
include/cc/cc_shard.h (1)
src/cc/cc_shard.cpp (10)
  • CcShard (58-200)
  • CcShard (202-210)
  • AddCandidateStandby (2541-2547)
  • AddCandidateStandby (2541-2541)
  • RemoveCandidateStandby (2549-2558)
  • RemoveCandidateStandby (2549-2549)
  • CheckAndFreeUnneededEntries (2560-2642)
  • CheckAndFreeUnneededEntries (2560-2560)
  • CalculateEntryMemorySize (2532-2539)
  • CalculateEntryMemorySize (2532-2533)
include/cc/non_blocking_lock.h (2)
include/cc/cc_entry.h (1)
  • SetForwardEntry (383-387)
src/cc/non_blocking_lock.cpp (4)
  • SetForwardEntry (753-757)
  • SetForwardEntry (753-754)
  • ReleaseForwardEntry (764-768)
  • ReleaseForwardEntry (765-765)
src/cc/cc_shard.cpp (1)
include/cc/cc_shard.h (3)
  • standby_buffer_memory_limit_ (1171-1171)
  • total_standby_buffer_memory_usage_ (1169-1169)
  • next_forward_sequence_id_ (1172-1172)
🪛 markdownlint-cli2 (0.18.1)
.cursor_knowledge/standby-message-buffer-memory-bounded.md

50-50: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (4)
include/cc/object_cc_map.h (1)

873-877: Ownership transfer of StandbyForwardEntry looks correct.

Make-unique + SetForwardEntry, then ReleaseForwardEntry to either forward or free is clean and leak-safe. LGTM.

Also applies to: 953-956, 1116-1119, 1135-1137, 1187-1195, 1393-1396, 1436-1438, 2432-2435

src/cc/cc_shard.cpp (3)

3012-3017: Reset path LGTM.

History cleared and counters reset; consistent with new model.


2648-2650: Forwarding ownership flow LGTM.

Taking ownership, sending, and conditional enqueue/evict are consistent and safe.

Also applies to: 2715-2796


2560-2642: Entry reclamation triggers integrated well.

Min-needed logic and post-retry/post-unsubscribe cleanup look correct.

Also applies to: 2899-2907, 2934-2938

Comment on lines +1075 to +1078
// Calculate memory size of a StandbyForwardEntry
// Returns: msg.ByteSizeLong() + sizeof(bool) + sizeof(uint64_t)
uint64_t CalculateEntryMemorySize(const StandbyForwardEntry *entry) const;

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Correct the memory-size comment for StandbyForwardEntry.

The comment still mentions a removed bool; reflect only sequence_id_.

Apply:

-// Calculate memory size of a StandbyForwardEntry
-// Returns: msg.ByteSizeLong() + sizeof(bool) + sizeof(uint64_t)
+// Calculate memory size of a StandbyForwardEntry
+// Returns: msg.ByteSizeLong() + sizeof(uint64_t)
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// Calculate memory size of a StandbyForwardEntry
// Returns: msg.ByteSizeLong() + sizeof(bool) + sizeof(uint64_t)
uint64_t CalculateEntryMemorySize(const StandbyForwardEntry *entry) const;
// Calculate memory size of a StandbyForwardEntry
// Returns: msg.ByteSizeLong() + sizeof(uint64_t)
uint64_t CalculateEntryMemorySize(const StandbyForwardEntry *entry) const;
🤖 Prompt for AI Agents
In include/cc/cc_shard.h around lines 1075 to 1078, the comment for
CalculateEntryMemorySize incorrectly mentions a removed bool; update the comment
to reflect that StandbyForwardEntry memory size is msg.ByteSizeLong() plus only
the size of sequence_id_ (uint64_t). Replace the Returns line with something
like "Returns: msg.ByteSizeLong() + sizeof(uint64_t)" or "Returns:
msg.ByteSizeLong() + sizeof(sequence_id_)" to accurately describe the
calculation.

Replace the fixed-size vector-based standby message buffer with a
memory-bounded queue that tracks memory usage and evicts oldest messages
when the limit is reached. This change improves memory efficiency by
allocating entries on-demand and freeing them immediately when no longer
needed.
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (4)
.cursor_knowledge/standby-message-buffer-memory-bounded.md (2)

50-68: Add language identifier to code fence.

The file structure code block lacks a language specification.

Apply this diff:

 **File Structure:**
-```
+```text
 tx_service/include/cc/
   - cc_shard.h (data structures, method declarations)
   ...

---

`88-94`: **Update memory calculation comment to reflect actual struct members.**

The comment references `in_use_` which was removed from `StandbyForwardEntry`. The struct only contains `sequence_id_` and `msg`.



Apply this diff:

```diff
 uint64_t CalculateEntryMemorySize(const StandbyForwardEntry *entry) const {
-    // protobuf size + overhead (bool in_use_ + uint64_t sequence_id_)
-    return entry->Message().ByteSizeLong() + sizeof(bool) + sizeof(uint64_t);
+    // protobuf size + overhead (uint64_t sequence_id_)
+    return entry->Message().ByteSizeLong() + sizeof(uint64_t);
 }
include/cc/cc_shard.h (1)

1075-1078: Correct the memory-size comment to match actual struct.

The comment incorrectly references a removed bool field. StandbyForwardEntry contains only sequence_id_ and msg.

Apply:

 // Calculate memory size of a StandbyForwardEntry
-// Returns: msg.ByteSizeLong() + sizeof(bool) + sizeof(uint64_t)
+// Returns: msg.ByteSizeLong() + sizeof(uint64_t)
src/cc/cc_shard.cpp (1)

2532-2539: Fix stale comment and size calculation

The comment references a removed in_use_ field. Also, accounting should exclude it.

-  // Overhead: bool in_use_ + uint64_t sequence_id_ = 9 bytes
-  return entry->Message().ByteSizeLong() + sizeof(bool) + sizeof(uint64_t);
+  // Overhead: uint64_t sequence_id_ (8 bytes)
+  return entry->Message().ByteSizeLong() + sizeof(uint64_t);
#!/bin/bash
# Ensure StandbyForwardEntry fields match the comment
rg -nU "struct\s+StandbyForwardEntry\b" -n -A40 include | sed -n '1,80p'
🧹 Nitpick comments (3)
include/cc/object_cc_map.h (1)

953-956: Pass unique_ptr directly to ForwardStandbyMessage (optional)

You release the ptr here and re-wrap inside ForwardStandbyMessage. Consider adding an overload that takes std::unique_ptr to avoid raw-pointer hops.

src/cc/cc_shard.cpp (2)

111-114: Avoid float math in limit calculation

Use integer math to prevent truncation surprises.

- standby_buffer_memory_limit_ =
-     (uint64_t) MB(node_memory_limit_mb) * 0.1 / core_cnt_;
+ standby_buffer_memory_limit_ =
+     (uint64_t)(MB(node_memory_limit_mb) / core_cnt_ / 10);

2648-2653: Take unique_ptr directly (optional)

Accept a std::unique_ptr<StandbyForwardEntry> parameter to avoid raw→unique_ptr round-trip here. Keep current overload for compatibility.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between eb178a8 and 4fad0a0.

📒 Files selected for processing (10)
  • .cursor_knowledge/standby-message-buffer-memory-bounded.md (1 hunks)
  • include/cc/catalog_cc_map.h (2 hunks)
  • include/cc/cc_entry.h (1 hunks)
  • include/cc/cc_shard.h (6 hunks)
  • include/cc/non_blocking_lock.h (4 hunks)
  • include/cc/object_cc_map.h (9 hunks)
  • include/standby.h (1 hunks)
  • src/cc/cc_shard.cpp (10 hunks)
  • src/cc/non_blocking_lock.cpp (1 hunks)
  • src/remote/cc_node_service.cpp (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • include/cc/catalog_cc_map.h
  • include/cc/cc_entry.h
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-11-11T07:10:40.346Z
Learnt from: lzxddz
Repo: eloqdata/tx_service PR: 199
File: include/cc/local_cc_shards.h:233-234
Timestamp: 2025-11-11T07:10:40.346Z
Learning: In the LocalCcShards class in include/cc/local_cc_shards.h, the EnqueueCcRequest methods use `shard_code & 0x3FF` followed by `% cc_shards_.size()` to distribute work across processor cores for load balancing. This is intentional and separate from partition ID calculation. The 0x3FF mask creates a consistent distribution range (0-1023) before modulo by actual core count.

Applied to files:

  • src/remote/cc_node_service.cpp
  • include/cc/cc_shard.h
  • src/cc/cc_shard.cpp
📚 Learning: 2025-10-09T03:56:58.811Z
Learnt from: thweetkomputer
Repo: eloqdata/tx_service PR: 150
File: include/cc/local_cc_shards.h:626-631
Timestamp: 2025-10-09T03:56:58.811Z
Learning: For the LocalCcShards class in include/cc/local_cc_shards.h: Writer locks (unique_lock) should continue using the original meta_data_mux_ (std::shared_mutex) rather than fast_meta_data_mux_ (FastMetaDataMutex) at this stage. Only reader locks may use the FastMetaDataMutex wrapper.

Applied to files:

  • include/cc/cc_shard.h
  • include/cc/object_cc_map.h
  • src/cc/cc_shard.cpp
  • include/cc/non_blocking_lock.h
  • src/cc/non_blocking_lock.cpp
  • .cursor_knowledge/standby-message-buffer-memory-bounded.md
🧬 Code graph analysis (4)
include/cc/cc_shard.h (1)
src/cc/cc_shard.cpp (10)
  • CcShard (58-200)
  • CcShard (202-210)
  • AddCandidateStandby (2541-2547)
  • AddCandidateStandby (2541-2541)
  • RemoveCandidateStandby (2549-2558)
  • RemoveCandidateStandby (2549-2549)
  • CheckAndFreeUnneededEntries (2560-2642)
  • CheckAndFreeUnneededEntries (2560-2560)
  • CalculateEntryMemorySize (2532-2539)
  • CalculateEntryMemorySize (2532-2533)
src/cc/cc_shard.cpp (1)
include/cc/cc_shard.h (3)
  • standby_buffer_memory_limit_ (1171-1171)
  • total_standby_buffer_memory_usage_ (1169-1169)
  • next_forward_sequence_id_ (1172-1172)
include/cc/non_blocking_lock.h (2)
include/cc/cc_entry.h (1)
  • SetForwardEntry (383-387)
src/cc/non_blocking_lock.cpp (4)
  • SetForwardEntry (753-757)
  • SetForwardEntry (753-754)
  • ReleaseForwardEntry (764-768)
  • ReleaseForwardEntry (765-765)
src/cc/non_blocking_lock.cpp (1)
include/cc/cc_entry.h (2)
  • SetForwardEntry (383-387)
  • ForwardEntry (377-381)
🪛 markdownlint-cli2 (0.18.1)
.cursor_knowledge/standby-message-buffer-memory-bounded.md

50-50: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🔇 Additional comments (26)
include/cc/cc_shard.h (3)

238-238: LGTM: Destructor properly manages new data structures.

The destructor appropriately cleans up the memory-bounded queue, lookup map, and candidate tracking structures introduced in this PR.


955-957: LGTM: New candidate standby lifecycle APIs are well-designed.

The public methods provide clear lifecycle management for candidate followers, preventing premature eviction of messages needed during the two-phase subscription process.


1160-1171: LGTM: Memory-bounded data structures are well-designed.

The ownership model is clear (deque owns via unique_ptr, map holds raw pointers), and the memory tracking fields enable proper budgeting and eviction.

include/standby.h (1)

42-43: LGTM: Proper initialization for new ownership model.

Setting out_of_sync to false and sequence_id_ to UINT64_MAX provides appropriate defaults for the exclusive-ownership lifecycle.

src/cc/non_blocking_lock.cpp (3)

753-757: LGTM: Proper ownership transfer into member.

The method correctly accepts a unique_ptr and transfers ownership via std::move.


759-762: LGTM: Non-owning accessor maintains compatibility.

Returns a raw pointer for read access without affecting ownership, maintaining the existing public API surface.


764-768: LGTM: Proper ownership transfer to caller.

The method correctly transfers ownership via std::move, allowing the caller to take exclusive ownership of the entry.

src/remote/cc_node_service.cpp (2)

1601-1656: LGTM: Robust rollback ensures atomic candidate tracking.

The rollback logic correctly removes candidate standbys from all previously successful shards when a later shard errors, preventing inconsistent partial-subscription state.


1863-1866: LGTM: Proper candidate-to-subscriber transition with cleanup.

The code correctly removes the candidate before subscribing. Note that AddSubscribedStandby internally calls CheckAndFreeUnneededEntries() (line 973 of cc_shard.h), ensuring entries no longer needed by this candidate are freed.

include/cc/object_cc_map.h (4)

873-877: Ownership transfer LGTM

Using std::unique_ptr for StandbyForwardEntry and moving into SetForwardEntry prevents leaks/double-frees.


1116-1118: Correct: release on Block/Unlock paths

Releasing and letting RAII free the forward entry on Block/Unlock avoids dangling buffers.
Please verify no downstream code relies on forward_entry after these branches.

Also applies to: 1135-1137


1192-1195: Commit path forwarding LGTM

ReleaseForwardEntry + forward call correctly hands ownership to shard sender.


1434-1438: Abort path cleanup LGTM

Releasing the forward entry on abort avoids stale per-key forwarding state.

include/cc/non_blocking_lock.h (4)

421-423: Reset uses unique_ptr correctly

forward_entry_.reset(); is correct and consistent with the new ownership model.


566-568: New API surface for ownership transfer is good

SetForwardEntry(std::unique_ptr<...>) + ReleaseForwardEntry() provide clear ownership semantics.


594-596: Member type change LGTM

Storing std::unique_ptr<StandbyForwardEntry> aligns storage with ownership; ForwardEntry() exposes a raw observer.


39-39: Forward-declaration approach requires additional refactoring for full feasibility

The suggestion to forward-declare struct StandbyForwardEntry; and move the destructor out-of-line is sound in principle for reducing rebuild surface. However, the Reset() method at lines 410-423 is inline and calls forward_entry_.reset() (line 421), which requires StandbyForwardEntry to be a complete type. Simply implementing the suggested changes would fail to compile.

To make this refactoring work, Reset() would also need to be moved out-of-line into the .cpp file alongside the destructor. This adds complexity beyond the suggested scope. The current state (with #include "standby.h") works correctly. Evaluate whether the rebuild reduction justifies the refactoring effort of moving multiple methods out-of-line.

src/cc/cc_shard.cpp (9)

202-211: Destructor is safe and clear

Properly clears history and maps to release memory on shutdown.


221-226: Init path reset LGTM

Resets standby history state and counters; good for clean startup.


2541-2548: Candidate add API LGTM

Lightweight map update and trace; fine.


2549-2558: Candidate remove API LGTM

Erases safely with logging.


2560-2642: Free only unneeded entries: logic looks correct

Computes min needed seq across subscribers and candidates; prunes from front accordingly. Good guard for empty state.


2696-2705: any_send_failed handling is correct

Properly tracks send failures to decide retention.


2738-2784: Eviction + candidate check LGTM

Uses >= comparison to drop only candidates that truly fell behind; fixes earlier bug.
Please confirm this replaced the prior <= condition everywhere.


2801-2805: Initialize stream sender before retry

Good defensive init.


2906-2907: Post-retry trimming is good

Runs CheckAndFreeUnneededEntries after retries to reclaim memory.

@liunyl liunyl merged commit d7e04da into main Nov 13, 2025
3 of 4 checks passed
@liunyl liunyl deleted the memory_bound branch November 13, 2025 13:27
@coderabbitai coderabbitai bot mentioned this pull request Dec 18, 2025
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug]: make standby forward msg buffer bounded by memory instead of number of records

3 participants