Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: persist and clean upgrade intent msg, ratelimit, and API #52

Merged
merged 1 commit into from
Aug 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ data
.vscode
*.json
.DS_Store
configs
1 change: 1 addition & 0 deletions subgraph-radio/benches/gossips.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ fn gossip_poi_bench(c: &mut Criterion) {
)],
coverage: CoverageLevel::Comprehensive,
collect_message_duration: 10,
ratelimit_threshold: 60000,
log_level: String::from("info"),
slack_token: None,
slack_channel: None,
Expand Down
11 changes: 10 additions & 1 deletion subgraph-radio/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl Config {
state
} else {
debug!("Created new state");
PersistedState::new(None, None, None)
PersistedState::new(None, None, None, None)
}
}

Expand Down Expand Up @@ -311,6 +311,15 @@ pub struct RadioInfrastructure {
none: no automatic upgrade, only notifications.\nDefault: comprehensive"
)]
pub auto_upgrade: CoverageLevel,
#[clap(
long,
value_parser = value_parser!(i64).range(1..),
default_value = "86400",
value_name = "RATELIMIT_THRESHOLD",
env = "RATELIMIT_THRESHOLD",
help = "Set upgrade intent ratelimit in seconds: only one upgrade per subgraph within the threshold (default: 86400 seconds = 1 day)"
)]
pub ratelimit_threshold: i64,
#[clap(
long,
value_parser = value_parser!(i64).range(1..),
Expand Down
59 changes: 38 additions & 21 deletions subgraph-radio/src/messages/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,21 @@ use async_graphql::SimpleObject;
use ethers_contract::EthAbiType;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
use prost::Message;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};

use graphcast_sdk::{
graphcast_agent::message_typing::{BuildMessageError, GraphcastMessage},
graphql::client_graph_account::{owned_subgraphs, subgraph_hash_by_id},
};
use prost::Message;
use serde::{Deserialize, Serialize};
use tracing::debug;

use crate::config::Config;
use crate::operator::indexer_management::{check_decision_basis, offchain_sync_indexing_rules};
use crate::operator::notifier::Notifier;
use crate::{config::Config, state::PersistedState};
use crate::{
operator::indexer_management::{check_decision_basis, offchain_sync_indexing_rules},
OperationError,
};

#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)]
#[eip712(
Expand Down Expand Up @@ -78,20 +81,36 @@ impl UpgradeIntentMessage {
Ok(self)
}

/// Format the notification for an UpgradeIntentMessage
fn notification_formatter(&self) -> String {
format!(
"Subgraph owner for a deployment announced an upgrade intent:\nSubgraph ID: {}\nNew deployment hash: {}",
self.subgraph_id,
self.new_hash,
)
}

/// Process the validated upgrade intent messages
/// If notification is set up, then notify the indexer
/// If indexer management server endpoint is set up, radio checks `auto_upgrade` for
pub async fn process_valid_message(&self, config: &Config, notifier: &Notifier) {
pub async fn process_valid_message(
&self,
config: &Config,
notifier: &Notifier,
state: &PersistedState,
) -> Result<&Self, OperationError> {
// ratelimit upgrades: return early if there was a recent upgrade
if state.recent_upgrade(self, config.radio_infrastructure.ratelimit_threshold) {
info!(subgraph = &self.subgraph_id, "Received an Upgrade Intent Message for a recently upgraded subgraph, skiping notification and auto deployment");
return Ok(self);
}
// send notifications
notifier.notify(format!(
"Subgraph owner for a deployment announced an upgrade intent:\nSubgraph ID: {}\nNew deployment hash: {}",
self.subgraph_id,
self.new_hash,
)).await;
// auto deployment
// If the identifier satisfy the config coverage level
// and if indexer management server endpoint is provided
notifier.notify(self.notification_formatter()).await;

// auto-deployment
// require configured indexer management server endpoint
if let Some(url) = &config.graph_stack().indexer_management_server_endpoint {
// If the identifier satisfy the config coverage level
let covered_topics = config
.generate_topics(
&config.radio_infrastructure().auto_upgrade,
Expand All @@ -100,17 +119,14 @@ impl UpgradeIntentMessage {
.await;
// Get the current deployment hash by querying network subgraph and take the latest hash of the subgraph id
// Should be able to assume valid identifier since the message is valid
let identifier = if let Ok(hash) = subgraph_hash_by_id(
let identifier = subgraph_hash_by_id(
config.graph_stack().network_subgraph(),
&self.graph_account,
&self.subgraph_id,
)
.await
{
hash
} else {
return;
};
.map_err(OperationError::Query)?;

if covered_topics
.clone()
.into_iter()
Expand All @@ -122,7 +138,8 @@ impl UpgradeIntentMessage {
res = tracing::field::debug(&res),
decision_basis, "New deployment setting"
);
}
};
}
Ok(self)
}
}
8 changes: 7 additions & 1 deletion subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,13 @@ pub async fn process_message(
VALIDATED_MESSAGES
.with_label_values(&[&msg.identifier, "upgrade_intent_message"])
.inc();
radio_msg.process_valid_message(&config, &notifier).await;
if radio_msg
.process_valid_message(&config, &notifier, &state)
.await
.is_ok()
{
state.add_upgrade_intent_message(msg.clone());
};
};
} else {
trace!("Waku message not decoded or validated, skipped message",);
Expand Down
34 changes: 32 additions & 2 deletions subgraph-radio/src/server/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use thiserror::Error;

use crate::{
config::Config,
messages::poi::PublicPoiMessage,
messages::{poi::PublicPoiMessage, upgrade::UpgradeIntentMessage},
operator::attestation::{
self, attestations_to_vec, compare_attestation, process_ppoi_message, Attestation,
AttestationEntry, AttestationError, ComparisonResult, ComparisonResultType,
Expand All @@ -23,7 +23,7 @@ pub struct QueryRoot;

#[Object]
impl QueryRoot {
async fn radio_payload_messages(
async fn public_poi_messages(
&self,
ctx: &Context<'_>,
identifier: Option<String>,
Expand All @@ -35,6 +35,20 @@ impl QueryRoot {
Ok(msgs)
}

async fn upgrade_intent_messages(
&self,
ctx: &Context<'_>,
subgraph_id: Option<String>,
) -> Result<Vec<UpgradeIntentMessage>, HttpServiceError> {
let msgs = ctx
.data_unchecked::<Arc<SubgraphRadioContext>>()
.upgrade_intent_messages_filtered(&subgraph_id)
.into_iter()
.map(|m| m.payload)
.collect();
Ok(msgs)
}

async fn local_attestations(
&self,
ctx: &Context<'_>,
Expand Down Expand Up @@ -257,6 +271,22 @@ impl SubgraphRadioContext {
filtered
}

pub fn upgrade_intent_messages(
&self,
) -> HashMap<String, GraphcastMessage<UpgradeIntentMessage>> {
self.persisted_state.upgrade_intent_messages()
}

pub fn upgrade_intent_messages_filtered(
&self,
subgraph_id: &Option<String>,
) -> Vec<GraphcastMessage<UpgradeIntentMessage>> {
subgraph_id
.as_ref()
.and_then(|id| self.upgrade_intent_messages().get(id).cloned())
.map_or(vec![], |m| vec![m])
}

pub fn comparison_result(&self, identifier: String) -> Option<ComparisonResult> {
let cmp_results = self.persisted_state.comparison_results();
cmp_results.get(&identifier).cloned()
Expand Down
Loading
Loading