From 72cf65d76cfdeac188cd51eed3093d2091f650ea Mon Sep 17 00:00:00 2001 From: Petko Date: Mon, 25 Sep 2023 13:28:15 +0300 Subject: [PATCH] feat: divergence summary notification on a given interval --- subgraph-radio/src/config.rs | 24 ++++++++++++++++-- subgraph-radio/src/operator/attestation.rs | 8 +++++- subgraph-radio/src/operator/mod.rs | 29 +++++++++++++++++++++- subgraph-radio/src/operator/notifier.rs | 7 ++++++ subgraph-radio/src/state.rs | 12 ++++++--- 5 files changed, 73 insertions(+), 7 deletions(-) diff --git a/subgraph-radio/src/config.rs b/subgraph-radio/src/config.rs index fe6231e..e16b9e1 100644 --- a/subgraph-radio/src/config.rs +++ b/subgraph-radio/src/config.rs @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize}; use std::collections::HashSet; use tracing::{debug, info, trace}; +use crate::operator::notifier::NotificationMode; use crate::state::{panic_hook, PersistedState}; use crate::{active_allocation_hashes, syncing_deployment_hashes}; @@ -470,11 +471,28 @@ pub struct RadioInfrastructure { long, value_name = "LOG_FORMAT", env = "LOG_FORMAT", - help = "Support logging formats: pretty, json, full, compact", - long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parsible", + help = "Supported logging formats: pretty, json, full, compact", + long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parseable", default_value = "pretty" )] pub log_format: LogFormat, + #[clap( + long, + value_name = "NOTIFICATION_MODE", + env = "NOTIFICATION_MODE", + help = "Supported: live, interval", + long_help = "live: send a notification as soon as it finds a divergence; interval: send a notification on a specified interval (default is 24 hours but can be configured with ) with a list of divergent subgraphs", + default_value = "live" + )] + pub notification_mode: NotificationMode, + #[clap( + long, + value_name = "NOTIFICATION_INTERVAL", + env = "NOTIFICATION_INTERVAL", + help = "Interval (in hours) between sending a divergence summary notification", + default_value = "24" + )] + pub notification_interval: u64, } #[derive(Clone, Debug, Args, Serialize, Deserialize, Default)] @@ -613,6 +631,8 @@ mod tests { log_format: LogFormat::Pretty, graphcast_network: GraphcastNetworkName::Testnet, auto_upgrade: CoverageLevel::Comprehensive, + notification_mode: NotificationMode::Live, + notification_interval: 24, }, config_file: None, } diff --git a/subgraph-radio/src/operator/attestation.rs b/subgraph-radio/src/operator/attestation.rs index 4dbae71..c0ddb99 100644 --- a/subgraph-radio/src/operator/attestation.rs +++ b/subgraph-radio/src/operator/attestation.rs @@ -17,6 +17,7 @@ use graphcast_sdk::{ graphcast_agent::message_typing::{get_indexer_stake, BuildMessageError, GraphcastMessage}, }; +use crate::operator::notifier::NotificationMode; use crate::{ messages::poi::PublicPoiMessage, metrics::ACTIVE_INDEXERS, state::PersistedState, OperationError, @@ -613,6 +614,7 @@ pub async fn process_comparison_results( result_strings: Vec>, notifier: Notifier, persisted_state: PersistedState, + notification_mode: NotificationMode, ) { // Generate attestation summary let mut match_strings = vec![]; @@ -626,7 +628,11 @@ pub async fn process_comparison_results( match result { Ok(comparison_result) => { let result_type = persisted_state - .handle_comparison_result(comparison_result.clone(), notifier.clone()) + .handle_comparison_result( + comparison_result.clone(), + notifier.clone(), + notification_mode.clone(), + ) .await; match result_type { diff --git a/subgraph-radio/src/operator/mod.rs b/subgraph-radio/src/operator/mod.rs index d1f5fe9..c50d945 100644 --- a/subgraph-radio/src/operator/mod.rs +++ b/subgraph-radio/src/operator/mod.rs @@ -31,6 +31,7 @@ use crate::messages::upgrade::UpgradeIntentMessage; use crate::metrics::handle_serve_metrics; use crate::operator::attestation::log_gossip_summary; use crate::operator::attestation::process_comparison_results; +use crate::operator::notifier::NotificationMode; use crate::server::run_server; use crate::state::PersistedState; use crate::GRAPHCAST_AGENT; @@ -191,6 +192,11 @@ impl RadioOperator { let mut gossip_poi_interval = interval(Duration::from_secs(30)); let mut comparison_interval = interval(Duration::from_secs(30)); + let mut notification_interval = tokio::time::interval(Duration::from_secs( + self.config.radio_infrastructure.notification_interval * 3600, + )); + let mut first_tick = true; + let iteration_timeout = Duration::from_secs(180); let update_timeout = Duration::from_secs(5); let gossip_timeout = Duration::from_secs(120); @@ -351,7 +357,8 @@ impl RadioOperator { identifiers.len(), comparison_res, self.notifier.clone(), - self.persisted_state.clone() + self.persisted_state.clone(), + self.config.radio_infrastructure.notification_mode.clone() ) }).await; @@ -361,6 +368,26 @@ impl RadioOperator { debug!("compare_poi completed"); } }, + _ = notification_interval.tick() => { + if first_tick { + first_tick = false; + continue; + } + if self.config.radio_infrastructure.notification_mode == NotificationMode::Daily { + let lines = { + let comparison_results = self.persisted_state.comparison_results.lock().unwrap(); + let mut lines = Vec::new(); + for (identifier, res) in comparison_results.iter() { + if res.result_type == ComparisonResultType::Divergent { + lines.push(format!("Deployment {} diverging as on block {}.", identifier, res.block_number)); + } + } + lines + }; + self.notifier.notify(lines.join("\n")).await; + } + }, + else => break, } diff --git a/subgraph-radio/src/operator/notifier.rs b/subgraph-radio/src/operator/notifier.rs index 9f4432a..288a597 100644 --- a/subgraph-radio/src/operator/notifier.rs +++ b/subgraph-radio/src/operator/notifier.rs @@ -16,6 +16,13 @@ pub struct Notifier { telegram_chat_id: Option, } +#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default, PartialEq)] +pub enum NotificationMode { + Daily, + #[default] + Live, +} + impl Notifier { pub fn new( radio_name: String, diff --git a/subgraph-radio/src/state.rs b/subgraph-radio/src/state.rs index 5b91cc3..e399081 100644 --- a/subgraph-radio/src/state.rs +++ b/subgraph-radio/src/state.rs @@ -17,6 +17,7 @@ use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage; use crate::messages::upgrade::UpgradeIntentMessage; use crate::metrics::CACHED_PPOI_MESSAGES; +use crate::operator::notifier::NotificationMode; use crate::{ messages::poi::PublicPoiMessage, operator::attestation::{ @@ -251,6 +252,7 @@ impl PersistedState { &self, new_comparison_result: ComparisonResult, notifier: Notifier, + notification_mode: NotificationMode, ) -> ComparisonResultType { let (should_notify, updated_comparison_result, result_type) = { let mut results = self.comparison_results.lock().unwrap(); @@ -295,7 +297,7 @@ impl PersistedState { (should_notify, new_comparison_result.clone(), result_type) }; - if should_notify { + if notification_mode == NotificationMode::Live && should_notify { notifier.notify(updated_comparison_result.to_string()).await; } @@ -596,7 +598,9 @@ mod tests { attestations: Vec::new(), }; - state.handle_comparison_result(new_result, notifier).await; + state + .handle_comparison_result(new_result, notifier, NotificationMode::Live) + .await; let comparison_results = state.comparison_results.lock().unwrap(); assert!(comparison_results.contains_key(&String::from("new_deployment"))); @@ -637,7 +641,9 @@ mod tests { .lock() .unwrap() .insert(String::from("existing_deployment"), old_result.clone()); - state.handle_comparison_result(new_result, notifier).await; + state + .handle_comparison_result(new_result, notifier, NotificationMode::Live) + .await; let comparison_results = state.comparison_results.lock().unwrap(); let result = comparison_results