refactor: msg pool to make more structured part 2#7006
refactor: msg pool to make more structured part 2#7006akaladarshi wants to merge 4 commits intomainfrom
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Repository UI Review profile: CHILL Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
WalkthroughThis PR consolidates message pool cache and state management by introducing ChangesMessage Pool State Consolidation
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Comment |
dd74a63 to
a86d8c4
Compare
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/message_pool/msgpool/local_store.rs`:
- Around line 26-28: The add() method currently unconditionally appends
resolved_from to self.local_addrs causing duplicates; change add() to avoid
duplicates by inserting only if the address is not already present (e.g., check
self.local_addrs.read().contains(&resolved_from) or convert local_addrs to a
HashSet and insert), so known_local_addrs() no longer grows per-message and
republish_pending_messages() won't re-resolve the same sender repeatedly; update
any code that assumes a Vec to handle the new container if you switch to
HashSet.
In `@src/message_pool/msgpool/mod.rs`:
- Around line 275-284: The republish trigger is using
RepublishState::mark_republished (which inserts) causing the logic to wake on
new CIDs instead of on CIDs already republished; change the check to a read-only
membership test by calling a new or existing RepublishState::was_republished
(implement it to return republished.contains(cid) without mutating state) and
use that in both loops (the branches around mpool_ctx.remove_from_selected_msgs
and the repub flag) so you only set repub = true when the CID was already in the
republished set.
In `@src/message_pool/msgpool/msg_pool.rs`:
- Around line 485-493: The load_local() implementation iterates
LocalStore::snapshot_msgs() (a HashSet) in non-deterministic order which causes
add() to fail with sequencing errors (SequenceTooLow, NonceGap,
DuplicateSequence) and may silently drop messages; fix by collecting
snapshot_msgs() into a vector, sort it deterministically by sender and
message().sequence before iterating, then call self.add(...) for each; update
the add() error handling in the closure used in load_local() so SequenceTooLow
still triggers local.remove_msg(&k) but other errors are either logged/warned
(including error kind) and left in local_msgs (or retried) rather than silently
ignored, referencing load_local, LocalStore::snapshot_msgs, add,
local.remove_msg, and the Error variants
SequenceTooLow/NonceGap/DuplicateSequence.
In `@src/message_pool/msgpool/republish.rs`:
- Around line 39-44: The trigger() method currently uses
self.trigger.send_async(()).await which can await and block head_change() when
the 4-slot wakeup buffer is full; replace the await send with a non-blocking
self.trigger.try_send(()) and treat a Full error as a no-op (return Ok(()))
because a full buffer already indicates a pending wake, while mapping other
errors into Error::Other with the error details. Keep the function signature and
callers (head_change(), republish_pending_messages()) unchanged; only change
send_async() -> try_send() and handle TrySendError::Full by dropping the signal
and returning Ok(()) while converting other TrySendError variants into the
existing Error::Other format.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 36a9a2fb-5de4-4a0e-85d3-0e9d2a8e6759
📒 Files selected for processing (5)
src/message_pool/msgpool/local_store.rssrc/message_pool/msgpool/mod.rssrc/message_pool/msgpool/msg_pool.rssrc/message_pool/msgpool/republish.rssrc/message_pool/msgpool/selection.rs
| pub(in crate::message_pool) fn add(&self, msg: SignedMessage, resolved_from: Address) { | ||
| self.local_addrs.write().push(resolved_from); | ||
| self.local_msgs.write().insert(msg); |
There was a problem hiding this comment.
Deduplicate local senders on insert.
Line 27 appends every resolved_from, so known_local_addrs() grows with message count rather than sender count. republish_pending_messages() walks this list on every cycle, so one busy local account turns into unbounded duplicate address resolution and snapshot work.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/local_store.rs` around lines 26 - 28, The add()
method currently unconditionally appends resolved_from to self.local_addrs
causing duplicates; change add() to avoid duplicates by inserting only if the
address is not already present (e.g., check
self.local_addrs.read().contains(&resolved_from) or convert local_addrs to a
HashSet and insert), so known_local_addrs() no longer grows per-message and
republish_pending_messages() won't re-resolve the same sender repeatedly; update
any code that assumes a Vec to handle the new container if you switch to
HashSet.
| for msg in smsgs { | ||
| mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?; | ||
| if !repub && republished.write().insert(msg.cid()) { | ||
| if !repub && republish.mark_republished(msg.cid()) { | ||
| repub = true; | ||
| } | ||
| } | ||
| for msg in msgs { | ||
| mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?; | ||
| if !repub && republished.write().insert(msg.cid()) { | ||
| if !repub && republish.mark_republished(msg.cid()) { | ||
| repub = true; |
There was a problem hiding this comment.
The republish trigger is checking the wrong condition.
mark_republished() returns true when the CID was not in the current-cycle set yet. Using that as the trigger condition flips the behavior: any previously unseen block message wakes the republisher, while a block that actually includes one of this cycle's republished messages does not. This needs a read-only membership check instead of an inserting check.
Suggested direction
- if !repub && republish.mark_republished(msg.cid()) {
+ if !repub && republish.was_republished(&msg.cid()) {
repub = true;
}RepublishState::was_republished should read republished.contains(cid) without mutating the set.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/mod.rs` around lines 275 - 284, The republish
trigger is using RepublishState::mark_republished (which inserts) causing the
logic to wake on new CIDs instead of on CIDs already republished; change the
check to a read-only membership test by calling a new or existing
RepublishState::was_republished (implement it to return
republished.contains(cid) without mutating state) and use that in both loops
(the branches around mpool_ctx.remove_from_selected_msgs and the repub flag) so
you only set repub = true when the CID was already in the republished set.
| pub fn load_local(&self) -> Result<(), Error> { | ||
| for k in self.local.snapshot_msgs() { | ||
| self.add(k.clone()).unwrap_or_else(|err| { | ||
| if err == Error::SequenceTooLow { | ||
| warn!("error adding message: {:?}", err); | ||
| local_msgs.remove(&k); | ||
| self.local.remove_msg(&k); | ||
| } | ||
| }) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, check the structure of the message_pool module
find src/message_pool -name "*.rs" | head -20Repository: ChainSafe/forest
Length of output: 713
🏁 Script executed:
#!/bin/bash
# Examine the load_local method and surrounding context
sed -n '480,510p' src/message_pool/msgpool/msg_pool.rsRepository: ChainSafe/forest
Length of output: 865
🏁 Script executed:
#!/bin/bash
# Find LocalStore definition
fd "local_store" src/message_pool --type fRepository: ChainSafe/forest
Length of output: 100
🏁 Script executed:
#!/bin/bash
# Search for snapshot_msgs implementation
rg "snapshot_msgs" src/message_pool --context 5 -A 10Repository: ChainSafe/forest
Length of output: 2618
🏁 Script executed:
#!/bin/bash
# Find LocalStore struct definition to see what local_msgs is
sed -n '1,100p' src/message_pool/msgpool/local_store.rs | grep -A 50 "struct LocalStore"Repository: ChainSafe/forest
Length of output: 1568
🏁 Script executed:
#!/bin/bash
# Get more of local_store.rs to see the full struct and local_msgs type
head -80 src/message_pool/msgpool/local_store.rsRepository: ChainSafe/forest
Length of output: 2421
🏁 Script executed:
#!/bin/bash
# Check the add method to understand sequencing logic
sed -n '400,550p' src/message_pool/msgpool/msg_pool.rs | head -100Repository: ChainSafe/forest
Length of output: 3490
🏁 Script executed:
#!/bin/bash
# Find the MessagePool::add method implementation
rg "pub fn add\(" src/message_pool/msgpool/msg_pool.rs -A 50 | head -100Repository: ChainSafe/forest
Length of output: 1982
🏁 Script executed:
#!/bin/bash
# Also search for any sequence-related validation in add
rg "sequence" src/message_pool/msgpool/msg_pool.rs | head -30Repository: ChainSafe/forest
Length of output: 1652
🏁 Script executed:
#!/bin/bash
# Look at the Error enum to understand what errors can occur
cat src/message_pool/errors.rsRepository: ChainSafe/forest
Length of output: 1661
Sort local messages by sender and sequence before replaying on startup.
LocalStore::snapshot_msgs() returns messages from a HashSet, which iterates in non-deterministic order. Since add() enforces strict sequencing (if sequence > msg.message().sequence { return Err(SequenceTooLow) }), replaying out-of-sequence messages can fail with errors like NonceGap or DuplicateSequence. The closure only handles SequenceTooLow, so other errors are silently ignored—the message is not added to the pending store but remains in local_msgs, resulting in silent data loss on restart.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/msg_pool.rs` around lines 485 - 493, The
load_local() implementation iterates LocalStore::snapshot_msgs() (a HashSet) in
non-deterministic order which causes add() to fail with sequencing errors
(SequenceTooLow, NonceGap, DuplicateSequence) and may silently drop messages;
fix by collecting snapshot_msgs() into a vector, sort it deterministically by
sender and message().sequence before iterating, then call self.add(...) for
each; update the add() error handling in the closure used in load_local() so
SequenceTooLow still triggers local.remove_msg(&k) but other errors are either
logged/warned (including error kind) and left in local_msgs (or retried) rather
than silently ignored, referencing load_local, LocalStore::snapshot_msgs, add,
local.remove_msg, and the Error variants
SequenceTooLow/NonceGap/DuplicateSequence.
| pub(in crate::message_pool) async fn trigger(&self) -> Result<(), Error> { | ||
| self.trigger | ||
| .send_async(()) | ||
| .await | ||
| .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, explore the republish.rs file structure and find REPUB_TRIGGER_CAPACITY
cat -n src/message_pool/msgpool/republish.rs | head -60Repository: ChainSafe/forest
Length of output: 2234
🏁 Script executed:
#!/bin/bash
# Find the constant definition and full context of trigger channel
rg "REPUB_TRIGGER_CAPACITY" src/message_pool/ -A 2 -B 2Repository: ChainSafe/forest
Length of output: 763
🏁 Script executed:
#!/bin/bash
# Find where trigger() is called (especially from head_change)
rg "\.trigger\(\)" src/message_pool/ -B 3 -A 3Repository: ChainSafe/forest
Length of output: 1016
🏁 Script executed:
#!/bin/bash
# Look for the receiver side handling of the trigger channel
rg "trigger.*recv|recv.*trigger" src/message_pool/ -B 2 -A 5Repository: ChainSafe/forest
Length of output: 42
🏁 Script executed:
#!/bin/bash
# Find where the receiver (rx) from RepublishState::new() is used
rg "RepublishState::new" src/message_pool/ -A 10 -B 2Repository: ChainSafe/forest
Length of output: 3329
🏁 Script executed:
#!/bin/bash
# Search for where the trigger receiver is consumed
rg "rx\.recv|recv_async|republish.*rx" src/message_pool/ -B 3 -A 5Repository: ChainSafe/forest
Length of output: 1496
🏁 Script executed:
#!/bin/bash
# Look at the broader head_change context to understand the impact
rg "head_change" src/message_pool/msgpool/mod.rs -B 5 -A 30Repository: ChainSafe/forest
Length of output: 7696
🏁 Script executed:
#!/bin/bash
# Search for where repub_trigger_rx is used in msg_pool.rs
cat -n src/message_pool/msgpool/msg_pool.rs | grep -A 50 "repub_trigger_rx"Repository: ChainSafe/forest
Length of output: 4703
🏁 Script executed:
#!/bin/bash
# Search for republish task spawning or loop
rg "republish|repub.*spawn|spawn.*repub" src/message_pool/msgpool/msg_pool.rs -B 3 -A 10Repository: ChainSafe/forest
Length of output: 4949
🏁 Script executed:
#!/bin/bash
# Look for any async loop involving republish
rg "loop.*repub|repub.*loop|republish_loop" src/message_pool/ -B 5 -A 15Repository: ChainSafe/forest
Length of output: 1588
Use try_send() instead of send_async() to avoid blocking head-change processing on a full wakeup buffer.
trigger() is called from the head_change() loop to signal the republish task. Since this channel is purely a wakeup mechanism with a 4-slot buffer, using send_async() can unnecessarily block head-change processing if the buffer fills while the republish task is executing republish_pending_messages(). A full buffer already indicates the republish loop is scheduled to wake, so the signal should be dropped rather than block.
Proposed change
pub(in crate::message_pool) async fn trigger(&self) -> Result<(), Error> {
- self.trigger
- .send_async(())
- .await
- .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))
+ match self.trigger.try_send(()) {
+ Ok(()) | Err(flume::TrySendError::Full(())) => Ok(()),
+ Err(flume::TrySendError::Disconnected(())) => {
+ Err(Error::Other("Republish receiver dropped".to_owned()))
+ }
+ }
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| pub(in crate::message_pool) async fn trigger(&self) -> Result<(), Error> { | |
| self.trigger | |
| .send_async(()) | |
| .await | |
| .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) | |
| } | |
| pub(in crate::message_pool) async fn trigger(&self) -> Result<(), Error> { | |
| match self.trigger.try_send(()) { | |
| Ok(()) | Err(flume::TrySendError::Full(())) => Ok(()), | |
| Err(flume::TrySendError::Disconnected(())) => { | |
| Err(Error::Other("Republish receiver dropped".to_owned())) | |
| } | |
| } | |
| } |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/message_pool/msgpool/republish.rs` around lines 39 - 44, The trigger()
method currently uses self.trigger.send_async(()).await which can await and
block head_change() when the 4-slot wakeup buffer is full; replace the await
send with a non-blocking self.trigger.try_send(()) and treat a Full error as a
no-op (return Ok(())) because a full buffer already indicates a pending wake,
while mapping other errors into Error::Other with the error details. Keep the
function signature and callers (head_change(), republish_pending_messages())
unchanged; only change send_async() -> try_send() and handle TrySendError::Full
by dropping the signal and returning Ok(()) while converting other TrySendError
variants into the existing Error::Other format.
Codecov Report❌ Patch coverage is Additional details and impacted files
... and 5 files with indirect coverage changes Continue to review full report in Codecov by Sentry.
🚀 New features to boost your workflow:
|
Summary of changes
Changes introduced in this pull request:
This PR is part 2 of restructuring of msg pool, it contains:
This change should be applied on top of the refactor: msg pool to make more structured #6965
Next part will have major changes:
MessagePoolitself rather than each individual field, this will allow us to:headchangetrigger which will become part of theMessagePool, instead of being a free function with unlimited paramsMessagePool, instead of being a free function with unlimited paramsReference issue to close (if applicable)
Part of #7010
Other information and links
Change checklist
Outside contributions
Summary by CodeRabbit