From b94fb2a8eb506cb85ebda25257bb1991438b560e Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Tue, 28 Oct 2025 10:05:58 +0100 Subject: [PATCH 1/6] fix: pin expected error type to RpcError --- src/error.rs | 12 ++--- src/event_scanner/message.rs | 6 +-- src/event_scanner/modes/common.rs | 4 +- src/robust_provider.rs | 81 ++++++++++++------------------- 4 files changed, 42 insertions(+), 61 deletions(-) diff --git a/src/error.rs b/src/error.rs index 4ea30a15..dbc80f49 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use alloy::{ }; use thiserror::Error; -use crate::{block_range_scanner::Message, robust_provider::RobustProviderError}; +use crate::{block_range_scanner::Message, robust_provider::Error}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -50,12 +50,12 @@ pub enum ScannerError { RetryFail(usize), } -impl From for ScannerError { - fn from(error: RobustProviderError) -> ScannerError { +impl From for ScannerError { + fn from(error: Error) -> ScannerError { match error { - RobustProviderError::RpcError(err) => ScannerError::RpcError(err), - RobustProviderError::Timeout => ScannerError::Timeout, - RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num), + Error::RpcError(err) => ScannerError::RpcError(err), + Error::Timeout => ScannerError::Timeout, + Error::RetryFailure(num) => ScannerError::RetryFail(num), } } } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 33da6c61..647a603f 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, robust_provider::RobustProviderError}; +use crate::{ScannerError, ScannerMessage, robust_provider::Error}; pub type Message = ScannerMessage, ScannerError>; @@ -10,8 +10,8 @@ impl From> for Message { } } -impl From for Message { - fn from(error: RobustProviderError) -> Message { +impl From for Message { + fn from(error: Error) -> Message { let scanner_error: ScannerError = error.into(); scanner_error.into() } diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index 59748dce..cbd302ae 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, - robust_provider::{RobustProvider, RobustProviderError}, + robust_provider::{Error, RobustProvider}, }; use alloy::{ network::Network, @@ -129,7 +129,7 @@ async fn get_logs( event_filter: &EventFilter, log_filter: &Filter, provider: &RobustProvider, -) -> Result, RobustProviderError> { +) -> Result, Error> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); match provider.get_logs(&log_filter).await { diff --git a/src/robust_provider.rs b/src/robust_provider.rs index ef3890df..7504dc86 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -13,18 +13,16 @@ use thiserror::Error; use tracing::{error, info}; #[derive(Error, Debug, Clone)] -pub enum RobustProviderError { - #[error("RPC error: {0}")] - RpcError(Arc>), +pub enum Error { #[error("Operation timed out")] Timeout, - #[error("Retry failed after {0} tries")] - RetryFail(usize), + #[error("RPC call failed after exhausting all retry attempts: {0}")] + RetryFailure(Arc>), } -impl From> for RobustProviderError { +impl From> for Error { fn from(err: RpcError) -> Self { - RobustProviderError::RpcError(Arc::new(err)) + Error::RetryFailure(Arc::new(err)) } } @@ -87,11 +85,9 @@ impl RobustProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result, RobustProviderError> { + ) -> Result, Error> { info!("eth_getBlockByNumber called"); - let operation = async || { - self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from) - }; + let operation = async || self.provider.get_block_by_number(number).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); @@ -105,10 +101,9 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_block_number(&self) -> Result { + pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); - let operation = - async || self.provider.get_block_number().await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_block_number().await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockNumber failed"); @@ -125,10 +120,9 @@ impl RobustProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, RobustProviderError> { + ) -> Result, Error> { info!("eth_getBlockByHash called"); - let operation = - async || self.provider.get_block_by_hash(hash).await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_block_by_hash(hash).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); @@ -142,10 +136,9 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_logs(&self, filter: &Filter) -> Result, RobustProviderError> { + pub async fn get_logs(&self, filter: &Filter) -> Result, Error> { info!("eth_getLogs called"); - let operation = - async || self.provider.get_logs(filter).await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_logs(filter).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); @@ -159,16 +152,11 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn subscribe_blocks( - &self, - ) -> Result, RobustProviderError> { + pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); let provider = self.provider.clone(); - let result = self - .retry_with_total_timeout(|| async { - provider.subscribe_blocks().await.map_err(RobustProviderError::from) - }) - .await; + let result = + self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; if let Err(e) = &result { error!(error = %e, "eth_subscribe failed"); } @@ -186,13 +174,10 @@ impl RobustProvider { /// - Returns [`RpcError`] with message "total operation timeout exceeded" /// if the overall timeout elapses. /// - Propagates any [`RpcError`] from the underlying retries. - async fn retry_with_total_timeout( - &self, - operation: F, - ) -> Result + async fn retry_with_total_timeout(&self, operation: F) -> Result where F: Fn() -> Fut, - Fut: Future>, + Fut: Future>>, { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) @@ -205,8 +190,8 @@ impl RobustProvider { .await { Ok(Ok(res)) => Ok(res), - Ok(Err(_)) => Err(RobustProviderError::RetryFail(self.max_retries + 1)), - Err(_) => Err(RobustProviderError::Timeout), + Ok(Err(e)) => Err(e.into()), + Err(_) => Err(Error::Timeout), } } } @@ -240,7 +225,8 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - Ok(call_count.load(Ordering::SeqCst)) + let count = call_count.load(Ordering::SeqCst); + Ok(count) }) .await; @@ -256,12 +242,10 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - if call_count.load(Ordering::SeqCst) < 3 { - Err(RobustProviderError::RpcError(Arc::new(TransportErrorKind::custom_str( - "temp error", - )))) - } else { - Ok(call_count.load(Ordering::SeqCst)) + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + _ => Err(TransportErrorKind::BackendGone.into()), } }) .await; @@ -275,21 +259,19 @@ mod tests { let call_count = AtomicUsize::new(0); - let result = provider + let result: Result<(), Error> = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - // permanent error - Err::(RobustProviderError::Timeout) + Err(TransportErrorKind::BackendGone.into()) }) .await; - let err = result.unwrap_err(); - assert!(matches!(err, RobustProviderError::RetryFail(3))); + assert!(matches!(result, Err(Error::RetryFailure(_)))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } #[tokio::test] - async fn test_retry_with_timeout_respects_total_delay() { + async fn test_retry_with_timeout_respects_max_timeout() { let max_timeout = 50; let provider = test_provider(max_timeout, 10, 1); @@ -300,7 +282,6 @@ mod tests { }) .await; - let err = result.unwrap_err(); - assert!(matches!(err, RobustProviderError::Timeout)); + assert!(matches!(result, Err(Error::Timeout))); } } From be0edd9ffb02cf3a9827de4536a28e1ee227739e Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Tue, 28 Oct 2025 10:08:43 +0100 Subject: [PATCH 2/6] fix: appropriate error conversion in errors.rs --- src/error.rs | 13 ++++++------- src/event_scanner/message.rs | 6 +++--- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/error.rs b/src/error.rs index dbc80f49..1a9cbe1d 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use alloy::{ }; use thiserror::Error; -use crate::{block_range_scanner::Message, robust_provider::Error}; +use crate::{block_range_scanner::Message, robust_provider::Error as RobustProviderError}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -47,15 +47,14 @@ pub enum ScannerError { Timeout, #[error("Retry failed after {0} tries")] - RetryFail(usize), + RetryFailure(Arc>), } -impl From for ScannerError { - fn from(error: Error) -> ScannerError { +impl From for ScannerError { + fn from(error: RobustProviderError) -> ScannerError { match error { - Error::RpcError(err) => ScannerError::RpcError(err), - Error::Timeout => ScannerError::Timeout, - Error::RetryFailure(num) => ScannerError::RetryFail(num), + RobustProviderError::Timeout => ScannerError::Timeout, + RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err), } } } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 647a603f..ebd1081a 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, robust_provider::Error}; +use crate::{ScannerError, ScannerMessage, robust_provider::Error as RobustProviderError}; pub type Message = ScannerMessage, ScannerError>; @@ -10,8 +10,8 @@ impl From> for Message { } } -impl From for Message { - fn from(error: Error) -> Message { +impl From for Message { + fn from(error: RobustProviderError) -> Message { let scanner_error: ScannerError = error.into(); scanner_error.into() } From 6c9c88bb4b2262a9bf8479c9df9c09b5846f1e0d Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Tue, 28 Oct 2025 10:11:08 +0100 Subject: [PATCH 3/6] ref: res conversion --- src/robust_provider.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 7504dc86..75da196e 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -189,8 +189,7 @@ impl RobustProvider { ) .await { - Ok(Ok(res)) => Ok(res), - Ok(Err(e)) => Err(e.into()), + Ok(res) => res.map_err(Error::from), Err(_) => Err(Error::Timeout), } } From e8186b96ad72cfb62e7dd234423304d7a9b50a03 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Tue, 28 Oct 2025 10:15:17 +0100 Subject: [PATCH 4/6] ref: extract operation in subscribe_blocks --- src/robust_provider.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 75da196e..6f2cde8b 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -154,9 +154,8 @@ impl RobustProvider { /// after exhausting retries or if the call times out. pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); - let provider = self.provider.clone(); - let result = - self.retry_with_total_timeout(|| async { provider.subscribe_blocks().await }).await; + let operation = async || self.provider.subscribe_blocks().await; + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_subscribe failed"); } From 48eb18ce2ddd6919f4587931dcd6867605f03984 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Tue, 28 Oct 2025 10:18:47 +0100 Subject: [PATCH 5/6] ref: rename import --- src/event_scanner/modes/common.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index cbd302ae..ef93e17e 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, - robust_provider::{Error, RobustProvider}, + robust_provider::{Error as RobustProviderError, RobustProvider}, }; use alloy::{ network::Network, @@ -129,7 +129,7 @@ async fn get_logs( event_filter: &EventFilter, log_filter: &Filter, provider: &RobustProvider, -) -> Result, Error> { +) -> Result, RobustProviderError> { let log_filter = log_filter.clone().from_block(*range.start()).to_block(*range.end()); match provider.get_logs(&log_filter).await { From 71a6d5dfbddb1dde2071b48cca222b275b6ef50e Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Tue, 28 Oct 2025 10:28:05 +0100 Subject: [PATCH 6/6] fix: error string for RetryFailure --- src/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/error.rs b/src/error.rs index 1a9cbe1d..52b31725 100644 --- a/src/error.rs +++ b/src/error.rs @@ -46,7 +46,7 @@ pub enum ScannerError { #[error("Operation timed out")] Timeout, - #[error("Retry failed after {0} tries")] + #[error("RPC call failed after exhausting all retry attempts: {0}")] RetryFailure(Arc>), }