From a5a84fb29f0700f74e033cce337d36f414d31bed Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 7 May 2026 12:43:03 +0530 Subject: [PATCH 1/4] introduce local store and cache structures to simply the msg pool --- src/message_pool/msgpool/local_store.rs | 107 +++++++++++++++++ src/message_pool/msgpool/mod.rs | 14 ++- src/message_pool/msgpool/msg_pool.rs | 152 +++++++++++++----------- src/message_pool/msgpool/selection.rs | 4 +- 4 files changed, 202 insertions(+), 75 deletions(-) create mode 100644 src/message_pool/msgpool/local_store.rs diff --git a/src/message_pool/msgpool/local_store.rs b/src/message_pool/msgpool/local_store.rs new file mode 100644 index 00000000000..d2ae087c80f --- /dev/null +++ b/src/message_pool/msgpool/local_store.rs @@ -0,0 +1,107 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Tracks local-wallet senders and the messages they've published. +//! +//! "Local" means "originated from a wallet on this node" — these are the +//! only senders whose pending messages get republished, and whose messages +//! get persisted across restarts. + +use ahash::HashSet; +use parking_lot::RwLock as SyncRwLock; + +use crate::message::SignedMessage; +use crate::shim::address::Address; + +#[allow(dead_code)] // wired up for use in a follow-up PR. +#[derive(Default)] +pub(in crate::message_pool) struct LocalStore { + /// Resolved-key addresses for which this node owns the signing key. + local_addrs: SyncRwLock>, + /// Locally-published messages, persisted across restarts. + local_msgs: SyncRwLock>, +} + +#[allow(dead_code)] // wired up for use in a follow-up PR. +impl LocalStore { + pub(in crate::message_pool) fn new() -> Self { + Self::default() + } + + pub(in crate::message_pool) fn add(&self, msg: SignedMessage, resolved_from: Address) { + self.local_addrs.write().push(resolved_from); + self.local_msgs.write().insert(msg); + } + + pub(in crate::message_pool) fn known_local_addrs(&self) -> Vec
{ + self.local_addrs.read().clone() + } + + pub(in crate::message_pool) fn snapshot_msgs(&self) -> Vec { + self.local_msgs.read().iter().cloned().collect() + } + + pub(in crate::message_pool) fn remove_msg(&self, msg: &SignedMessage) { + self.local_msgs.write().remove(msg); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message::MessageRead as _; + use crate::shim::econ::TokenAmount; + use crate::shim::message::Message as ShimMessage; + + fn make_smsg(from: Address, seq: u64) -> SignedMessage { + SignedMessage::mock_bls_signed_message(ShimMessage { + from, + sequence: seq, + gas_premium: TokenAmount::from_atto(100u64), + gas_limit: 1_000_000, + ..ShimMessage::default() + }) + } + + #[test] + fn add_records_address_and_message() { + let store = LocalStore::new(); + let addr = Address::new_id(1); + let msg = make_smsg(addr, 0); + + store.add(msg.clone(), addr); + + assert_eq!(store.known_local_addrs(), vec![addr]); + let msgs = store.snapshot_msgs(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sequence(), 0); + } + + #[test] + fn add_appends_addresses_in_order() { + let store = LocalStore::new(); + let a1 = Address::new_id(1); + let a2 = Address::new_id(2); + + store.add(make_smsg(a1, 0), a1); + store.add(make_smsg(a2, 0), a2); + + assert_eq!(store.known_local_addrs(), vec![a1, a2]); + } + + #[test] + fn remove_msg_drops_only_the_named_message() { + let store = LocalStore::new(); + let addr = Address::new_id(1); + let m0 = make_smsg(addr, 0); + let m1 = make_smsg(addr, 1); + + store.add(m0.clone(), addr); + store.add(m1.clone(), addr); + store.remove_msg(&m0); + + let remaining = store.snapshot_msgs(); + assert_eq!(remaining.len(), 1); + assert_eq!(remaining[0].sequence(), 1); + } +} diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 98a52b2ae02..f18a4ff4ba1 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT pub(in crate::message_pool) mod events; +pub(in crate::message_pool) mod local_store; pub(in crate::message_pool) mod metrics; pub(in crate::message_pool) mod msg_pool; pub(in crate::message_pool) mod msg_set; @@ -36,10 +37,11 @@ use utils::{get_base_fee_lower_bound, recover_sig}; use super::errors::Error; use crate::message_pool::{ msg_chain::{Chains, create_message_chains}, - msg_pool::{StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key}, - msgpool::pending_store::PendingStore, + msg_pool::{StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key}, + msgpool::{local_store::LocalStore, pending_store::PendingStore}, provider::Provider, }; +use crate::message_pool::msgpool::msg_pool::StateNonceCacheKey; const REPLACE_BY_FEE_RATIO: f32 = 1.25; const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64; @@ -56,7 +58,7 @@ async fn republish_pending_messages( pending_store: &PendingStore, cur_tipset: &SyncRwLock, republished: &SyncRwLock>, - local_addrs: &SyncRwLock>, + local: &LocalStore, key_cache: &IdToAddressCache, chain_config: &ChainConfig, ) -> Result<(), Error> @@ -70,8 +72,8 @@ where // Only republish messages from local addresses, ie. transactions which were // sent to this node directly. - for actor in local_addrs.read().iter() { - let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| { + for actor in local.known_local_addrs() { + let Ok(resolved) = resolve_to_key(api, key_cache, &actor, &ts).inspect_err(|e| { tracing::debug!(%actor, "republish: failed to resolve address: {e:#}"); }) else { continue; @@ -517,7 +519,7 @@ pub mod tests { let sig = Signature::new_secp256k1(vec![]); let signed = SignedMessage::new_unchecked(umsg, sig); let cid = signed.cid(); - pool.sig_val_cache.push(cid.into(), ()); + pool.caches.sig_val.push(cid.into(), ()); signed } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 8d27bbd0d02..fe48a996986 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -6,8 +6,8 @@ // inclusion in the chain. Messages are added either directly for locally // published messages or through pubsub propagation. -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; - +use std::{sync::Arc, time::Duration}; +use std::num::NonZeroUsize; use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; #[cfg(test)] @@ -27,13 +27,12 @@ use crate::state_manager::IdToAddressCache; use crate::state_manager::utils::is_valid_for_sending; use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; -use crate::utils::get_size::CidWrapper; +use crate::utils::get_size::{CidWrapper, GetSize}; use ahash::{HashSet, HashSetExt}; use anyhow::Context as _; use cid::Cid; use futures::StreamExt; use fvm_ipld_encoding::to_vec; -use get_size2::GetSize; use itertools::Itertools; use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; @@ -49,25 +48,16 @@ use crate::message_pool::{ errors::Error, head_change, msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore, + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, + events::MpoolUpdate, + local_store::LocalStore, + pending_store::PendingStore, recover_sig, republish_pending_messages, }, provider::Provider, utils::get_base_fee_lower_bound, }; -// LruCache sizes have been taken from the lotus implementation -const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize); -const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize); -const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize); -const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize); - -#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)] -pub(crate) struct StateNonceCacheKey { - tipset_key: TipsetKey, - addr: Address, -} - pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000; pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10; /// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent @@ -84,35 +74,69 @@ pub enum TrustPolicy { pub use super::msg_set::{MsgSetLimits, StrictnessPolicy}; +// LruCache sizes have been taken from the lotus implementation +const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize); +const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize); +const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize); +const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize); + +#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)] +pub(in crate::message_pool) struct StateNonceCacheKey { + pub tipset_key: TipsetKey, + pub addr: Address, +} + +/// The LRU caches owned by [`MessagePool`]. +#[allow(dead_code)] // wired up for use in a follow-up PR. +pub(in crate::message_pool) struct Caches { + /// BLS signatures keyed by message [`Cid`](cid::Cid). + pub bls_sig: SizeTrackingLruCache, + /// Already-verified signatures keyed by message [`Cid`](cid::Cid). + pub sig_val: SizeTrackingLruCache, + /// ID address → key address resolution cache. + pub key: IdToAddressCache, + /// State-nonce-after-tipset cache, keyed by `(TipsetKey, Address)`. + pub state_nonce: SizeTrackingLruCache, +} + +#[allow(dead_code)] // wired up for use in a follow-up PR. +impl Caches { + pub(in crate::message_pool) fn new() -> Self { + Self { + bls_sig: SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE), + sig_val: SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE), + key: SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE), + state_nonce: SizeTrackingLruCache::new_with_metrics( + "state_nonce".into(), + STATE_NONCE_CACHE_SIZE, + ), + } + } +} + /// This contains all necessary information needed for the message pool. /// Keeps track of messages to apply, as well as context needed for verifying /// transactions. pub struct MessagePool { - /// The local address of the client - local_addrs: Arc>>, /// Pending messages, keyed by resolved-key address, together with the /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`]. pub(in crate::message_pool) pending_store: PendingStore, + /// Bundled LRU caches shared by the message pool's hot paths. + pub(in crate::message_pool) caches: Caches, + /// Local-wallet sender state (resolved addresses + messages persisted + /// across restarts). + pub(in crate::message_pool) local: Arc, /// The current tipset (a set of blocks) pub cur_tipset: Arc>, /// The underlying provider pub api: Arc, /// Sender half to send messages to other components pub network_sender: flume::Sender, - /// A cache for BLS signature keyed by Cid - pub bls_sig_cache: SizeTrackingLruCache, - /// A cache for BLS signature keyed by Cid - pub sig_val_cache: SizeTrackingLruCache, - /// Cache for ID address ID to key address resolution. - pub key_cache: IdToAddressCache, - /// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`). - pub state_nonce_cache: SizeTrackingLruCache, /// A set of republished messages identified by their Cid pub republished: Arc>>, /// Acts as a signal to republish messages from the republished set of /// messages pub repub_trigger: flume::Sender<()>, - local_msgs: Arc>>, /// Configurable parameters of the message pool pub config: MpoolConfig, /// Chain configuration @@ -193,15 +217,14 @@ where } pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result { - resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts) + resolve_to_key(self.api.as_ref(), &self.caches.key, addr, cur_ts) } /// Add a signed message to the pool and its address. fn add_local(&self, m: SignedMessage) -> Result<(), Error> { let cur_ts = self.current_tipset(); let resolved = self.resolve_to_key(&m.from(), &cur_ts)?; - self.local_addrs.write().push(resolved); - self.local_msgs.write().insert(m); + self.local.add(m, resolved); Ok(()) } @@ -276,14 +299,14 @@ where fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> { let cid = msg.cid(); - if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) { + if let Some(()) = self.caches.sig_val.get_cloned(&(cid).into()) { return Ok(()); } msg.verify(self.chain_config.eth_chain_id) .map_err(|e| Error::Other(e.to_string()))?; - self.sig_val_cache.push(cid.into(), ()); + self.caches.sig_val.push(cid.into(), ()); Ok(()) } @@ -353,9 +376,9 @@ where let cur_ts = self.current_tipset(); add_helper( self.api.as_ref(), - &self.bls_sig_cache, + &self.caches.bls_sig, &self.pending_store, - &self.key_cache, + &self.caches.key, &cur_ts, msg, self.get_state_sequence(&from, &cur_ts)?, @@ -390,8 +413,8 @@ where fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result { get_state_sequence( self.api.as_ref(), - &self.key_cache, - &self.state_nonce_cache, + &self.caches.key, + &self.caches.state_nonce, addr, cur_ts, ) @@ -465,7 +488,7 @@ where msg_vec.append(smsgs.as_mut()); for msg in umsg { - let smsg = recover_sig(&self.bls_sig_cache, msg)?; + let smsg = recover_sig(&self.caches.bls_sig, msg)?; msg_vec.push(smsg) } } @@ -473,13 +496,12 @@ where } /// Loads local messages to the message pool to be applied. - pub fn load_local(&mut self) -> Result<(), Error> { - let mut local_msgs = self.local_msgs.write(); - for k in local_msgs.iter().cloned().collect_vec() { + pub fn load_local(&self) -> Result<(), Error> { + for k in self.local.snapshot_msgs() { self.add(k.clone()).unwrap_or_else(|err| { if err == Error::SequenceTooLow { warn!("error adding message: {:?}", err); - local_msgs.remove(&k); + self.local.remove_msg(&k); } }) } @@ -515,13 +537,13 @@ where { head_change( self.api.as_ref(), - &self.bls_sig_cache, + &self.caches.bls_sig, self.repub_trigger.clone(), self.republished.as_ref(), &self.pending_store, self.cur_tipset.as_ref(), - &self.key_cache, - &self.state_nonce_cache, + &self.caches.key, + &self.caches.state_nonce, revert, apply, ) @@ -544,7 +566,6 @@ where where T: Provider, { - let local_addrs = Arc::new(SyncRwLock::new(Vec::new())); // Per-actor limits are constant for the lifetime of this pool; capture // them once here rather than re-reading on every insert. let pending_store = PendingStore::new(MsgSetLimits::new( @@ -552,28 +573,16 @@ where api.max_untrusted_actor_pending_messages(), )); let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); - let bls_sig_cache = - SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE); - let sig_val_cache = - SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE); - let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE); - let state_nonce_cache = - SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE); - let local_msgs = Arc::new(SyncRwLock::new(HashSet::new())); let republished = Arc::new(SyncRwLock::new(HashSet::new())); let block_delay = chain_config.block_delay_secs; let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4); - let mut mp = MessagePool { - local_addrs, + let mp = MessagePool { pending_store, + caches: Caches::new(), + local: Arc::new(LocalStore::new()), cur_tipset: tipset, api: Arc::new(api), - bls_sig_cache, - sig_val_cache, - key_cache, - state_nonce_cache, - local_msgs, republished, config, network_sender, @@ -586,11 +595,11 @@ where let mut head_changes_rx = mp.api.subscribe_head_changes(); let api = mp.api.clone(); - let bls_sig_cache = mp.bls_sig_cache.shallow_clone(); + let bls_sig_cache = mp.caches.bls_sig.shallow_clone(); let pending_store = mp.pending_store.shallow_clone(); let republished = mp.republished.clone(); - let key_cache = mp.key_cache.shallow_clone(); - let state_nonce_cache = mp.state_nonce_cache.shallow_clone(); + let key_cache = mp.caches.key.shallow_clone(); + let state_nonce_cache = mp.caches.state_nonce.shallow_clone(); let current_ts = mp.cur_tipset.clone(); let repub_trigger = mp.repub_trigger.clone(); @@ -631,8 +640,8 @@ where let pending_store = mp.pending_store.shallow_clone(); let cur_tipset = mp.cur_tipset.clone(); let republished = mp.republished.clone(); - let local_addrs = mp.local_addrs.clone(); - let key_cache = mp.key_cache.shallow_clone(); + let local = mp.local.clone(); + let key_cache = mp.caches.key.shallow_clone(); let network_sender = Arc::new(mp.network_sender.clone()); let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs); // Reacts to republishing requests @@ -650,7 +659,7 @@ where &pending_store, cur_tipset.as_ref(), republished.as_ref(), - local_addrs.as_ref(), + local.as_ref(), &key_cache, &chain_config, ) @@ -762,6 +771,15 @@ mod tests { )) } + #[test] + fn caches_new_constructs_all_four_caches_empty() { + let caches = Caches::new(); + assert_eq!(caches.bls_sig.len(), 0); + assert_eq!(caches.sig_val.len(), 0); + assert_eq!(caches.key.len(), 0); + assert_eq!(caches.state_nonce.len(), 0); + } + // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M // gas limit. There are no limits on a single message. #[test] diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index f7cc9817464..b6d386aa314 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -652,9 +652,9 @@ where // Run head change to do reorg detection run_head_change( self.api.as_ref(), - &self.bls_sig_cache, + &self.caches.bls_sig, &self.pending_store, - &self.key_cache, + &self.caches.key, cur_ts.clone(), ts.clone(), &mut result, From a86d8c4151c6b0c965e828d541f0b71ec69dff20 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Thu, 7 May 2026 12:44:52 +0530 Subject: [PATCH 2/4] introduce replublish structure to simplify the msg pool --- src/message_pool/msgpool/local_store.rs | 11 +-- src/message_pool/msgpool/mod.rs | 31 +++----- src/message_pool/msgpool/msg_pool.rs | 84 ++++++++-------------- src/message_pool/msgpool/republish.rs | 95 +++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 84 deletions(-) create mode 100644 src/message_pool/msgpool/republish.rs diff --git a/src/message_pool/msgpool/local_store.rs b/src/message_pool/msgpool/local_store.rs index d2ae087c80f..202a5dc0409 100644 --- a/src/message_pool/msgpool/local_store.rs +++ b/src/message_pool/msgpool/local_store.rs @@ -1,11 +1,10 @@ // Copyright 2019-2026 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT -//! Tracks local-wallet senders and the messages they've published. +//! Tracks local-wallet senders and the messages they have published. //! -//! "Local" means "originated from a wallet on this node" — these are the -//! only senders whose pending messages get republished, and whose messages -//! get persisted across restarts. +//! Only messages from these senders are eligible for republishing, and only +//! these messages are replayed into the pending store on `load_local`. use ahash::HashSet; use parking_lot::RwLock as SyncRwLock; @@ -13,16 +12,12 @@ use parking_lot::RwLock as SyncRwLock; use crate::message::SignedMessage; use crate::shim::address::Address; -#[allow(dead_code)] // wired up for use in a follow-up PR. #[derive(Default)] pub(in crate::message_pool) struct LocalStore { - /// Resolved-key addresses for which this node owns the signing key. local_addrs: SyncRwLock>, - /// Locally-published messages, persisted across restarts. local_msgs: SyncRwLock>, } -#[allow(dead_code)] // wired up for use in a follow-up PR. impl LocalStore { pub(in crate::message_pool) fn new() -> Self { Self::default() diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index f18a4ff4ba1..f2997542193 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -8,6 +8,7 @@ pub(in crate::message_pool) mod msg_pool; pub(in crate::message_pool) mod msg_set; pub(in crate::message_pool) mod pending_store; pub(in crate::message_pool) mod provider; +pub(in crate::message_pool) mod republish; pub mod selection; #[cfg(test)] pub mod test_provider; @@ -27,21 +28,20 @@ use crate::state_manager::IdToAddressCache; use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; -use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; -use cid::Cid; +use ahash::{HashMap, HashMapExt}; use fvm_ipld_encoding::to_vec; use parking_lot::RwLock as SyncRwLock; use tracing::error; use utils::{get_base_fee_lower_bound, recover_sig}; use super::errors::Error; +use crate::message_pool::msgpool::msg_pool::StateNonceCacheKey; use crate::message_pool::{ msg_chain::{Chains, create_message_chains}, msg_pool::{StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key}, - msgpool::{local_store::LocalStore, pending_store::PendingStore}, + msgpool::{local_store::LocalStore, pending_store::PendingStore, republish::RepublishState}, provider::Provider, }; -use crate::message_pool::msgpool::msg_pool::StateNonceCacheKey; const REPLACE_BY_FEE_RATIO: f32 = 1.25; const RBF_NUM: u64 = ((REPLACE_BY_FEE_RATIO - 1f32) * 256f32) as u64; @@ -57,7 +57,7 @@ async fn republish_pending_messages( network_sender: &flume::Sender, pending_store: &PendingStore, cur_tipset: &SyncRwLock, - republished: &SyncRwLock>, + republish: &RepublishState, local: &LocalStore, key_cache: &IdToAddressCache, chain_config: &ChainConfig, @@ -68,8 +68,6 @@ where let ts = cur_tipset.read().shallow_clone(); let mut pending_map: HashMap> = HashMap::new(); - republished.write().clear(); - // Only republish messages from local addresses, ie. transactions which were // sent to this node directly. for actor in local.known_local_addrs() { @@ -100,11 +98,8 @@ where .map_err(|_| Error::Other("Network receiver dropped".to_string()))?; } - let mut republished_t = HashSet::new(); - for m in msgs.iter() { - republished_t.insert(m.cid()); - } - *republished.write() = republished_t; + let republished_cids: Vec<_> = msgs.iter().map(|m| m.cid()).collect(); + republish.replace_with(republished_cids); Ok(()) } @@ -222,8 +217,7 @@ where pub(in crate::message_pool) async fn head_change( api: &T, bls_sig_cache: &SizeTrackingLruCache, - repub_trigger: flume::Sender<()>, - republished: &SyncRwLock>, + republish: &RepublishState, pending_store: &PendingStore, cur_tipset: &SyncRwLock, key_cache: &IdToAddressCache, @@ -280,13 +274,13 @@ where for msg in smsgs { mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?; - if !repub && republished.write().insert(msg.cid()) { + if !repub && republish.mark_republished(msg.cid()) { repub = true; } } for msg in msgs { mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?; - if !repub && republished.write().insert(msg.cid()) { + if !repub && republish.mark_republished(msg.cid()) { repub = true; } } @@ -294,10 +288,7 @@ where *cur_tipset.write() = ts; } if repub { - repub_trigger - .send_async(()) - .await - .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?; + republish.trigger().await?; } let cur_ts = cur_tipset.read().shallow_clone(); let mpool_ctx = MpoolCtx { diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index fe48a996986..1c531e8258f 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -6,8 +6,6 @@ // inclusion in the chain. Messages are added either directly for locally // published messages or through pubsub propagation. -use std::{sync::Arc, time::Duration}; -use std::num::NonZeroUsize; use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; #[cfg(test)] @@ -28,7 +26,6 @@ use crate::state_manager::utils::is_valid_for_sending; use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::{CidWrapper, GetSize}; -use ahash::{HashSet, HashSetExt}; use anyhow::Context as _; use cid::Cid; use futures::StreamExt; @@ -36,6 +33,8 @@ use fvm_ipld_encoding::to_vec; use itertools::Itertools; use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; +use std::num::NonZeroUsize; +use std::{sync::Arc, time::Duration}; use tokio::{ sync::broadcast::{self, error::RecvError}, task::JoinSet, @@ -48,16 +47,26 @@ use crate::message_pool::{ errors::Error, head_change, msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, - events::MpoolUpdate, - local_store::LocalStore, - pending_store::PendingStore, - recover_sig, republish_pending_messages, + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, local_store::LocalStore, + pending_store::PendingStore, recover_sig, republish::RepublishState, + republish_pending_messages, }, provider::Provider, utils::get_base_fee_lower_bound, }; +// LruCache sizes have been taken from the lotus implementation +const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize); +const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize); +const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize); +const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize); + +#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)] +pub(crate) struct StateNonceCacheKey { + tipset_key: TipsetKey, + addr: Address, +} + pub const MAX_ACTOR_PENDING_MESSAGES: u64 = 1000; pub const MAX_UNTRUSTED_ACTOR_PENDING_MESSAGES: u64 = 10; /// Maximum size of a serialized message in bytes. This is an anti-DOS measure to prevent @@ -74,32 +83,14 @@ pub enum TrustPolicy { pub use super::msg_set::{MsgSetLimits, StrictnessPolicy}; -// LruCache sizes have been taken from the lotus implementation -const BLS_SIG_CACHE_SIZE: NonZeroUsize = nonzero!(40000usize); -const SIG_VAL_CACHE_SIZE: NonZeroUsize = nonzero!(32000usize); -const KEY_CACHE_SIZE: NonZeroUsize = nonzero!(1_048_576usize); -const STATE_NONCE_CACHE_SIZE: NonZeroUsize = nonzero!(32768usize); - -#[derive(Clone, Debug, Hash, PartialEq, Eq, GetSize)] -pub(in crate::message_pool) struct StateNonceCacheKey { - pub tipset_key: TipsetKey, - pub addr: Address, -} - -/// The LRU caches owned by [`MessagePool`]. -#[allow(dead_code)] // wired up for use in a follow-up PR. +/// LRU caches owned by [`MessagePool`]. pub(in crate::message_pool) struct Caches { - /// BLS signatures keyed by message [`Cid`](cid::Cid). pub bls_sig: SizeTrackingLruCache, - /// Already-verified signatures keyed by message [`Cid`](cid::Cid). pub sig_val: SizeTrackingLruCache, - /// ID address → key address resolution cache. pub key: IdToAddressCache, - /// State-nonce-after-tipset cache, keyed by `(TipsetKey, Address)`. pub state_nonce: SizeTrackingLruCache, } -#[allow(dead_code)] // wired up for use in a follow-up PR. impl Caches { pub(in crate::message_pool) fn new() -> Self { Self { @@ -121,10 +112,8 @@ pub struct MessagePool { /// Pending messages, keyed by resolved-key address, together with the /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`]. pub(in crate::message_pool) pending_store: PendingStore, - /// Bundled LRU caches shared by the message pool's hot paths. pub(in crate::message_pool) caches: Caches, - /// Local-wallet sender state (resolved addresses + messages persisted - /// across restarts). + /// Local-wallet sender store pub(in crate::message_pool) local: Arc, /// The current tipset (a set of blocks) pub cur_tipset: Arc>, @@ -132,11 +121,8 @@ pub struct MessagePool { pub api: Arc, /// Sender half to send messages to other components pub network_sender: flume::Sender, - /// A set of republished messages identified by their Cid - pub republished: Arc>>, - /// Acts as a signal to republish messages from the republished set of - /// messages - pub repub_trigger: flume::Sender<()>, + /// Republish coordination state + pub(in crate::message_pool) republish: Arc, /// Configurable parameters of the message pool pub config: MpoolConfig, /// Chain configuration @@ -538,8 +524,7 @@ where head_change( self.api.as_ref(), &self.caches.bls_sig, - self.repub_trigger.clone(), - self.republished.as_ref(), + self.republish.as_ref(), &self.pending_store, self.cur_tipset.as_ref(), &self.caches.key, @@ -573,20 +558,18 @@ where api.max_untrusted_actor_pending_messages(), )); let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); - let republished = Arc::new(SyncRwLock::new(HashSet::new())); let block_delay = chain_config.block_delay_secs; - let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4); + let (republish, repub_trigger_rx) = RepublishState::new(); let mp = MessagePool { pending_store, caches: Caches::new(), local: Arc::new(LocalStore::new()), cur_tipset: tipset, api: Arc::new(api), - republished, config, network_sender, - repub_trigger, + republish: Arc::new(republish), chain_config: Arc::clone(&chain_config), }; @@ -597,12 +580,11 @@ where let api = mp.api.clone(); let bls_sig_cache = mp.caches.bls_sig.shallow_clone(); let pending_store = mp.pending_store.shallow_clone(); - let republished = mp.republished.clone(); + let republish = mp.republish.clone(); let key_cache = mp.caches.key.shallow_clone(); let state_nonce_cache = mp.caches.state_nonce.shallow_clone(); let current_ts = mp.cur_tipset.clone(); - let repub_trigger = mp.repub_trigger.clone(); // Reacts to new HeadChanges services.spawn(async move { @@ -612,8 +594,7 @@ where if let Err(e) = head_change( api.as_ref(), &bls_sig_cache, - repub_trigger.clone(), - republished.as_ref(), + republish.as_ref(), &pending_store, ¤t_ts, &key_cache, @@ -639,7 +620,7 @@ where let api = mp.api.clone(); let pending_store = mp.pending_store.shallow_clone(); let cur_tipset = mp.cur_tipset.clone(); - let republished = mp.republished.clone(); + let republish = mp.republish.clone(); let local = mp.local.clone(); let key_cache = mp.caches.key.shallow_clone(); let network_sender = Arc::new(mp.network_sender.clone()); @@ -658,7 +639,7 @@ where network_sender.as_ref(), &pending_store, cur_tipset.as_ref(), - republished.as_ref(), + republish.as_ref(), local.as_ref(), &key_cache, &chain_config, @@ -771,15 +752,6 @@ mod tests { )) } - #[test] - fn caches_new_constructs_all_four_caches_empty() { - let caches = Caches::new(); - assert_eq!(caches.bls_sig.len(), 0); - assert_eq!(caches.sig_val.len(), 0); - assert_eq!(caches.key.len(), 0); - assert_eq!(caches.state_nonce.len(), 0); - } - // Regression test for https://github.com/ChainSafe/forest/pull/6118 which fixed a bogus 100M // gas limit. There are no limits on a single message. #[test] diff --git a/src/message_pool/msgpool/republish.rs b/src/message_pool/msgpool/republish.rs new file mode 100644 index 00000000000..526e2e801e5 --- /dev/null +++ b/src/message_pool/msgpool/republish.rs @@ -0,0 +1,95 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Tracks which CIDs were already broadcast in the current republish cycle +//! and exposes a trigger to wake the republish task early. + +use ahash::HashSet; +use cid::Cid; +use parking_lot::RwLock as SyncRwLock; + +use crate::message_pool::Error; + +const REPUB_TRIGGER_CAPACITY: usize = 4; + +pub(in crate::message_pool) struct RepublishState { + republished: SyncRwLock>, + trigger: flume::Sender<()>, +} + +impl RepublishState { + pub(in crate::message_pool) fn new() -> (Self, flume::Receiver<()>) { + let (trigger, rx) = flume::bounded(REPUB_TRIGGER_CAPACITY); + ( + Self { + republished: SyncRwLock::default(), + trigger, + }, + rx, + ) + } + + /// Returns `true` if the CID was newly inserted — callers use this to + /// decide whether to wake the republish loop. + pub(in crate::message_pool) fn mark_republished(&self, cid: Cid) -> bool { + self.republished.write().insert(cid) + } + + /// Wake the republish task early. + pub(in crate::message_pool) async fn trigger(&self) -> Result<(), Error> { + self.trigger + .send_async(()) + .await + .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) + } + + pub(in crate::message_pool) fn replace_with>(&self, cids: I) { + let mut set = self.republished.write(); + set.clear(); + set.extend(cids); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn mark_republished_returns_true_only_on_first_insert() { + let (state, _rx) = RepublishState::new(); + let cid = Cid::default(); + + assert!(state.mark_republished(cid), "first insert should be new"); + assert!( + !state.mark_republished(cid), + "second insert should be a no-op", + ); + } + + #[tokio::test] + async fn trigger_succeeds_when_receiver_is_alive() { + let (state, rx) = RepublishState::new(); + state.trigger().await.expect("send should succeed"); + rx.try_recv() + .expect("trigger should be observable on the receiver"); + } + + #[test] + fn replace_with_clears_then_inserts() { + let (state, _rx) = RepublishState::new(); + let prior = Cid::default(); + state.mark_republished(prior); + + state.replace_with(std::iter::empty()); + assert!( + state.mark_republished(prior), + "set should be empty after clear-and-extend with empty iter", + ); + + state.replace_with([prior]); + assert!( + !state.mark_republished(prior), + "prior CID should be present after replace_with", + ); + } +} From 5760ba25af5a269d98cc1b6fa020b5dbf52bd266 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 8 May 2026 14:01:31 +0530 Subject: [PATCH 3/4] address ai comments --- src/message_pool/msgpool/mod.rs | 8 ++-- src/message_pool/msgpool/msg_pool.rs | 9 ++--- src/message_pool/msgpool/republish.rs | 56 +++++++++++---------------- 3 files changed, 31 insertions(+), 42 deletions(-) diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index f2997542193..f45b33b22eb 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -214,7 +214,7 @@ where /// The state nonce cache is naturally invalidated when the tipset changes, since /// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey). #[allow(clippy::too_many_arguments)] -pub(in crate::message_pool) async fn head_change( +pub(in crate::message_pool) fn head_change( api: &T, bls_sig_cache: &SizeTrackingLruCache, republish: &RepublishState, @@ -274,13 +274,13 @@ where for msg in smsgs { mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?; - if !repub && republish.mark_republished(msg.cid()) { + if !repub && republish.was_republished(&msg.cid()) { repub = true; } } for msg in msgs { mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?; - if !repub && republish.mark_republished(msg.cid()) { + if !repub && republish.was_republished(&msg.cid()) { repub = true; } } @@ -288,7 +288,7 @@ where *cur_tipset.write() = ts; } if repub { - republish.trigger().await?; + republish.trigger()?; } let cur_ts = cur_tipset.read().shallow_clone(); let mpool_ctx = MpoolCtx { diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 1c531e8258f..ee07c872680 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -40,7 +40,7 @@ use tokio::{ task::JoinSet, time::interval, }; -use tracing::warn; +use tracing::{error, warn}; use crate::message_pool::{ config::MpoolConfig, @@ -488,6 +488,8 @@ where if err == Error::SequenceTooLow { warn!("error adding message: {:?}", err); self.local.remove_msg(&k); + } else { + error!("error adding local message: {:?}", err); } }) } @@ -532,7 +534,6 @@ where revert, apply, ) - .await } } @@ -601,9 +602,7 @@ where &state_nonce_cache, reverts, applies, - ) - .await - { + ) { tracing::warn!("Error changing head: {e}"); } } diff --git a/src/message_pool/msgpool/republish.rs b/src/message_pool/msgpool/republish.rs index 526e2e801e5..595ff1705d3 100644 --- a/src/message_pool/msgpool/republish.rs +++ b/src/message_pool/msgpool/republish.rs @@ -10,7 +10,7 @@ use parking_lot::RwLock as SyncRwLock; use crate::message_pool::Error; -const REPUB_TRIGGER_CAPACITY: usize = 4; +const REPUB_TRIGGER_CAPACITY: usize = 1; pub(in crate::message_pool) struct RepublishState { republished: SyncRwLock>, @@ -29,17 +29,15 @@ impl RepublishState { ) } - /// Returns `true` if the CID was newly inserted — callers use this to - /// decide whether to wake the republish loop. - pub(in crate::message_pool) fn mark_republished(&self, cid: Cid) -> bool { - self.republished.write().insert(cid) + /// Returns `true` if `cid` was seen by the republished state. + pub(in crate::message_pool) fn was_republished(&self, cid: &Cid) -> bool { + self.republished.read().contains(cid) } /// Wake the republish task early. - pub(in crate::message_pool) async fn trigger(&self) -> Result<(), Error> { + pub(in crate::message_pool) fn trigger(&self) -> Result<(), Error> { self.trigger - .send_async(()) - .await + .try_send(()) .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) } @@ -55,41 +53,33 @@ mod tests { use super::*; #[test] - fn mark_republished_returns_true_only_on_first_insert() { + fn was_republished_reflects_replace_with() { let (state, _rx) = RepublishState::new(); let cid = Cid::default(); - assert!(state.mark_republished(cid), "first insert should be new"); assert!( - !state.mark_republished(cid), - "second insert should be a no-op", + !state.was_republished(&cid), + "fresh state should not contain any CIDs", ); - } - - #[tokio::test] - async fn trigger_succeeds_when_receiver_is_alive() { - let (state, rx) = RepublishState::new(); - state.trigger().await.expect("send should succeed"); - rx.try_recv() - .expect("trigger should be observable on the receiver"); - } - - #[test] - fn replace_with_clears_then_inserts() { - let (state, _rx) = RepublishState::new(); - let prior = Cid::default(); - state.mark_republished(prior); - state.replace_with(std::iter::empty()); + state.replace_with([cid]); assert!( - state.mark_republished(prior), - "set should be empty after clear-and-extend with empty iter", + state.was_republished(&cid), + "replace_with should populate the set", ); - state.replace_with([prior]); + state.replace_with(std::iter::empty()); assert!( - !state.mark_republished(prior), - "prior CID should be present after replace_with", + !state.was_republished(&cid), + "replace_with with empty iter should clear the set", ); } + + #[test] + fn trigger_succeeds_when_receiver_is_alive() { + let (state, rx) = RepublishState::new(); + state.trigger().expect("send should succeed"); + rx.try_recv() + .expect("trigger should be observable on the receiver"); + } } From 4861a318f8e7907d0cb44c4e3691b0e9d8f1dba6 Mon Sep 17 00:00:00 2001 From: Aryan Tikarya Date: Fri, 8 May 2026 14:02:56 +0530 Subject: [PATCH 4/4] change local store addr to hashset so we don't have to republish the same message again --- src/message_pool/msgpool/local_store.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/message_pool/msgpool/local_store.rs b/src/message_pool/msgpool/local_store.rs index 202a5dc0409..11f2a4c621e 100644 --- a/src/message_pool/msgpool/local_store.rs +++ b/src/message_pool/msgpool/local_store.rs @@ -14,8 +14,8 @@ use crate::shim::address::Address; #[derive(Default)] pub(in crate::message_pool) struct LocalStore { - local_addrs: SyncRwLock>, - local_msgs: SyncRwLock>, + addrs: SyncRwLock>, + msgs: SyncRwLock>, } impl LocalStore { @@ -24,20 +24,20 @@ impl LocalStore { } pub(in crate::message_pool) fn add(&self, msg: SignedMessage, resolved_from: Address) { - self.local_addrs.write().push(resolved_from); - self.local_msgs.write().insert(msg); + self.addrs.write().insert(resolved_from); + self.msgs.write().insert(msg); } - pub(in crate::message_pool) fn known_local_addrs(&self) -> Vec
{ - self.local_addrs.read().clone() + pub(in crate::message_pool) fn known_local_addrs(&self) -> HashSet
{ + self.addrs.read().clone() } pub(in crate::message_pool) fn snapshot_msgs(&self) -> Vec { - self.local_msgs.read().iter().cloned().collect() + self.msgs.read().iter().cloned().collect() } pub(in crate::message_pool) fn remove_msg(&self, msg: &SignedMessage) { - self.local_msgs.write().remove(msg); + self.msgs.write().remove(msg); } }