Skip to content

Commit

Permalink
fix: add local attestation without matching with remote
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Sep 1, 2023
1 parent 8a41797 commit ece7016
Show file tree
Hide file tree
Showing 2 changed files with 153 additions and 44 deletions.
5 changes: 0 additions & 5 deletions subgraph-radio/src/operator/attestation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1056,11 +1056,6 @@ mod tests {
let mut local_attestations: HashMap<String, HashMap<u64, Attestation>> = HashMap::new();
local_attestations.insert("hash".to_string(), local_blocks.clone());
local_attestations.insert("hash2".to_string(), local_blocks);
println!(
"find local comparison point: {:#?}\n{:#?}",
&local_attestations,
&test_msg_vec()
);
let (block_num, collect_window_end) = local_comparison_point(
&local_attestations,
&test_msg_vec(),
Expand Down
192 changes: 153 additions & 39 deletions subgraph-radio/src/server/model/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use async_graphql::{Context, EmptyMutation, EmptySubscription, Object, Schema, SimpleObject};
use chrono::Utc;

use std::{collections::HashMap, sync::Arc};
use thiserror::Error;

Expand Down Expand Up @@ -106,35 +107,10 @@ impl QueryRoot {

let mut ratios = vec![];
for r in res {
// Double check for local attestations to ensure there will be no divide by 0 during the ratio
let local_attestation = if let Some(local_attestation) = r.local_attestation {
local_attestation
} else {
continue;
};
let local_ppoi = local_attestation.ppoi.clone();

// Aggregate remote attestations with the local attestations
let mut aggregated_attestations: Vec<Attestation> = vec![];
for a in r.attestations {
if a.ppoi == local_attestation.ppoi {
let updateed_attestation = attestation::Attestation::update(
&a,
local_info.address.clone(),
local_info.stake,
Utc::now().timestamp(),
);
if let Ok(updated_a) = updateed_attestation {
aggregated_attestations.push(updated_a);
} else {
continue;
}
} else {
aggregated_attestations.push(a)
}
}
let sender_ratio = sender_count_str(&aggregated_attestations, local_ppoi.clone());
let stake_ratio = stake_weight_str(&aggregated_attestations, local_ppoi);
let (aggregated_attestations, local_ppoi) =
aggregate_attestation(r.clone(), &local_info);
let sender_ratio = sender_count_str(&aggregated_attestations, &local_ppoi);
let stake_ratio = stake_weight_str(&aggregated_attestations, &local_ppoi);
ratios.push(CompareRatio::new(
r.deployment,
r.block_number,
Expand Down Expand Up @@ -162,7 +138,7 @@ impl QueryRoot {
}

/// Helper function to order attestations by stake weight and then find the number of unique senders
pub fn sender_count_str(attestations: &[Attestation], local_ppoi: String) -> String {
pub fn sender_count_str(attestations: &[Attestation], local_ppoi: &str) -> String {
// Create a HashMap to store the attestation and senders
let mut temp_attestations = attestations.to_owned();
let mut output = String::new();
Expand All @@ -171,33 +147,51 @@ pub fn sender_count_str(attestations: &[Attestation], local_ppoi: String) -> Str
temp_attestations.sort_by(|a, b| b.stake_weight.cmp(&a.stake_weight));
// Iterate through the attestations and populate the maps
// No set is needed since uniqueness is garuanteeded by validation
for att in attestations.iter() {
let separator = if att.ppoi == local_ppoi { "*:" } else { ":" };
let mut matched = false;
for att in temp_attestations.iter() {
let separator = if att.ppoi == local_ppoi {
matched = true;
"*:"
} else {
":"
};

output.push_str(&format!("{}{}", att.senders.len(), separator));
}

output.pop(); // Remove the trailing ':'
if !matched {
output.push_str("0*");
} else {
output.pop(); // Remove the trailing ':'
}

output
}

/// Helper function to order attestations by stake weight and then find the number of unique senders
pub fn stake_weight_str(attestations: &[Attestation], local_ppoi: String) -> String {
pub fn stake_weight_str(attestations: &[Attestation], local_ppoi: &str) -> String {
// Create a HashMap to store the attestation and senders
let mut temp_attestations = attestations.to_owned();
let mut output = String::new();

let mut matched = false;
// Sort the attestations by descending stake weight
temp_attestations.sort_by(|a, b| b.stake_weight.cmp(&a.stake_weight));
// Iterate through the attestations and populate the maps
// No set is needed since uniqueness is garuanteeded by validation
for att in attestations.iter() {
let separator = if att.ppoi == local_ppoi { "*:" } else { ":" };
for att in temp_attestations.iter() {
let separator = if att.ppoi == local_ppoi {
matched = true;
"*:"
} else {
":"
};
output.push_str(&format!("{}{}", att.stake_weight, separator));
}
if !matched {
output.push_str("0*");
} else {
output.pop(); // Remove the trailing ':'
}

output.pop(); // Remove the trailing ':'
output
}

Expand Down Expand Up @@ -363,6 +357,45 @@ fn filter_remote_ppoi_messages(
is_matching_identifier && is_matching_block
}

// Return attestation aggregated between remote and local, and the local ppoi
fn aggregate_attestation(
r: ComparisonResult,
local_info: &IndexerInfo,
) -> (Vec<Attestation>, String) {
// Double check for local attestations to ensure there will be no divide by 0 during the ratio
let local_attestation = if let Some(local_attestation) = r.local_attestation {
local_attestation
} else {
// Simply return remote attestations
return (r.attestations, String::from("n/a"));
};

let local_ppoi = local_attestation.ppoi.clone();
// Aggregate remote attestations with the local attestations
let mut aggregated_attestations: Vec<Attestation> = vec![];
let mut matched = false;
for a in r.attestations {
if a.ppoi == local_ppoi {
let updated_attestation = attestation::Attestation::update(
&a,
local_info.address.clone(),
local_info.stake,
Utc::now().timestamp(),
);
if let Ok(updated_a) = updated_attestation {
aggregated_attestations.push(updated_a);
}
matched = true;
} else {
aggregated_attestations.push(a)
}
}
if !matched {
aggregated_attestations.push(local_attestation);
}
(aggregated_attestations, local_ppoi)
}

#[derive(Debug, PartialEq, Eq, Hash, SimpleObject)]
struct CompareRatio {
deployment: String,
Expand Down Expand Up @@ -413,3 +446,84 @@ pub enum HttpServiceError {
#[error("HTTP client error: {0}")]
HttpClientError(#[from] reqwest::Error),
}

#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_aggregate_attestations() {
let attestation1 =
Attestation::new("ppoi-1".to_string(), 1.0, vec!["0xa1".to_string()], vec![0]);

let attestation2 =
Attestation::new("ppoi-2".to_string(), 2.0, vec!["0xa2".to_string()], vec![1]);

let attestation3 =
Attestation::new("ppoi-3".to_string(), 3.0, vec!["0xa3".to_string()], vec![2]);

// Matched local attestation
let local_attestation = attestation1.clone();
let attestations = vec![attestation1.clone(), attestation2, attestation3];
let r = ComparisonResult {
deployment: "test_deployment".to_string(),
block_number: 42,
result_type: ComparisonResultType::Divergent,
local_attestation: Some(local_attestation.clone()),
attestations: attestations.clone(),
};

let local_info = IndexerInfo {
address: String::from("0xa0"),
stake: 1.0,
};

let aggregated_attestation = aggregate_attestation(r, &local_info);
assert_eq!(aggregated_attestation.0.len(), 3);
assert_eq!(&aggregated_attestation.1, &local_attestation.ppoi);
let sender_ratio = sender_count_str(&aggregated_attestation.0, &aggregated_attestation.1);
let stake_ratio = stake_weight_str(&aggregated_attestation.0, &aggregated_attestation.1);
assert_eq!(sender_ratio, "1:2*:1");
assert_eq!(stake_ratio, "3:2*:2");

// Uniquely diverged local attestation
let r = ComparisonResult {
deployment: "test_deployment".to_string(),
block_number: 42,
result_type: ComparisonResultType::Divergent,
local_attestation: Some(Attestation::new(
String::from("unique_ppoi"),
1.0,
vec![String::from("0xa0")],
vec![0],
)),
attestations: attestations.clone(),
};

let local_info = IndexerInfo {
address: String::from("0xa0"),
stake: 1.0,
};

let aggregated_attestation = aggregate_attestation(r, &local_info);
assert_eq!(aggregated_attestation.0.len(), 4);
let stake_ratio = stake_weight_str(&aggregated_attestation.0, &aggregated_attestation.1);
assert!((&stake_ratio == "3:2:1:1*" || &stake_ratio == "3:2:1*:1"));

// Missing local attestation, not found
let r = ComparisonResult {
deployment: "test_deployment".to_string(),
block_number: 42,
result_type: ComparisonResultType::NotFound,
local_attestation: None,
attestations: attestations.clone(),
};

let aggregated_attestation = aggregate_attestation(r, &local_info);
assert_eq!(aggregated_attestation.0.len(), 3);
assert_eq!(&aggregated_attestation.1, "n/a");
let sender_ratio = sender_count_str(&aggregated_attestation.0, &aggregated_attestation.1);
let stake_ratio = stake_weight_str(&aggregated_attestation.0, &aggregated_attestation.1);
assert_eq!(sender_ratio, "1:1:1:0*");
assert_eq!(stake_ratio, "3:2:1:0*");
}
}

0 comments on commit ece7016

Please sign in to comment.