Skip to content

Commit

Permalink
fix(static-file): pass producer as Arc<Mutex<_>> to ensure only one…
Browse files Browse the repository at this point in the history
… is active (paradigmxyz#7143)

Co-authored-by: joshieDo <ranriver@protonmail.com>
  • Loading branch information
2 people authored and Ruteri committed Apr 17, 2024
1 parent c060c62 commit 03f00df
Show file tree
Hide file tree
Showing 10 changed files with 149 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 5 additions & 7 deletions bin/reth/src/commands/import.rs
Expand Up @@ -107,19 +107,17 @@ impl ImportCommand {
let tip = file_client.tip().expect("file client has no tip");
info!(target: "reth::cli", "Chain file imported");

let static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
);

let (mut pipeline, events) = self
.build_import_pipeline(
config,
provider_factory.clone(),
&consensus,
file_client,
static_file_producer,
StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
PruneModes::default(),
),
)
.await?;

Expand Down
32 changes: 20 additions & 12 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Expand Up @@ -69,14 +69,15 @@ impl<DB: Database + 'static> StaticFileHook<DB> {

/// This will try to spawn the static_file_producer if it is idle:
/// 1. Check if producing static files is needed through
/// [StaticFileProducer::get_static_file_targets] and then
/// [StaticFileTargets::any](reth_static_file::StaticFileTargets::any).
/// [StaticFileProducer::get_static_file_targets](reth_static_file::StaticFileProducerInner::get_static_file_targets)
/// and then [StaticFileTargets::any](reth_static_file::StaticFileTargets::any).
/// 2.
/// 1. If producing static files is needed, pass static file request to the
/// [StaticFileProducer::run] and spawn it in a separate task. Set static file producer
/// state to [StaticFileProducerState::Running].
/// [StaticFileProducer::run](reth_static_file::StaticFileProducerInner::run) and spawn
/// it in a separate task. Set static file producer state to
/// [StaticFileProducerState::Running].
/// 2. If producing static files is not needed, set static file producer state back to
/// [StaticFileProducerState::Idle].
/// [StaticFileProducerState::Idle].
///
/// If static_file_producer is already running, do nothing.
fn try_spawn_static_file_producer(
Expand All @@ -85,24 +86,31 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
) -> RethResult<Option<EngineHookEvent>> {
Ok(match &mut self.state {
StaticFileProducerState::Idle(static_file_producer) => {
let Some(mut static_file_producer) = static_file_producer.take() else {
let Some(static_file_producer) = static_file_producer.take() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer is already running but the state is idle");
return Ok(None)
};

let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;
let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc()
else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};

let targets =
locked_static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: Some(finalized_block_number),
receipts: Some(finalized_block_number),
transactions: Some(finalized_block_number),
})?;

// Check if the moving data to static files has been requested.
if targets.any() {
let (tx, rx) = oneshot::channel();
self.task_spawner.spawn_critical_blocking(
"static_file_producer task",
Box::pin(async move {
let result = static_file_producer.run(targets);
let result = locked_static_file_producer.run(targets);
let _ = tx.send((static_file_producer, result));
}),
);
Expand Down
4 changes: 2 additions & 2 deletions crates/node-builder/src/builder.rs
Expand Up @@ -548,12 +548,12 @@ where
let max_block = config.max_block(&network_client, provider_factory.clone()).await?;
let mut hooks = EngineHooks::new();

let mut static_file_producer = StaticFileProducer::new(
let static_file_producer = StaticFileProducer::new(
provider_factory.clone(),
provider_factory.static_file_provider(),
prune_config.clone().unwrap_or_default().segments,
);
let static_file_producer_events = static_file_producer.events();
let static_file_producer_events = static_file_producer.lock().events();
hooks.add(StaticFileHook::new(static_file_producer.clone(), Box::new(executor.clone())));
info!(target: "reth::cli", "StaticFileProducer initialized");

Expand Down
8 changes: 4 additions & 4 deletions crates/revm/src/stack.rs
Expand Up @@ -131,7 +131,7 @@ where
) -> Option<CallOutcome> {
call_inspectors!([&mut self.custom_print_tracer], |inspector| {
if let Some(outcome) = inspector.call(context, inputs) {
return Some(outcome);
return Some(outcome)
}
});

Expand All @@ -151,7 +151,7 @@ where
// If the inspector returns a different ret or a revert with a non-empty message,
// we assume it wants to tell us something
if new_ret != outcome {
return new_ret;
return new_ret
}
});

Expand All @@ -166,7 +166,7 @@ where
) -> Option<CreateOutcome> {
call_inspectors!([&mut self.custom_print_tracer], |inspector| {
if let Some(out) = inspector.create(context, inputs) {
return Some(out);
return Some(out)
}
});

Expand All @@ -186,7 +186,7 @@ where
// If the inspector returns a different ret or a revert with a non-empty message,
// we assume it wants to tell us something
if new_ret != outcome {
return new_ret;
return new_ret
}
});

Expand Down
2 changes: 1 addition & 1 deletion crates/rpc/rpc/src/eth/bundle.rs
Expand Up @@ -82,7 +82,7 @@ where
{
return Err(EthApiError::InvalidParams(
EthBundleError::Eip4844BlobGasExceeded.to_string(),
));
))
}

let block_id: reth_rpc_types::BlockId = state_block_number.into();
Expand Down
9 changes: 7 additions & 2 deletions crates/stages/src/pipeline/mod.rs
Expand Up @@ -229,9 +229,14 @@ where
/// -> [StageId::Execution]
/// - [StaticFileSegment::Transactions](reth_primitives::static_file::StaticFileSegment::Transactions)
/// -> [StageId::Bodies]
///
/// CAUTION: This method locks the static file producer Mutex, hence can block the thread if the
/// lock is occupied.
fn produce_static_files(&mut self) -> RethResult<()> {
let mut static_file_producer = self.static_file_producer.lock();

let provider = self.provider_factory.provider()?;
let targets = self.static_file_producer.get_static_file_targets(HighestStaticFiles {
let targets = static_file_producer.get_static_file_targets(HighestStaticFiles {
headers: provider
.get_stage_checkpoint(StageId::Headers)?
.map(|checkpoint| checkpoint.block_number),
Expand All @@ -242,7 +247,7 @@ where
.get_stage_checkpoint(StageId::Bodies)?
.map(|checkpoint| checkpoint.block_number),
})?;
self.static_file_producer.run(targets)?;
static_file_producer.run(targets)?;

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/static-file/Cargo.toml
Expand Up @@ -27,6 +27,7 @@ tokio-stream.workspace = true
tracing.workspace = true
clap = { workspace = true, features = ["derive"], optional = true }
rayon.workspace = true
parking_lot = { workspace = true, features = ["send_guard", "arc_lock"] }

[dev-dependencies]
reth-db = { workspace = true, features = ["test-utils"] }
Expand Down
3 changes: 2 additions & 1 deletion crates/static-file/src/lib.rs
Expand Up @@ -13,5 +13,6 @@ mod static_file_producer;

pub use event::StaticFileProducerEvent;
pub use static_file_producer::{
StaticFileProducer, StaticFileProducerResult, StaticFileProducerWithResult, StaticFileTargets,
StaticFileProducer, StaticFileProducerInner, StaticFileProducerResult,
StaticFileProducerWithResult, StaticFileTargets,
};
123 changes: 106 additions & 17 deletions crates/static-file/src/static_file_producer.rs
@@ -1,6 +1,7 @@
//! Support for producing static files.

use crate::{segments, segments::Segment, StaticFileProducerEvent};
use parking_lot::Mutex;
use rayon::prelude::*;
use reth_db::database::Database;
use reth_interfaces::RethResult;
Expand All @@ -10,26 +11,58 @@ use reth_provider::{
ProviderFactory,
};
use reth_tokio_util::EventListeners;
use std::{ops::RangeInclusive, time::Instant};
use std::{
ops::{Deref, RangeInclusive},
sync::Arc,
time::Instant,
};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, trace};

/// Result of [StaticFileProducer::run] execution.
/// Result of [StaticFileProducerInner::run] execution.
pub type StaticFileProducerResult = RethResult<StaticFileTargets>;

/// The [StaticFileProducer] instance itself with the result of [StaticFileProducer::run]
/// The [StaticFileProducer] instance itself with the result of [StaticFileProducerInner::run]
pub type StaticFileProducerWithResult<DB> = (StaticFileProducer<DB>, StaticFileProducerResult);

/// Static File producer routine. See [StaticFileProducer::run] for more detailed description.
/// Static File producer. It's a wrapper around [StaticFileProducer] that allows to share it
/// between threads.
#[derive(Debug, Clone)]
pub struct StaticFileProducer<DB> {
pub struct StaticFileProducer<DB>(Arc<Mutex<StaticFileProducerInner<DB>>>);

impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
) -> Self {
Self(Arc::new(Mutex::new(StaticFileProducerInner::new(
provider_factory,
static_file_provider,
prune_modes,
))))
}
}

impl<DB> Deref for StaticFileProducer<DB> {
type Target = Arc<Mutex<StaticFileProducerInner<DB>>>;

fn deref(&self) -> &Self::Target {
&self.0
}
}

/// Static File producer routine. See [StaticFileProducerInner::run] for more detailed description.
#[derive(Debug)]
pub struct StaticFileProducerInner<DB> {
/// Provider factory
provider_factory: ProviderFactory<DB>,
/// Static File provider
static_file_provider: StaticFileProvider,
/// Pruning configuration for every part of the data that can be pruned. Set by user, and
/// needed in [StaticFileProducer] to prevent attempting to move prunable data to static files.
/// See [StaticFileProducer::get_static_file_targets].
/// needed in [StaticFileProducerInner] to prevent attempting to move prunable data to static
/// files. See [StaticFileProducerInner::get_static_file_targets].
prune_modes: PruneModes,
listeners: EventListeners<StaticFileProducerEvent>,
}
Expand Down Expand Up @@ -68,9 +101,8 @@ impl StaticFileTargets {
}
}

impl<DB: Database> StaticFileProducer<DB> {
/// Creates a new [StaticFileProducer].
pub fn new(
impl<DB: Database> StaticFileProducerInner<DB> {
fn new(
provider_factory: ProviderFactory<DB>,
static_file_provider: StaticFileProvider,
prune_modes: PruneModes,
Expand Down Expand Up @@ -200,9 +232,11 @@ impl<DB: Database> StaticFileProducer<DB> {

#[cfg(test)]
mod tests {
use crate::{static_file_producer::StaticFileTargets, StaticFileProducer};
use crate::static_file_producer::{
StaticFileProducer, StaticFileProducerInner, StaticFileTargets,
};
use assert_matches::assert_matches;
use reth_db::{database::Database, transaction::DbTx};
use reth_db::{database::Database, test_utils::TempDatabase, transaction::DbTx, DatabaseEnv};
use reth_interfaces::{
provider::ProviderError,
test_utils::{
Expand All @@ -214,13 +248,18 @@ mod tests {
use reth_primitives::{
static_file::HighestStaticFiles, PruneModes, StaticFileSegment, B256, U256,
};
use reth_provider::providers::StaticFileWriter;
use reth_provider::{
providers::{StaticFileProvider, StaticFileWriter},
ProviderFactory,
};
use reth_stages::test_utils::{StorageKind, TestStageDB};
use std::{
sync::{mpsc::channel, Arc},
time::Duration,
};

#[test]
fn run() {
fn setup() -> (ProviderFactory<Arc<TempDatabase<DatabaseEnv>>>, StaticFileProvider) {
let mut rng = generators::rng();

let db = TestStageDB::default();

let blocks = random_block_range(&mut rng, 0..=3, B256::ZERO, 2..3);
Expand Down Expand Up @@ -251,8 +290,14 @@ mod tests {

let provider_factory = db.factory;
let static_file_provider = provider_factory.static_file_provider();
(provider_factory, static_file_provider)
}

let mut static_file_producer = StaticFileProducer::new(
#[test]
fn run() {
let (provider_factory, static_file_provider) = setup();

let mut static_file_producer = StaticFileProducerInner::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
Expand Down Expand Up @@ -324,4 +369,48 @@ mod tests {
HighestStaticFiles { headers: Some(3), receipts: Some(3), transactions: Some(3) }
);
}

/// Tests that a cloneable [`StaticFileProducer`] type is not susceptible to any race condition.
#[test]
fn only_one() {
let (provider_factory, static_file_provider) = setup();

let static_file_producer = StaticFileProducer::new(
provider_factory,
static_file_provider.clone(),
PruneModes::default(),
);

let (tx, rx) = channel();

for i in 0..5 {
let producer = static_file_producer.clone();
let tx = tx.clone();

std::thread::spawn(move || {
let mut locked_producer = producer.lock();
if i == 0 {
// Let other threads spawn as well.
std::thread::sleep(Duration::from_millis(100));
}
let targets = locked_producer
.get_static_file_targets(HighestStaticFiles {
headers: Some(1),
receipts: Some(1),
transactions: Some(1),
})
.expect("get static file targets");
assert_matches!(locked_producer.run(targets.clone()), Ok(_));
tx.send(targets).unwrap();
});
}

drop(tx);

let mut only_one = Some(());
for target in rx {
// Only the first spawn should have any meaningful target.
assert!(only_one.take().is_some_and(|_| target.any()) || !target.any())
}
}
}

0 comments on commit 03f00df

Please sign in to comment.