diff --git a/anvil/src/filter.rs b/anvil/src/filter.rs index 578e628808357..6e1994c684b63 100644 --- a/anvil/src/filter.rs +++ b/anvil/src/filter.rs @@ -8,11 +8,9 @@ use anvil_core::eth::subscription::SubscriptionId; use anvil_rpc::response::ResponseResult; use ethers::{ prelude::{Log as EthersLog, H256 as TxHash}, - types::FilteredParams, + types::{Filter, FilteredParams}, }; use futures::{channel::mpsc::Receiver, Stream, StreamExt}; - -use ethers::types::Filter; use std::{ collections::HashMap, pin::Pin, @@ -23,6 +21,7 @@ use std::{ use tokio::sync::Mutex; use tracing::{trace, warn}; +/// Type alias for filters identified by their id and their expiration timestamp type FilterMap = Arc>>; /// timeout after which to remove an active filter if it wasn't polled since then @@ -45,19 +44,19 @@ impl Filters { let id = new_id(); trace!(target: "node::filter", "Adding new filter id {}", id); let mut filters = self.active_filters.lock().await; - filters.insert(id.clone(), (filter, Instant::now())); + filters.insert(id.clone(), (filter, self.next_deadline())); id } pub async fn get_filter_changes(&self, id: &str) -> ResponseResult { { let mut filters = self.active_filters.lock().await; - if let Some((filter, timestamp)) = filters.get_mut(id) { + if let Some((filter, deadline)) = filters.get_mut(id) { let resp = filter .next() .await .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new())); - *timestamp = Instant::now(); + *deadline = self.next_deadline(); return resp } } @@ -85,11 +84,17 @@ impl Filters { self.keepalive } + /// Returns the timestamp after which a filter should expire + fn next_deadline(&self) -> Instant { + Instant::now() + self.keep_alive() + } + + /// Evict all filters that weren't updated and reached there deadline pub async fn evict(&self) { trace!(target: "node::filter", "Evicting stale filters"); - let deadline = Instant::now() - self.keepalive; + let now = Instant::now(); let mut active_filters = self.active_filters.lock().await; - active_filters.retain(|_, (_, timestamp)| *timestamp > deadline); + active_filters.retain(|_, (_, deadline)| *deadline > now); } } diff --git a/anvil/src/service.rs b/anvil/src/service.rs index c21d3b6947c28..d01e7aeedee5a 100644 --- a/anvil/src/service.rs +++ b/anvil/src/service.rs @@ -49,12 +49,14 @@ impl NodeService { fee_history: FeeHistoryService, filters: Filters, ) -> Self { + let start = tokio::time::Instant::now() + filters.keep_alive(); + let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive()); Self { pool, block_producer: BlockProducer::new(backend), miner, fee_history, - filter_eviction_interval: tokio::time::interval(filters.keep_alive()), + filter_eviction_interval, filters, } }