diff --git a/docs/graphman.md b/docs/graphman.md index 31353fbabc3..8c857703dda 100644 --- a/docs/graphman.md +++ b/docs/graphman.md @@ -371,21 +371,30 @@ Inspect all blocks after block `13000000`: Remove the call cache of the specified chain. -If block numbers are not mentioned in `--from` and `--to`, then all the call cache will be removed. +Either remove entries in the range `--from` and `--to`, remove stale contracts which have not been accessed for a specified duration `--ttl_days`, or remove the entire cache with `--remove-entire-cache`. Removing the entire cache can reduce indexing performance significantly and should generally be avoided. -USAGE: - graphman chain call-cache remove [OPTIONS] + Usage: graphman chain call-cache remove [OPTIONS] -OPTIONS: - -f, --from - Starting block number + Options: + --remove-entire-cache + Remove the entire cache + + --ttl-days + Remove stale contracts based on call_meta table - -h, --help - Print help information + --ttl-max-contracts + Limit the number of contracts to consider for stale contract removal + + -f, --from + Starting block number - -t, --to + -t, --to Ending block number + -h, --help + Print help (see a summary with '-h') + + ### DESCRIPTION Remove the call cache of a specified chain. @@ -404,6 +413,15 @@ the first block number will be used as the starting block number. The `to` option is used to specify the ending block number of the block range. In the absence of `to` option, the last block number will be used as the ending block number. +#### `--remove-entire-cache` +The `--remove-entire-cache` option is used to remove the entire call cache of the specified chain. + +#### `--ttl-days ` +The `--ttl-days` option is used to remove stale contracts based on the `call_meta.accessed_at` field. For example, if `--ttl-days` is set to 7, all calls to a contract that has not been accessed in the last 7 days will be removed from the call cache. + +#### `--ttl-max-contracts ` +The `--ttl-max-contracts` option is used to limit the maximum number of contracts to be removed when using the `--ttl-days` option. For example, if `--ttl-max-contracts` is set to 100, at most 100 contracts will be removed from the call cache even if more contracts meet the TTL criteria. + ### EXAMPLES Remove the call cache for all blocks numbered from 10 to 20: @@ -412,5 +430,12 @@ Remove the call cache for all blocks numbered from 10 to 20: Remove all the call cache of the specified chain: - graphman --config config.toml chain call-cache ethereum remove + graphman --config config.toml chain call-cache ethereum remove --remove-entire-cache + +Remove stale contracts from the call cache that have not been accessed in the last 7 days: + + graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 + +Remove stale contracts from the call cache that have not been accessed in the last 7 days, limiting the removal to a maximum of 100 contracts: + graphman --config config.toml chain call-cache ethereum remove --ttl-days 7 --ttl-max-contracts 100 diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 8f0bc565e6c..b2d9bf71df2 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -571,6 +571,13 @@ impl ChainStore for MockChainStore { async fn clear_call_cache(&self, _from: BlockNumber, _to: BlockNumber) -> Result<(), Error> { unimplemented!() } + async fn clear_stale_call_cache( + &self, + _ttl_days: i32, + _ttl_max_contracts: Option, + ) -> Result<(), Error> { + unimplemented!() + } fn chain_identifier(&self) -> Result { unimplemented!() } diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index f29c66f4784..2d115aeff07 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -599,6 +599,13 @@ pub trait ChainStore: ChainHeadStore { /// Clears call cache of the chain for the given `from` and `to` block number. async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>; + /// Clears stale call cache entries for the given TTL in days. + async fn clear_stale_call_cache( + &self, + ttl_days: i32, + ttl_max_contracts: Option, + ) -> Result<(), Error>; + /// Return the chain identifier for this store. fn chain_identifier(&self) -> Result; diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index aba6595f1c9..9e67a532a8c 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -555,14 +555,21 @@ pub enum ChainCommand { pub enum CallCacheCommand { /// Remove the call cache of the specified chain. /// - /// Either remove entries in the range `--from` and `--to`, or remove - /// the entire cache with `--remove-entire-cache`. Removing the entire + /// Either remove entries in the range `--from` and `--to`, + /// remove the cache for contracts that have not been accessed for the specified duration --ttl_days, + /// or remove the entire cache with `--remove-entire-cache`. Removing the entire /// cache can reduce indexing performance significantly and should /// generally be avoided. Remove { /// Remove the entire cache #[clap(long, conflicts_with_all = &["from", "to"])] remove_entire_cache: bool, + /// Remove the cache for contracts that have not been accessed in the last days + #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(i32).range(1..))] + ttl_days: Option, + /// Limits the number of contracts to consider for cache removal when using --ttl_days + #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(i64).range(1..))] + ttl_max_contracts: Option, /// Starting block number #[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")] from: Option, @@ -1472,8 +1479,19 @@ async fn main() -> anyhow::Result<()> { from, to, remove_entire_cache, + ttl_days, + ttl_max_contracts, } => { let chain_store = ctx.chain_store(&chain_name)?; + if let Some(ttl_days) = ttl_days { + return commands::chain::clear_stale_call_cache( + chain_store, + ttl_days, + ttl_max_contracts, + ) + .await; + } + if !remove_entire_cache && from.is_none() && to.is_none() { bail!("you must specify either --from and --to or --remove-entire-cache"); } diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index 905568a5637..11622dca2da 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -81,6 +81,21 @@ pub async fn clear_call_cache( Ok(()) } +pub async fn clear_stale_call_cache( + chain_store: Arc, + ttl_days: i32, + ttl_max_contracts: Option, +) -> Result<(), Error> { + println!( + "Removing stale entries from the call cache for `{}`", + chain_store.chain + ); + chain_store + .clear_stale_call_cache(ttl_days, ttl_max_contracts) + .await?; + Ok(()) +} + pub async fn info( primary: ConnectionPool, store: Arc, diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index db83199a56c..45d5a7f27a9 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -83,6 +83,7 @@ pub use data::Storage; /// Encapuslate access to the blocks table for a chain. mod data { + use crate::diesel::dsl::IntervalDsl; use diesel::sql_types::{Array, Binary, Bool, Nullable}; use diesel::{connection::SimpleConnection, insert_into}; use diesel::{delete, prelude::*, sql_query}; @@ -104,8 +105,10 @@ mod data { use graph::prelude::transaction_receipt::LightTransactionReceipt; use graph::prelude::web3::types::H256; use graph::prelude::{ - serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, StoreError, + info, serde_json as json, BlockNumber, BlockPtr, CachedEthereumCall, Error, Logger, + StoreError, }; + use std::collections::HashMap; use std::convert::TryFrom; use std::fmt; @@ -1398,6 +1401,190 @@ mod data { } } + pub fn clear_stale_call_cache( + &self, + conn: &mut PgConnection, + logger: &Logger, + ttl_days: i32, + ttl_max_contracts: Option, + ) -> Result<(), Error> { + let mut total_calls: usize = 0; + let mut total_contracts: i64 = 0; + // We process contracts in batches to avoid loading too many entries into memory + // at once. Each contract can have many calls, so we also delete calls in batches. + // Note: The batch sizes were chosen based on experimentation. Potentially, they + // could be made configurable via ENV vars. + let contracts_batch_size: i64 = 2000; + let cache_batch_size: usize = 10000; + + // Limits the number of contracts to process if ttl_max_contracts is set. + // Used also to adjust the final batch size, so we don't process more + // contracts than the set limit. + let remaining_contracts = |processed: i64| -> Option { + ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) + }; + + match self { + Storage::Shared => { + use public::eth_call_cache as cache; + use public::eth_call_meta as meta; + + loop { + if let Some(0) = remaining_contracts(total_contracts) { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", + total_calls, + total_contracts + ); + break; + } + + let batch_limit = remaining_contracts(total_contracts) + .map(|left| left.min(contracts_batch_size)) + .unwrap_or(contracts_batch_size); + + let stale_contracts = meta::table + .select(meta::contract_address) + .filter( + meta::accessed_at + .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + ) + .limit(batch_limit) + .get_results::>(conn)?; + + if stale_contracts.is_empty() { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts", + total_calls, + total_contracts + ); + break; + } + + loop { + let next_batch = cache::table + .select(cache::id) + .filter(cache::contract_address.eq_any(&stale_contracts)) + .limit(cache_batch_size as i64) + .get_results::>(conn)?; + let deleted_count = + diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) + .execute(conn)?; + + total_calls += deleted_count; + + if deleted_count < cache_batch_size { + break; + } + } + + let deleted_contracts = diesel::delete( + meta::table.filter(meta::contract_address.eq_any(&stale_contracts)), + ) + .execute(conn)?; + + total_contracts += deleted_contracts as i64; + } + + Ok(()) + } + Storage::Private(Schema { + call_cache, + call_meta, + .. + }) => { + let select_query = format!( + "WITH stale_contracts AS ( + SELECT contract_address + FROM {} + WHERE accessed_at < current_date - interval '{} days' + LIMIT $1 + ) + SELECT contract_address FROM stale_contracts", + call_meta.qname, ttl_days + ); + + let delete_cache_query = format!( + "WITH targets AS ( + SELECT id + FROM {} + WHERE contract_address = ANY($1) + LIMIT {} + ) + DELETE FROM {} USING targets + WHERE {}.id = targets.id", + call_cache.qname, cache_batch_size, call_cache.qname, call_cache.qname + ); + + let delete_meta_query = format!( + "DELETE FROM {} WHERE contract_address = ANY($1)", + call_meta.qname + ); + + #[derive(QueryableByName)] + struct ContractAddress { + #[diesel(sql_type = Bytea)] + contract_address: Vec, + } + + loop { + if let Some(0) = remaining_contracts(total_contracts) { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", + total_calls, + total_contracts + ); + break; + } + + let batch_limit = remaining_contracts(total_contracts) + .map(|left| left.min(contracts_batch_size)) + .unwrap_or(contracts_batch_size); + + let stale_contracts: Vec> = sql_query(&select_query) + .bind::(batch_limit) + .load::(conn)? + .into_iter() + .map(|r| r.contract_address) + .collect(); + + if stale_contracts.is_empty() { + info!( + logger, + "Finished cleaning call cache: deleted {} entries for {} contracts", + total_calls, + total_contracts + ); + break; + } + + loop { + let deleted_count = sql_query(&delete_cache_query) + .bind::, _>(&stale_contracts) + .execute(conn)?; + + total_calls += deleted_count; + + if deleted_count < cache_batch_size { + break; + } + } + + let deleted_contracts = sql_query(&delete_meta_query) + .bind::, _>(&stale_contracts) + .execute(conn)?; + + total_contracts += deleted_contracts as i64; + } + + Ok(()) + } + } + } + pub(super) fn update_accessed_at( &self, conn: &mut PgConnection, @@ -2508,6 +2695,16 @@ impl ChainStoreTrait for ChainStore { Ok(()) } + async fn clear_stale_call_cache( + &self, + ttl_days: i32, + ttl_max_contracts: Option, + ) -> Result<(), Error> { + let conn = &mut *self.get_conn()?; + self.storage + .clear_stale_call_cache(conn, &self.logger, ttl_days, ttl_max_contracts) + } + async fn transaction_receipts_in_block( &self, block_hash: &H256, diff --git a/store/test-store/tests/postgres/chain_head.rs b/store/test-store/tests/postgres/chain_head.rs index cf501f1438f..73baf71b008 100644 --- a/store/test-store/tests/postgres/chain_head.rs +++ b/store/test-store/tests/postgres/chain_head.rs @@ -1,6 +1,7 @@ //! Test ChainStore implementation of Store, in particular, how //! the chain head pointer gets updated in various situations +use diesel::RunQueryDsl; use graph::blockchain::{BlockHash, BlockPtr}; use graph::data::store::ethereum::call; use graph::data::store::scalar::Bytes; @@ -490,6 +491,72 @@ fn eth_call_cache() { }) } +#[test] +/// Tests mainly query correctness. Requires data in order not to hit early returns when no stale contracts are found. +fn test_clear_stale_call_cache() { + let chain = vec![]; + + #[derive(diesel::QueryableByName)] + struct Namespace { + #[diesel(sql_type = diesel::sql_types::Text)] + namespace: String, + } + + run_test_async(chain, |chain_store, _, _| async move { + let logger = LOGGER.cheap_clone(); + let address = H160([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 2, 3]); + let call: [u8; 6] = [1, 2, 3, 4, 5, 6]; + let return_value: [u8; 3] = [7, 8, 9]; + + let mut conn = PRIMARY_POOL.get().unwrap(); + + // Insert a call cache entry, otherwise it will hit an early return and won't test all queries + let call = call::Request::new(address, call.to_vec(), 0); + chain_store + .set_call( + &logger, + call.cheap_clone(), + BLOCK_ONE.block_ptr(), + call::Retval::Value(Bytes::from(return_value)), + ) + .unwrap(); + + // Confirm the call cache entry is there + let ret = chain_store.get_call(&call, BLOCK_ONE.block_ptr()).unwrap(); + assert!(ret.is_some()); + + // Now we need to update the accessed_at timestamp to be stale, so it gets deleted + // Get namespace from chains table + let namespace: String = diesel::sql_query(format!( + "SELECT namespace FROM public.chains WHERE name = '{}'", + chain_store.chain + )) + .get_result::(&mut conn) + .unwrap() + .namespace; + + // Determine the correct meta table name + let meta_table: String = match namespace.as_str() { + "public" => "eth_call_meta".to_owned(), + _ => format!("{namespace}.call_meta"), + }; + + // Update accessed_at to be 8 days ago, so it's stale for a 7 day threshold + let _ = diesel::sql_query(format!( + "UPDATE {meta_table} SET accessed_at = NOW() - INTERVAL '8 days' WHERE contract_address = $1" + )).bind::(address.as_bytes()) + .execute(&mut conn) + .unwrap(); + + let result = chain_store.clear_stale_call_cache(7, None).await; + assert!(result.is_ok()); + + // Confirm the call cache entry was removed + let ret = chain_store.get_call(&call, BLOCK_ONE.block_ptr()).unwrap(); + assert!(ret.is_none()); + }); +} + #[test] /// Tests only query correctness. No data is involved. fn test_transaction_receipts_in_block_function() {