diff --git a/Cargo.lock b/Cargo.lock index 3c97be17671c..111e2cfcd4bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6735,6 +6735,7 @@ version = "0.2.0-beta.2" dependencies = [ "assert_matches", "clap", + "parking_lot 0.12.1", "rayon", "reth-db", "reth-interfaces", diff --git a/bin/reth/src/commands/import.rs b/bin/reth/src/commands/import.rs index fa5d4488aa99..81b50dd8aea2 100644 --- a/bin/reth/src/commands/import.rs +++ b/bin/reth/src/commands/import.rs @@ -107,19 +107,17 @@ impl ImportCommand { let tip = file_client.tip().expect("file client has no tip"); info!(target: "reth::cli", "Chain file imported"); - let static_file_producer = StaticFileProducer::new( - provider_factory.clone(), - provider_factory.static_file_provider(), - PruneModes::default(), - ); - let (mut pipeline, events) = self .build_import_pipeline( config, provider_factory.clone(), &consensus, file_client, - static_file_producer, + StaticFileProducer::new( + provider_factory.clone(), + provider_factory.static_file_provider(), + PruneModes::default(), + ), ) .await?; diff --git a/crates/consensus/beacon/src/engine/hooks/static_file.rs b/crates/consensus/beacon/src/engine/hooks/static_file.rs index 112a80580d4a..5b77ec39d638 100644 --- a/crates/consensus/beacon/src/engine/hooks/static_file.rs +++ b/crates/consensus/beacon/src/engine/hooks/static_file.rs @@ -69,14 +69,15 @@ impl StaticFileHook { /// This will try to spawn the static_file_producer if it is idle: /// 1. Check if producing static files is needed through - /// [StaticFileProducer::get_static_file_targets] and then - /// [StaticFileTargets::any](reth_static_file::StaticFileTargets::any). + /// [StaticFileProducer::get_static_file_targets](reth_static_file::StaticFileProducerInner::get_static_file_targets) + /// and then [StaticFileTargets::any](reth_static_file::StaticFileTargets::any). /// 2. /// 1. If producing static files is needed, pass static file request to the - /// [StaticFileProducer::run] and spawn it in a separate task. Set static file producer - /// state to [StaticFileProducerState::Running]. + /// [StaticFileProducer::run](reth_static_file::StaticFileProducerInner::run) and spawn + /// it in a separate task. Set static file producer state to + /// [StaticFileProducerState::Running]. /// 2. If producing static files is not needed, set static file producer state back to - /// [StaticFileProducerState::Idle]. + /// [StaticFileProducerState::Idle]. /// /// If static_file_producer is already running, do nothing. fn try_spawn_static_file_producer( @@ -85,16 +86,23 @@ impl StaticFileHook { ) -> RethResult> { Ok(match &mut self.state { StaticFileProducerState::Idle(static_file_producer) => { - let Some(mut static_file_producer) = static_file_producer.take() else { + let Some(static_file_producer) = static_file_producer.take() else { trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle"); return Ok(None) }; - let targets = static_file_producer.get_static_file_targets(HighestStaticFiles { - headers: Some(finalized_block_number), - receipts: Some(finalized_block_number), - transactions: Some(finalized_block_number), - })?; + let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc() + else { + trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken"); + return Ok(None) + }; + + let targets = + locked_static_file_producer.get_static_file_targets(HighestStaticFiles { + headers: Some(finalized_block_number), + receipts: Some(finalized_block_number), + transactions: Some(finalized_block_number), + })?; // Check if the moving data to static files has been requested. if targets.any() { @@ -102,7 +110,7 @@ impl StaticFileHook { self.task_spawner.spawn_critical_blocking( "static_file_producer task", Box::pin(async move { - let result = static_file_producer.run(targets); + let result = locked_static_file_producer.run(targets); let _ = tx.send((static_file_producer, result)); }), ); diff --git a/crates/node-builder/src/builder.rs b/crates/node-builder/src/builder.rs index 80840603bf1d..9e6c950cf391 100644 --- a/crates/node-builder/src/builder.rs +++ b/crates/node-builder/src/builder.rs @@ -548,12 +548,12 @@ where let max_block = config.max_block(&network_client, provider_factory.clone()).await?; let mut hooks = EngineHooks::new(); - let mut static_file_producer = StaticFileProducer::new( + let static_file_producer = StaticFileProducer::new( provider_factory.clone(), provider_factory.static_file_provider(), prune_config.clone().unwrap_or_default().segments, ); - let static_file_producer_events = static_file_producer.events(); + let static_file_producer_events = static_file_producer.lock().events(); hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone()))); info!(target: "reth::cli", "StaticFileProducer initialized"); diff --git a/crates/revm/src/stack.rs b/crates/revm/src/stack.rs index 9d4b6b02a877..8f8bfa5ce0dd 100644 --- a/crates/revm/src/stack.rs +++ b/crates/revm/src/stack.rs @@ -131,7 +131,7 @@ where ) -> Option { call_inspectors!([&mut self.custom_print_tracer], |inspector| { if let Some(outcome) = inspector.call(context, inputs) { - return Some(outcome); + return Some(outcome) } }); @@ -151,7 +151,7 @@ where // If the inspector returns a different ret or a revert with a non-empty message, // we assume it wants to tell us something if new_ret != outcome { - return new_ret; + return new_ret } }); @@ -166,7 +166,7 @@ where ) -> Option { call_inspectors!([&mut self.custom_print_tracer], |inspector| { if let Some(out) = inspector.create(context, inputs) { - return Some(out); + return Some(out) } }); @@ -186,7 +186,7 @@ where // If the inspector returns a different ret or a revert with a non-empty message, // we assume it wants to tell us something if new_ret != outcome { - return new_ret; + return new_ret } }); diff --git a/crates/rpc/rpc/src/eth/bundle.rs b/crates/rpc/rpc/src/eth/bundle.rs index da95e41a299b..c2d56df3132d 100644 --- a/crates/rpc/rpc/src/eth/bundle.rs +++ b/crates/rpc/rpc/src/eth/bundle.rs @@ -82,7 +82,7 @@ where { return Err(EthApiError::InvalidParams( EthBundleError::Eip4844BlobGasExceeded.to_string(), - )); + )) } let block_id: reth_rpc_types::BlockId = state_block_number.into(); diff --git a/crates/stages/src/pipeline/mod.rs b/crates/stages/src/pipeline/mod.rs index 40d010f48608..9fc78846bcbc 100644 --- a/crates/stages/src/pipeline/mod.rs +++ b/crates/stages/src/pipeline/mod.rs @@ -229,9 +229,14 @@ where /// -> [StageId::Execution] /// - [StaticFileSegment::Transactions](reth_primitives::static_file::StaticFileSegment::Transactions) /// -> [StageId::Bodies] + /// + /// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the + /// lock is occupied. fn produce_static_files(&mut self) -> RethResult<()> { + let mut static_file_producer = self.static_file_producer.lock(); + let provider = self.provider_factory.provider()?; - let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles { + let targets = static_file_producer.get_static_file_targets(HighestStaticFiles { headers: provider .get_stage_checkpoint(StageId::Headers)? .map(|checkpoint| checkpoint.block_number), @@ -242,7 +247,7 @@ where .get_stage_checkpoint(StageId::Bodies)? .map(|checkpoint| checkpoint.block_number), })?; - self.static_file_producer.run(targets)?; + static_file_producer.run(targets)?; Ok(()) } diff --git a/crates/static-file/Cargo.toml b/crates/static-file/Cargo.toml index 0f7d867b3ab1..b3fc1b93d368 100644 --- a/crates/static-file/Cargo.toml +++ b/crates/static-file/Cargo.toml @@ -27,6 +27,7 @@ tokio-stream.workspace = true tracing.workspace = true clap = { workspace = true, features = ["derive"], optional = true } rayon.workspace = true +parking_lot = { workspace = true, features = ["send_guard", "arc_lock"] } [dev-dependencies] reth-db = { workspace = true, features = ["test-utils"] } diff --git a/crates/static-file/src/lib.rs b/crates/static-file/src/lib.rs index 2c6c11dfd4ff..f545298ebd47 100644 --- a/crates/static-file/src/lib.rs +++ b/crates/static-file/src/lib.rs @@ -13,5 +13,6 @@ mod static_file_producer; pub use event::StaticFileProducerEvent; pub use static_file_producer::{ - StaticFileProducer, StaticFileProducerResult, StaticFileProducerWithResult, StaticFileTargets, + StaticFileProducer, StaticFileProducerInner, StaticFileProducerResult, + StaticFileProducerWithResult, StaticFileTargets, }; diff --git a/crates/static-file/src/static_file_producer.rs b/crates/static-file/src/static_file_producer.rs index 52b115e9fee8..c5ee9818eee6 100644 --- a/crates/static-file/src/static_file_producer.rs +++ b/crates/static-file/src/static_file_producer.rs @@ -1,6 +1,7 @@ //! Support for producing static files. use crate::{segments, segments::Segment, StaticFileProducerEvent}; +use parking_lot::Mutex; use rayon::prelude::*; use reth_db::database::Database; use reth_interfaces::RethResult; @@ -10,26 +11,58 @@ use reth_provider::{ ProviderFactory, }; use reth_tokio_util::EventListeners; -use std::{ops::RangeInclusive, time::Instant}; +use std::{ + ops::{Deref, RangeInclusive}, + sync::Arc, + time::Instant, +}; use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, trace}; -/// Result of [StaticFileProducer::run] execution. +/// Result of [StaticFileProducerInner::run] execution. pub type StaticFileProducerResult = RethResult; -/// The [StaticFileProducer] instance itself with the result of [StaticFileProducer::run] +/// The [StaticFileProducer] instance itself with the result of [StaticFileProducerInner::run] pub type StaticFileProducerWithResult = (StaticFileProducer, StaticFileProducerResult); -/// Static File producer routine. See [StaticFileProducer::run] for more detailed description. +/// Static File producer. It's a wrapper around [StaticFileProducer] that allows to share it +/// between threads. #[derive(Debug, Clone)] -pub struct StaticFileProducer { +pub struct StaticFileProducer(Arc>>); + +impl StaticFileProducer { + /// Creates a new [StaticFileProducer]. + pub fn new( + provider_factory: ProviderFactory, + static_file_provider: StaticFileProvider, + prune_modes: PruneModes, + ) -> Self { + Self(Arc::new(Mutex::new(StaticFileProducerInner::new( + provider_factory, + static_file_provider, + prune_modes, + )))) + } +} + +impl Deref for StaticFileProducer { + type Target = Arc>>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +/// Static File producer routine. See [StaticFileProducerInner::run] for more detailed description. +#[derive(Debug)] +pub struct StaticFileProducerInner { /// Provider factory provider_factory: ProviderFactory, /// Static File provider static_file_provider: StaticFileProvider, /// Pruning configuration for every part of the data that can be pruned. Set by user, and - /// needed in [StaticFileProducer] to prevent attempting to move prunable data to static files. - /// See [StaticFileProducer::get_static_file_targets]. + /// needed in [StaticFileProducerInner] to prevent attempting to move prunable data to static + /// files. See [StaticFileProducerInner::get_static_file_targets]. prune_modes: PruneModes, listeners: EventListeners, } @@ -68,9 +101,8 @@ impl StaticFileTargets { } } -impl StaticFileProducer { - /// Creates a new [StaticFileProducer]. - pub fn new( +impl StaticFileProducerInner { + fn new( provider_factory: ProviderFactory, static_file_provider: StaticFileProvider, prune_modes: PruneModes, @@ -200,9 +232,11 @@ impl StaticFileProducer { #[cfg(test)] mod tests { - use crate::{static_file_producer::StaticFileTargets, StaticFileProducer}; + use crate::static_file_producer::{ + StaticFileProducer, StaticFileProducerInner, StaticFileTargets, + }; use assert_matches::assert_matches; - use reth_db::{database::Database, transaction::DbTx}; + use reth_db::{database::Database, test_utils::TempDatabase, transaction::DbTx, DatabaseEnv}; use reth_interfaces::{ provider::ProviderError, test_utils::{ @@ -214,13 +248,18 @@ mod tests { use reth_primitives::{ static_file::HighestStaticFiles, PruneModes, StaticFileSegment, B256, U256, }; - use reth_provider::providers::StaticFileWriter; + use reth_provider::{ + providers::{StaticFileProvider, StaticFileWriter}, + ProviderFactory, + }; use reth_stages::test_utils::{StorageKind, TestStageDB}; + use std::{ + sync::{mpsc::channel, Arc}, + time::Duration, + }; - #[test] - fn run() { + fn setup() -> (ProviderFactory>>, StaticFileProvider) { let mut rng = generators::rng(); - let db = TestStageDB::default(); let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3); @@ -251,8 +290,14 @@ mod tests { let provider_factory = db.factory; let static_file_provider = provider_factory.static_file_provider(); + (provider_factory, static_file_provider) + } - let mut static_file_producer = StaticFileProducer::new( + #[test] + fn run() { + let (provider_factory, static_file_provider) = setup(); + + let mut static_file_producer = StaticFileProducerInner::new( provider_factory, static_file_provider.clone(), PruneModes::default(), @@ -324,4 +369,48 @@ mod tests { HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) } ); } + + /// Tests that a cloneable [`StaticFileProducer`] type is not susceptible to any race condition. + #[test] + fn only_one() { + let (provider_factory, static_file_provider) = setup(); + + let static_file_producer = StaticFileProducer::new( + provider_factory, + static_file_provider.clone(), + PruneModes::default(), + ); + + let (tx, rx) = channel(); + + for i in 0..5 { + let producer = static_file_producer.clone(); + let tx = tx.clone(); + + std::thread::spawn(move || { + let mut locked_producer = producer.lock(); + if i == 0 { + // Let other threads spawn as well. + std::thread::sleep(Duration::from_millis(100)); + } + let targets = locked_producer + .get_static_file_targets(HighestStaticFiles { + headers: Some(1), + receipts: Some(1), + transactions: Some(1), + }) + .expect("get static file targets"); + assert_matches!(locked_producer.run(targets.clone()), Ok(_)); + tx.send(targets).unwrap(); + }); + } + + drop(tx); + + let mut only_one = Some(()); + for target in rx { + // Only the first spawn should have any meaningful target. + assert!(only_one.take().is_some_and(|_| target.any()) || !target.any()) + } + } }