Skip to content

chore: refactor importer to reduce repeated code#2373

Merged
carneiro-cw merged 18 commits intomainfrom
importer_refac
Oct 17, 2025
Merged

chore: refactor importer to reduce repeated code#2373
carneiro-cw merged 18 commits intomainfrom
importer_refac

Conversation

@carneiro-cw
Copy link
Contributor

@carneiro-cw carneiro-cw commented Oct 16, 2025

PR Type

Enhancement


Description

  • Refactored importer to reduce code duplication

  • Introduced generic block fetcher for common logic

  • Improved error handling and retry mechanisms

  • Enhanced BlockNumber functionality with new conversions


Diagram Walkthrough

flowchart LR
  A["Importer"]
  B["Generic Block Fetcher"]
  C["Error Handling"]
  D["BlockNumber"]
  A --> B
  A --> C
  A --> D
  B --> C
Loading

File Walkthrough

Relevant files
Enhancement
importer.rs
Refactor importer for improved code reuse and error handling

src/eth/follower/importer/importer.rs

  • Introduced generic_block_fetcher for common fetching logic
  • Refactored start_block_fetcher and start_block_with_changes_fetcher
  • Added helper functions for common tasks (receive_with_timeout,
    send_block_to_kafka)
  • Improved error handling with fetch_with_retry function
+170/-179
block_number.rs
Enhance BlockNumber functionality with new conversions     

src/eth/primitives/block_number.rs

  • Modified count_to method to accept impl Into
  • Added From for u64 implementation
+8/-2     

@github-actions
Copy link
Contributor

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 No relevant tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Error Handling

The new generic_block_fetcher function doesn't properly handle errors from the process_fn. It should propagate errors instead of silently continuing.

/// Generic block fetcher that handles the common logic of fetching blocks in parallel
async fn generic_block_fetcher<T, U, FetchFut, ProcessFut, FetchFn, ProcessFn>(
    task_name: &'static str,
    backlog_tx: mpsc::Sender<U>,
    mut importer_block_number: BlockNumber,
    fetch_fn: FetchFn,
    process_fn: ProcessFn,
) -> anyhow::Result<()>
where
    FetchFut: std::future::Future<Output = T>,
    ProcessFut: std::future::Future<Output = anyhow::Result<U>>,
    FetchFn: Fn(BlockNumber) -> FetchFut,
    ProcessFn: Fn(T) -> ProcessFut,
{
    let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;

    loop {
        if Self::should_shutdown(task_name) {
            return Ok(());
        }

        // if we are ahead of current block number, await until we are behind again
        let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
        if importer_block_number.as_u64() > external_rpc_current_block {
            yield_now().await;
            continue;
        }

        // we are behind current, so we will fetch multiple blocks in parallel to catch up
        let blocks_behind = importer_block_number.count_to(external_rpc_current_block);
        let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe
        tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");

        let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
        while blocks_to_fetch > 0 {
            blocks_to_fetch -= 1;
            tasks.push(fetch_fn(importer_block_number));
            importer_block_number = importer_block_number.next_block_number();
        }

        // keep fetching in order
        let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
        while let Some(fetched_data) = tasks.next().await {
            let processed = process_fn(fetched_data).await?;
            if backlog_tx.send(processed).await.is_err() {
                warn_task_rx_closed(task_name);
                return Ok(());
            }
        }
    }
}
Logging Improvement

The fetch_with_retry function logs warnings for both None and Err cases. Consider using different log levels or messages to distinguish between these scenarios.

/// Generic retry logic for fetching data from blockchain
async fn fetch_with_retry<T, F, Fut>(block_number: BlockNumber, fetch_fn: F, operation_name: &str) -> T
where
    F: Fn(BlockNumber) -> Fut,
    Fut: std::future::Future<Output = anyhow::Result<Option<T>>>,
{
    const RETRY_DELAY: Duration = Duration::from_millis(10);
    Span::with(|s| {
        s.rec_str("block_number", &block_number);
    });

    loop {
        tracing::info!(%block_number, "fetching {}", operation_name);

        match fetch_fn(block_number).await {
            Ok(Some(response)) => return response,
            Ok(None) => {
                tracing::warn!(
                    %block_number,
                    delay_ms = %RETRY_DELAY.as_millis(),
                    "{} not available yet, retrying with delay.",
                    operation_name
                );
                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
            }
            Err(e) => {
                tracing::warn!(
                    reason = ?e,
                    %block_number,
                    delay_ms = %RETRY_DELAY.as_millis(),
                    "failed to fetch {}, retrying with delay.",
                    operation_name
                );
                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
            }
        };
    }
}
Potential Overflow

The count_to method uses saturating_sub which may lead to unexpected behavior if higher_end is less than self.as_u64(). Consider adding a check or using checked_sub.

pub fn count_to(self, higher_end: impl Into<u64>) -> u64 {
    higher_end.into().saturating_sub(self.as_u64()) + 1
}

@github-actions
Copy link
Contributor

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Implement exponential backoff for retries

Implement an exponential backoff strategy for retries instead of using a constant
delay. This can help reduce unnecessary load on the system during prolonged issues
while still allowing for quick recovery from transient failures.

src/eth/follower/importer/importer.rs [616-652]

 async fn fetch_with_retry<T, F, Fut>(block_number: BlockNumber, fetch_fn: F, operation_name: &str) -> T
 where
     F: Fn(BlockNumber) -> Fut,
     Fut: std::future::Future<Output = anyhow::Result<Option<T>>>,
 {
-    const RETRY_DELAY: Duration = Duration::from_millis(10);
+    const INITIAL_RETRY_DELAY: Duration = Duration::from_millis(10);
+    const MAX_RETRY_DELAY: Duration = Duration::from_secs(60);
     Span::with(|s| {
         s.rec_str("block_number", &block_number);
     });
 
+    let mut retry_delay = INITIAL_RETRY_DELAY;
     loop {
         tracing::info!(%block_number, "fetching {}", operation_name);
 
         match fetch_fn(block_number).await {
             Ok(Some(response)) => return response,
             Ok(None) => {
                 tracing::warn!(
                     %block_number,
-                    delay_ms = %RETRY_DELAY.as_millis(),
+                    delay_ms = %retry_delay.as_millis(),
                     "{} not available yet, retrying with delay.",
                     operation_name
                 );
-                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
             }
             Err(e) => {
                 tracing::warn!(
                     reason = ?e,
                     %block_number,
-                    delay_ms = %RETRY_DELAY.as_millis(),
+                    delay_ms = %retry_delay.as_millis(),
                     "failed to fetch {}, retrying with delay.",
                     operation_name
                 );
-                traced_sleep(RETRY_DELAY, SleepReason::RetryBackoff).await;
             }
         };
+        traced_sleep(retry_delay, SleepReason::RetryBackoff).await;
+        retry_delay = std::cmp::min(retry_delay * 2, MAX_RETRY_DELAY);
     }
 }
Suggestion importance[1-10]: 8

__

Why: Implementing exponential backoff for retries is a significant improvement. It reduces system load during prolonged issues while allowing quick recovery from transient failures, enhancing overall system resilience and efficiency.

Medium
Possible issue
Improve error handling in block processing

Consider implementing error handling for the process_fn call. Currently, if
process_fn returns an error, it will cause the entire function to return
immediately, potentially leaving some blocks unfetched. Instead, you could log the
error and continue processing the remaining blocks.

src/eth/follower/importer/importer.rs [475-518]

 async fn generic_block_fetcher<T, U, FetchFut, ProcessFut, FetchFn, ProcessFn>(
     task_name: &'static str,
     backlog_tx: mpsc::Sender<U>,
     mut importer_block_number: BlockNumber,
     fetch_fn: FetchFn,
     process_fn: ProcessFn,
 ) -> anyhow::Result<()>
 where
     FetchFut: std::future::Future<Output = T>,
     ProcessFut: std::future::Future<Output = anyhow::Result<U>>,
     FetchFn: Fn(BlockNumber) -> FetchFut,
     ProcessFn: Fn(T) -> ProcessFut,
 {
     let _permit = IMPORTER_ONLINE_TASKS_SEMAPHORE.acquire().await;
 
     loop {
         if Self::should_shutdown(task_name) {
             return Ok(());
         }
 
         // if we are ahead of current block number, await until we are behind again
         let external_rpc_current_block = EXTERNAL_RPC_CURRENT_BLOCK.load(Ordering::Relaxed);
         if importer_block_number.as_u64() > external_rpc_current_block {
             yield_now().await;
             continue;
         }
 
         // we are behind current, so we will fetch multiple blocks in parallel to catch up
         let blocks_behind = importer_block_number.count_to(external_rpc_current_block);
         let mut blocks_to_fetch = min(blocks_behind, 1_000); // avoid spawning millions of tasks (not parallelism), at least until we know it is safe
         tracing::info!(%blocks_behind, blocks_to_fetch, "catching up with blocks");
 
         let mut tasks = Vec::with_capacity(blocks_to_fetch as usize);
         while blocks_to_fetch > 0 {
             blocks_to_fetch -= 1;
             tasks.push(fetch_fn(importer_block_number));
             importer_block_number = importer_block_number.next_block_number();
         }
 
         // keep fetching in order
         let mut tasks = futures::stream::iter(tasks).buffered(PARALLEL_BLOCKS);
         while let Some(fetched_data) = tasks.next().await {
-            let processed = process_fn(fetched_data).await?;
-            if backlog_tx.send(processed).await.is_err() {
-                warn_task_rx_closed(task_name);
-                return Ok(());
+            match process_fn(fetched_data).await {
+                Ok(processed) => {
+                    if backlog_tx.send(processed).await.is_err() {
+                        warn_task_rx_closed(task_name);
+                        return Ok(());
+                    }
+                }
+                Err(e) => {
+                    tracing::error!("Error processing fetched data: {:?}", e);
+                    // Optionally, you could add some retry logic here
+                }
             }
         }
     }
 }
Suggestion importance[1-10]: 7

__

Why: The suggestion improves error handling by logging errors and continuing processing, which enhances robustness. This change prevents the entire function from failing due to a single block processing error.

Medium

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting

@codecov
Copy link

codecov bot commented Oct 16, 2025

Codecov Report

❌ Patch coverage is 85.14056% with 37 lines in your changes missing coverage. Please review.
✅ Project coverage is 84.41%. Comparing base (8b7c8c6) to head (589c557).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/eth/follower/importer/mod.rs 79.84% 26 Missing ⚠️
src/eth/follower/importer/importer_supervisor.rs 88.23% 8 Missing ⚠️
src/eth/follower/importer/importers/fake_leader.rs 0.00% 2 Missing ⚠️
src/eth/follower/importer/importer_config.rs 90.90% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2373      +/-   ##
==========================================
- Coverage   84.93%   84.41%   -0.52%     
==========================================
  Files         130      138       +8     
  Lines       10728    10647      -81     
==========================================
- Hits         9112     8988     -124     
- Misses       1616     1659      +43     
Flag Coverage Δ
contracts-rocks- 45.32% <0.00%> (+0.77%) ⬆️
e2e-admin-password 22.84% <0.00%> (+0.14%) ⬆️
e2e-clock-stratus 25.72% <0.00%> (+0.16%) ⬆️
e2e-genesis 27.28% <0.00%> (+0.17%) ⬆️
e2e-importer-offline 60.44% <1.60%> (+0.68%) ⬆️
e2e-rpc-downloader 55.41% <1.60%> (+0.64%) ⬆️
e2e-stratus 57.51% <1.60%> (+0.46%) ⬆️
leader-follower- 62.62% <84.27%> (-0.32%) ⬇️
rust-tests 30.60% <0.00%> (-1.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@carneiro-cw carneiro-cw merged commit 87fcb3a into main Oct 17, 2025
41 of 42 checks passed
@carneiro-cw carneiro-cw deleted the importer_refac branch October 17, 2025 20:25
@stratus-benchmark
Copy link

Final benchmark:
Run ID: bench-fbbede9f

Git Info:

Leader Stats:
RPS Stats: Max: 10498.00, Min: 896.00, Avg: 3916.02, StdDev: 488.64
TPS Stats: Max: 4550.00, Min: 238.00, Avg: 3879.97, StdDev: 344.40

Follower Stats:
Imported Blocks/s: Max: 8.00, Min: 5.00, Avg: 6.14, StdDev: 0.66
Imported Transactions/s: Max: 28780.00, Min: 12692.00, Avg: 23686.96, StdDev: 2886.15

Plots:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants

Comments