Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use alloy::{
};
use thiserror::Error;

use crate::{block_range_scanner::Message, robust_provider::RobustProviderError};
use crate::{block_range_scanner::Message, robust_provider::Error as RobustProviderError};

#[derive(Error, Debug, Clone)]
pub enum ScannerError {
Expand Down Expand Up @@ -40,16 +40,15 @@ pub enum ScannerError {
#[error("Operation timed out")]
Timeout,

#[error("Retry failed after {0} tries")]
RetryFail(usize),
#[error("RPC call failed after exhausting all retry attempts: {0}")]
RetryFailure(Arc<RpcError<TransportErrorKind>>),
Comment on lines -43 to +44
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be safe to assume that the lib user already knows the max retry count, so we should only notify them that retries have failed and with what error

}

impl From<RobustProviderError> for ScannerError {
fn from(error: RobustProviderError) -> ScannerError {
match error {
RobustProviderError::RpcError(err) => ScannerError::RpcError(err),
RobustProviderError::Timeout => ScannerError::Timeout,
RobustProviderError::RetryFail(num) => ScannerError::RetryFail(num),
RobustProviderError::RetryFailure(err) => ScannerError::RetryFailure(err),
RobustProviderError::BlockNotFound(block) => ScannerError::BlockNotFound(block),
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/event_scanner/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use alloy::{rpc::types::Log, sol_types::SolEvent};

use crate::{ScannerError, ScannerMessage, robust_provider::RobustProviderError};
use crate::{ScannerError, ScannerMessage, robust_provider::Error as RobustProviderError};

pub type Message = ScannerMessage<Vec<Log>, ScannerError>;

Expand Down
2 changes: 1 addition & 1 deletion src/event_scanner/modes/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::ops::RangeInclusive;
use crate::{
block_range_scanner::{MAX_BUFFERED_MESSAGES, Message as BlockRangeMessage},
event_scanner::{filter::EventFilter, listener::EventListener, message::Message},
robust_provider::{RobustProvider, RobustProviderError},
robust_provider::{Error as RobustProviderError, RobustProvider},
};
use alloy::{
network::Network,
Expand Down
83 changes: 31 additions & 52 deletions src/robust_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,18 @@
use tracing::{error, info};

#[derive(Error, Debug, Clone)]
pub enum RobustProviderError {
#[error("RPC error: {0}")]
RpcError(Arc<RpcError<TransportErrorKind>>),
pub enum Error {
#[error("Operation timed out")]
Timeout,
#[error("Retry failed after {0} tries")]
RetryFail(usize),
#[error("RPC call failed after exhausting all retry attempts: {0}")]
RetryFailure(Arc<RpcError<TransportErrorKind>>),
#[error("Block not found, block number: {0}")]
BlockNotFound(BlockNumberOrTag),
}

impl From<RpcError<TransportErrorKind>> for RobustProviderError {
impl From<RpcError<TransportErrorKind>> for Error {
fn from(err: RpcError<TransportErrorKind>) -> Self {
RobustProviderError::RpcError(Arc::new(err))
Error::RetryFailure(Arc::new(err))
}
}

Expand Down Expand Up @@ -89,17 +87,15 @@
pub async fn get_block_by_number(
&self,
number: BlockNumberOrTag,
) -> Result<N::BlockResponse, RobustProviderError> {
) -> Result<N::BlockResponse, Error> {
info!("eth_getBlockByNumber called");
let operation = async || {
self.provider.get_block_by_number(number).await.map_err(RobustProviderError::from)
};
let operation = async || self.provider.get_block_by_number(number).await;
let result = self.retry_with_total_timeout(operation).await;
if let Err(e) = &result {
error!(error = %e, "eth_getByBlockNumber failed");
}

result?.ok_or_else(|| RobustProviderError::BlockNotFound(number))

Check failure on line 98 in src/robust_provider.rs

View workflow job for this annotation

GitHub Actions / Cargo Build

failed to resolve: use of undeclared type `RobustProviderError`

Check failure on line 98 in src/robust_provider.rs

View workflow job for this annotation

GitHub Actions / Cargo test

failed to resolve: use of undeclared type `RobustProviderError`

Check failure on line 98 in src/robust_provider.rs

View workflow job for this annotation

GitHub Actions / Cargo clippy

failed to resolve: use of undeclared type `RobustProviderError`
}

/// Fetch the latest block number with retry and timeout.
Expand All @@ -108,10 +104,9 @@
///
/// Returns an error if RPC call fails repeatedly even
/// after exhausting retries or if the call times out.
pub async fn get_block_number(&self) -> Result<u64, RobustProviderError> {
pub async fn get_block_number(&self) -> Result<u64, Error> {
info!("eth_getBlockNumber called");
let operation =
async || self.provider.get_block_number().await.map_err(RobustProviderError::from);
let operation = async || self.provider.get_block_number().await;
let result = self.retry_with_total_timeout(operation).await;
if let Err(e) = &result {
error!(error = %e, "eth_getBlockNumber failed");
Expand All @@ -128,10 +123,9 @@
pub async fn get_block_by_hash(
&self,
hash: alloy::primitives::BlockHash,
) -> Result<Option<N::BlockResponse>, RobustProviderError> {
) -> Result<Option<N::BlockResponse>, Error> {
info!("eth_getBlockByHash called");
let operation =
async || self.provider.get_block_by_hash(hash).await.map_err(RobustProviderError::from);
let operation = async || self.provider.get_block_by_hash(hash).await;
let result = self.retry_with_total_timeout(operation).await;
if let Err(e) = &result {
error!(error = %e, "eth_getBlockByHash failed");
Expand All @@ -145,10 +139,9 @@
///
/// Returns an error if RPC call fails repeatedly even
/// after exhausting retries or if the call times out.
pub async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, RobustProviderError> {
pub async fn get_logs(&self, filter: &Filter) -> Result<Vec<Log>, Error> {
info!("eth_getLogs called");
let operation =
async || self.provider.get_logs(filter).await.map_err(RobustProviderError::from);
let operation = async || self.provider.get_logs(filter).await;
let result = self.retry_with_total_timeout(operation).await;
if let Err(e) = &result {
error!(error = %e, "eth_getLogs failed");
Expand All @@ -162,16 +155,10 @@
///
/// Returns an error if RPC call fails repeatedly even
/// after exhausting retries or if the call times out.
pub async fn subscribe_blocks(
&self,
) -> Result<Subscription<N::HeaderResponse>, RobustProviderError> {
pub async fn subscribe_blocks(&self) -> Result<Subscription<N::HeaderResponse>, Error> {
info!("eth_subscribe called");
let provider = self.provider.clone();
let result = self
.retry_with_total_timeout(|| async {
provider.subscribe_blocks().await.map_err(RobustProviderError::from)
})
.await;
let operation = async || self.provider.subscribe_blocks().await;
let result = self.retry_with_total_timeout(operation).await;
if let Err(e) = &result {
error!(error = %e, "eth_subscribe failed");
}
Expand All @@ -189,13 +176,10 @@
/// - Returns [`RpcError<TransportErrorKind>`] with message "total operation timeout exceeded"
/// if the overall timeout elapses.
/// - Propagates any [`RpcError<TransportErrorKind>`] from the underlying retries.
async fn retry_with_total_timeout<T, F, Fut>(
&self,
operation: F,
) -> Result<T, RobustProviderError>
async fn retry_with_total_timeout<T, F, Fut>(&self, operation: F) -> Result<T, Error>
where
F: Fn() -> Fut,
Fut: Future<Output = Result<T, RobustProviderError>>,
Fut: Future<Output = Result<T, RpcError<TransportErrorKind>>>,
Copy link
Collaborator Author

@0xNeshi 0xNeshi Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverting the change asked for in #135 (comment).
Why? Since we know we're working only with alloys providers, it is safe to specify the RpcError as the expected one for all operations.

{
let retry_strategy = ExponentialBuilder::default()
.with_max_times(self.max_retries)
Expand All @@ -207,9 +191,8 @@
)
.await
{
Ok(Ok(res)) => Ok(res),
Ok(Err(_)) => Err(RobustProviderError::RetryFail(self.max_retries + 1)),
Err(_) => Err(RobustProviderError::Timeout),
Ok(res) => res.map_err(Error::from),
Err(_) => Err(Error::Timeout),
}
}
}
Expand Down Expand Up @@ -243,7 +226,8 @@
let result = provider
.retry_with_total_timeout(|| async {
call_count.fetch_add(1, Ordering::SeqCst);
Ok(call_count.load(Ordering::SeqCst))
let count = call_count.load(Ordering::SeqCst);
Ok(count)
})
.await;

Expand All @@ -259,12 +243,10 @@
let result = provider
.retry_with_total_timeout(|| async {
call_count.fetch_add(1, Ordering::SeqCst);
if call_count.load(Ordering::SeqCst) < 3 {
Err(RobustProviderError::RpcError(Arc::new(TransportErrorKind::custom_str(
"temp error",
))))
} else {
Ok(call_count.load(Ordering::SeqCst))
let count = call_count.load(Ordering::SeqCst);
match count {
3 => Ok(count),
_ => Err(TransportErrorKind::BackendGone.into()),
}
})
.await;
Expand All @@ -278,21 +260,19 @@

let call_count = AtomicUsize::new(0);

let result = provider
let result: Result<(), Error> = provider
.retry_with_total_timeout(|| async {
call_count.fetch_add(1, Ordering::SeqCst);
// permanent error
Err::<i32, RobustProviderError>(RobustProviderError::Timeout)
Err(TransportErrorKind::BackendGone.into())
Comment on lines -285 to +266
Copy link
Collaborator Author

@0xNeshi 0xNeshi Oct 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably Error::Timeout was returned for simplicity, but to avoid future confusion about this retry... timing out, better for the inner error to be something else.

})
.await;

let err = result.unwrap_err();
assert!(matches!(err, RobustProviderError::RetryFail(3)));
assert!(matches!(result, Err(Error::RetryFailure(_))));
assert_eq!(call_count.load(Ordering::SeqCst), 3);
}

#[tokio::test]
async fn test_retry_with_timeout_respects_total_delay() {
async fn test_retry_with_timeout_respects_max_timeout() {
let max_timeout = 50;
let provider = test_provider(max_timeout, 10, 1);

Expand All @@ -303,7 +283,6 @@
})
.await;

let err = result.unwrap_err();
assert!(matches!(err, RobustProviderError::Timeout));
assert!(matches!(result, Err(Error::Timeout)));
}
}
Loading