Skip to content

Commit

Permalink
feat: version upgrade triggers offchain sync request to indexer
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Aug 9, 2023
1 parent a6e7f8d commit 9a66e38
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 81 deletions.
42 changes: 33 additions & 9 deletions subgraph-radio/src/messages/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ use graphcast_sdk::{
};
use prost::Message;
use serde::{Deserialize, Serialize};
use tracing::debug;

use crate::config::Config;
use crate::operator::indexer_management::{check_decision_basis, offchain_sync_indexing_rules};
use crate::operator::notifier::Notifier;

#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, PartialEq, SimpleObject)]
Expand Down Expand Up @@ -106,7 +109,7 @@ impl VersionUpgradeMessage {
let subgraphs = owned_subgraphs(network_subgraph, &self.graph_account)
.await
.map_err(BuildMessageError::FieldDerivations)?;
if !subgraphs.contains(&self.identifier) {
if !subgraphs.contains(&self.subgraph_id) {
return Err(BuildMessageError::InvalidFields(anyhow::anyhow!(format!(
"Verified account failed to be subgraph owner. Verified account: {:#?}",
self.graph_account
Expand All @@ -129,14 +132,35 @@ impl VersionUpgradeMessage {
}

/// process the validated version upgrade messages, currently just notify
pub async fn process_valid_message(&self, notifier: &Notifier) {
// send notifications, later can optionally automate deployment
pub async fn process_valid_message(&self, config: &Config, notifier: &Notifier) {
// send notifications
notifier.notify(format!(
"Subgraph owner for a deployment has shared version upgrade info:\nold deployment: {}\nnew deployment: {}\nplanned migrate time: {}\nnetwork: {}",
self.identifier,
self.new_hash,
self.migrate_time,
self.network
)).await;
"Subgraph owner for a deployment has shared version upgrade info:\nold deployment: {}\nnew deployment: {}\nplanned migrate time: {}\nnetwork: {}",
self.identifier,
self.new_hash,
self.migrate_time,
self.network
)).await;
// auto deployment
// If the identifier satisfy the config coverage level
// and if indexer management server endpoint is provided
if let Some(url) = &config.graph_stack().indexer_management_server_endpoint {
let covered_topics = config
.generate_topics(config.graph_stack().indexer_address.clone())
.await;

if covered_topics
.clone()
.into_iter()
.any(|x| x == self.identifier.clone())
{
let res = offchain_sync_indexing_rules(url, &self.new_hash).await;
let decision_basis = check_decision_basis(url, &self.new_hash).await;
debug!(
res = tracing::field::debug(&res),
decision_basis, "New deployment setting"
);
}
}
}
}
51 changes: 27 additions & 24 deletions subgraph-radio/src/operator/indexer_management.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,28 @@ pub async fn offchain_sync_indexing_rules(
.map_err(|e| OperationError::Query(graphcast_sdk::graphql::QueryError::Transport(e)))
}

pub async fn check_decision_basis(url: &str, deployment: &str) -> Option<String> {
indexing_rules(url)
.await
.ok()?
.as_object()?
.get("data")?
.as_object()?
.get("indexingRules")?
.as_array()?
.iter()
.find(|o| {
o.as_object()
.and_then(|rule| rule.get("identifier"))
.and_then(|identifier| identifier.as_str())
.map_or(false, |i| i == deployment)
})?
.as_object()?
.get("decisionBasis")?
.as_str()
.map(|s| s.to_string())
}

// // NOTE: this set of tests can only run in context of running indexer_management server
// #[cfg(test)]
// mod tests {
Expand All @@ -138,33 +160,14 @@ pub async fn offchain_sync_indexing_rules(
// async fn test_set_offchain_sync() {
// let res_json = offchain_sync_indexing_rules(
// "http://127.0.0.1:18000",
// "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss",
// "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB",
// )
// .await;
// assert!(res_json.is_ok());

// let check_setting = indexing_rules("http://127.0.0.1:18000").await.unwrap();

// assert!(check_setting
// .as_object()
// .unwrap()
// .get("data")
// .unwrap()
// .as_object()
// .unwrap()
// .get("iiterles")
// .unwrap()
// .as_array()
// .unwrap()
// .into_iter()
// .any(|o| o
// .as_object()
// .unwrap()
// .get("identifier")
// .unwrap()
// .as_str()
// .unwrap()
// == "Qmb5Ysp5oCUXhLA8NmxmYKDAX2nCMnh7Vvb5uffb9n5vss")
// );
// let rule_decision = check_decision_basis("http://127.0.0.1:18000",
// "QmacQnSgia4iDPWHpeY6aWxesRFdb8o5DKZUx96zZqEWrB",).await.unwrap();

// assert!(rule_decision == "offchain".to_string());
// }
// }
119 changes: 71 additions & 48 deletions subgraph-radio/src/operator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl RadioOperator {

let state_ref = persisted_state.clone();
let upgrade_notifier = notifier.clone();
let graph_node = config.graph_stack.graph_node_status_endpoint.clone();
let upgrade_config = config.clone();

// try message format in order of PublicPOIMessage, VersionUpgradeMessage
tokio::spawn(async move {
Expand All @@ -131,12 +131,13 @@ impl RadioOperator {
let callbook = agent.callbook.clone();
let nonces = agent.nonces.clone();
let local_sender = agent.graphcast_identity.graphcast_id.clone();
if let Ok(msg) = agent.decode::<PublicPoiMessage>(msg.payload()).await {
let parsed = if let Ok(msg) = agent.decode::<PublicPoiMessage>(msg.payload()).await
{
trace!(
message = tracing::field::debug(&msg),
"Parseable as Public PoI message, now validate",
);
let msg = match check_message_validity(
match check_message_validity(
msg,
&nonces,
callbook.clone(),
Expand All @@ -146,60 +147,82 @@ impl RadioOperator {
.await
.map_err(|e| WakuHandlingError::InvalidMessage(e.to_string()))
{
Ok(msg) => msg,
Err(e) => {
debug!(
err = tracing::field::debug(e),
"Failed to validate by Graphcast"
);
continue;
Ok(msg) => {
let is_valid = msg
.payload
.validity_check(
&msg,
&upgrade_config.graph_stack.graph_node_status_endpoint,
)
.await
.is_ok();

if is_valid {
VALIDATED_MESSAGES
.with_label_values(&[&msg.identifier, "public_poi_message"])
.inc();
process_valid_message(msg.clone(), &state_ref).await;
};
is_valid
}
};

let is_valid = msg.payload.validity_check(&msg, &graph_node).await;

if is_valid.is_ok() {
VALIDATED_MESSAGES
.with_label_values(&[&msg.identifier, "public_poi_message"])
.inc();
process_valid_message(msg, &state_ref).await;
};
} else if let Ok(msg) = agent.decode::<VersionUpgradeMessage>(msg.payload()).await {
trace!(
message = tracing::field::debug(&msg),
"Parseable as Version Upgrade message, now validate",
);
let msg = match check_message_validity(
msg,
&nonces,
callbook.clone(),
local_sender.clone(),
&id_validation,
)
.await
.map_err(|e| WakuHandlingError::InvalidMessage(e.to_string()))
{
Ok(msg) => msg,
Err(e) => {
debug!(
err = tracing::field::debug(e),
"Failed to validate by Graphcast"
);
continue;
false
}
};

let is_valid = msg.payload.validity_check(&msg, &graph_node).await;

if let Ok(radio_msg) = is_valid {
VALIDATED_MESSAGES
.with_label_values(&[&msg.identifier, "version_upgrade_message"])
.inc();
radio_msg.process_valid_message(&upgrade_notifier).await;
};
}
} else {
trace!("Waku message not decoded or validated, skipped message",);
false
};

if !parsed {
if let Ok(msg) = agent.decode::<VersionUpgradeMessage>(msg.payload()).await {
trace!(
message = tracing::field::debug(&msg),
"Parseable as Version Upgrade message, now validate",
);
// Skip general first time sender nonce check and timestamp check
let msg = match msg
.valid_sender(
callbook.graphcast_registry(),
callbook.graph_network(),
local_sender.clone(),
&id_validation,
)
.await
.map_err(|e| WakuHandlingError::InvalidMessage(e.to_string()))
{
Ok(msg) => msg,
Err(e) => {
debug!(
err = tracing::field::debug(e),
"Failed to validate Graphcast sender"
);
continue;
}
};
let is_valid = msg
.payload
.validity_check(
msg,
&upgrade_config.graph_stack.network_subgraph.clone(),
)
.await;

if let Ok(radio_msg) = is_valid {
VALIDATED_MESSAGES
.with_label_values(&[&msg.identifier, "version_upgrade_message"])
.inc();
radio_msg
.process_valid_message(&upgrade_config, &upgrade_notifier)
.await;
};
} else {
trace!("Waku message not decoded or validated, skipped message",);
};
}
}
});

Expand Down

0 comments on commit 9a66e38

Please sign in to comment.