Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion crates/evm/core/src/backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1024,6 +1024,16 @@ impl DatabaseExt for Backend {
return Ok(());
}

// Update block number and timestamp of active fork (if any) with current env values,
// in order to preserve values changed by using `roll` and `warp` cheatcodes.
if let Some(active_fork_id) = self.active_fork_id() {
self.forks.update_block(
self.ensure_fork_id(active_fork_id).cloned()?,
env.block.number,
env.block.timestamp,
)?;
}

let fork_id = self.ensure_fork_id(id).cloned()?;
let idx = self.inner.ensure_fork_index(&fork_id)?;
let fork_env = self
Expand Down Expand Up @@ -1100,7 +1110,7 @@ impl DatabaseExt for Backend {
}

self.active_fork_ids = Some((id, idx));
// update the environment accordingly
// Update current environment with environment of newly selected fork.
update_current_env_with_fork_env(env, fork_env);

Ok(())
Expand Down
124 changes: 74 additions & 50 deletions crates/evm/core/src/fork/multi.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
//! Support for running multiple fork backends
//! Support for running multiple fork backends.
//!
//! The design is similar to the single `SharedBackend`, `BackendHandler` but supports multiple
//! concurrently active pairs at once.

use super::CreateFork;
use alloy_primitives::U256;
use alloy_transport::layers::RetryBackoffService;
use foundry_common::provider::{
runtime_transport::RuntimeTransport, ProviderBuilder, RetryProvider,
Expand Down Expand Up @@ -65,13 +66,13 @@ impl<T: Into<String>> From<T> for ForkId {
}

/// The Sender half of multi fork pair.
/// Can send requests to the `MultiForkHandler` to create forks
/// Can send requests to the `MultiForkHandler` to create forks.
#[derive(Clone, Debug)]
#[must_use]
pub struct MultiFork {
/// Channel to send `Request`s to the handler
/// Channel to send `Request`s to the handler.
handler: Sender<Request>,
/// Ensures that all rpc resources get flushed properly
/// Ensures that all rpc resources get flushed properly.
_shutdown: Arc<ShutDownMultiFork>,
}

Expand All @@ -81,8 +82,8 @@ impl MultiFork {
trace!(target: "fork::multi", "spawning multifork");

let (fork, mut handler) = Self::new();
// spawn a light-weight thread with a thread-local async runtime just for
// sending and receiving data from the remote client(s)
// Spawn a light-weight thread with a thread-local async runtime just for
// sending and receiving data from the remote client(s).
std::thread::Builder::new()
.name("multi-fork-backend".into())
.spawn(move || {
Expand All @@ -92,10 +93,10 @@ impl MultiFork {
.expect("failed to build tokio runtime");

rt.block_on(async move {
// flush cache every 60s, this ensures that long-running fork tests get their
// cache flushed from time to time
// Flush cache every 60s, this ensures that long-running fork tests get their
// cache flushed from time to time.
// NOTE: we install the interval here because the `tokio::timer::Interval`
// requires a rt
// requires a rt.
handler.set_flush_cache_interval(Duration::from_secs(60));
handler.await
});
Expand All @@ -115,9 +116,9 @@ impl MultiFork {
(Self { handler, _shutdown }, MultiForkHandler::new(handler_rx))
}

/// Returns a fork backend
/// Returns a fork backend.
///
/// If no matching fork backend exists it will be created
/// If no matching fork backend exists it will be created.
pub fn create_fork(&self, fork: CreateFork) -> eyre::Result<(ForkId, SharedBackend, Env)> {
trace!("Creating new fork, url={}, block={:?}", fork.url, fork.evm_opts.fork_block_number);
let (sender, rx) = oneshot_channel();
Expand All @@ -126,9 +127,9 @@ impl MultiFork {
rx.recv()?
}

/// Rolls the block of the fork
/// Rolls the block of the fork.
///
/// If no matching fork backend exists it will be created
/// If no matching fork backend exists it will be created.
pub fn roll_fork(
&self,
fork: ForkId,
Expand All @@ -141,7 +142,7 @@ impl MultiFork {
rx.recv()?
}

/// Returns the `Env` of the given fork, if any
/// Returns the `Env` of the given fork, if any.
pub fn get_env(&self, fork: ForkId) -> eyre::Result<Option<Env>> {
trace!(?fork, "getting env config");
let (sender, rx) = oneshot_channel();
Expand All @@ -150,7 +151,16 @@ impl MultiFork {
Ok(rx.recv()?)
}

/// Returns the corresponding fork if it exists
/// Updates block number and timestamp of given fork with new values.
pub fn update_block(&self, fork: ForkId, number: U256, timestamp: U256) -> eyre::Result<()> {
trace!(?fork, ?number, ?timestamp, "update fork block");
self.handler
.clone()
.try_send(Request::UpdateBlock(fork, number, timestamp))
.map_err(|e| eyre::eyre!("{:?}", e))
}

/// Returns the corresponding fork if it exists.
///
/// Returns `None` if no matching fork backend is available.
pub fn get_fork(&self, id: impl Into<ForkId>) -> eyre::Result<Option<SharedBackend>> {
Expand All @@ -162,7 +172,7 @@ impl MultiFork {
Ok(rx.recv()?)
}

/// Returns the corresponding fork url if it exists
/// Returns the corresponding fork url if it exists.
///
/// Returns `None` if no matching fork is available.
pub fn get_fork_url(&self, id: impl Into<ForkId>) -> eyre::Result<Option<String>> {
Expand All @@ -180,37 +190,39 @@ type CreateFuture =
type CreateSender = OneshotSender<eyre::Result<(ForkId, SharedBackend, Env)>>;
type GetEnvSender = OneshotSender<Option<Env>>;

/// Request that's send to the handler
/// Request that's send to the handler.
#[derive(Debug)]
enum Request {
/// Creates a new ForkBackend
/// Creates a new ForkBackend.
CreateFork(Box<CreateFork>, CreateSender),
/// Returns the Fork backend for the `ForkId` if it exists
/// Returns the Fork backend for the `ForkId` if it exists.
GetFork(ForkId, OneshotSender<Option<SharedBackend>>),
/// Adjusts the block that's being forked, by creating a new fork at the new block
/// Adjusts the block that's being forked, by creating a new fork at the new block.
RollFork(ForkId, u64, CreateSender),
/// Returns the environment of the fork
/// Returns the environment of the fork.
GetEnv(ForkId, GetEnvSender),
/// Updates the block number and timestamp of the fork.
UpdateBlock(ForkId, U256, U256),
/// Shutdowns the entire `MultiForkHandler`, see `ShutDownMultiFork`
ShutDown(OneshotSender<()>),
/// Returns the Fork Url for the `ForkId` if it exists
/// Returns the Fork Url for the `ForkId` if it exists.
GetForkUrl(ForkId, OneshotSender<Option<String>>),
}

enum ForkTask {
/// Contains the future that will establish a new fork
/// Contains the future that will establish a new fork.
Create(CreateFuture, ForkId, CreateSender, Vec<CreateSender>),
}

/// The type that manages connections in the background
/// The type that manages connections in the background.
#[must_use = "futures do nothing unless polled"]
pub struct MultiForkHandler {
/// Incoming requests from the `MultiFork`.
incoming: Fuse<Receiver<Request>>,

/// All active handlers
/// All active handlers.
///
/// It's expected that this list will be rather small (<10)
/// It's expected that this list will be rather small (<10).
handlers: Vec<(ForkId, Handler)>,

// tasks currently in progress
Expand All @@ -222,7 +234,7 @@ pub struct MultiForkHandler {
/// block number.
forks: HashMap<ForkId, CreatedFork>,

/// Optional periodic interval to flush rpc cache
/// Optional periodic interval to flush rpc cache.
flush_cache_interval: Option<tokio::time::Interval>,
}

Expand All @@ -237,7 +249,7 @@ impl MultiForkHandler {
}
}

/// Sets the interval after which all rpc caches should be flushed periodically
/// Sets the interval after which all rpc caches should be flushed periodically.
pub fn set_flush_cache_interval(&mut self, period: Duration) -> &mut Self {
self.flush_cache_interval =
Some(tokio::time::interval_at(tokio::time::Instant::now() + period, period));
Expand All @@ -261,13 +273,13 @@ impl MultiForkHandler {
let fork_id = ForkId::new(&fork.url, fork.evm_opts.fork_block_number);
trace!(?fork_id, "created new forkId");

// there could already be a task for the requested fork in progress
// There could already be a task for the requested fork in progress.
if let Some(in_progress) = self.find_in_progress_task(&fork_id) {
in_progress.push(sender);
return;
}

// need to create a new fork
// Need to create a new fork.
let task = Box::pin(create_fork(fork));
self.pending_tasks.push(ForkTask::Create(task, fork_id, sender, Vec::new()));
}
Expand All @@ -282,14 +294,23 @@ impl MultiForkHandler {
self.forks.insert(fork_id.clone(), fork.clone());
let _ = sender.send(Ok((fork_id.clone(), fork.backend.clone(), fork.opts.env.clone())));

// notify all additional senders and track unique forkIds
// Notify all additional senders and track unique forkIds.
for sender in additional_senders {
let next_fork_id = fork.inc_senders(fork_id.clone());
self.forks.insert(next_fork_id.clone(), fork.clone());
let _ = sender.send(Ok((next_fork_id, fork.backend.clone(), fork.opts.env.clone())));
}
}

/// Update fork block number and timestamp. Used to preserve values set by `roll` and `warp`
/// cheatcodes when new fork selected.
fn update_block(&mut self, fork_id: ForkId, block_number: U256, block_timestamp: U256) {
if let Some(fork) = self.forks.get_mut(&fork_id) {
fork.opts.env.block.number = block_number;
fork.opts.env.block.timestamp = block_timestamp;
}
}
Comment on lines +305 to +312
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, makes sense


fn on_request(&mut self, req: Request) {
match req {
Request::CreateFork(fork, sender) => self.create_fork(*fork, sender),
Expand All @@ -310,9 +331,12 @@ impl MultiForkHandler {
Request::GetEnv(fork_id, sender) => {
let _ = sender.send(self.forks.get(&fork_id).map(|fork| fork.opts.env.clone()));
}
Request::UpdateBlock(fork_id, block_number, block_timestamp) => {
self.update_block(fork_id, block_number, block_timestamp);
}
Request::ShutDown(sender) => {
trace!(target: "fork::multi", "received shutdown signal");
// we're emptying all fork backends, this way we ensure all caches get flushed
// We're emptying all fork backends, this way we ensure all caches get flushed.
self.forks.clear();
self.handlers.clear();
let _ = sender.send(());
Expand All @@ -325,30 +349,30 @@ impl MultiForkHandler {
}
}

// Drives all handler to completion
// This future will finish once all underlying BackendHandler are completed
// Drives all handler to completion.
// This future will finish once all underlying BackendHandler are completed.
impl Future for MultiForkHandler {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let pin = self.get_mut();

// receive new requests
// Receive new requests.
loop {
match Pin::new(&mut pin.incoming).poll_next(cx) {
Poll::Ready(Some(req)) => {
pin.on_request(req);
}
Poll::Ready(None) => {
// channel closed, but we still need to drive the fork handlers to completion
// Channel closed, but we still need to drive the fork handlers to completion.
trace!(target: "fork::multi", "request channel closed");
break;
}
Poll::Pending => break,
}
}

// advance all tasks
// Advance all tasks.
for n in (0..pin.pending_tasks.len()).rev() {
let task = pin.pending_tasks.swap_remove(n);
match task {
Expand Down Expand Up @@ -387,7 +411,7 @@ impl Future for MultiForkHandler {
}
}

// advance all handlers
// Advance all handlers.
for n in (0..pin.handlers.len()).rev() {
let (id, mut handler) = pin.handlers.swap_remove(n);
match handler.poll_unpin(cx) {
Expand All @@ -405,7 +429,7 @@ impl Future for MultiForkHandler {
return Poll::Ready(());
}

// periodically flush cached RPC state
// Periodically flush cached RPC state.
if pin
.flush_cache_interval
.as_mut()
Expand All @@ -415,7 +439,7 @@ impl Future for MultiForkHandler {
{
trace!(target: "fork::multi", "tick flushing caches");
let forks = pin.forks.values().map(|f| f.backend.clone()).collect::<Vec<_>>();
// flush this on new thread to not block here
// Flush this on new thread to not block here.
std::thread::Builder::new()
.name("flusher".into())
.spawn(move || {
Expand All @@ -431,12 +455,12 @@ impl Future for MultiForkHandler {
/// Tracks the created Fork
#[derive(Debug, Clone)]
struct CreatedFork {
/// How the fork was initially created
/// How the fork was initially created.
opts: CreateFork,
/// Copy of the sender
/// Copy of the sender.
backend: SharedBackend,
/// How many consumers there are, since a `SharedBacked` can be used by multiple
/// consumers
/// consumers.
num_senders: Arc<AtomicUsize>,
}

Expand All @@ -445,7 +469,7 @@ impl CreatedFork {
Self { opts, backend, num_senders: Arc::new(AtomicUsize::new(1)) }
}

/// Increment senders and return unique identifier of the fork
/// Increment senders and return unique identifier of the fork.
fn inc_senders(&self, fork_id: ForkId) -> ForkId {
format!(
"{}-{}",
Expand All @@ -458,7 +482,7 @@ impl CreatedFork {

/// A type that's used to signaling the `MultiForkHandler` when it's time to shut down.
///
/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes
/// This is essentially a sync on drop, so that the `MultiForkHandler` can flush all rpc cashes.
///
/// This type intentionally does not implement `Clone` since it's intended that there's only once
/// instance.
Expand All @@ -481,9 +505,9 @@ impl Drop for ShutDownMultiFork {
}
}

/// Creates a new fork
/// Creates a new fork.
///
/// This will establish a new `Provider` to the endpoint and return the Fork Backend
/// This will establish a new `Provider` to the endpoint and return the Fork Backend.
async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork, Handler)> {
let provider = Arc::new(
ProviderBuilder::new(fork.url.as_str())
Expand All @@ -493,16 +517,16 @@ async fn create_fork(mut fork: CreateFork) -> eyre::Result<(ForkId, CreatedFork,
.build()?,
);

// initialise the fork environment
// Initialise the fork environment.
let (env, block) = fork.evm_opts.fork_evm_env(&fork.url).await?;
fork.env = env;
let meta = BlockchainDbMeta::new(fork.env.clone(), fork.url.clone());

// we need to use the block number from the block because the env's number can be different on
// We need to use the block number from the block because the env's number can be different on
// some L2s (e.g. Arbitrum).
let number = block.header.number.unwrap_or(meta.block_env.number.to());

// determine the cache path if caching is enabled
// Determine the cache path if caching is enabled.
let cache_path = if fork.enable_caching {
Config::foundry_block_cache_dir(meta.cfg_env.chain_id, number)
} else {
Expand Down
3 changes: 3 additions & 0 deletions crates/forge/tests/it/repros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,3 +358,6 @@ test_repro!(8277);

// https://github.com/foundry-rs/foundry/issues/8287
test_repro!(8287);

// https://github.com/foundry-rs/foundry/issues/8168
test_repro!(8168);
Loading