Skip to content

Commit

Permalink
feat: divergence summary notification on a given interval
Browse files Browse the repository at this point in the history
  • Loading branch information
petkodes committed Sep 25, 2023
1 parent e19df4b commit 72cf65d
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 7 deletions.
24 changes: 22 additions & 2 deletions subgraph-radio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use serde::{Deserialize, Serialize};
use std::collections::HashSet;
use tracing::{debug, info, trace};

use crate::operator::notifier::NotificationMode;
use crate::state::{panic_hook, PersistedState};
use crate::{active_allocation_hashes, syncing_deployment_hashes};

Expand Down Expand Up @@ -470,11 +471,28 @@ pub struct RadioInfrastructure {
long,
value_name = "LOG_FORMAT",
env = "LOG_FORMAT",
help = "Support logging formats: pretty, json, full, compact",
long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parsible",
help = "Supported logging formats: pretty, json, full, compact",
long_help = "pretty: verbose and human readable; json: not verbose and parsable; compact: not verbose and not parsable; full: verbose and not parseable",
default_value = "pretty"
)]
pub log_format: LogFormat,
#[clap(
long,
value_name = "NOTIFICATION_MODE",
env = "NOTIFICATION_MODE",
help = "Supported: live, interval",
long_help = "live: send a notification as soon as it finds a divergence; interval: send a notification on a specified interval (default is 24 hours but can be configured with ) with a list of divergent subgraphs",
default_value = "live"
)]
pub notification_mode: NotificationMode,
#[clap(
long,
value_name = "NOTIFICATION_INTERVAL",
env = "NOTIFICATION_INTERVAL",
help = "Interval (in hours) between sending a divergence summary notification",
default_value = "24"
)]
pub notification_interval: u64,
}

#[derive(Clone, Debug, Args, Serialize, Deserialize, Default)]
Expand Down Expand Up @@ -613,6 +631,8 @@ mod tests {
log_format: LogFormat::Pretty,
graphcast_network: GraphcastNetworkName::Testnet,
auto_upgrade: CoverageLevel::Comprehensive,
notification_mode: NotificationMode::Live,
notification_interval: 24,
},
config_file: None,
}
Expand Down
8 changes: 7 additions & 1 deletion subgraph-radio/src/operator/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use graphcast_sdk::{
graphcast_agent::message_typing::{get_indexer_stake, BuildMessageError, GraphcastMessage},
};

use crate::operator::notifier::NotificationMode;
use crate::{
messages::poi::PublicPoiMessage, metrics::ACTIVE_INDEXERS, state::PersistedState,
OperationError,
Expand Down Expand Up @@ -613,6 +614,7 @@ pub async fn process_comparison_results(
result_strings: Vec<Result<ComparisonResult, OperationError>>,
notifier: Notifier,
persisted_state: PersistedState,
notification_mode: NotificationMode,
) {
// Generate attestation summary
let mut match_strings = vec![];
Expand All @@ -626,7 +628,11 @@ pub async fn process_comparison_results(
match result {
Ok(comparison_result) => {
let result_type = persisted_state
.handle_comparison_result(comparison_result.clone(), notifier.clone())
.handle_comparison_result(
comparison_result.clone(),
notifier.clone(),
notification_mode.clone(),
)
.await;

match result_type {
Expand Down
29 changes: 28 additions & 1 deletion subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::messages::upgrade::UpgradeIntentMessage;
use crate::metrics::handle_serve_metrics;
use crate::operator::attestation::log_gossip_summary;
use crate::operator::attestation::process_comparison_results;
use crate::operator::notifier::NotificationMode;
use crate::server::run_server;
use crate::state::PersistedState;
use crate::GRAPHCAST_AGENT;
Expand Down Expand Up @@ -191,6 +192,11 @@ impl RadioOperator {
let mut gossip_poi_interval = interval(Duration::from_secs(30));
let mut comparison_interval = interval(Duration::from_secs(30));

let mut notification_interval = tokio::time::interval(Duration::from_secs(
self.config.radio_infrastructure.notification_interval * 3600,
));
let mut first_tick = true;

let iteration_timeout = Duration::from_secs(180);
let update_timeout = Duration::from_secs(5);
let gossip_timeout = Duration::from_secs(120);
Expand Down Expand Up @@ -351,7 +357,8 @@ impl RadioOperator {
identifiers.len(),
comparison_res,
self.notifier.clone(),
self.persisted_state.clone()
self.persisted_state.clone(),
self.config.radio_infrastructure.notification_mode.clone()
)
}).await;

Expand All @@ -361,6 +368,26 @@ impl RadioOperator {
debug!("compare_poi completed");
}
},
_ = notification_interval.tick() => {
if first_tick {
first_tick = false;
continue;
}
if self.config.radio_infrastructure.notification_mode == NotificationMode::Daily {
let lines = {
let comparison_results = self.persisted_state.comparison_results.lock().unwrap();
let mut lines = Vec::new();
for (identifier, res) in comparison_results.iter() {
if res.result_type == ComparisonResultType::Divergent {
lines.push(format!("Deployment {} diverging as on block {}.", identifier, res.block_number));
}
}
lines
};
self.notifier.notify(lines.join("\n")).await;
}
},

else => break,
}

Expand Down
7 changes: 7 additions & 0 deletions subgraph-radio/src/operator/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@ pub struct Notifier {
telegram_chat_id: Option<i64>,
}

#[derive(clap::ValueEnum, Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub enum NotificationMode {
Daily,
#[default]
Live,
}

impl Notifier {
pub fn new(
radio_name: String,
Expand Down
12 changes: 9 additions & 3 deletions subgraph-radio/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use graphcast_sdk::graphcast_agent::message_typing::GraphcastMessage;

use crate::messages::upgrade::UpgradeIntentMessage;
use crate::metrics::CACHED_PPOI_MESSAGES;
use crate::operator::notifier::NotificationMode;
use crate::{
messages::poi::PublicPoiMessage,
operator::attestation::{
Expand Down Expand Up @@ -251,6 +252,7 @@ impl PersistedState {
&self,
new_comparison_result: ComparisonResult,
notifier: Notifier,
notification_mode: NotificationMode,
) -> ComparisonResultType {
let (should_notify, updated_comparison_result, result_type) = {
let mut results = self.comparison_results.lock().unwrap();
Expand Down Expand Up @@ -295,7 +297,7 @@ impl PersistedState {
(should_notify, new_comparison_result.clone(), result_type)
};

if should_notify {
if notification_mode == NotificationMode::Live && should_notify {
notifier.notify(updated_comparison_result.to_string()).await;
}

Expand Down Expand Up @@ -596,7 +598,9 @@ mod tests {
attestations: Vec::new(),
};

state.handle_comparison_result(new_result, notifier).await;
state
.handle_comparison_result(new_result, notifier, NotificationMode::Live)
.await;

let comparison_results = state.comparison_results.lock().unwrap();
assert!(comparison_results.contains_key(&String::from("new_deployment")));
Expand Down Expand Up @@ -637,7 +641,9 @@ mod tests {
.lock()
.unwrap()
.insert(String::from("existing_deployment"), old_result.clone());
state.handle_comparison_result(new_result, notifier).await;
state
.handle_comparison_result(new_result, notifier, NotificationMode::Live)
.await;

let comparison_results = state.comparison_results.lock().unwrap();
let result = comparison_results
Expand Down

0 comments on commit 72cf65d

Please sign in to comment.