diff --git a/src/error.rs b/src/error.rs index d168548a..cd4b4926 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,7 @@ use alloy::{ }; use thiserror::Error; -use crate::{block_range_scanner::Message, robust_provider::RobustProviderError}; +use crate::{block_range_scanner::Message, robust_provider::Error as RobustProviderError}; #[derive(Error, Debug, Clone)] pub enum ScannerError { @@ -40,16 +40,15 @@ pub enum ScannerError { #[error("Operation timed out")] Timeout, - #[error("Retry failed after {0} tries")] - RetryFail(usize), + #[error("RPC call failed after exhausting all retry attempts: {0}")] + RetryFailure(Arc>), } impl From for ScannerError { fn from(error: RobustProviderError) -> ScannerError { match error { - RobustProviderError::RpcError(err) => ScannerError::RpcError(err), RobustProviderError::Timeout => ScannerError::Timeout, - RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num), + RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err), RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block), } } diff --git a/src/event_scanner/message.rs b/src/event_scanner/message.rs index 33da6c61..ebd1081a 100644 --- a/src/event_scanner/message.rs +++ b/src/event_scanner/message.rs @@ -1,6 +1,6 @@ use alloy::{rpc::types::Log, sol_types::SolEvent}; -use crate::{ScannerError, ScannerMessage, robust_provider::RobustProviderError}; +use crate::{ScannerError, ScannerMessage, robust_provider::Error as RobustProviderError}; pub type Message = ScannerMessage, ScannerError>; diff --git a/src/event_scanner/modes/common.rs b/src/event_scanner/modes/common.rs index 59748dce..ef93e17e 100644 --- a/src/event_scanner/modes/common.rs +++ b/src/event_scanner/modes/common.rs @@ -3,7 +3,7 @@ use std::ops::RangeInclusive; use crate::{ block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage}, event_scanner::{filter::EventFilter, listener::EventListener, message::Message}, - robust_provider::{RobustProvider, RobustProviderError}, + robust_provider::{Error as RobustProviderError, RobustProvider}, }; use alloy::{ network::Network, diff --git a/src/robust_provider.rs b/src/robust_provider.rs index 2cf05eec..872c7ba6 100644 --- a/src/robust_provider.rs +++ b/src/robust_provider.rs @@ -13,20 +13,18 @@ use thiserror::Error; use tracing::{error, info}; #[derive(Error, Debug, Clone)] -pub enum RobustProviderError { - #[error("RPC error: {0}")] - RpcError(Arc>), +pub enum Error { #[error("Operation timed out")] Timeout, - #[error("Retry failed after {0} tries")] - RetryFail(usize), + #[error("RPC call failed after exhausting all retry attempts: {0}")] + RetryFailure(Arc>), #[error("Block not found, block number: {0}")] BlockNotFound(BlockNumberOrTag), } -impl From> for RobustProviderError { +impl From> for Error { fn from(err: RpcError) -> Self { - RobustProviderError::RpcError(Arc::new(err)) + Error::RetryFailure(Arc::new(err)) } } @@ -89,11 +87,9 @@ impl RobustProvider { pub async fn get_block_by_number( &self, number: BlockNumberOrTag, - ) -> Result { + ) -> Result { info!("eth_getBlockByNumber called"); - let operation = async || { - self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from) - }; + let operation = async || self.provider.get_block_by_number(number).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getByBlockNumber failed"); @@ -108,10 +104,9 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_block_number(&self) -> Result { + pub async fn get_block_number(&self) -> Result { info!("eth_getBlockNumber called"); - let operation = - async || self.provider.get_block_number().await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_block_number().await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockNumber failed"); @@ -128,10 +123,9 @@ impl RobustProvider { pub async fn get_block_by_hash( &self, hash: alloy::primitives::BlockHash, - ) -> Result, RobustProviderError> { + ) -> Result, Error> { info!("eth_getBlockByHash called"); - let operation = - async || self.provider.get_block_by_hash(hash).await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_block_by_hash(hash).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getBlockByHash failed"); @@ -145,10 +139,9 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn get_logs(&self, filter: &Filter) -> Result, RobustProviderError> { + pub async fn get_logs(&self, filter: &Filter) -> Result, Error> { info!("eth_getLogs called"); - let operation = - async || self.provider.get_logs(filter).await.map_err(RobustProviderError::from); + let operation = async || self.provider.get_logs(filter).await; let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_getLogs failed"); @@ -162,16 +155,10 @@ impl RobustProvider { /// /// Returns an error if RPC call fails repeatedly even /// after exhausting retries or if the call times out. - pub async fn subscribe_blocks( - &self, - ) -> Result, RobustProviderError> { + pub async fn subscribe_blocks(&self) -> Result, Error> { info!("eth_subscribe called"); - let provider = self.provider.clone(); - let result = self - .retry_with_total_timeout(|| async { - provider.subscribe_blocks().await.map_err(RobustProviderError::from) - }) - .await; + let operation = async || self.provider.subscribe_blocks().await; + let result = self.retry_with_total_timeout(operation).await; if let Err(e) = &result { error!(error = %e, "eth_subscribe failed"); } @@ -189,13 +176,10 @@ impl RobustProvider { /// - Returns [`RpcError`] with message "total operation timeout exceeded" /// if the overall timeout elapses. /// - Propagates any [`RpcError`] from the underlying retries. - async fn retry_with_total_timeout( - &self, - operation: F, - ) -> Result + async fn retry_with_total_timeout(&self, operation: F) -> Result where F: Fn() -> Fut, - Fut: Future>, + Fut: Future>>, { let retry_strategy = ExponentialBuilder::default() .with_max_times(self.max_retries) @@ -207,9 +191,8 @@ impl RobustProvider { ) .await { - Ok(Ok(res)) => Ok(res), - Ok(Err(_)) => Err(RobustProviderError::RetryFail(self.max_retries + 1)), - Err(_) => Err(RobustProviderError::Timeout), + Ok(res) => res.map_err(Error::from), + Err(_) => Err(Error::Timeout), } } } @@ -243,7 +226,8 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - Ok(call_count.load(Ordering::SeqCst)) + let count = call_count.load(Ordering::SeqCst); + Ok(count) }) .await; @@ -259,12 +243,10 @@ mod tests { let result = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - if call_count.load(Ordering::SeqCst) < 3 { - Err(RobustProviderError::RpcError(Arc::new(TransportErrorKind::custom_str( - "temp error", - )))) - } else { - Ok(call_count.load(Ordering::SeqCst)) + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + _ => Err(TransportErrorKind::BackendGone.into()), } }) .await; @@ -278,21 +260,19 @@ mod tests { let call_count = AtomicUsize::new(0); - let result = provider + let result: Result<(), Error> = provider .retry_with_total_timeout(|| async { call_count.fetch_add(1, Ordering::SeqCst); - // permanent error - Err::(RobustProviderError::Timeout) + Err(TransportErrorKind::BackendGone.into()) }) .await; - let err = result.unwrap_err(); - assert!(matches!(err, RobustProviderError::RetryFail(3))); + assert!(matches!(result, Err(Error::RetryFailure(_)))); assert_eq!(call_count.load(Ordering::SeqCst), 3); } #[tokio::test] - async fn test_retry_with_timeout_respects_total_delay() { + async fn test_retry_with_timeout_respects_max_timeout() { let max_timeout = 50; let provider = test_provider(max_timeout, 10, 1); @@ -303,7 +283,6 @@ mod tests { }) .await; - let err = result.unwrap_err(); - assert!(matches!(err, RobustProviderError::Timeout)); + assert!(matches!(result, Err(Error::Timeout))); } }