diff --git a/src/block_range_scanner.rs b/src/block_range_scanner.rs index 4f4e875..6cd9f24 100644 --- a/src/block_range_scanner.rs +++ b/src/block_range_scanner.rs @@ -64,24 +64,31 @@ use tokio::{ sync::{mpsc, oneshot}, try_join, }; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; +use tokio_stream::wrappers::ReceiverStream; use crate::{ ScannerMessage, + block_range_scanner::sync_handler::SyncHandler, error::ScannerError, robust_provider::{Error as RobustProviderError, IntoRobustProvider, RobustProvider}, types::{Notification, TryStream}, }; use alloy::{ consensus::BlockHeader, - eips::{BlockId, BlockNumberOrTag}, + eips::BlockId, network::{BlockResponse, Network, primitives::HeaderResponse}, - primitives::{B256, BlockNumber}, - pubsub::Subscription, + primitives::BlockNumber, transports::{RpcError, TransportErrorKind}, }; use tracing::{debug, error, info, warn}; +mod common; +mod reorg_handler; +mod ring_buffer; +mod sync_handler; + +use reorg_handler::ReorgHandler; + pub const DEFAULT_MAX_BLOCK_RANGE: u64 = 1000; // copied form https://github.com/taikoxyz/taiko-mono/blob/f4b3a0e830e42e2fee54829326389709dd422098/packages/taiko-client/pkg/chain_iterator/block_batch_iterator.go#L19 pub const DEFAULT_BLOCK_CONFIRMATIONS: u64 = 0; @@ -141,6 +148,7 @@ impl BlockRangeScanner { Self { max_block_range: DEFAULT_MAX_BLOCK_RANGE } } + /// Sets the maximum block range per RPC call for the scanner. #[must_use] pub fn max_block_range(mut self, max_block_range: u64) -> Self { self.max_block_range = max_block_range; @@ -216,6 +224,7 @@ pub enum Command { struct Service { provider: RobustProvider, + reorg_handler: ReorgHandler, max_block_range: u64, error_count: u64, command_receiver: mpsc::Receiver, @@ -225,9 +234,11 @@ struct Service { impl Service { pub fn new(provider: RobustProvider, max_block_range: u64) -> (Self, mpsc::Sender) { let (cmd_tx, cmd_rx) = mpsc::channel(100); + let reorg_handler = ReorgHandler::new(provider.clone()); let service = Self { provider, + reorg_handler, max_block_range, error_count: 0, command_receiver: cmd_rx, @@ -292,6 +303,8 @@ impl Service { ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let latest = self.provider.get_block_number().await?; + let provider = self.provider.clone(); + let mut reorg_handler = self.reorg_handler.clone(); // the next block returned by the underlying subscription will always be `latest + 1`, // because `latest` was already mined and subscription by definition only streams after new @@ -303,12 +316,15 @@ impl Service { info!("WebSocket connected for live blocks"); tokio::spawn(async move { - Self::stream_live_blocks( + common::stream_live_blocks( range_start, subscription, - sender, + &sender, + &provider, block_confirmations, max_block_range, + &mut reorg_handler, + false, // (notification unnecessary) ) .await; }); @@ -323,6 +339,7 @@ impl Service { sender: mpsc::Sender, ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; + let provider = self.provider.clone(); let (start_block, end_block) = tokio::try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id))?; @@ -337,12 +354,17 @@ impl Service { info!(start_block = start_block_num, end_block = end_block_num, "Syncing historical data"); + let mut reorg_handler = self.reorg_handler.clone(); + tokio::spawn(async move { - Self::stream_historical_blocks( + common::stream_historical_blocks( + start_block_num, start_block_num, end_block_num, max_block_range, &sender, + &provider, + &mut reorg_handler, ) .await; }); @@ -351,106 +373,19 @@ impl Service { } async fn handle_sync( - &mut self, + &self, start_id: BlockId, block_confirmations: u64, sender: mpsc::Sender, ) -> Result<(), ScannerError> { - let provider = self.provider.clone(); - let max_block_range = self.max_block_range; - - let get_start_block = async || -> Result { - let block = match start_id { - BlockId::Number(BlockNumberOrTag::Number(num)) => num, - _ => provider.get_block(start_id).await?.header().number(), - }; - Ok(block) - }; - - let get_confirmed_tip = async || -> Result { - let confirmed_block = provider.get_latest_confirmed(block_confirmations).await?; - Ok(confirmed_block) - }; - - // Step 1: - // Fetches the starting block and confirmed tip for historical sync in parallel - let (start_block, confirmed_tip) = - tokio::try_join!(get_start_block(), get_confirmed_tip())?; - - let subscription = self.provider.subscribe_blocks().await?; - info!("Buffering live blocks"); - - // If start is beyond confirmed tip, skip historical and go straight to live - if start_block > confirmed_tip { - info!( - start_block = start_block, - confirmed_tip = confirmed_tip, - "Start block is beyond confirmed tip, starting live stream" - ); - - tokio::spawn(async move { - Self::stream_live_blocks( - start_block, - subscription, - sender, - block_confirmations, - max_block_range, - ) - .await; - }); - - return Ok(()); - } - - info!(start_block = start_block, end_block = confirmed_tip, "Syncing historical data"); - - // Step 2: Setup the live streaming buffer - // This channel will accumulate while historical sync is running - let (live_block_buffer_sender, live_block_buffer_receiver) = - mpsc::channel::(MAX_BUFFERED_MESSAGES); - - // The cutoff is the last block we have synced historically - // Any block > cutoff will come from the live stream - let cutoff = confirmed_tip; - - // This task runs independently, accumulating new blocks while wehistorical data is syncing - tokio::spawn(async move { - Self::stream_live_blocks( - cutoff + 1, - subscription, - live_block_buffer_sender, - block_confirmations, - max_block_range, - ) - .await; - }); - - tokio::spawn(async move { - // Step 4: Perform historical synchronization - // This processes blocks from start_block to end_block (cutoff) - // If this fails, we need to abort the live streaming task - Self::stream_historical_blocks(start_block, confirmed_tip, max_block_range, &sender) - .await; - - info!("Chain tip reached, switching to live"); - - if !sender.try_stream(Notification::SwitchingToLive).await { - return; - } - - info!("Successfully transitioned from historical to live data"); - - // Step 5: - // Spawn the buffer processor task - // This will: - // 1. Process all buffered blocks, filtering out any ≤ cutoff - // 2. Forward blocks > cutoff to the user - // 3. Continue forwarding until the buffer if exhausted (waits for new blocks from live - // stream) - Self::process_live_block_buffer(live_block_buffer_receiver, sender, cutoff).await; - }); - - Ok(()) + let sync_handler = SyncHandler::new( + self.provider.clone(), + self.max_block_range, + start_id, + block_confirmations, + sender, + ); + sync_handler.run().await } async fn handle_rewind( @@ -461,6 +396,7 @@ impl Service { ) -> Result<(), ScannerError> { let max_block_range = self.max_block_range; let provider = self.provider.clone(); + let mut reorg_handler = self.reorg_handler.clone(); let (start_block, end_block) = try_join!(self.provider.get_block(start_id), self.provider.get_block(end_id),)?; @@ -472,7 +408,8 @@ impl Service { }; tokio::spawn(async move { - Self::stream_rewind(from, to, max_block_range, &sender, &provider).await; + Self::stream_rewind(from, to, max_block_range, &sender, &provider, &mut reorg_handler) + .await; }); Ok(()) @@ -491,13 +428,14 @@ impl Service { max_block_range: u64, sender: &mpsc::Sender, provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, ) { let mut batch_count = 0; // for checking whether reorg occurred - let mut tip_hash = from.header().hash(); + let mut tip = from; - let from = from.header().number(); + let from = tip.header().number(); let to = to.header().number(); // we're iterating in reverse @@ -522,10 +460,10 @@ impl Service { break; } - let reorged = match reorg_detected(provider, tip_hash).await { - Ok(detected) => { - info!(block_number = %from, hash = %tip_hash, "Reorg detected"); - detected + let reorged_opt = match reorg_handler.check(&tip).await { + Ok(opt) => { + info!(block_number = %from, hash = %tip.header().hash(), "Reorg detected"); + opt } Err(e) => { error!(error = %e, "Terminal RPC call error, shutting down"); @@ -534,8 +472,9 @@ impl Service { } }; - if reorged { - info!(block_number = %from, hash = %tip_hash, "Reorg detected"); + // for now we only care if a reorg occurred, not which block it was + if reorged_opt.is_some() { + info!(block_number = %from, hash = %tip.header().hash(), "Reorg detected"); if !sender.try_stream(Notification::ReorgDetected).await { break; @@ -544,8 +483,8 @@ impl Service { // restart rewind batch_from = from; // store the updated end block hash - tip_hash = match provider.get_block_by_number(from.into()).await { - Ok(block) => block.header().hash(), + tip = match provider.get_block_by_number(from.into()).await { + Ok(block) => block, Err(RobustProviderError::BlockNotFound(_)) => { panic!("Block with number '{from}' should exist post-reorg"); } @@ -564,143 +503,6 @@ impl Service { info!(batch_count = batch_count, "Rewind completed"); } - - async fn stream_historical_blocks( - start: BlockNumber, - end: BlockNumber, - max_block_range: u64, - sender: &mpsc::Sender, - ) { - let mut batch_count = 0; - - let mut next_start_block = start; - - // must be <= to include the edge case when start == end (i.e. return the single block - // range) - while next_start_block <= end { - let batch_end_block_number = - next_start_block.saturating_add(max_block_range - 1).min(end); - - if !sender.try_stream(next_start_block..=batch_end_block_number).await { - break; - } - - batch_count += 1; - if batch_count % 10 == 0 { - debug!(batch_count = batch_count, "Processed historical batches"); - } - - if batch_end_block_number == end { - break; - } - - // Next block number always exists as we checked end block previously - let next_start_block_number = batch_end_block_number.saturating_add(1); - - next_start_block = next_start_block_number; - } - - info!(batch_count = batch_count, "Historical sync completed"); - } - - async fn stream_live_blocks( - mut range_start: BlockNumber, - subscription: Subscription, - sender: mpsc::Sender, - block_confirmations: u64, - max_block_range: u64, - ) { - // ensure we start streaming only after the expected_next_block cutoff - let cutoff = range_start; - let mut stream = subscription.into_stream().skip_while(|header| header.number() < cutoff); - - while let Some(incoming_block) = stream.next().await { - let incoming_block_num = incoming_block.number(); - info!(block_number = incoming_block_num, "Received block header"); - - if incoming_block_num < range_start { - warn!("Reorg detected: sending forked range"); - if !sender.try_stream(Notification::ReorgDetected).await { - return; - } - - // Calculate the confirmed block position for the incoming block - let incoming_confirmed = incoming_block_num.saturating_sub(block_confirmations); - - // updated expected block to updated confirmed - range_start = incoming_confirmed; - } - - let confirmed = incoming_block_num.saturating_sub(block_confirmations); - if confirmed >= range_start { - // NOTE: Edge case when difference between range end and range start >= max - // reads - let range_end = confirmed.min(range_start.saturating_add(max_block_range - 1)); - - info!(range_start = range_start, range_end = range_end, "Sending live block range"); - - if !sender.try_stream(range_start..=range_end).await { - return; - } - - // Overflow can not realistically happen - range_start = range_end + 1; - } - } - } - - async fn process_live_block_buffer( - mut buffer_rx: mpsc::Receiver, - sender: mpsc::Sender, - cutoff: BlockNumber, - ) { - let mut processed = 0; - let mut discarded = 0; - - // Process all buffered messages - while let Some(data) = buffer_rx.recv().await { - match data { - Message::Data(range) => { - let (start, end) = (*range.start(), *range.end()); - if start >= cutoff { - if !sender.try_stream(range).await { - break; - } - processed += end - start; - } else if end >= cutoff { - discarded += cutoff - start; - - let start = cutoff; - if !sender.try_stream(start..=end).await { - break; - } - processed += end - start; - } else { - discarded += end - start; - } - } - other => { - // Could be error or notification - if !sender.try_stream(other).await { - break; - } - } - } - } - - info!(processed = processed, discarded = discarded, "Processed buffered messages"); - } -} - -async fn reorg_detected( - provider: &RobustProvider, - hash_to_check: B256, -) -> Result { - match provider.get_block_by_hash(hash_to_check).await { - Ok(_) => Ok(false), - Err(RobustProviderError::BlockNotFound(_)) => Ok(true), - Err(e) => Err(e.into()), - } } pub struct BlockRangeScannerClient { @@ -847,11 +649,7 @@ impl BlockRangeScannerClient { #[cfg(test)] mod tests { use super::*; - use crate::{assert_closed, assert_next}; - use alloy::{ - eips::{BlockId, BlockNumberOrTag}, - network::Ethereum, - }; + use alloy::eips::{BlockId, BlockNumberOrTag}; use tokio::sync::mpsc; #[test] @@ -870,88 +668,6 @@ mod tests { assert_eq!(scanner.max_block_range, max_block_range); } - #[tokio::test] - async fn buffered_messages_after_cutoff_are_all_passed() { - let cutoff = 50; - let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(51..=55)).await.unwrap(); - buffer_tx.send(Message::Data(56..=60)).await.unwrap(); - buffer_tx.send(Message::Data(61..=70)).await.unwrap(); - drop(buffer_tx); - - let (out_tx, out_rx) = mpsc::channel(8); - Service::::process_live_block_buffer(buffer_rx, out_tx, cutoff).await; - - let mut stream = ReceiverStream::new(out_rx); - - assert_next!(stream, 51..=55); - assert_next!(stream, 56..=60); - assert_next!(stream, 61..=70); - assert_closed!(stream); - } - - #[tokio::test] - async fn ranges_entirely_before_cutoff_are_discarded() { - let cutoff = 100; - - let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(40..=50)).await.unwrap(); - buffer_tx.send(Message::Data(51..=60)).await.unwrap(); - buffer_tx.send(Message::Data(61..=70)).await.unwrap(); - drop(buffer_tx); - - let (out_tx, out_rx) = mpsc::channel(8); - Service::::process_live_block_buffer(buffer_rx, out_tx, cutoff).await; - - let mut stream = ReceiverStream::new(out_rx); - - assert_closed!(stream); - } - - #[tokio::test] - async fn ranges_overlapping_cutoff_are_trimmed() { - let cutoff = 75; - - let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(60..=70)).await.unwrap(); - buffer_tx.send(Message::Data(71..=80)).await.unwrap(); - buffer_tx.send(Message::Data(81..=86)).await.unwrap(); - drop(buffer_tx); - - let (out_tx, out_rx) = mpsc::channel(8); - Service::::process_live_block_buffer(buffer_rx, out_tx, cutoff).await; - - let mut stream = ReceiverStream::new(out_rx); - - assert_next!(stream, 75..=80); - assert_next!(stream, 81..=86); - assert_closed!(stream); - } - - #[tokio::test] - async fn edge_case_range_exactly_at_cutoff() { - let cutoff = 100; - - let (buffer_tx, buffer_rx) = mpsc::channel(8); - buffer_tx.send(Message::Data(98..=98)).await.unwrap(); // Just before: discard - buffer_tx.send(Message::Data(99..=100)).await.unwrap(); // Includes cutoff: trim to 100..=100 - buffer_tx.send(Message::Data(100..=100)).await.unwrap(); // Exactly at: forward - buffer_tx.send(Message::Data(100..=101)).await.unwrap(); // Starts at cutoff: forward - buffer_tx.send(Message::Data(102..=102)).await.unwrap(); // After cutoff: forward - drop(buffer_tx); - - let (out_tx, out_rx) = mpsc::channel(8); - Service::::process_live_block_buffer(buffer_rx, out_tx, cutoff).await; - - let mut stream = ReceiverStream::new(out_rx); - - assert_next!(stream, 100..=100); - assert_next!(stream, 100..=100); - assert_next!(stream, 100..=101); - assert_next!(stream, 102..=102); - assert_closed!(stream); - } - #[tokio::test] async fn try_send_forwards_errors_to_subscribers() { let (tx, mut rx) = mpsc::channel::(1); diff --git a/src/block_range_scanner/common.rs b/src/block_range_scanner/common.rs new file mode 100644 index 0000000..16a1a9e --- /dev/null +++ b/src/block_range_scanner/common.rs @@ -0,0 +1,334 @@ +use tokio::sync::mpsc; +use tokio_stream::StreamExt; + +use crate::{ + block_range_scanner::{Message, reorg_handler::ReorgHandler}, + robust_provider::RobustProvider, + types::{Notification, TryStream}, +}; +use alloy::{ + consensus::BlockHeader, + network::{BlockResponse, Network}, + primitives::BlockNumber, + pubsub::Subscription, +}; +use tracing::{debug, error, info, warn}; + +#[allow(clippy::too_many_arguments)] +pub(crate) async fn stream_live_blocks( + stream_start: BlockNumber, + subscription: Subscription, + sender: &mpsc::Sender, + provider: &RobustProvider, + block_confirmations: u64, + max_block_range: u64, + reorg_handler: &mut ReorgHandler, + notify_after_first_block: bool, +) { + // Phase 1: Wait for first relevant block + let mut stream = skip_to_relevant_blocks::(subscription, stream_start, block_confirmations); + + let Some(first_block) = stream.next().await else { + warn!("Subscription channel closed before receiving any blocks"); + return; + }; + + if notify_after_first_block && !sender.try_stream(Notification::StartingLiveStream).await { + return; + } + + // Phase 2: Initialize streaming state with first block + let Some(mut state) = initialize_live_streaming_state( + first_block, + stream_start, + block_confirmations, + max_block_range, + sender, + provider, + reorg_handler, + ) + .await + else { + return; + }; + + // Phase 3: Continuously stream blocks with reorg handling + stream_blocks_continuously( + &mut stream, + &mut state, + stream_start, + block_confirmations, + max_block_range, + sender, + provider, + reorg_handler, + ) + .await; + + warn!("Live block subscription ended"); +} + +/// Skips blocks until we reach the first block that's relevant for streaming +fn skip_to_relevant_blocks( + subscription: Subscription, + stream_start: BlockNumber, + block_confirmations: u64, +) -> impl tokio_stream::Stream { + subscription.into_stream().skip_while(move |header| { + header.number().saturating_sub(block_confirmations) < stream_start + }) +} + +/// Initializes the streaming state after receiving the first block +/// Returns None if the channel is closed +async fn initialize_live_streaming_state( + first_block: N::HeaderResponse, + stream_start: BlockNumber, + block_confirmations: u64, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, +) -> Option> { + let incoming_block_num = first_block.number(); + info!(block_number = incoming_block_num, "Received first block header"); + + let confirmed = incoming_block_num.saturating_sub(block_confirmations); + + // Catch up on any confirmed blocks between stream_start and the confirmed tip + let previous_batch_end = stream_historical_blocks( + stream_start, + stream_start, + confirmed, + max_block_range, + sender, + provider, + reorg_handler, + ) + .await?; + + Some(LiveStreamingState { + batch_start: stream_start, + previous_batch_end: Some(previous_batch_end), + }) +} + +/// Continuously streams blocks, handling reorgs as they occur +#[allow(clippy::too_many_arguments)] +async fn stream_blocks_continuously< + N: Network, + S: tokio_stream::Stream + Unpin, +>( + stream: &mut S, + state: &mut LiveStreamingState, + stream_start: BlockNumber, + block_confirmations: u64, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, +) { + while let Some(incoming_block) = stream.next().await { + let incoming_block_num = incoming_block.number(); + info!(block_number = incoming_block_num, "Received block header"); + + // Check for reorgs and update state accordingly + let Some(common_ancestor) = + check_for_reorg(state.previous_batch_end.as_ref(), reorg_handler, sender).await + else { + return; + }; + + if let Some(common_ancestor) = common_ancestor { + if !handle_reorg_detected(common_ancestor, stream_start, state, sender).await { + return; // Channel closed + } + } else { + // No reorg: advance batch_start to after the previous batch + advance_batch_start_after_previous_end(state); + } + + // Stream the next batch of confirmed blocks + let batch_end_num = incoming_block_num.saturating_sub(block_confirmations); + if !stream_next_batch( + batch_end_num, + state, + stream_start, + max_block_range, + sender, + provider, + reorg_handler, + ) + .await + { + return; // Channel closed + } + } +} + +/// Checks if a reorg occurred by verifying the previous batch end block. +/// Returns `None` if the channel is closed. +async fn check_for_reorg( + previous_batch_end: Option<&N::BlockResponse>, + reorg_handler: &mut ReorgHandler, + sender: &mpsc::Sender, +) -> Option> { + let batch_end = previous_batch_end?; + + match reorg_handler.check(batch_end).await { + Ok(reorg_opt) => Some(reorg_opt), + Err(e) => { + error!(error = %e, "Failed to perform reorg check"); + _ = sender.try_stream(e).await; + None + } + } +} + +/// Handles a detected reorg by notifying and adjusting the streaming state +/// Returns false if the channel is closed +async fn handle_reorg_detected( + common_ancestor: N::BlockResponse, + stream_start: BlockNumber, + state: &mut LiveStreamingState, + sender: &mpsc::Sender, +) -> bool { + if !sender.try_stream(Notification::ReorgDetected).await { + return false; + } + + let ancestor_num = common_ancestor.header().number(); + + // Reset streaming position based on common ancestor + if ancestor_num < stream_start { + // Reorg went before our starting point - restart from stream_start + info!( + ancestor_block = ancestor_num, + stream_start = stream_start, + "Reorg detected before stream start, resetting to stream start" + ); + state.batch_start = stream_start; + state.previous_batch_end = None; + } else { + // Resume from after the common ancestor + info!(ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor"); + state.batch_start = ancestor_num + 1; + state.previous_batch_end = Some(common_ancestor); + } + + true +} + +/// Advances `batch_start` after processing a normal (non-reorg) block +fn advance_batch_start_after_previous_end(state: &mut LiveStreamingState) { + if let Some(prev_batch_end) = state.previous_batch_end.as_ref() { + state.batch_start = prev_batch_end.header().number() + 1; + } +} + +/// Streams the next batch of blocks up to `batch_end_num`. +/// Returns false if the channel is closed +async fn stream_next_batch( + batch_end_num: BlockNumber, + state: &mut LiveStreamingState, + stream_start: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, +) -> bool { + if batch_end_num < state.batch_start { + // No new confirmed blocks to stream yet + return true; + } + + state.previous_batch_end = stream_historical_blocks( + stream_start, + state.batch_start, + batch_end_num, + max_block_range, + sender, + provider, + reorg_handler, + ) + .await; + + if state.previous_batch_end.is_none() { + // Channel closed + return false; + } + + // SAFETY: Overflow cannot realistically happen + state.batch_start = batch_end_num + 1; + + true +} + +/// Tracks the current state of live streaming +struct LiveStreamingState { + /// The starting block number for the next batch to stream + batch_start: BlockNumber, + /// The last block from the previous batch (used for reorg detection) + previous_batch_end: Option, +} + +/// Assumes that `stream_start <= next_start_block <= end`. +pub(crate) async fn stream_historical_blocks( + stream_start: BlockNumber, + mut next_start_block: BlockNumber, + end: BlockNumber, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, +) -> Option { + let mut batch_count = 0; + + loop { + let batch_end_num = next_start_block.saturating_add(max_block_range - 1).min(end); + let batch_end = match provider.get_block_by_number(batch_end_num.into()).await { + Ok(block) => block, + Err(e) => { + error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch"); + _ = sender.try_stream(e).await; + return None; + } + }; + + if !sender.try_stream(next_start_block..=batch_end_num).await { + return Some(batch_end); + } + + batch_count += 1; + if batch_count % 10 == 0 { + debug!(batch_count = batch_count, "Processed historical batches"); + } + + let reorged_opt = match reorg_handler.check(&batch_end).await { + Ok(opt) => opt, + Err(e) => { + error!(error = %e, "Failed to perform reorg check"); + _ = sender.try_stream(e).await; + return None; + } + }; + + next_start_block = if let Some(common_ancestor) = reorged_opt { + if !sender.try_stream(Notification::ReorgDetected).await { + return None; + } + if common_ancestor.header().number() < stream_start { + stream_start + } else { + common_ancestor.header().number() + 1 + } + } else { + batch_end_num.saturating_add(1) + }; + + if next_start_block > end { + info!(batch_count = batch_count, "Historical sync completed"); + return Some(batch_end); + } + } +} diff --git a/src/block_range_scanner/reorg_handler.rs b/src/block_range_scanner/reorg_handler.rs new file mode 100644 index 0000000..243a3f2 --- /dev/null +++ b/src/block_range_scanner/reorg_handler.rs @@ -0,0 +1,99 @@ +use alloy::{ + consensus::BlockHeader, + eips::BlockNumberOrTag, + network::{BlockResponse, Ethereum, Network, primitives::HeaderResponse}, + primitives::BlockHash, +}; +use tracing::{info, warn}; + +use crate::{ + ScannerError, + robust_provider::{self, RobustProvider}, +}; + +use super::ring_buffer::RingBuffer; + +#[derive(Clone)] +pub(crate) struct ReorgHandler { + provider: RobustProvider, + buffer: RingBuffer, +} + +impl ReorgHandler { + pub fn new(provider: RobustProvider) -> Self { + Self { provider, buffer: RingBuffer::new(10) } + } + + pub async fn check( + &mut self, + block: &N::BlockResponse, + ) -> Result, ScannerError> { + let block = block.header(); + info!(block_hash = %block.hash(), block_number = block.number(), "Checking if block was reorged"); + if !self.reorg_detected(block).await? { + let block_hash = block.hash(); + info!(block_hash = %block_hash, block_number = block.number(), "No reorg detected"); + // store the incoming block's hash for future reference + if !matches!(self.buffer.back(), Some(&hash) if hash == block_hash) { + self.buffer.push(block_hash); + } + return Ok(None); + } + + info!("Reorg detected, searching for common ancestor"); + + while let Some(&block_hash) = self.buffer.back() { + info!(block_hash = %block_hash, "Checking if block exists on-chain"); + match self.provider.get_block_by_hash(block_hash).await { + Ok(common_ancestor) => { + let common_ancestor_header = common_ancestor.header(); + + let finalized = + self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + let finalized_header = finalized.header(); + + let common_ancestor = if finalized_header.number() <= + common_ancestor_header.number() + { + info!(common_ancestor = %common_ancestor_header.hash(), block_number = common_ancestor_header.number(), "Common ancestor found"); + common_ancestor + } else { + warn!( + finalized_hash = %finalized_header.hash(), block_number = finalized_header.number(), "Possible deep reorg detected, using finalized block as common ancestor" + ); + // all buffered blocks are finalized, so no more need to track them + self.buffer.clear(); + finalized + }; + + return Ok(Some(common_ancestor)); + } + Err(robust_provider::Error::BlockNotFound(_)) => { + // block was reorged + _ = self.buffer.pop_back(); + } + Err(e) => return Err(e.into()), + } + } + + warn!("Possible deep reorg detected, setting finalized block as common ancestor"); + + let finalized = self.provider.get_block_by_number(BlockNumberOrTag::Finalized).await?; + + // no need to store finalized block's hash in the buffer, as it is returned by default only + // if not buffered hashes exist on-chain + + let header = finalized.header(); + info!(finalized_hash = %header.hash(), block_number = header.number(), "Finalized block set as common ancestor"); + + Ok(Some(finalized)) + } + + async fn reorg_detected(&self, block: &N::HeaderResponse) -> Result { + match self.provider.get_block_by_hash(block.hash()).await { + Ok(_) => Ok(false), + Err(robust_provider::Error::BlockNotFound(_)) => Ok(true), + Err(e) => Err(e.into()), + } + } +} diff --git a/src/block_range_scanner/ring_buffer.rs b/src/block_range_scanner/ring_buffer.rs new file mode 100644 index 0000000..f40d6e4 --- /dev/null +++ b/src/block_range_scanner/ring_buffer.rs @@ -0,0 +1,35 @@ +use std::collections::VecDeque; + +#[derive(Clone)] +pub(crate) struct RingBuffer { + inner: VecDeque, + capacity: usize, +} + +impl RingBuffer { + /// Creates an empty [`RingBuffer`] with a specific capacity. + pub fn new(capacity: usize) -> Self { + Self { inner: VecDeque::with_capacity(capacity), capacity } + } + + /// Adds a new element to the buffer. If the buffer is full, + /// the oldest element is removed to make space. + pub fn push(&mut self, item: T) { + if self.inner.len() == self.capacity { + self.inner.pop_front(); // Remove the oldest element + } + self.inner.push_back(item); // Add the new element + } + + pub fn pop_back(&mut self) -> Option { + self.inner.pop_back() + } + + pub fn back(&self) -> Option<&T> { + self.inner.back() + } + + pub fn clear(&mut self) { + self.inner.clear(); + } +} diff --git a/src/block_range_scanner/sync_handler.rs b/src/block_range_scanner/sync_handler.rs new file mode 100644 index 0000000..74ee315 --- /dev/null +++ b/src/block_range_scanner/sync_handler.rs @@ -0,0 +1,218 @@ +use alloy::{eips::BlockId, network::Network, primitives::BlockNumber}; +use tokio::sync::mpsc; +use tracing::{error, info}; + +use crate::{ + Notification, ScannerError, + block_range_scanner::{Message, common, reorg_handler::ReorgHandler}, + robust_provider::RobustProvider, + types::TryStream, +}; + +/// Represents the initial state when starting a sync operation +enum SyncState { + /// Start block is already at or beyond the confirmed tip - go straight to live + AlreadyLive { start_block: BlockNumber }, + /// Start block is behind - need to catch up first, then go live + NeedsCatchup { start_block: BlockNumber, confirmed_tip: BlockNumber }, +} + +pub(crate) struct SyncHandler { + provider: RobustProvider, + max_block_range: u64, + start_id: BlockId, + block_confirmations: u64, + sender: mpsc::Sender, + reorg_handler: ReorgHandler, +} + +impl SyncHandler { + pub fn new( + provider: RobustProvider, + max_block_range: u64, + start_id: BlockId, + block_confirmations: u64, + sender: mpsc::Sender, + ) -> Self { + let reorg_handler = ReorgHandler::new(provider.clone()); + Self { provider, max_block_range, start_id, block_confirmations, sender, reorg_handler } + } + + pub async fn run(mut self) -> Result<(), ScannerError> { + let sync_state = self.determine_sync_state().await?; + + match sync_state { + SyncState::AlreadyLive { start_block } => { + info!( + start_block = start_block, + "Start block is beyond confirmed tip, waiting until starting block is confirmed before starting live stream" + ); + self.spawn_live_only(start_block).await?; + } + SyncState::NeedsCatchup { start_block, confirmed_tip } => { + info!( + start_block = start_block, + confirmed_tip = confirmed_tip, + "Start block is behind confirmed tip, catching up then transitioning to live" + ); + self.spawn_catchup_then_live(start_block, confirmed_tip); + } + } + + Ok(()) + } + + /// Determines whether we need to catch up or can start live immediately + async fn determine_sync_state(&self) -> Result { + let (start_block, confirmed_tip) = tokio::try_join!( + self.provider.get_block_number_by_id(self.start_id), + self.provider.get_latest_confirmed(self.block_confirmations) + )?; + + if start_block > confirmed_tip { + Ok(SyncState::AlreadyLive { start_block }) + } else { + Ok(SyncState::NeedsCatchup { start_block, confirmed_tip }) + } + } + + /// Spawns a task that only streams live blocks (no historical catchup needed) + async fn spawn_live_only(&mut self, start_block: BlockNumber) -> Result<(), ScannerError> { + let max_block_range = self.max_block_range; + let block_confirmations = self.block_confirmations; + let provider = self.provider.clone(); + let sender = self.sender.clone(); + let mut reorg_handler = self.reorg_handler.clone(); + + let subscription = provider.subscribe_blocks().await?; + + tokio::spawn(async move { + common::stream_live_blocks( + start_block, + subscription, + &sender, + &provider, + block_confirmations, + max_block_range, + &mut reorg_handler, + true, + ) + .await; + }); + + Ok(()) + } + + /// Spawns a task that catches up on historical blocks, then transitions to live streaming + fn spawn_catchup_then_live(&self, start_block: BlockNumber, confirmed_tip: BlockNumber) { + let max_block_range = self.max_block_range; + let block_confirmations = self.block_confirmations; + let provider = self.provider.clone(); + let mut reorg_handler = self.reorg_handler.clone(); + let sender = self.sender.clone(); + + tokio::spawn(async move { + // Phase 1: Catch up on any blocks that have been minted during the historical sync + let start_block = match Self::catchup_historical_blocks( + start_block, + confirmed_tip, + block_confirmations, + max_block_range, + &sender, + &provider, + &mut reorg_handler, + ) + .await + { + Ok(start_block) => start_block, + Err(e) => { + error!(error = %e, "Error during historical catchup, shutting down"); + _ = sender.try_stream(e).await; + return; + } + }; + + // Phase 2: Transition to live streaming + Self::transition_to_live( + start_block, + block_confirmations, + max_block_range, + &sender, + &provider, + &mut reorg_handler, + ) + .await; + }); + } + + /// Catches up on historical blocks until we reach the chain tip + /// Returns the block number where live streaming should begin + async fn catchup_historical_blocks( + mut start_block: BlockNumber, + mut confirmed_tip: BlockNumber, + block_confirmations: u64, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, + ) -> Result { + while start_block < confirmed_tip { + common::stream_historical_blocks( + start_block, + start_block, + confirmed_tip, + max_block_range, + sender, + provider, + reorg_handler, + ) + .await; + + let latest = provider.get_block_number().await?; + + start_block = confirmed_tip + 1; + confirmed_tip = latest.saturating_sub(block_confirmations); + } + + info!("Historical catchup complete, ready to transition to live"); + + Ok(start_block) + } + + /// Subscribes to live blocks and begins streaming + async fn transition_to_live( + start_block: BlockNumber, + block_confirmations: u64, + max_block_range: u64, + sender: &mpsc::Sender, + provider: &RobustProvider, + reorg_handler: &mut ReorgHandler, + ) { + let subscription = match provider.subscribe_blocks().await { + Ok(sub) => sub, + Err(e) => { + error!(error = %e, "Error subscribing to live blocks, shutting down"); + _ = sender.try_stream(e).await; + return; + } + }; + + if !sender.try_stream(Notification::StartingLiveStream).await { + return; + } + + info!("Successfully transitioned from historical to live streaming"); + + common::stream_live_blocks( + start_block, + subscription, + sender, + provider, + block_confirmations, + max_block_range, + reorg_handler, + false, // (already notified above) + ) + .await; + } +} diff --git a/src/event_scanner/scanner/common.rs b/src/event_scanner/scanner/common.rs index af736a7..9c446a1 100644 --- a/src/event_scanner/scanner/common.rs +++ b/src/event_scanner/scanner/common.rs @@ -148,12 +148,13 @@ pub fn spawn_log_consumers( } if let ConsumerMode::CollectLatest { .. } = mode { - if !collected.is_empty() { + if collected.is_empty() { + info!("No latest logs collected"); + } else { + info!("Sending collected logs to consumer"); collected.reverse(); // restore chronological order + _ = sender.try_stream(collected).await; } - - info!("Sending collected logs to consumer"); - _ = sender.try_stream(collected).await; } }); diff --git a/src/event_scanner/scanner/sync/from_latest.rs b/src/event_scanner/scanner/sync/from_latest.rs index 7be468d..6db88d7 100644 --- a/src/event_scanner/scanner/sync/from_latest.rs +++ b/src/event_scanner/scanner/sync/from_latest.rs @@ -4,13 +4,10 @@ use alloy::{ network::{BlockResponse, Network}, }; -use tokio::sync::mpsc; -use tokio_stream::{StreamExt, wrappers::ReceiverStream}; -use tracing::info; +use tracing::{error, info}; use crate::{ - EventScannerBuilder, Notification, ScannerError, - block_range_scanner::Message as BlockRangeMessage, + EventScannerBuilder, ScannerError, event_scanner::{ EventScanner, scanner::{ @@ -19,6 +16,7 @@ use crate::{ }, }, robust_provider::IntoRobustProvider, + types::TryStream, }; impl EventScannerBuilder { @@ -80,11 +78,6 @@ impl EventScanner { // Setup rewind and live streams to run in parallel. let rewind_stream = client.rewind(BlockNumberOrTag::Earliest, latest_block).await?; - // We actually rely on the sync mode for the live stream, to - // ensure that we don't miss any events in case a new block was minted while - // we were setting up the streams or a reorg happens. - let sync_stream = - client.stream_from(latest_block + 1, self.config.block_confirmations).await?; // Start streaming... tokio::spawn(async move { @@ -100,20 +93,20 @@ impl EventScanner { ) .await; - // Notify the client that we're now streaming live. - info!("Switching to live stream"); - - // Use a one-off channel for the notification. - let (tx, rx) = mpsc::channel::(1); - let stream = ReceiverStream::new(rx); - tx.send(BlockRangeMessage::Notification(Notification::SwitchingToLive)) - .await - .expect("receiver exists"); - - // close the channel to stop the stream - drop(tx); - - let sync_stream = stream.chain(sync_stream); + // We actually rely on the sync mode for the live stream, as more blocks could have been + // minted while the scanner was collecting the latest `count` events. + // Note: Sync mode will notify the client when it switches to live streaming. + let sync_stream = + match client.stream_from(latest_block + 1, self.config.block_confirmations).await { + Ok(stream) => stream, + Err(e) => { + error!(error = %e, "Error during sync mode setup"); + for listener in listeners { + _ = listener.sender.try_stream(e.clone()).await; + } + return; + } + }; // Start the live (sync) stream. handle_stream(sync_stream, &provider, &listeners, ConsumerMode::Stream).await; diff --git a/src/event_scanner/scanner/sync/mod.rs b/src/event_scanner/scanner/sync/mod.rs index 889a58e..a33fa83 100644 --- a/src/event_scanner/scanner/sync/mod.rs +++ b/src/event_scanner/scanner/sync/mod.rs @@ -16,7 +16,7 @@ impl EventScannerBuilder { /// /// 1. **Latest events phase**: Collects up to `count` most recent events by scanning backwards /// from the current chain tip. Events are delivered in chronological order. - /// 2. **Automatic transition**: Emits [`Notification::SwitchingToLive`][switch_to_live] to + /// 2. **Automatic transition**: Emits [`Notification::StartingLiveStream`][switch_to_live] to /// signal the mode change /// 3. **Live streaming phase**: Continuously monitors and streams new events as they arrive /// on-chain @@ -50,7 +50,7 @@ impl EventScannerBuilder { /// } /// Message::Notification(notification) => { /// println!("Notification received: {:?}", notification); - /// // You'll see Notification::SwitchingToLive when transitioning + /// // You'll see Notification::StartingLiveStream when transitioning /// } /// Message::Error(e) => { /// eprintln!("Error: {}", e); @@ -101,7 +101,7 @@ impl EventScannerBuilder { /// [subscribe]: crate::EventScanner::subscribe /// [start]: crate::event_scanner::EventScanner::start /// [reorg]: crate::types::Notification::ReorgDetected - /// [switch_to_live]: crate::types::Notification::SwitchingToLive + /// [switch_to_live]: crate::types::Notification::StartingLiveStream #[must_use] pub fn from_latest(self, count: usize) -> EventScannerBuilder { EventScannerBuilder::::new(count) @@ -114,7 +114,7 @@ impl EventScannerBuilder { /// /// 1. **Historical sync phase**: Streams events from `from_block` up to the current confirmed /// tip - /// 2. **Automatic transition**: Emits [`Notification::SwitchingToLive`][switch_to_live] to + /// 2. **Automatic transition**: Emits [`Notification::StartingLiveStream`][switch_to_live] to /// signal the mode change /// 3. **Live streaming phase**: Continuously monitors and streams new events as they arrive /// on-chain @@ -148,7 +148,7 @@ impl EventScannerBuilder { /// } /// Message::Notification(notification) => { /// println!("Notification received: {:?}", notification); - /// // You'll see Notification::SwitchingToLive when transitioning + /// // You'll see Notification::StartingLiveStream when transitioning /// } /// Message::Error(e) => { /// eprintln!("Error: {}", e); @@ -208,7 +208,7 @@ impl EventScannerBuilder { /// [subscribe]: crate::EventScanner::subscribe /// [start]: crate::event_scanner::EventScanner::start /// [reorg]: crate::types::Notification::ReorgDetected - /// [switch_to_live]: crate::types::Notification::SwitchingToLive + /// [switch_to_live]: crate::types::Notification::StartingLiveStream #[must_use] pub fn from_block(self, block_id: impl Into) -> EventScannerBuilder { EventScannerBuilder::::new(block_id.into()) diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index a5cbec2..210865e 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -98,6 +98,25 @@ impl RobustProvider { result } + /// Get the block number for a given block identifier. + /// + /// # Errors + /// + /// See [retry errors](#retry-errors). + pub async fn get_block_number_by_id(&self, id: BlockId) -> Result { + info!("get_block_number_by_id called"); + let result = self + .retry_with_total_timeout( + move |provider| async move { provider.get_block_number_by_id(id).await }, + false, + ) + .await; + if let Err(e) = &result { + error!(error = %e, "get_block_number_by_id failed"); + } + result?.ok_or_else(|| Error::BlockNotFound(id)) + } + /// Fetch the latest confirmed block number with retry and timeout. /// /// This method fetches the latest block number and subtracts the specified diff --git a/src/test_utils/macros.rs b/src/test_utils/macros.rs index 78e6032..b2106f5 100644 --- a/src/test_utils/macros.rs +++ b/src/test_utils/macros.rs @@ -176,14 +176,17 @@ pub async fn assert_event_sequence + Unpin>( assert!( elapsed < timeout_duration, - "Timed out waiting for events. Still expecting: {:#?}", + "Timed out waiting for events.\nNext Expected:\n{:#?}\nRemaining:\n{:#?}", + expected, remaining.collect::>() ); let time_left = timeout_duration - elapsed; let message = tokio::time::timeout(time_left, tokio_stream::StreamExt::next(stream)) .await - .expect("timed out waiting for next batch"); + .unwrap_or_else(|_| { + panic!("timed out waiting for next stream batch, expected event: {expected:#?}") + }); match message { Some(Message::Data(batch)) => { diff --git a/src/types.rs b/src/types.rs index d657920..0a8b80f 100644 --- a/src/types.rs +++ b/src/types.rs @@ -10,9 +10,11 @@ pub enum ScannerMessage { Notification(Notification), } +// TODO: implement Display for ScannerMessage + #[derive(Copy, Debug, Clone, PartialEq)] pub enum Notification { - SwitchingToLive, + StartingLiveStream, ReorgDetected, } diff --git a/tests/block_range_scanner.rs b/tests/block_range_scanner.rs index 0d18ad6..22314fb 100644 --- a/tests/block_range_scanner.rs +++ b/tests/block_range_scanner.rs @@ -48,7 +48,33 @@ async fn live_mode_processes_all_blocks_respecting_block_confirmations() -> anyh } #[tokio::test] -async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> { +async fn live_with_block_confirmations_always_emits_genesis_block() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; + let client = BlockRangeScanner::new().connect(provider.clone()).await?.run()?; + + let mut stream = client.stream_live(3).await?; + + provider.anvil_mine(Some(1), None).await?; + assert_next!(stream, 0..=0); + let stream = assert_empty!(stream); + + provider.anvil_mine(Some(2), None).await?; + let mut stream = assert_empty!(stream); + + provider.anvil_mine(Some(5), None).await?; + assert_range_coverage!(stream, 1..=5); + let mut stream = assert_empty!(stream); + + provider.anvil_mine(Some(1), None).await?; + assert_next!(stream, 6..=6); + assert_empty!(stream); + + Ok(()) +} + +#[tokio::test] +async fn stream_from_starts_at_latest_once_it_has_enough_confirmations() -> anyhow::Result<()> { let anvil = Anvil::new().try_spawn()?; let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; @@ -64,6 +90,7 @@ async fn stream_from_latest_starts_at_tip_not_confirmed() -> anyhow::Result<()> let mut stream = assert_empty!(stream); provider.anvil_mine(Some(1), None).await?; + assert_next!(stream, Notification::StartingLiveStream); assert_next!(stream, 20..=20); let mut stream = assert_empty!(stream); @@ -128,11 +155,18 @@ async fn shallow_block_confirmation_does_not_mitigate_reorg() -> anyhow::Result< // reorg more blocks than the block_confirmation config provider.anvil_reorg(ReorgOptions { depth: 8, tx_block_pairs: vec![] }).await?; - // mint additional blocks - provider.anvil_mine(Some(3), None).await?; + + // mint 1 block to allow the scanner to process reorged blocks (previously streamed + the block + // confirmed now) + provider.anvil_mine(Some(1), None).await?; assert_next!(stream, Notification::ReorgDetected); - assert_range_coverage!(stream, 0..=10); + assert_range_coverage!(stream, 3..=8); + let mut stream = assert_empty!(stream); + + // mint additional blocks to allow the scanner to stream all of the pre-reorg blocks + provider.anvil_mine(Some(3), None).await?; + assert_range_coverage!(stream, 9..=10); assert_empty!(stream); Ok(()) diff --git a/tests/latest_events/basic.rs b/tests/latest_events/basic.rs index 20e4446..45aeeff 100644 --- a/tests/latest_events/basic.rs +++ b/tests/latest_events/basic.rs @@ -64,7 +64,7 @@ async fn fewer_available_than_count_returns_all() -> anyhow::Result<()> { } #[tokio::test] -async fn no_events_returns_empty() -> anyhow::Result<()> { +async fn no_past_events_returns_empty() -> anyhow::Result<()> { let count = 5; let setup = setup_latest_scanner(None, None, count, None, None).await?; let scanner = setup.scanner; @@ -72,9 +72,6 @@ async fn no_events_returns_empty() -> anyhow::Result<()> { scanner.start().await?; - let expected: &[TestCounter::CountIncreased] = &[]; - - assert_next!(stream, expected); assert_closed!(stream); Ok(()) diff --git a/tests/live/optional_fields.rs b/tests/live/optional_fields.rs index d8b33e6..5a9536f 100644 --- a/tests/live/optional_fields.rs +++ b/tests/live/optional_fields.rs @@ -87,7 +87,7 @@ async fn mixed_optional_and_required_filters() -> anyhow::Result<()> { scanner.start().await?; - // First increase the counter to have some balance + // First increase the contract_2 counter contract_2.increase().send().await?.watch().await?; contract_2.increase().send().await?.watch().await?; contract_2.increase().send().await?.watch().await?; diff --git a/tests/live/reorg.rs b/tests/live/reorg.rs index be70f44..36f79f4 100644 --- a/tests/live/reorg.rs +++ b/tests/live/reorg.rs @@ -22,15 +22,6 @@ async fn rescans_events_within_same_block() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; } - // reorg the chain - let tx_block_pairs = vec![ - (TransactionData::JSON(contract.increase().into_transaction_request()), 0), - (TransactionData::JSON(contract.increase().into_transaction_request()), 0), - (TransactionData::JSON(contract.increase().into_transaction_request()), 0), - ]; - - provider.primary().anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; - // assert initial events are emitted as expected assert_event_sequence!( stream, @@ -42,7 +33,15 @@ async fn rescans_events_within_same_block() -> anyhow::Result<()> { CountIncreased { newCount: U256::from(5) } ] ); - // assert expected messages post-reorg + + // reorg the chain + let tx_block_pairs = vec![ + (TransactionData::JSON(contract.increase().into_transaction_request()), 0), + (TransactionData::JSON(contract.increase().into_transaction_request()), 0), + (TransactionData::JSON(contract.increase().into_transaction_request()), 0), + ]; + provider.primary().anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; + assert_next!(stream, Notification::ReorgDetected); // assert the reorged events are emitted assert_next!( @@ -70,15 +69,6 @@ async fn rescans_events_with_ascending_blocks() -> anyhow::Result<()> { contract.increase().send().await?.watch().await?; } - // reorg the chain - let tx_block_pairs = vec![ - (TransactionData::JSON(contract.increase().into_transaction_request()), 0), - (TransactionData::JSON(contract.increase().into_transaction_request()), 1), - (TransactionData::JSON(contract.increase().into_transaction_request()), 2), - ]; - - provider.primary().anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; - // assert initial events are emitted as expected assert_event_sequence!( stream, @@ -90,7 +80,16 @@ async fn rescans_events_with_ascending_blocks() -> anyhow::Result<()> { CountIncreased { newCount: U256::from(5) } ] ); - // assert expected messages post-reorg + + // reorg the chain + let tx_block_pairs = vec![ + (TransactionData::JSON(contract.increase().into_transaction_request()), 0), + (TransactionData::JSON(contract.increase().into_transaction_request()), 1), + (TransactionData::JSON(contract.increase().into_transaction_request()), 2), + ]; + + provider.primary().anvil_reorg(ReorgOptions { depth: 4, tx_block_pairs }).await?; + assert_next!(stream, Notification::ReorgDetected); // assert the reorged events are emitted assert_event_sequence_final!( diff --git a/tests/sync/from_block.rs b/tests/sync/from_block.rs index 0aa36e5..43f6091 100644 --- a/tests/sync/from_block.rs +++ b/tests/sync/from_block.rs @@ -32,13 +32,13 @@ async fn replays_historical_then_switches_to_live() -> anyhow::Result<()> { ] ); + // chain tip reached + assert_next!(stream, Notification::StartingLiveStream); + // now emit live events contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; - // chain tip reached - assert_next!(stream, Notification::SwitchingToLive); - // live events assert_event_sequence_final!( stream, @@ -72,6 +72,9 @@ async fn sync_from_future_block_waits_until_minted() -> anyhow::Result<()> { // Act: emit an event that will be mined in block == future_start contract.increase().send().await?.watch().await?; + // only after the live event at `future_start_block` is emitted, will `StartingLiveStream` be + // streamed + assert_next!(stream, Notification::StartingLiveStream); // Assert: the first streamed message arrives and contains the expected event assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(3) }]); assert_empty!(stream); @@ -100,14 +103,13 @@ async fn block_confirmations_mitigate_reorgs() -> anyhow::Result<()> { TestCounter::CountIncreased { newCount: U256::from(2) } ] ); + assert_next!(stream, Notification::StartingLiveStream); // emit "live" events for _ in 0..2 { contract.increase().send().await?.watch().await?; } - // switching to "live" phase - assert_next!(stream, Notification::SwitchingToLive); // assert confirmed live events are streamed separately let stream = assert_event_sequence_final!( stream, diff --git a/tests/sync/from_latest.rs b/tests/sync/from_latest.rs index 622535d..e36cd77 100644 --- a/tests/sync/from_latest.rs +++ b/tests/sync/from_latest.rs @@ -1,4 +1,7 @@ +use std::time::Duration; + use alloy::{primitives::U256, providers::ext::AnvilApi}; +use tokio::time::sleep; use crate::common::{TestCounter, setup_sync_from_latest_scanner}; use event_scanner::{Notification, assert_empty, assert_event_sequence_final, assert_next}; @@ -30,13 +33,16 @@ async fn happy_path_no_duplicates() -> anyhow::Result<()> { TestCounter::CountIncreased { newCount: U256::from(6) }, ] ); - // Transition to live - assert_next!(stream, Notification::SwitchingToLive); + let mut stream = assert_empty!(stream); // Live phase: emit three more, should arrive in order without duplicating latest contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; + // Assert `StartingLiveStream` after emitting live events, because the test finishes the "latest + // events" phase before new events are emitted, thus the "live" phase actually starts from a + // future block. + assert_next!(stream, Notification::StartingLiveStream); assert_event_sequence_final!( stream, &[ @@ -69,13 +75,16 @@ async fn fewer_historical_then_continues_live() -> anyhow::Result<()> { TestCounter::CountIncreased { newCount: U256::from(2) }, ] ); - assert_next!(stream, Notification::SwitchingToLive); let mut stream = assert_empty!(stream); // Live: two more arrive contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; + // Assert `StartingLiveStream` after emitting live events, because the test finishes the "latest + // events" phase before new events are emitted, thus the "live" phase actually starts from a + // future block. + assert_next!(stream, Notification::StartingLiveStream); assert_event_sequence_final!( stream, &[ @@ -111,11 +120,18 @@ async fn exact_historical_count_then_live() -> anyhow::Result<()> { TestCounter::CountIncreased { newCount: U256::from(4) }, ] ); - assert_next!(stream, Notification::SwitchingToLive); let mut stream = assert_empty!(stream); + // give scanner time to subscribe to live events + sleep(Duration::from_millis(10)).await; + // Live continues contract.increase().send().await?.watch().await?; + + // Assert `StartingLiveStream` after emitting live events, because the test finishes the "latest + // events" phase before new events are emitted, thus the "live" phase actually starts from a + // future block. + assert_next!(stream, Notification::StartingLiveStream); assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(5) }]); assert_empty!(stream); @@ -131,16 +147,19 @@ async fn no_historical_only_live_streams() -> anyhow::Result<()> { scanner.start().await?; - // Latest is empty - let expected: &[TestCounter::CountIncreased] = &[]; - assert_next!(stream, expected); - assert_next!(stream, Notification::SwitchingToLive); - let mut stream = assert_empty!(stream); + // give scanner time to start + sleep(Duration::from_millis(10)).await; // Live events arrive contract.increase().send().await?.watch().await?; contract.increase().send().await?.watch().await?; + // Latest events are empty + + // Assert `StartingLiveStream` after emitting live events, because the test finishes the "latest + // events" phase before new events are emitted, thus the "live" phase actually starts from a + // future block. + assert_next!(stream, Notification::StartingLiveStream); assert_event_sequence_final!( stream, &[ @@ -181,12 +200,15 @@ async fn block_gaps_do_not_affect_number_of_events_streamed() -> anyhow::Result< TestCounter::CountIncreased { newCount: U256::from(3) }, ] ); - assert_next!(stream, Notification::SwitchingToLive); let mut stream = assert_empty!(stream); // Immediately produce a new live event in a new block contract.increase().send().await?.watch().await?; + // Assert `StartingLiveStream` after emitting live events, because the test finishes the "latest + // events" phase before new events are emitted, thus the "live" phase actually starts from a + // future block. + assert_next!(stream, Notification::StartingLiveStream); assert_next!(stream, &[TestCounter::CountIncreased { newCount: U256::from(4) }]); assert_empty!(stream); @@ -216,8 +238,9 @@ async fn waiting_on_live_logs_arriving() -> anyhow::Result<()> { TestCounter::CountIncreased { newCount: U256::from(3) }, ] ); - assert_next!(stream, Notification::SwitchingToLive); assert_empty!(stream); + // `Notification::StartingLiveStream` arrives only on first live block received + Ok(()) }