From a3672d5c8aaca87f82ba8bef521d4a674cb30481 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 10 Aug 2022 15:32:04 +0200 Subject: [PATCH 1/2] refactor(anvil): flip filter expiration timestamp --- anvil/src/filter.rs | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/anvil/src/filter.rs b/anvil/src/filter.rs index 578e628808357..6e1994c684b63 100644 --- a/anvil/src/filter.rs +++ b/anvil/src/filter.rs @@ -8,11 +8,9 @@ use anvil_core::eth::subscription::SubscriptionId; use anvil_rpc::response::ResponseResult; use ethers::{ prelude::{Log as EthersLog, H256 as TxHash}, - types::FilteredParams, + types::{Filter, FilteredParams}, }; use futures::{channel::mpsc::Receiver, Stream, StreamExt}; - -use ethers::types::Filter; use std::{ collections::HashMap, pin::Pin, @@ -23,6 +21,7 @@ use std::{ use tokio::sync::Mutex; use tracing::{trace, warn}; +/// Type alias for filters identified by their id and their expiration timestamp type FilterMap = Arc>>; /// timeout after which to remove an active filter if it wasn't polled since then @@ -45,19 +44,19 @@ impl Filters { let id = new_id(); trace!(target: "node::filter", "Adding new filter id {}", id); let mut filters = self.active_filters.lock().await; - filters.insert(id.clone(), (filter, Instant::now())); + filters.insert(id.clone(), (filter, self.next_deadline())); id } pub async fn get_filter_changes(&self, id: &str) -> ResponseResult { { let mut filters = self.active_filters.lock().await; - if let Some((filter, timestamp)) = filters.get_mut(id) { + if let Some((filter, deadline)) = filters.get_mut(id) { let resp = filter .next() .await .unwrap_or_else(|| ResponseResult::success(Vec::<()>::new())); - *timestamp = Instant::now(); + *deadline = self.next_deadline(); return resp } } @@ -85,11 +84,17 @@ impl Filters { self.keepalive } + /// Returns the timestamp after which a filter should expire + fn next_deadline(&self) -> Instant { + Instant::now() + self.keep_alive() + } + + /// Evict all filters that weren't updated and reached there deadline pub async fn evict(&self) { trace!(target: "node::filter", "Evicting stale filters"); - let deadline = Instant::now() - self.keepalive; + let now = Instant::now(); let mut active_filters = self.active_filters.lock().await; - active_filters.retain(|_, (_, timestamp)| *timestamp > deadline); + active_filters.retain(|_, (_, deadline)| *deadline > now); } } From da38b0da5530285ad5e09881d4434d2115836c05 Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 10 Aug 2022 15:39:42 +0200 Subject: [PATCH 2/2] fix: use interval_at --- anvil/src/service.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/anvil/src/service.rs b/anvil/src/service.rs index c21d3b6947c28..d01e7aeedee5a 100644 --- a/anvil/src/service.rs +++ b/anvil/src/service.rs @@ -49,12 +49,14 @@ impl NodeService { fee_history: FeeHistoryService, filters: Filters, ) -> Self { + let start = tokio::time::Instant::now() + filters.keep_alive(); + let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive()); Self { pool, block_producer: BlockProducer::new(backend), miner, fee_history, - filter_eviction_interval: tokio::time::interval(filters.keep_alive()), + filter_eviction_interval, filters, } }