Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions src/message_pool/msgpool/local_store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2019-2026 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

//! Tracks local-wallet senders and the messages they have published.
//!
//! Only messages from these senders are eligible for republishing, and only
//! these messages are replayed into the pending store on `load_local`.

use ahash::HashSet;
use parking_lot::RwLock as SyncRwLock;

use crate::message::SignedMessage;
use crate::shim::address::Address;

#[derive(Default)]
pub(in crate::message_pool) struct LocalStore {
addrs: SyncRwLock<HashSet<Address>>,
msgs: SyncRwLock<HashSet<SignedMessage>>,
}

impl LocalStore {
pub(in crate::message_pool) fn new() -> Self {
Self::default()
}

pub(in crate::message_pool) fn add(&self, msg: SignedMessage, resolved_from: Address) {
self.addrs.write().insert(resolved_from);
self.msgs.write().insert(msg);
}

pub(in crate::message_pool) fn known_local_addrs(&self) -> HashSet<Address> {
self.addrs.read().clone()
}

pub(in crate::message_pool) fn snapshot_msgs(&self) -> Vec<SignedMessage> {
self.msgs.read().iter().cloned().collect()
}

pub(in crate::message_pool) fn remove_msg(&self, msg: &SignedMessage) {
self.msgs.write().remove(msg);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::message::MessageRead as _;
use crate::shim::econ::TokenAmount;
use crate::shim::message::Message as ShimMessage;

fn make_smsg(from: Address, seq: u64) -> SignedMessage {
SignedMessage::mock_bls_signed_message(ShimMessage {
from,
sequence: seq,
gas_premium: TokenAmount::from_atto(100u64),
gas_limit: 1_000_000,
..ShimMessage::default()
})
}

#[test]
fn add_records_address_and_message() {
let store = LocalStore::new();
let addr = Address::new_id(1);
let msg = make_smsg(addr, 0);

store.add(msg.clone(), addr);

assert_eq!(store.known_local_addrs(), vec![addr]);
let msgs = store.snapshot_msgs();
assert_eq!(msgs.len(), 1);
assert_eq!(msgs[0].sequence(), 0);
}

#[test]
fn add_appends_addresses_in_order() {
let store = LocalStore::new();
let a1 = Address::new_id(1);
let a2 = Address::new_id(2);

store.add(make_smsg(a1, 0), a1);
store.add(make_smsg(a2, 0), a2);

assert_eq!(store.known_local_addrs(), vec![a1, a2]);
}

#[test]
fn remove_msg_drops_only_the_named_message() {
let store = LocalStore::new();
let addr = Address::new_id(1);
let m0 = make_smsg(addr, 0);
let m1 = make_smsg(addr, 1);

store.add(m0.clone(), addr);
store.add(m1.clone(), addr);
store.remove_msg(&m0);

let remaining = store.snapshot_msgs();
assert_eq!(remaining.len(), 1);
assert_eq!(remaining[0].sequence(), 1);
}
}
43 changes: 18 additions & 25 deletions src/message_pool/msgpool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@
// SPDX-License-Identifier: Apache-2.0, MIT

pub(in crate::message_pool) mod events;
pub(in crate::message_pool) mod local_store;
pub(in crate::message_pool) mod metrics;
pub(in crate::message_pool) mod msg_pool;
pub(in crate::message_pool) mod msg_set;
pub(in crate::message_pool) mod pending_store;
pub(in crate::message_pool) mod provider;
pub(in crate::message_pool) mod republish;
pub mod selection;
#[cfg(test)]
pub mod test_provider;
Expand All @@ -26,18 +28,18 @@ use crate::state_manager::IdToAddressCache;
use crate::utils::ShallowClone as _;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::CidWrapper;
use ahash::{HashMap, HashMapExt, HashSet, HashSetExt};
use cid::Cid;
use ahash::{HashMap, HashMapExt};
use fvm_ipld_encoding::to_vec;
use parking_lot::RwLock as SyncRwLock;
use tracing::error;
use utils::{get_base_fee_lower_bound, recover_sig};

use super::errors::Error;
use crate::message_pool::msgpool::msg_pool::StateNonceCacheKey;
use crate::message_pool::{
msg_chain::{Chains, create_message_chains},
msg_pool::{StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key},
msgpool::pending_store::PendingStore,
msg_pool::{StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key},
msgpool::{local_store::LocalStore, pending_store::PendingStore, republish::RepublishState},
provider::Provider,
};

Expand All @@ -55,8 +57,8 @@ async fn republish_pending_messages<T>(
network_sender: &flume::Sender<NetworkMessage>,
pending_store: &PendingStore,
cur_tipset: &SyncRwLock<Tipset>,
republished: &SyncRwLock<HashSet<Cid>>,
local_addrs: &SyncRwLock<Vec<Address>>,
republish: &RepublishState,
local: &LocalStore,
key_cache: &IdToAddressCache,
chain_config: &ChainConfig,
) -> Result<(), Error>
Expand All @@ -66,12 +68,10 @@ where
let ts = cur_tipset.read().shallow_clone();
let mut pending_map: HashMap<Address, HashMap<u64, SignedMessage>> = HashMap::new();

republished.write().clear();

// Only republish messages from local addresses, ie. transactions which were
// sent to this node directly.
for actor in local_addrs.read().iter() {
let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| {
for actor in local.known_local_addrs() {
let Ok(resolved) = resolve_to_key(api, key_cache, &actor, &ts).inspect_err(|e| {
tracing::debug!(%actor, "republish: failed to resolve address: {e:#}");
}) else {
continue;
Expand All @@ -98,11 +98,8 @@ where
.map_err(|_| Error::Other("Network receiver dropped".to_string()))?;
}

let mut republished_t = HashSet::new();
for m in msgs.iter() {
republished_t.insert(m.cid());
}
*republished.write() = republished_t;
let republished_cids: Vec<_> = msgs.iter().map(|m| m.cid()).collect();
republish.replace_with(republished_cids);

Ok(())
}
Expand Down Expand Up @@ -217,11 +214,10 @@ where
/// The state nonce cache is naturally invalidated when the tipset changes, since
/// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey).
#[allow(clippy::too_many_arguments)]
pub(in crate::message_pool) async fn head_change<T>(
pub(in crate::message_pool) fn head_change<T>(
api: &T,
bls_sig_cache: &SizeTrackingLruCache<CidWrapper, Signature>,
repub_trigger: flume::Sender<()>,
republished: &SyncRwLock<HashSet<Cid>>,
republish: &RepublishState,
pending_store: &PendingStore,
cur_tipset: &SyncRwLock<Tipset>,
key_cache: &IdToAddressCache,
Expand Down Expand Up @@ -278,24 +274,21 @@ where

for msg in smsgs {
mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?;
if !repub && republished.write().insert(msg.cid()) {
if !repub && republish.was_republished(&msg.cid()) {
repub = true;
}
}
for msg in msgs {
mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?;
if !repub && republished.write().insert(msg.cid()) {
if !repub && republish.was_republished(&msg.cid()) {
repub = true;
Comment on lines 275 to 284
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

The republish trigger is checking the wrong condition.

mark_republished() returns true when the CID was not in the current-cycle set yet. Using that as the trigger condition flips the behavior: any previously unseen block message wakes the republisher, while a block that actually includes one of this cycle's republished messages does not. This needs a read-only membership check instead of an inserting check.

Suggested direction
- if !repub && republish.mark_republished(msg.cid()) {
+ if !repub && republish.was_republished(&msg.cid()) {
     repub = true;
 }

RepublishState::was_republished should read republished.contains(cid) without mutating the set.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/message_pool/msgpool/mod.rs` around lines 275 - 284, The republish
trigger is using RepublishState::mark_republished (which inserts) causing the
logic to wake on new CIDs instead of on CIDs already republished; change the
check to a read-only membership test by calling a new or existing
RepublishState::was_republished (implement it to return
republished.contains(cid) without mutating state) and use that in both loops
(the branches around mpool_ctx.remove_from_selected_msgs and the repub flag) so
you only set repub = true when the CID was already in the republished set.

}
}
}
*cur_tipset.write() = ts;
}
if repub {
repub_trigger
.send_async(())
.await
.map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?;
republish.trigger()?;
}
let cur_ts = cur_tipset.read().shallow_clone();
let mpool_ctx = MpoolCtx {
Expand Down Expand Up @@ -517,7 +510,7 @@ pub mod tests {
let sig = Signature::new_secp256k1(vec![]);
let signed = SignedMessage::new_unchecked(umsg, sig);
let cid = signed.cid();
pool.sig_val_cache.push(cid.into(), ());
pool.caches.sig_val.push(cid.into(), ());
signed
}

Expand Down
Loading
Loading