Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions src/rpc/methods/eth/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ impl EthEventHandler {
let ExecutedTipset {
executed_messages, ..
} = ctx.state_manager.load_executed_tipset(tipset).await?;
let mut resolved_id_addrs = HashMap::default();
let mut event_count = 0;
for (
msg_idx,
Expand All @@ -315,37 +316,36 @@ impl EthEventHandler {
let event_idx_base = u64::try_from(event_count)?;
event_count += events.len();
for (event_idx, event) in (event_idx_base..).zip(events.iter()) {
let id_addr = Address::new_id(event.emitter());
let result = ctx
.state_manager
.resolve_to_deterministic_address(id_addr, tipset)
.await
.with_context(|| {
format!(
"resolving address {} failed (EPOCH = {})",
id_addr,
tipset.epoch()
)
});
let resolved = if let Ok(resolved) = result {
let emitter = event.emitter();
let id_addr = Address::new_id(emitter);
let resolved_opt = if let Some(r) = resolved_id_addrs.get(&emitter) {
*r
} else {
let r = ctx
.state_manager
.resolve_to_deterministic_address(id_addr, tipset)
.await
.ok();
resolved_id_addrs.insert(emitter, r);
r
};
let resolved = if let Some(resolved) = resolved_opt {
resolved
} else if matches!(skip_event, SkipEvent::OnUnresolvedAddress) {
// Skip event
continue;
} else {
if let SkipEvent::OnUnresolvedAddress = skip_event {
// Skip event
continue;
} else {
id_addr
}
id_addr
};

let entries: Vec<crate::shim::executor::Entry> = event.entries();
let entries = event.entries();
let matched = if let Some(spec) = spec {
spec.matches(&resolved, &entries)?
} else {
true
};
if matched {
let entries: Vec<EventEntry> = entries
let entries = entries
.into_iter()
.map(|entry| {
let (flags, key, codec, value) = entry.into_parts();
Expand Down
31 changes: 22 additions & 9 deletions src/state_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ use crate::state_manager::cache::TipsetStateCache;
use crate::state_manager::chain_rand::draw_randomness;
use crate::state_migration::run_state_migrations;
use crate::utils::ShallowClone as _;
use crate::utils::cache::SizeTrackingLruCache;
use crate::utils::get_size::{GetSize, vec_heap_size_helper};
use ahash::{HashMap, HashMapExt};
use anyhow::{Context as _, bail, ensure};
Expand Down Expand Up @@ -84,6 +85,7 @@ use tokio::sync::{RwLock, broadcast::error::RecvError};
use tracing::{error, info, instrument, warn};

const DEFAULT_TIPSET_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
const DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE: NonZeroUsize = nonzero!(1024usize);
pub const EVENTS_AMT_BITWIDTH: u32 = 5;

/// Result of executing an individual chain message in a tipset.
Expand Down Expand Up @@ -190,6 +192,7 @@ pub struct StateManager<DB> {
cs: Arc<ChainStore<DB>>,
/// This is a cache which indexes tipsets to their calculated state output (state root, receipt root).
cache: TipsetStateCache<ExecutedTipset>,
id_to_deterministic_address_cache: SizeTrackingLruCache<u64, Address>,
beacon: Arc<crate::beacon::BeaconSchedule>,
engine: Arc<MultiEngine>,
}
Expand Down Expand Up @@ -217,6 +220,10 @@ where
cache: TipsetStateCache::new("executed_tipset"), // For StateOutput
beacon,
engine,
id_to_deterministic_address_cache: SizeTrackingLruCache::new_with_metrics(
"id_to_deterministic_address".into(),
DEFAULT_ID_TO_DETERMINISTIC_ADDRESS_CACHE_SIZE,
),
})
}

Expand Down Expand Up @@ -1790,20 +1797,26 @@ where
match address.protocol() {
BLS | Secp256k1 | Delegated => Ok(address),
Actor => anyhow::bail!("cannot resolve actor address to key address"),
_ => {
ID => {
let id = address.id()?;
if let Some(cached) = self.id_to_deterministic_address_cache.get_cloned(&id) {
return Ok(cached);
}
// First try to resolve the actor in the parent state, so we don't have to compute anything.
if let Ok(state) =
let resolved = if let Ok(state) =
StateTree::new_from_root(self.blockstore_owned(), ts.parent_state())
&& let Ok(address) = state
.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
{
return Ok(address);
}

// If that fails, compute the tip-set and try again.
let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)
address
} else {
// If that fails, compute the tip-set and try again.
let TipsetState { state_root, .. } = self.load_tipset_state(ts).await?;
let state = StateTree::new_from_root(self.blockstore_owned(), &state_root)?;
state.resolve_to_deterministic_addr(self.chain_store().blockstore(), address)?
};
self.id_to_deterministic_address_cache.push(id, resolved);
Ok(resolved)
}
}
}
Expand Down
Loading