diff --git a/graph/src/components/metrics/block_state.rs b/graph/src/components/metrics/block_state.rs index 87984d46647..e46683638a9 100644 --- a/graph/src/components/metrics/block_state.rs +++ b/graph/src/components/metrics/block_state.rs @@ -10,19 +10,21 @@ use url::Url; use crate::{ blockchain::BlockPtr, components::store::{DeploymentId, Entity}, - data::store::Id, + data::{store::Id, value::Word}, env::ENV_VARS, runtime::gas::Gas, schema::EntityType, util::cache_weight::CacheWeight, }; -#[derive(Debug)] +#[derive(Default, Debug)] pub struct BlockStateMetrics { pub gas_counter: HashMap, pub op_counter: HashMap, pub read_bytes_counter: HashMap, pub write_bytes_counter: HashMap, + pub entity_count_changes: HashMap, + pub current_storage_size: HashMap, } #[derive(Hash, PartialEq, Eq, Debug, Clone)] @@ -44,6 +46,8 @@ impl BlockStateMetrics { write_bytes_counter: HashMap::new(), gas_counter: HashMap::new(), op_counter: HashMap::new(), + entity_count_changes: HashMap::new(), + current_storage_size: HashMap::new(), } } @@ -63,6 +67,14 @@ impl BlockStateMetrics { for (key, value) in other.op_counter { *self.op_counter.entry(key).or_insert(0) += value; } + + for (key, value) in other.entity_count_changes { + *self.entity_count_changes.entry(key).or_insert(0) = value; + } + + for (key, value) in other.current_storage_size { + *self.current_storage_size.entry(key).or_insert(0) = value; + } } fn serialize_to_csv>( @@ -97,6 +109,25 @@ impl BlockStateMetrics { ) } + pub fn counter_to_csv_i32( + data: &HashMap, + column_names: Vec<&str>, + ) -> Result { + Self::serialize_to_csv( + data.iter().map(|(key, value)| match key { + CounterKey::Entity(typename, id) => { + vec![ + typename.typename().to_string(), + id.to_string(), + value.to_string(), + ] + } + CounterKey::String(key) => vec![key.to_string(), value.to_string()], + }), + column_names, + ) + } + async fn write_csv_to_store(bucket: &str, path: &str, data: String) -> Result<()> { let data_bytes = data.into_bytes(); @@ -158,6 +189,57 @@ impl BlockStateMetrics { } } + pub fn track_entity_count_change(&mut self, entity_type: &EntityType, change: i32) { + if ENV_VARS.enable_dips_metrics { + let key = CounterKey::Entity(entity_type.clone(), Id::String(Word::from("total"))); + let counter = self.entity_count_changes.entry(key).or_insert(0); + if change < 0 { + *counter = counter.saturating_sub((-change) as u64); + } else { + *counter = counter.saturating_add(change as u64); + } + } + } + + pub fn track_storage_size_change( + &mut self, + entity_type: &EntityType, + entity: &Entity, + is_removal: bool, + ) { + if ENV_VARS.enable_dips_metrics { + let key = CounterKey::Entity(entity_type.clone(), entity.id()); + let size = entity.weight() as u64; + + let storage = self.current_storage_size.entry(key).or_insert(0); + if is_removal { + *storage = storage.saturating_sub(size); + } else { + *storage = size; + } + } + } + + pub fn track_storage_size_change_batch( + &mut self, + entity_type: &EntityType, + entities: &[Entity], + is_removal: bool, + ) { + if ENV_VARS.enable_dips_metrics { + for entity in entities { + self.track_storage_size_change(entity_type, entity, is_removal); + } + } + } + + pub fn track_entity_count_change_batch(&mut self, entity_type: &EntityType, changes: &[i32]) { + if ENV_VARS.enable_dips_metrics { + let total_change: i32 = changes.iter().sum(); + self.track_entity_count_change(entity_type, total_change); + } + } + pub fn flush_metrics_to_store( &self, logger: &Logger, @@ -180,6 +262,8 @@ impl BlockStateMetrics { let op_counter = self.op_counter.clone(); let read_bytes_counter = self.read_bytes_counter.clone(); let write_bytes_counter = self.write_bytes_counter.clone(); + let entity_count_changes = self.entity_count_changes.clone(); + let current_storage_size = self.current_storage_size.clone(); // Spawn the async task crate::spawn(async move { @@ -203,6 +287,16 @@ impl BlockStateMetrics { Self::counter_to_csv(&write_bytes_counter, vec!["entity", "id", "bytes"]) .unwrap(), ), + ( + "entity_changes", + Self::counter_to_csv(&entity_count_changes, vec!["entity", "id", "count"]) + .unwrap(), + ), + ( + "storage_size", + Self::counter_to_csv(¤t_storage_size, vec!["entity", "id", "bytes"]) + .unwrap(), + ), ]; // Convert each metrics upload into a future diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 721e3d80bc1..d382864d078 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -4,6 +4,7 @@ use std::{collections::HashSet, sync::Arc}; use crate::{ blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime}, cheap_clone::CheapClone, + components::metrics::block_state::BlockStateMetrics, components::subgraph::Entity, constraint_violation, data::{store::Id, subgraph::schema::SubgraphError}, @@ -495,6 +496,32 @@ impl RowGroup { pub fn ids(&self) -> impl Iterator { self.rows.iter().map(|emod| emod.id()) } + + pub fn track_metrics(&self, metrics: &mut BlockStateMetrics) { + // Track entity count changes + let changes: Vec = self + .rows + .iter() + .map(|row| row.entity_count_change()) + .collect(); + metrics.track_entity_count_change_batch(&self.entity_type, &changes); + + // Track writes only + let writes: Vec = self + .rows + .iter() + .filter_map(|row| match row { + EntityModification::Insert { data, .. } + | EntityModification::Overwrite { data, .. } => Some(data.as_ref().clone()), + EntityModification::Remove { .. } => None, + }) + .collect(); + + if !writes.is_empty() { + metrics.track_entity_write_batch(&self.entity_type, &writes); + metrics.track_storage_size_change_batch(&self.entity_type, &writes, false); + } + } } struct ClampsByBlockIterator<'a> { @@ -679,10 +706,17 @@ impl Batch { let mut mods = RowGroups::new(); + let mut metrics = BlockStateMetrics::default(); + for m in raw_mods { mods.group_entry(&m.key().entity_type).push(m, block)?; } + // Track metrics for each group + for group in &mods.groups { + group.track_metrics(&mut metrics); + } + let data_sources = DataSources::new(block_ptr.cheap_clone(), data_sources); let offchain_to_remove = DataSources::new(block_ptr.cheap_clone(), offchain_to_remove); let first_block = block_ptr.number; diff --git a/runtime/wasm/src/host_exports.rs b/runtime/wasm/src/host_exports.rs index 78921bbcf34..999d7478cf7 100644 --- a/runtime/wasm/src/host_exports.rs +++ b/runtime/wasm/src/host_exports.rs @@ -33,6 +33,8 @@ use crate::{error::DeterminismLevel, module::IntoTrap}; use super::module::WasmInstanceData; +use graph::schema::EntityKey; + fn write_poi_event( proof_of_indexing: &SharedProofOfIndexing, poi_event: &ProofOfIndexingEvent, @@ -350,6 +352,19 @@ impl HostExports { state.metrics.track_entity_write(&entity_type, &entity); + state + .metrics + .track_storage_size_change(&entity_type, &entity, false); + + if state + .entity_cache + .get(&key, GetScope::Store) + .map_err(|e| HostExportError::Deterministic(e.into()))? + .is_none() + { + state.metrics.track_entity_count_change(&entity_type, 1); + } + state .entity_cache .set(key, entity, Some(&mut state.write_capacity_remaining))?; @@ -388,7 +403,7 @@ impl HostExports { "store_remove", )?; - state.entity_cache.remove(key); + self.remove_entity(&key, state)?; Ok(()) } @@ -1233,6 +1248,28 @@ impl HostExports { .map(|mut tokens| tokens.pop().unwrap()) .context("Failed to decode") } + + fn remove_entity( + &self, + key: &EntityKey, + state: &mut BlockState, + ) -> Result<(), HostExportError> { + let entity_type = key.entity_type.clone(); + + if let Some(entity) = state + .entity_cache + .get(key, GetScope::Store) + .map_err(|e| HostExportError::Deterministic(e.into()))? + { + state + .metrics + .track_storage_size_change(&entity_type, &entity, true); + state.metrics.track_entity_count_change(&entity_type, -1); + } + + state.entity_cache.remove(key.clone()); + Ok(()) + } } fn string_to_h160(string: &str) -> Result {