diff --git a/src/message_pool/msgpool/local_store.rs b/src/message_pool/msgpool/local_store.rs new file mode 100644 index 00000000000..11f2a4c621e --- /dev/null +++ b/src/message_pool/msgpool/local_store.rs @@ -0,0 +1,102 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Tracks local-wallet senders and the messages they have published. +//! +//! Only messages from these senders are eligible for republishing, and only +//! these messages are replayed into the pending store on `load_local`. + +use ahash::HashSet; +use parking_lot::RwLock as SyncRwLock; + +use crate::message::SignedMessage; +use crate::shim::address::Address; + +#[derive(Default)] +pub(in crate::message_pool) struct LocalStore { + addrs: SyncRwLock>, + msgs: SyncRwLock>, +} + +impl LocalStore { + pub(in crate::message_pool) fn new() -> Self { + Self::default() + } + + pub(in crate::message_pool) fn add(&self, msg: SignedMessage, resolved_from: Address) { + self.addrs.write().insert(resolved_from); + self.msgs.write().insert(msg); + } + + pub(in crate::message_pool) fn known_local_addrs(&self) -> HashSet
{ + self.addrs.read().clone() + } + + pub(in crate::message_pool) fn snapshot_msgs(&self) -> Vec { + self.msgs.read().iter().cloned().collect() + } + + pub(in crate::message_pool) fn remove_msg(&self, msg: &SignedMessage) { + self.msgs.write().remove(msg); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::message::MessageRead as _; + use crate::shim::econ::TokenAmount; + use crate::shim::message::Message as ShimMessage; + + fn make_smsg(from: Address, seq: u64) -> SignedMessage { + SignedMessage::mock_bls_signed_message(ShimMessage { + from, + sequence: seq, + gas_premium: TokenAmount::from_atto(100u64), + gas_limit: 1_000_000, + ..ShimMessage::default() + }) + } + + #[test] + fn add_records_address_and_message() { + let store = LocalStore::new(); + let addr = Address::new_id(1); + let msg = make_smsg(addr, 0); + + store.add(msg.clone(), addr); + + assert_eq!(store.known_local_addrs(), vec![addr]); + let msgs = store.snapshot_msgs(); + assert_eq!(msgs.len(), 1); + assert_eq!(msgs[0].sequence(), 0); + } + + #[test] + fn add_appends_addresses_in_order() { + let store = LocalStore::new(); + let a1 = Address::new_id(1); + let a2 = Address::new_id(2); + + store.add(make_smsg(a1, 0), a1); + store.add(make_smsg(a2, 0), a2); + + assert_eq!(store.known_local_addrs(), vec![a1, a2]); + } + + #[test] + fn remove_msg_drops_only_the_named_message() { + let store = LocalStore::new(); + let addr = Address::new_id(1); + let m0 = make_smsg(addr, 0); + let m1 = make_smsg(addr, 1); + + store.add(m0.clone(), addr); + store.add(m1.clone(), addr); + store.remove_msg(&m0); + + let remaining = store.snapshot_msgs(); + assert_eq!(remaining.len(), 1); + assert_eq!(remaining[0].sequence(), 1); + } +} diff --git a/src/message_pool/msgpool/mod.rs b/src/message_pool/msgpool/mod.rs index 98a52b2ae02..f45b33b22eb 100644 --- a/src/message_pool/msgpool/mod.rs +++ b/src/message_pool/msgpool/mod.rs @@ -2,11 +2,13 @@ // SPDX-License-Identifier: Apache-2.0, MIT pub(in crate::message_pool) mod events; +pub(in crate::message_pool) mod local_store; pub(in crate::message_pool) mod metrics; pub(in crate::message_pool) mod msg_pool; pub(in crate::message_pool) mod msg_set; pub(in crate::message_pool) mod pending_store; pub(in crate::message_pool) mod provider; +pub(in crate::message_pool) mod republish; pub mod selection; #[cfg(test)] pub mod test_provider; @@ -26,18 +28,18 @@ use crate::state_manager::IdToAddressCache; use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; use crate::utils::get_size::CidWrapper; -use ahash::{HashMap, HashMapExt, HashSet, HashSetExt}; -use cid::Cid; +use ahash::{HashMap, HashMapExt}; use fvm_ipld_encoding::to_vec; use parking_lot::RwLock as SyncRwLock; use tracing::error; use utils::{get_base_fee_lower_bound, recover_sig}; use super::errors::Error; +use crate::message_pool::msgpool::msg_pool::StateNonceCacheKey; use crate::message_pool::{ msg_chain::{Chains, create_message_chains}, - msg_pool::{StateNonceCacheKey, StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key}, - msgpool::pending_store::PendingStore, + msg_pool::{StrictnessPolicy, TrustPolicy, add_helper, resolve_to_key}, + msgpool::{local_store::LocalStore, pending_store::PendingStore, republish::RepublishState}, provider::Provider, }; @@ -55,8 +57,8 @@ async fn republish_pending_messages( network_sender: &flume::Sender, pending_store: &PendingStore, cur_tipset: &SyncRwLock, - republished: &SyncRwLock>, - local_addrs: &SyncRwLock>, + republish: &RepublishState, + local: &LocalStore, key_cache: &IdToAddressCache, chain_config: &ChainConfig, ) -> Result<(), Error> @@ -66,12 +68,10 @@ where let ts = cur_tipset.read().shallow_clone(); let mut pending_map: HashMap> = HashMap::new(); - republished.write().clear(); - // Only republish messages from local addresses, ie. transactions which were // sent to this node directly. - for actor in local_addrs.read().iter() { - let Ok(resolved) = resolve_to_key(api, key_cache, actor, &ts).inspect_err(|e| { + for actor in local.known_local_addrs() { + let Ok(resolved) = resolve_to_key(api, key_cache, &actor, &ts).inspect_err(|e| { tracing::debug!(%actor, "republish: failed to resolve address: {e:#}"); }) else { continue; @@ -98,11 +98,8 @@ where .map_err(|_| Error::Other("Network receiver dropped".to_string()))?; } - let mut republished_t = HashSet::new(); - for m in msgs.iter() { - republished_t.insert(m.cid()); - } - *republished.write() = republished_t; + let republished_cids: Vec<_> = msgs.iter().map(|m| m.cid()).collect(); + republish.replace_with(republished_cids); Ok(()) } @@ -217,11 +214,10 @@ where /// The state nonce cache is naturally invalidated when the tipset changes, since /// it is keyed by [`TipsetKey`](crate::blocks::TipsetKey). #[allow(clippy::too_many_arguments)] -pub(in crate::message_pool) async fn head_change( +pub(in crate::message_pool) fn head_change( api: &T, bls_sig_cache: &SizeTrackingLruCache, - repub_trigger: flume::Sender<()>, - republished: &SyncRwLock>, + republish: &RepublishState, pending_store: &PendingStore, cur_tipset: &SyncRwLock, key_cache: &IdToAddressCache, @@ -278,13 +274,13 @@ where for msg in smsgs { mpool_ctx.remove_from_selected_msgs(&msg.from(), msg.sequence(), &mut rmsgs)?; - if !repub && republished.write().insert(msg.cid()) { + if !repub && republish.was_republished(&msg.cid()) { repub = true; } } for msg in msgs { mpool_ctx.remove_from_selected_msgs(&msg.from, msg.sequence, &mut rmsgs)?; - if !repub && republished.write().insert(msg.cid()) { + if !repub && republish.was_republished(&msg.cid()) { repub = true; } } @@ -292,10 +288,7 @@ where *cur_tipset.write() = ts; } if repub { - repub_trigger - .send_async(()) - .await - .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}")))?; + republish.trigger()?; } let cur_ts = cur_tipset.read().shallow_clone(); let mpool_ctx = MpoolCtx { @@ -517,7 +510,7 @@ pub mod tests { let sig = Signature::new_secp256k1(vec![]); let signed = SignedMessage::new_unchecked(umsg, sig); let cid = signed.cid(); - pool.sig_val_cache.push(cid.into(), ()); + pool.caches.sig_val.push(cid.into(), ()); signed } diff --git a/src/message_pool/msgpool/msg_pool.rs b/src/message_pool/msgpool/msg_pool.rs index 8d27bbd0d02..ee07c872680 100644 --- a/src/message_pool/msgpool/msg_pool.rs +++ b/src/message_pool/msgpool/msg_pool.rs @@ -6,8 +6,6 @@ // inclusion in the chain. Messages are added either directly for locally // published messages or through pubsub propagation. -use std::{num::NonZeroUsize, sync::Arc, time::Duration}; - use crate::blocks::{CachingBlockHeader, Tipset, TipsetKey}; use crate::chain::{HeadChanges, MINIMUM_BASE_FEE}; #[cfg(test)] @@ -27,30 +25,31 @@ use crate::state_manager::IdToAddressCache; use crate::state_manager::utils::is_valid_for_sending; use crate::utils::ShallowClone as _; use crate::utils::cache::SizeTrackingLruCache; -use crate::utils::get_size::CidWrapper; -use ahash::{HashSet, HashSetExt}; +use crate::utils::get_size::{CidWrapper, GetSize}; use anyhow::Context as _; use cid::Cid; use futures::StreamExt; use fvm_ipld_encoding::to_vec; -use get_size2::GetSize; use itertools::Itertools; use nonzero_ext::nonzero; use parking_lot::RwLock as SyncRwLock; +use std::num::NonZeroUsize; +use std::{sync::Arc, time::Duration}; use tokio::{ sync::broadcast::{self, error::RecvError}, task::JoinSet, time::interval, }; -use tracing::warn; +use tracing::{error, warn}; use crate::message_pool::{ config::MpoolConfig, errors::Error, head_change, msgpool::{ - BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, pending_store::PendingStore, - recover_sig, republish_pending_messages, + BASE_FEE_LOWER_BOUND_FACTOR_CONSERVATIVE, events::MpoolUpdate, local_store::LocalStore, + pending_store::PendingStore, recover_sig, republish::RepublishState, + republish_pending_messages, }, provider::Provider, utils::get_base_fee_lower_bound, @@ -84,35 +83,46 @@ pub enum TrustPolicy { pub use super::msg_set::{MsgSetLimits, StrictnessPolicy}; +/// LRU caches owned by [`MessagePool`]. +pub(in crate::message_pool) struct Caches { + pub bls_sig: SizeTrackingLruCache, + pub sig_val: SizeTrackingLruCache, + pub key: IdToAddressCache, + pub state_nonce: SizeTrackingLruCache, +} + +impl Caches { + pub(in crate::message_pool) fn new() -> Self { + Self { + bls_sig: SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE), + sig_val: SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE), + key: SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE), + state_nonce: SizeTrackingLruCache::new_with_metrics( + "state_nonce".into(), + STATE_NONCE_CACHE_SIZE, + ), + } + } +} + /// This contains all necessary information needed for the message pool. /// Keeps track of messages to apply, as well as context needed for verifying /// transactions. pub struct MessagePool { - /// The local address of the client - local_addrs: Arc>>, /// Pending messages, keyed by resolved-key address, together with the /// broadcast channel for [`MpoolUpdate`] events. See [`PendingStore`]. pub(in crate::message_pool) pending_store: PendingStore, + pub(in crate::message_pool) caches: Caches, + /// Local-wallet sender store + pub(in crate::message_pool) local: Arc, /// The current tipset (a set of blocks) pub cur_tipset: Arc>, /// The underlying provider pub api: Arc, /// Sender half to send messages to other components pub network_sender: flume::Sender, - /// A cache for BLS signature keyed by Cid - pub bls_sig_cache: SizeTrackingLruCache, - /// A cache for BLS signature keyed by Cid - pub sig_val_cache: SizeTrackingLruCache, - /// Cache for ID address ID to key address resolution. - pub key_cache: IdToAddressCache, - /// Cache for state nonce lookups keyed by (`TipsetKey`, `Address`). - pub state_nonce_cache: SizeTrackingLruCache, - /// A set of republished messages identified by their Cid - pub republished: Arc>>, - /// Acts as a signal to republish messages from the republished set of - /// messages - pub repub_trigger: flume::Sender<()>, - local_msgs: Arc>>, + /// Republish coordination state + pub(in crate::message_pool) republish: Arc, /// Configurable parameters of the message pool pub config: MpoolConfig, /// Chain configuration @@ -193,15 +203,14 @@ where } pub fn resolve_to_key(&self, addr: &Address, cur_ts: &Tipset) -> Result { - resolve_to_key(self.api.as_ref(), &self.key_cache, addr, cur_ts) + resolve_to_key(self.api.as_ref(), &self.caches.key, addr, cur_ts) } /// Add a signed message to the pool and its address. fn add_local(&self, m: SignedMessage) -> Result<(), Error> { let cur_ts = self.current_tipset(); let resolved = self.resolve_to_key(&m.from(), &cur_ts)?; - self.local_addrs.write().push(resolved); - self.local_msgs.write().insert(m); + self.local.add(m, resolved); Ok(()) } @@ -276,14 +285,14 @@ where fn verify_msg_sig(&self, msg: &SignedMessage) -> Result<(), Error> { let cid = msg.cid(); - if let Some(()) = self.sig_val_cache.get_cloned(&(cid).into()) { + if let Some(()) = self.caches.sig_val.get_cloned(&(cid).into()) { return Ok(()); } msg.verify(self.chain_config.eth_chain_id) .map_err(|e| Error::Other(e.to_string()))?; - self.sig_val_cache.push(cid.into(), ()); + self.caches.sig_val.push(cid.into(), ()); Ok(()) } @@ -353,9 +362,9 @@ where let cur_ts = self.current_tipset(); add_helper( self.api.as_ref(), - &self.bls_sig_cache, + &self.caches.bls_sig, &self.pending_store, - &self.key_cache, + &self.caches.key, &cur_ts, msg, self.get_state_sequence(&from, &cur_ts)?, @@ -390,8 +399,8 @@ where fn get_state_sequence(&self, addr: &Address, cur_ts: &Tipset) -> Result { get_state_sequence( self.api.as_ref(), - &self.key_cache, - &self.state_nonce_cache, + &self.caches.key, + &self.caches.state_nonce, addr, cur_ts, ) @@ -465,7 +474,7 @@ where msg_vec.append(smsgs.as_mut()); for msg in umsg { - let smsg = recover_sig(&self.bls_sig_cache, msg)?; + let smsg = recover_sig(&self.caches.bls_sig, msg)?; msg_vec.push(smsg) } } @@ -473,13 +482,14 @@ where } /// Loads local messages to the message pool to be applied. - pub fn load_local(&mut self) -> Result<(), Error> { - let mut local_msgs = self.local_msgs.write(); - for k in local_msgs.iter().cloned().collect_vec() { + pub fn load_local(&self) -> Result<(), Error> { + for k in self.local.snapshot_msgs() { self.add(k.clone()).unwrap_or_else(|err| { if err == Error::SequenceTooLow { warn!("error adding message: {:?}", err); - local_msgs.remove(&k); + self.local.remove_msg(&k); + } else { + error!("error adding local message: {:?}", err); } }) } @@ -515,17 +525,15 @@ where { head_change( self.api.as_ref(), - &self.bls_sig_cache, - self.repub_trigger.clone(), - self.republished.as_ref(), + &self.caches.bls_sig, + self.republish.as_ref(), &self.pending_store, self.cur_tipset.as_ref(), - &self.key_cache, - &self.state_nonce_cache, + &self.caches.key, + &self.caches.state_nonce, revert, apply, ) - .await } } @@ -544,7 +552,6 @@ where where T: Provider, { - let local_addrs = Arc::new(SyncRwLock::new(Vec::new())); // Per-actor limits are constant for the lifetime of this pool; capture // them once here rather than re-reading on every insert. let pending_store = PendingStore::new(MsgSetLimits::new( @@ -552,32 +559,18 @@ where api.max_untrusted_actor_pending_messages(), )); let tipset = Arc::new(SyncRwLock::new(api.get_heaviest_tipset())); - let bls_sig_cache = - SizeTrackingLruCache::new_with_metrics("bls_sig".into(), BLS_SIG_CACHE_SIZE); - let sig_val_cache = - SizeTrackingLruCache::new_with_metrics("sig_val".into(), SIG_VAL_CACHE_SIZE); - let key_cache = SizeTrackingLruCache::new_with_metrics("mpool_key".into(), KEY_CACHE_SIZE); - let state_nonce_cache = - SizeTrackingLruCache::new_with_metrics("state_nonce".into(), STATE_NONCE_CACHE_SIZE); - let local_msgs = Arc::new(SyncRwLock::new(HashSet::new())); - let republished = Arc::new(SyncRwLock::new(HashSet::new())); let block_delay = chain_config.block_delay_secs; - let (repub_trigger, repub_trigger_rx) = flume::bounded::<()>(4); - let mut mp = MessagePool { - local_addrs, + let (republish, repub_trigger_rx) = RepublishState::new(); + let mp = MessagePool { pending_store, + caches: Caches::new(), + local: Arc::new(LocalStore::new()), cur_tipset: tipset, api: Arc::new(api), - bls_sig_cache, - sig_val_cache, - key_cache, - state_nonce_cache, - local_msgs, - republished, config, network_sender, - repub_trigger, + republish: Arc::new(republish), chain_config: Arc::clone(&chain_config), }; @@ -586,14 +579,13 @@ where let mut head_changes_rx = mp.api.subscribe_head_changes(); let api = mp.api.clone(); - let bls_sig_cache = mp.bls_sig_cache.shallow_clone(); + let bls_sig_cache = mp.caches.bls_sig.shallow_clone(); let pending_store = mp.pending_store.shallow_clone(); - let republished = mp.republished.clone(); - let key_cache = mp.key_cache.shallow_clone(); - let state_nonce_cache = mp.state_nonce_cache.shallow_clone(); + let republish = mp.republish.clone(); + let key_cache = mp.caches.key.shallow_clone(); + let state_nonce_cache = mp.caches.state_nonce.shallow_clone(); let current_ts = mp.cur_tipset.clone(); - let repub_trigger = mp.repub_trigger.clone(); // Reacts to new HeadChanges services.spawn(async move { @@ -603,17 +595,14 @@ where if let Err(e) = head_change( api.as_ref(), &bls_sig_cache, - repub_trigger.clone(), - republished.as_ref(), + republish.as_ref(), &pending_store, ¤t_ts, &key_cache, &state_nonce_cache, reverts, applies, - ) - .await - { + ) { tracing::warn!("Error changing head: {e}"); } } @@ -630,9 +619,9 @@ where let api = mp.api.clone(); let pending_store = mp.pending_store.shallow_clone(); let cur_tipset = mp.cur_tipset.clone(); - let republished = mp.republished.clone(); - let local_addrs = mp.local_addrs.clone(); - let key_cache = mp.key_cache.shallow_clone(); + let republish = mp.republish.clone(); + let local = mp.local.clone(); + let key_cache = mp.caches.key.shallow_clone(); let network_sender = Arc::new(mp.network_sender.clone()); let republish_interval = u64::from(10 * block_delay + chain_config.propagation_delay_secs); // Reacts to republishing requests @@ -649,8 +638,8 @@ where network_sender.as_ref(), &pending_store, cur_tipset.as_ref(), - republished.as_ref(), - local_addrs.as_ref(), + republish.as_ref(), + local.as_ref(), &key_cache, &chain_config, ) diff --git a/src/message_pool/msgpool/republish.rs b/src/message_pool/msgpool/republish.rs new file mode 100644 index 00000000000..595ff1705d3 --- /dev/null +++ b/src/message_pool/msgpool/republish.rs @@ -0,0 +1,85 @@ +// Copyright 2019-2026 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Tracks which CIDs were already broadcast in the current republish cycle +//! and exposes a trigger to wake the republish task early. + +use ahash::HashSet; +use cid::Cid; +use parking_lot::RwLock as SyncRwLock; + +use crate::message_pool::Error; + +const REPUB_TRIGGER_CAPACITY: usize = 1; + +pub(in crate::message_pool) struct RepublishState { + republished: SyncRwLock>, + trigger: flume::Sender<()>, +} + +impl RepublishState { + pub(in crate::message_pool) fn new() -> (Self, flume::Receiver<()>) { + let (trigger, rx) = flume::bounded(REPUB_TRIGGER_CAPACITY); + ( + Self { + republished: SyncRwLock::default(), + trigger, + }, + rx, + ) + } + + /// Returns `true` if `cid` was seen by the republished state. + pub(in crate::message_pool) fn was_republished(&self, cid: &Cid) -> bool { + self.republished.read().contains(cid) + } + + /// Wake the republish task early. + pub(in crate::message_pool) fn trigger(&self) -> Result<(), Error> { + self.trigger + .try_send(()) + .map_err(|e| Error::Other(format!("Republish receiver dropped: {e}"))) + } + + pub(in crate::message_pool) fn replace_with>(&self, cids: I) { + let mut set = self.republished.write(); + set.clear(); + set.extend(cids); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn was_republished_reflects_replace_with() { + let (state, _rx) = RepublishState::new(); + let cid = Cid::default(); + + assert!( + !state.was_republished(&cid), + "fresh state should not contain any CIDs", + ); + + state.replace_with([cid]); + assert!( + state.was_republished(&cid), + "replace_with should populate the set", + ); + + state.replace_with(std::iter::empty()); + assert!( + !state.was_republished(&cid), + "replace_with with empty iter should clear the set", + ); + } + + #[test] + fn trigger_succeeds_when_receiver_is_alive() { + let (state, rx) = RepublishState::new(); + state.trigger().expect("send should succeed"); + rx.try_recv() + .expect("trigger should be observable on the receiver"); + } +} diff --git a/src/message_pool/msgpool/selection.rs b/src/message_pool/msgpool/selection.rs index f7cc9817464..b6d386aa314 100644 --- a/src/message_pool/msgpool/selection.rs +++ b/src/message_pool/msgpool/selection.rs @@ -652,9 +652,9 @@ where // Run head change to do reorg detection run_head_change( self.api.as_ref(), - &self.bls_sig_cache, + &self.caches.bls_sig, &self.pending_store, - &self.key_cache, + &self.caches.key, cur_ts.clone(), ts.clone(), &mut result,