Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
24232e1
feat: imlp eq for array and message
0xNeshi Nov 3, 2025
07d1f7c
test: ref reorg_rescans_events_within_same_block
0xNeshi Nov 3, 2025
08d6097
test: ref
0xNeshi Nov 3, 2025
e510702
test: ref
0xNeshi Nov 3, 2025
780e402
test: ref
0xNeshi Nov 3, 2025
783a043
feat: design live mode reorg API
0xNeshi Nov 3, 2025
3d59bff
ref: pass ws_stream to the live task (not create it within)
0xNeshi Nov 3, 2025
8e49d9a
chore: merge
0xNeshi Nov 10, 2025
d3ac82c
feat: implement ReorgHandler::check
0xNeshi Nov 10, 2025
81212dd
test: fix shallow_block_confirmation_does_not_mitigate_reorg
0xNeshi Nov 10, 2025
73388f3
fix: range_start should be updated only when reorg common_ancestor is…
0xNeshi Nov 10, 2025
ec19227
ref: refactor sync mode
0xNeshi Nov 10, 2025
744b859
Merge remote-tracking branch 'origin/main' into reorg-update
0xNeshi Nov 12, 2025
b58e3a9
feat: major update to reorg logic (incl. historic reorg)
0xNeshi Nov 12, 2025
431e202
fix: always check reorgs by block hash
0xNeshi Nov 12, 2025
f681208
test: sync::block_confirmations_mitigate_reorgs assert historic befor…
0xNeshi Nov 13, 2025
42e6e2c
test: reorgs assert historic before emitting live assert historic bef…
0xNeshi Nov 13, 2025
a315ff6
fix: update batch_end on reorg
0xNeshi Nov 13, 2025
556a775
chore: fix conflicts
0xNeshi Nov 13, 2025
9a63182
ref: use get_latest_confirmed instead of manual calc
0xNeshi Nov 13, 2025
60111c3
Revert "ref: use get_latest_confirmed instead of manual calc"
0xNeshi Nov 13, 2025
43d8621
test: remove test_log from reorg_depth_one
0xNeshi Nov 13, 2025
e07a671
test: add edge case live_with_block_confirmations_always_emits_genesi…
0xNeshi Nov 13, 2025
7f3c963
ref: remove inner_batch_start
0xNeshi Nov 13, 2025
dd247f9
test: fix wording in assert_next macro
0xNeshi Nov 14, 2025
f892327
fix: update rewind logic + major switchingtolive refactor
0xNeshi Nov 14, 2025
078c4c6
feat: SwitchingToLive -> StartingLiveStream
0xNeshi Nov 14, 2025
8169ed7
test: remove assert_next_any
0xNeshi Nov 14, 2025
78cd3e6
Revert "test: remove assert_next_any"
0xNeshi Nov 17, 2025
5c095cf
test: implement assert_event_sequence
0xNeshi Nov 17, 2025
cb8577e
docs: add todo in macros for similar macro for ranges
0xNeshi Nov 17, 2025
3cb37e2
test: update all relevant test assertions to assert_event_sequence
0xNeshi Nov 17, 2025
678f369
Revert "test: update all relevant test assertions to assert_event_seq…
0xNeshi Nov 17, 2025
52dfb0e
Revert "docs: add todo in macros for similar macro for ranges"
0xNeshi Nov 17, 2025
ec6258f
Revert "test: implement assert_event_sequence"
0xNeshi Nov 17, 2025
c108cc2
Revert "Revert "test: remove assert_next_any""
0xNeshi Nov 17, 2025
47c54ac
chore: merge main
0xNeshi Nov 18, 2025
0234360
ref: use historical mode fn for initial batch when live
0xNeshi Nov 18, 2025
aabd5e2
ref: live: batch_end -> previous_batch_end
0xNeshi Nov 18, 2025
667d08f
test: ref shallow_block_confirmation_does_not_mitigate_reorg
0xNeshi Nov 18, 2025
05fc91a
feat: use historic handling when streaming live range larger than 1
0xNeshi Nov 18, 2025
dd7f784
chore: merge main
0xNeshi Nov 19, 2025
49db650
chore: merge main
0xNeshi Nov 19, 2025
e17c724
ref: clippy
0xNeshi Nov 19, 2025
679bbf6
ref: update when 'start block before conf. tip' log is shown
0xNeshi Nov 19, 2025
d8607bd
test: update error messages for assert_event_sequence
0xNeshi Nov 20, 2025
bd77eaa
test: remove assert_empty from no_historical_only_live_streams
0xNeshi Nov 20, 2025
4dc736f
fix: emit StartingLiveStream before receiving first event on live and…
0xNeshi Nov 20, 2025
d0afb65
fix: stop emitting StartingLiveStream in live
0xNeshi Nov 20, 2025
f0e8fbd
test:fix no_historical_only_live_streams: introduce lag
0xNeshi Nov 20, 2025
377b243
remove unused partialeq impl
0xNeshi Nov 20, 2025
cf6f84c
test: ref from_latest
0xNeshi Nov 20, 2025
5209b79
test: ref: from_block
0xNeshi Nov 20, 2025
f45d3c9
test: revert mixed_events_and_filters_return_correct_streams
0xNeshi Nov 20, 2025
85d146c
test: ref: live_with_block_confirmations_always_emits_genesis_block
0xNeshi Nov 20, 2025
13bd108
ref: reorg_handler: store block hash only if not already stored
0xNeshi Nov 20, 2025
8af930e
Revert "ref: reorg_handler: store block hash only if not already stored"
0xNeshi Nov 20, 2025
d8cf028
ref: use reorg_handler in rewind
0xNeshi Nov 20, 2025
c06475a
ref: reorg_handler: get hash before logging & storing it
0xNeshi Nov 20, 2025
32c06f7
fix: limit stream_historical post-reorg next_start_block to 'start'
0xNeshi Nov 20, 2025
6eb3941
ref: stream_historical_blocks: add stream_start field
0xNeshi Nov 20, 2025
46ff151
doc: stream_historical_blocks: add comment about one of assumptions
0xNeshi Nov 20, 2025
c1b67a7
ref: reorg_handler: store block hash only if not already stored
0xNeshi Nov 20, 2025
38c4a12
chore: merge main
0xNeshi Nov 20, 2025
ca8b509
feat: impl get_block_number_by_id
0xNeshi Nov 20, 2025
e276d66
ref: handle_sync
0xNeshi Nov 20, 2025
3dfad2c
test: remove commented out
0xNeshi Nov 20, 2025
50a8a5a
test: exact_historical_count_then_live: add sleep to fix flakiness
0xNeshi Nov 20, 2025
d40c147
ref: stream_live_blocks
0xNeshi Nov 20, 2025
f49be2d
ref: clippy
0xNeshi Nov 21, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
392 changes: 54 additions & 338 deletions src/block_range_scanner.rs

Large diffs are not rendered by default.

334 changes: 334 additions & 0 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,334 @@
use tokio::sync::mpsc;
use tokio_stream::StreamExt;

use crate::{
block_range_scanner::{Message, reorg_handler::ReorgHandler},
robust_provider::RobustProvider,
types::{Notification, TryStream},
};
use alloy::{
consensus::BlockHeader,
network::{BlockResponse, Network},
primitives::BlockNumber,
pubsub::Subscription,
};
use tracing::{debug, error, info, warn};

#[allow(clippy::too_many_arguments)]
pub(crate) async fn stream_live_blocks<N: Network>(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the amount of changes to the block_range_scanner.rs, it was becoming hard to understand the implication of those changes.

I figured it would be a good time to extract these common functions and to refactor handle_sync for two reasons:

  1. Easier to review - stream_live_blocks and stream_historical_blocks behave completely differently from before, and can be regarded as completely new functions. Moving them to common.rs makes the reviewer look at these functions for what they are - new functions.
  2. Show what the future BlockRangeScanner refactor could look like (see sync_handler.rs) - each mode would be its own "strategy" type.

stream_start: BlockNumber,
subscription: Subscription<N::HeaderResponse>,
sender: &mpsc::Sender<Message>,
provider: &RobustProvider<N>,
block_confirmations: u64,
max_block_range: u64,
reorg_handler: &mut ReorgHandler<N>,
notify_after_first_block: bool,
) {
// Phase 1: Wait for first relevant block
let mut stream = skip_to_relevant_blocks::<N>(subscription, stream_start, block_confirmations);

let Some(first_block) = stream.next().await else {
warn!("Subscription channel closed before receiving any blocks");
return;
};

if notify_after_first_block && !sender.try_stream(Notification::StartingLiveStream).await {
return;
}

// Phase 2: Initialize streaming state with first block
let Some(mut state) = initialize_live_streaming_state(
first_block,
stream_start,
block_confirmations,
max_block_range,
sender,
provider,
reorg_handler,
)
.await
else {
return;
};

// Phase 3: Continuously stream blocks with reorg handling
stream_blocks_continuously(
&mut stream,
&mut state,
stream_start,
block_confirmations,
max_block_range,
sender,
provider,
reorg_handler,
)
.await;

warn!("Live block subscription ended");
}

/// Skips blocks until we reach the first block that's relevant for streaming
fn skip_to_relevant_blocks<N: Network>(
subscription: Subscription<N::HeaderResponse>,
stream_start: BlockNumber,
block_confirmations: u64,
) -> impl tokio_stream::Stream<Item = N::HeaderResponse> {
subscription.into_stream().skip_while(move |header| {
header.number().saturating_sub(block_confirmations) < stream_start
})
}

/// Initializes the streaming state after receiving the first block
/// Returns None if the channel is closed
async fn initialize_live_streaming_state<N: Network>(
first_block: N::HeaderResponse,
stream_start: BlockNumber,
block_confirmations: u64,
max_block_range: u64,
sender: &mpsc::Sender<Message>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) -> Option<LiveStreamingState<N>> {
let incoming_block_num = first_block.number();
info!(block_number = incoming_block_num, "Received first block header");

let confirmed = incoming_block_num.saturating_sub(block_confirmations);

// Catch up on any confirmed blocks between stream_start and the confirmed tip
let previous_batch_end = stream_historical_blocks(
stream_start,
stream_start,
confirmed,
max_block_range,
sender,
provider,
reorg_handler,
)
.await?;

Some(LiveStreamingState {
batch_start: stream_start,
previous_batch_end: Some(previous_batch_end),
})
}

/// Continuously streams blocks, handling reorgs as they occur
#[allow(clippy::too_many_arguments)]
async fn stream_blocks_continuously<
N: Network,
S: tokio_stream::Stream<Item = N::HeaderResponse> + Unpin,
>(
stream: &mut S,
state: &mut LiveStreamingState<N>,
stream_start: BlockNumber,
block_confirmations: u64,
max_block_range: u64,
sender: &mpsc::Sender<Message>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) {
while let Some(incoming_block) = stream.next().await {
let incoming_block_num = incoming_block.number();
info!(block_number = incoming_block_num, "Received block header");

// Check for reorgs and update state accordingly
let Some(common_ancestor) =
check_for_reorg(state.previous_batch_end.as_ref(), reorg_handler, sender).await
else {
return;
};

if let Some(common_ancestor) = common_ancestor {
if !handle_reorg_detected(common_ancestor, stream_start, state, sender).await {
return; // Channel closed
}
} else {
// No reorg: advance batch_start to after the previous batch
advance_batch_start_after_previous_end(state);
}

// Stream the next batch of confirmed blocks
let batch_end_num = incoming_block_num.saturating_sub(block_confirmations);
if !stream_next_batch(
batch_end_num,
state,
stream_start,
max_block_range,
sender,
provider,
reorg_handler,
)
.await
{
return; // Channel closed
}
}
}

/// Checks if a reorg occurred by verifying the previous batch end block.
/// Returns `None` if the channel is closed.
async fn check_for_reorg<N: Network>(
previous_batch_end: Option<&N::BlockResponse>,
reorg_handler: &mut ReorgHandler<N>,
sender: &mpsc::Sender<Message>,
) -> Option<Option<N::BlockResponse>> {
let batch_end = previous_batch_end?;

match reorg_handler.check(batch_end).await {
Ok(reorg_opt) => Some(reorg_opt),
Err(e) => {
error!(error = %e, "Failed to perform reorg check");
_ = sender.try_stream(e).await;
None
}
}
}

/// Handles a detected reorg by notifying and adjusting the streaming state
/// Returns false if the channel is closed
async fn handle_reorg_detected<N: Network>(
common_ancestor: N::BlockResponse,
stream_start: BlockNumber,
state: &mut LiveStreamingState<N>,
sender: &mpsc::Sender<Message>,
) -> bool {
if !sender.try_stream(Notification::ReorgDetected).await {
return false;
}

let ancestor_num = common_ancestor.header().number();

// Reset streaming position based on common ancestor
if ancestor_num < stream_start {
// Reorg went before our starting point - restart from stream_start
info!(
ancestor_block = ancestor_num,
stream_start = stream_start,
"Reorg detected before stream start, resetting to stream start"
);
state.batch_start = stream_start;
state.previous_batch_end = None;
} else {
// Resume from after the common ancestor
info!(ancestor_block = ancestor_num, "Reorg detected, resuming from common ancestor");
state.batch_start = ancestor_num + 1;
state.previous_batch_end = Some(common_ancestor);
}

true
}

/// Advances `batch_start` after processing a normal (non-reorg) block
fn advance_batch_start_after_previous_end<N: Network>(state: &mut LiveStreamingState<N>) {
if let Some(prev_batch_end) = state.previous_batch_end.as_ref() {
state.batch_start = prev_batch_end.header().number() + 1;
}
}

/// Streams the next batch of blocks up to `batch_end_num`.
/// Returns false if the channel is closed
async fn stream_next_batch<N: Network>(
batch_end_num: BlockNumber,
state: &mut LiveStreamingState<N>,
stream_start: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<Message>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) -> bool {
if batch_end_num < state.batch_start {
// No new confirmed blocks to stream yet
return true;
}

state.previous_batch_end = stream_historical_blocks(
stream_start,
state.batch_start,
batch_end_num,
max_block_range,
sender,
provider,
reorg_handler,
)
.await;

if state.previous_batch_end.is_none() {
// Channel closed
return false;
}

// SAFETY: Overflow cannot realistically happen
state.batch_start = batch_end_num + 1;

true
}

/// Tracks the current state of live streaming
struct LiveStreamingState<N: Network> {
/// The starting block number for the next batch to stream
batch_start: BlockNumber,
/// The last block from the previous batch (used for reorg detection)
previous_batch_end: Option<N::BlockResponse>,
}

/// Assumes that `stream_start <= next_start_block <= end`.
pub(crate) async fn stream_historical_blocks<N: Network>(
stream_start: BlockNumber,
mut next_start_block: BlockNumber,
end: BlockNumber,
max_block_range: u64,
sender: &mpsc::Sender<Message>,
provider: &RobustProvider<N>,
reorg_handler: &mut ReorgHandler<N>,
) -> Option<N::BlockResponse> {
let mut batch_count = 0;

loop {
let batch_end_num = next_start_block.saturating_add(max_block_range - 1).min(end);
let batch_end = match provider.get_block_by_number(batch_end_num.into()).await {
Ok(block) => block,
Err(e) => {
error!(batch_start = next_start_block, batch_end = batch_end_num, error = %e, "Failed to get ending block of the current batch");
_ = sender.try_stream(e).await;
return None;
}
};

if !sender.try_stream(next_start_block..=batch_end_num).await {
return Some(batch_end);
}

batch_count += 1;
if batch_count % 10 == 0 {
debug!(batch_count = batch_count, "Processed historical batches");
}

let reorged_opt = match reorg_handler.check(&batch_end).await {
Ok(opt) => opt,
Err(e) => {
error!(error = %e, "Failed to perform reorg check");
_ = sender.try_stream(e).await;
return None;
}
};

next_start_block = if let Some(common_ancestor) = reorged_opt {
if !sender.try_stream(Notification::ReorgDetected).await {
return None;
}
if common_ancestor.header().number() < stream_start {
stream_start
} else {
common_ancestor.header().number() + 1
}
} else {
batch_end_num.saturating_add(1)
};

if next_start_block > end {
info!(batch_count = batch_count, "Historical sync completed");
return Some(batch_end);
}
}
}
Loading
Loading