From 6f34d8e0f2890118f24bf3e0aa02ba1f4845f8f7 Mon Sep 17 00:00:00 2001 From: Zero Hero <0xzerohero@gmail.com> Date: Tue, 5 Dec 2023 19:09:49 +0200 Subject: [PATCH] fix: `async fn` resumed after completion --- ethers-providers/src/toolbox/log_query.rs | 43 +++++++++++++++++++++-- 1 file changed, 40 insertions(+), 3 deletions(-) diff --git a/ethers-providers/src/toolbox/log_query.rs b/ethers-providers/src/toolbox/log_query.rs index f71ab536a..be1828d20 100644 --- a/ethers-providers/src/toolbox/log_query.rs +++ b/ethers-providers/src/toolbox/log_query.rs @@ -26,6 +26,7 @@ enum LogQueryState<'a> { LoadLastBlock(PinBoxFut<'a, U64>), LoadLogs(PinBoxFut<'a, Vec>), Consume, + End, } impl<'a, P> LogQuery<'a, P> @@ -60,6 +61,14 @@ macro_rules! rewake_with_new_state { }; } +macro_rules! return_error { + ($ctx:ident, $this:ident, $error:expr) => { + $this.state = LogQueryState::End; + $ctx.waker().wake_by_ref(); + return Poll::Ready(Some(Err($error))) + }; +} + /// Errors while querying for logs #[derive(Error, Debug)] pub enum LogQueryError { @@ -111,7 +120,9 @@ where let fut = Box::pin(async move { provider.get_logs(&filter).await }); rewake_with_new_state!(ctx, self, LogQueryState::LoadLogs(fut)); } - Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLastBlockError(err)))), + Err(err) => { + return_error!(ctx, self, LogQueryError::LoadLastBlockError(err)); + } } } LogQueryState::LoadLogs(fut) => match futures_util::ready!(fut.as_mut().poll(ctx)) { @@ -119,7 +130,9 @@ where self.current_logs = VecDeque::from(logs); rewake_with_new_state!(ctx, self, LogQueryState::Consume); } - Err(err) => Poll::Ready(Some(Err(LogQueryError::LoadLogsError(err)))), + Err(err) => { + return_error!(ctx, self, LogQueryError::LoadLogsError(err)); + } }, LogQueryState::Consume => { let log = self.current_logs.pop_front(); @@ -136,7 +149,7 @@ where // no more pages to load, and everything is consumed // can safely assume this will always be set in this state if from_block > self.last_block.unwrap() { - return Poll::Ready(None) + return Poll::Ready(None); } // load next page self.from_block = Some(to_block + 1); @@ -151,6 +164,30 @@ where Poll::Ready(log.map(Ok)) } } + LogQueryState::End => Poll::Ready(None), } } } + +#[cfg(test)] +mod test { + use ethers_core::types::Filter; + use futures_util::StreamExt; + + use crate::{JsonRpcError, LogQuery, MockResponse, Provider}; + + #[tokio::test] + async fn return_error() { + let (provider, mock) = Provider::mocked(); + mock.push_response(MockResponse::Error(JsonRpcError {code: -32000, message: "One of the blocks specified in filter (fromBlock, toBlock or blockHash) cannot be found.".to_string(), data: None})); + + let filter = + Filter::new().from_block(1).to_block(11).event("Transfer(address,address,uint256)"); + + let log_query = LogQuery::new(&provider, &filter).with_page_size(5); + + let logs: Vec<_> = log_query.collect().await; + + assert!(logs[0].is_err()); + } +}