diff --git a/crates/flashblocks-rpc/src/rpc.rs b/crates/flashblocks-rpc/src/rpc.rs index e0a9c61..60a1321 100644 --- a/crates/flashblocks-rpc/src/rpc.rs +++ b/crates/flashblocks-rpc/src/rpc.rs @@ -3,7 +3,6 @@ use std::time::Duration; use crate::metrics::Metrics; use crate::pending_blocks::PendingBlocks; -use crate::subscription::Flashblock; use alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_primitives::{Address, TxHash, U256}; use alloy_rpc_types::simulate::{SimBlock, SimulatePayload, SimulatedBlock}; @@ -42,7 +41,7 @@ pub trait FlashblocksAPI { /// Retrieves the pending blocks. fn get_pending_blocks(&self) -> Guard>>; - fn subscribe_to_flashblocks(&self) -> broadcast::Receiver; + fn subscribe_to_flashblocks(&self) -> broadcast::Receiver>; } pub trait PendingBlocksAPI { @@ -525,10 +524,9 @@ where loop { match receiver.recv().await { - Ok(flashblock) if flashblock.metadata.receipts.contains_key(&tx_hash) => { + Ok(pending_state) if pending_state.get_receipt(tx_hash).is_some() => { debug!(message = "found receipt in flashblock", tx_hash = %tx_hash); - let pending_blocks = self.flashblocks_state.get_pending_blocks(); - return pending_blocks.get_transaction_receipt(tx_hash); + return pending_state.get_receipt(tx_hash); } Ok(_) => { trace!(message = "flashblock does not contain receipt", tx_hash = %tx_hash); diff --git a/crates/flashblocks-rpc/src/state.rs b/crates/flashblocks-rpc/src/state.rs index e6f49b6..17e1aaa 100644 --- a/crates/flashblocks-rpc/src/state.rs +++ b/crates/flashblocks-rpc/src/state.rs @@ -51,7 +51,7 @@ enum StateUpdate { pub struct FlashblocksState { pending_blocks: Arc>, queue: mpsc::UnboundedSender, - flashblock_sender: Sender, + flashblock_sender: Sender>, state_processor: StateProcessor, } @@ -66,13 +66,18 @@ where pub fn new(client: Client) -> Self { let (tx, rx) = mpsc::unbounded_channel::(); let pending_blocks: Arc> = Arc::new(ArcSwapOption::new(None)); - let state_processor = - StateProcessor::new(client, pending_blocks.clone(), Arc::new(Mutex::new(rx))); + let (flashblock_sender, _) = broadcast::channel(BUFFER_SIZE); + let state_processor = StateProcessor::new( + client, + pending_blocks.clone(), + Arc::new(Mutex::new(rx)), + flashblock_sender.clone(), + ); Self { pending_blocks, queue: tx, - flashblock_sender: broadcast::channel(BUFFER_SIZE).0, + flashblock_sender, state_processor, } } @@ -113,8 +118,6 @@ impl FlashblocksReceiver for FlashblocksState { error!(message = "could not add flashblock to processing queue", block_number = flashblock.metadata.block_number, flashblock_index = flashblock.index, error = %e); } } - - _ = self.flashblock_sender.send(flashblock); } } @@ -123,7 +126,7 @@ impl FlashblocksAPI for FlashblocksState { self.pending_blocks.load() } - fn subscribe_to_flashblocks(&self) -> tokio::sync::broadcast::Receiver { + fn subscribe_to_flashblocks(&self) -> broadcast::Receiver> { self.flashblock_sender.subscribe() } } @@ -183,6 +186,7 @@ struct StateProcessor { pending_blocks: Arc>, metrics: Metrics, client: Client, + sender: Sender>, } impl StateProcessor @@ -197,12 +201,14 @@ where client: Client, pending_blocks: Arc>, rx: Arc>>, + sender: Sender>, ) -> Self { Self { metrics: Metrics::default(), pending_blocks, client, rx, + sender, } } @@ -233,7 +239,11 @@ where ); match self.process_flashblock(prev_pending_blocks, &flashblock) { Ok(new_pending_blocks) => { - self.pending_blocks.swap(new_pending_blocks); + if new_pending_blocks.is_some() { + _ = self.sender.send(new_pending_blocks.clone().unwrap()) + } + + self.pending_blocks.swap(new_pending_blocks.clone()); self.metrics .block_processing_duration .record(start_time.elapsed());