Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions anvil/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,9 @@ use anvil_core::eth::subscription::SubscriptionId;
use anvil_rpc::response::ResponseResult;
use ethers::{
prelude::{Log as EthersLog, H256 as TxHash},
types::FilteredParams,
types::{Filter, FilteredParams},
};
use futures::{channel::mpsc::Receiver, Stream, StreamExt};

use ethers::types::Filter;
use std::{
collections::HashMap,
pin::Pin,
Expand All @@ -23,6 +21,7 @@ use std::{
use tokio::sync::Mutex;
use tracing::{trace, warn};

/// Type alias for filters identified by their id and their expiration timestamp
type FilterMap = Arc<Mutex<HashMap<String, (EthFilter, Instant)>>>;

/// timeout after which to remove an active filter if it wasn't polled since then
Expand All @@ -45,19 +44,19 @@ impl Filters {
let id = new_id();
trace!(target: "node::filter", "Adding new filter id {}", id);
let mut filters = self.active_filters.lock().await;
filters.insert(id.clone(), (filter, Instant::now()));
filters.insert(id.clone(), (filter, self.next_deadline()));
id
}

pub async fn get_filter_changes(&self, id: &str) -> ResponseResult {
{
let mut filters = self.active_filters.lock().await;
if let Some((filter, timestamp)) = filters.get_mut(id) {
if let Some((filter, deadline)) = filters.get_mut(id) {
let resp = filter
.next()
.await
.unwrap_or_else(|| ResponseResult::success(Vec::<()>::new()));
*timestamp = Instant::now();
*deadline = self.next_deadline();
return resp
}
}
Expand Down Expand Up @@ -85,11 +84,17 @@ impl Filters {
self.keepalive
}

/// Returns the timestamp after which a filter should expire
fn next_deadline(&self) -> Instant {
Instant::now() + self.keep_alive()
}

/// Evict all filters that weren't updated and reached there deadline
pub async fn evict(&self) {
trace!(target: "node::filter", "Evicting stale filters");
let deadline = Instant::now() - self.keepalive;
let now = Instant::now();
let mut active_filters = self.active_filters.lock().await;
active_filters.retain(|_, (_, timestamp)| *timestamp > deadline);
active_filters.retain(|_, (_, deadline)| *deadline > now);
}
}

Expand Down
4 changes: 3 additions & 1 deletion anvil/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ impl NodeService {
fee_history: FeeHistoryService,
filters: Filters,
) -> Self {
let start = tokio::time::Instant::now() + filters.keep_alive();
let filter_eviction_interval = tokio::time::interval_at(start, filters.keep_alive());
Self {
pool,
block_producer: BlockProducer::new(backend),
miner,
fee_history,
filter_eviction_interval: tokio::time::interval(filters.keep_alive()),
filter_eviction_interval,
filters,
}
}
Expand Down