Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add RadioPayload trait alias and requirements #303

Merged
merged 2 commits into from Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 18 additions & 1 deletion examples/ping-pong/src/types.rs
Expand Up @@ -3,6 +3,9 @@ use ethers_contract::EthAbiType;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;

use graphcast_sdk::graphcast_agent::message_typing::{
GraphcastMessage, MessageError, RadioPayload,
};
use prost::Message;
use serde::{Deserialize, Serialize};

Expand All @@ -19,7 +22,7 @@ pub static MESSAGES: OnceCell<Arc<Mutex<Vec<SimpleMessage>>>> = OnceCell::new();
/// Make a test radio type
#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, SimpleObject)]
#[eip712(
name = "Graphcast Ping-Pong Radio",
name = "SimpleMessage",
version = "0",
chain_id = 1,
verifying_contract = "0xc944e90c64b2c07662a292be6244bdf05cda44a7"
Expand All @@ -31,6 +34,20 @@ pub struct SimpleMessage {
pub content: String,
}

impl RadioPayload for SimpleMessage {
fn valid_outer(&self, outer: &GraphcastMessage<Self>) -> Result<&Self, MessageError> {
if self.identifier == outer.identifier {
Ok(self)
} else {
Err(MessageError::InvalidFields(anyhow::anyhow!(
"Radio message wrapped by inconsistent GraphcastMessage: {:#?} <- {:#?}",
&self,
&outer,
)))
}
}
}

impl SimpleMessage {
pub fn new(identifier: String, content: String) -> Self {
SimpleMessage {
Expand Down
133 changes: 72 additions & 61 deletions src/graphcast_agent/message_typing.rs
Expand Up @@ -2,7 +2,10 @@ use anyhow::anyhow;
use async_graphql::SimpleObject;
use chrono::Utc;
use ethers::signers::{Signer, Wallet};
use ethers_core::{k256::ecdsa::SigningKey, types::Signature};
use ethers_core::{
k256::ecdsa::SigningKey,
types::{transaction::eip712::Eip712Error, Signature},
};
use prost::Message;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt, str::FromStr, sync::Arc};
Expand Down Expand Up @@ -39,17 +42,26 @@ pub async fn get_indexer_stake(
.indexer_stake())
}

//TODO: add required functions for RadioPayload, such as
// Build, new, validations, ...; may need to be async trait for valid checks
pub trait RadioPayload:
Message
+ ethers::types::transaction::eip712::Eip712<Error = Eip712Error>
+ Default
+ Clone
+ 'static
+ Serialize
+ async_graphql::OutputType
{
// type ExternalValidation;
// async fn validity_check(&self, gc: GraphcastMessage<Self>, input: Self::ExternalValidation) -> Result<&Self, MessageError>;

fn valid_outer(&self, outer: &GraphcastMessage<Self>) -> Result<&Self, MessageError>;
}

/// GraphcastMessage type casts over radio payload
#[derive(Clone, Message, Serialize, Deserialize, SimpleObject)]
pub struct GraphcastMessage<T>
where
T: Message
+ ethers::types::transaction::eip712::Eip712
+ Default
+ Clone
+ 'static
+ async_graphql::OutputType,
{
pub struct GraphcastMessage<T: RadioPayload> {
/// Graph identifier for the entity the radio is communicating about
#[prost(string, tag = "1")]
pub identifier: String,
Expand All @@ -67,23 +79,15 @@ where
pub signature: String,
}

impl<
T: Message
+ ethers::types::transaction::eip712::Eip712
+ Default
+ Clone
+ 'static
+ async_graphql::OutputType,
> GraphcastMessage<T>
{
impl<T: RadioPayload> GraphcastMessage<T> {
/// Create a graphcast message
pub fn new(
identifier: String,
nonce: i64,
graph_account: String,
payload: T,
signature: String,
) -> Result<Self, BuildMessageError> {
) -> Result<Self, MessageError> {
Ok(GraphcastMessage {
identifier,
nonce,
Expand All @@ -100,11 +104,11 @@ impl<
graph_account: String,
nonce: i64,
payload: T,
) -> Result<Self, BuildMessageError> {
) -> Result<Self, MessageError> {
let sig = wallet
.sign_typed_data(&payload)
.await
.map_err(|_| BuildMessageError::Signing)?;
.map_err(|_| MessageError::Signing)?;

GraphcastMessage::new(identifier, nonce, graph_account, payload, sig.to_string())
}
Expand Down Expand Up @@ -147,7 +151,7 @@ impl<
network_subgraph: &str,
local_sender_id: String,
id_validation: &IdentityValidation,
) -> Result<&Self, BuildMessageError> {
) -> Result<&Self, MessageError> {
if id_validation == &IdentityValidation::NoCheck {
return Ok(self);
};
Expand All @@ -161,28 +165,28 @@ impl<
}

/// Check timestamp: prevent past message replay
pub fn valid_time(&self) -> Result<&Self, BuildMessageError> {
pub fn valid_time(&self) -> Result<&Self, MessageError> {
//Can store for measuring overall Graphcast message latency
let message_age = Utc::now().timestamp() - self.nonce;
// 0 allow instant atomic messaging, use 1 to exclude them
if (0..MSG_REPLAY_LIMIT).contains(&message_age) {
Ok(self)
} else {
Err(BuildMessageError::InvalidFields(anyhow!(
Err(MessageError::InvalidFields(anyhow!(
"Message timestamp {} outside acceptable range {}, drop message",
message_age,
MSG_REPLAY_LIMIT
)))
}
}

pub fn remote_account(&self, local_sender_id: String) -> Result<Account, BuildMessageError> {
pub fn remote_account(&self, local_sender_id: String) -> Result<Account, MessageError> {
let sender_address = self.recover_sender_address().and_then(|a| {
trace!("recovered sender address: {:#?}\n", a,);
if a != local_sender_id {
Ok(a)
} else {
Err(BuildMessageError::InvalidFields(anyhow!(
Err(MessageError::InvalidFields(anyhow!(
"Message is from self, drop message"
)))
}
Expand All @@ -191,22 +195,19 @@ impl<
}

/// Recover sender address from Graphcast message radio payload
pub fn recover_sender_address(&self) -> Result<String, BuildMessageError> {
pub fn recover_sender_address(&self) -> Result<String, MessageError> {
let signed_data = self
.payload
.encode_eip712()
.expect("Could not encode payload using EIP712");
match Signature::from_str(&self.signature).and_then(|sig| sig.recover(signed_data)) {
Ok(addr) => Ok(format!("{addr:#x}")),
Err(x) => Err(BuildMessageError::InvalidFields(x.into())),
Err(x) => Err(MessageError::InvalidFields(x.into())),
}
}

/// Check historic nonce: ensure message sequencing
pub async fn valid_nonce(
&self,
nonces: &Arc<Mutex<NoncesMap>>,
) -> Result<&Self, BuildMessageError> {
pub async fn valid_nonce(&self, nonces: &Arc<Mutex<NoncesMap>>) -> Result<&Self, MessageError> {
let address = self.recover_sender_address()?;

let mut nonces = nonces.lock().await;
Expand All @@ -225,7 +226,7 @@ impl<
);

if nonce > &self.nonce {
Err(BuildMessageError::InvalidFields(anyhow!(
Err(MessageError::InvalidFields(anyhow!(
"Invalid nonce for subgraph {} and address {}! Received nonce - {} is smaller than currently saved one - {}, skipping message...",
self.identifier, address, self.nonce, nonce
)))
Expand All @@ -240,7 +241,7 @@ impl<
let updated_nonces =
prepare_nonces(nonces_per_subgraph, address.clone(), self.nonce);
nonces.insert(self.identifier.clone(), updated_nonces);
Err(BuildMessageError::InvalidFields(anyhow!(
Err(MessageError::InvalidFields(anyhow!(
"No saved nonce for address {} on topic {}, saving this one and skipping message...",
address, self.identifier
)))
Expand All @@ -250,7 +251,7 @@ impl<
None => {
let updated_nonces = prepare_nonces(&HashMap::new(), address, self.nonce);
nonces.insert(self.identifier.clone(), updated_nonces);
Err(BuildMessageError::InvalidFields(anyhow!(
Err(MessageError::InvalidFields(anyhow!(
"First time receiving message for subgraph {}. Saving sender and nonce, skipping message...",
self.identifier
)))
Expand All @@ -259,12 +260,11 @@ impl<
}

pub fn decode(payload: &[u8]) -> Result<Self, WakuHandlingError> {
match <GraphcastMessage<T> as Message>::decode(payload) {
Ok(graphcast_message) => Ok(graphcast_message),
Err(e) => Err(WakuHandlingError::InvalidMessage(format!(
<GraphcastMessage<T> as Message>::decode(payload).map_err(|e| {
WakuHandlingError::InvalidMessage(format!(
"Waku message not interpretated as a Graphcast message\nError occurred: {e:?}"
))),
}
))
})
}
}

Expand All @@ -273,20 +273,13 @@ impl<
/// Time check verifies that message was from within the acceptable timestamp
/// Block hash check verifies sender's access to valid Ethereum node provider and blocks
/// Nonce check ensures the ordering of the messages and avoids past messages
pub async fn check_message_validity<
T: Message
+ ethers::types::transaction::eip712::Eip712
+ Default
+ Clone
+ 'static
+ async_graphql::OutputType,
>(
pub async fn check_message_validity<T: RadioPayload>(
graphcast_message: GraphcastMessage<T>,
nonces: &Arc<Mutex<NoncesMap>>,
callbook: CallBook,
local_sender_id: String,
id_validation: &IdentityValidation,
) -> Result<GraphcastMessage<T>, BuildMessageError> {
) -> Result<GraphcastMessage<T>, MessageError> {
graphcast_message
.valid_sender(
callbook.graphcast_registry(),
Expand All @@ -307,7 +300,7 @@ pub async fn check_message_validity<
}

#[derive(Debug, thiserror::Error)]
pub enum BuildMessageError {
pub enum MessageError {
#[error("Radio payload failed to satisfy the defined Eip712 typing")]
Payload,
#[error("Could not sign payload")]
Expand All @@ -326,17 +319,17 @@ pub enum BuildMessageError {
TypeCast(String),
}

impl BuildMessageError {
impl MessageError {
pub fn type_string(&self) -> &'static str {
match self {
BuildMessageError::Payload => "Payload",
BuildMessageError::Signing => "Signing",
BuildMessageError::Encoding => "Encoding",
BuildMessageError::Decoding => "Decoding",
BuildMessageError::InvalidFields(_) => "InvalidFields",
BuildMessageError::Network(_) => "Network",
BuildMessageError::FieldDerivations(_) => "FieldDerivations",
BuildMessageError::TypeCast(_) => "TypeCast",
MessageError::Payload => "Payload",
MessageError::Signing => "Signing",
MessageError::Encoding => "Encoding",
MessageError::Decoding => "Decoding",
MessageError::InvalidFields(_) => "InvalidFields",
MessageError::Network(_) => "Network",
MessageError::FieldDerivations(_) => "FieldDerivations",
MessageError::TypeCast(_) => "TypeCast",
}
}
}
Expand Down Expand Up @@ -385,7 +378,6 @@ mod tests {
use ethers_core::rand::thread_rng;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
use prost::Message;
use serde::{Deserialize, Serialize};

/// Make a test radio type
Expand All @@ -403,6 +395,25 @@ mod tests {
pub content: String,
}

impl RadioPayload for SimpleMessage {
//TODO: Add various requirements to RadioPayload
// type ExternalValidation = Option<String>;
// fn validity_check(&self, _gc: GraphcastMessage<Self>, _val: Option<String>) -> Result<&Self, MessageError> {
// Ok(self)
// }
fn valid_outer(&self, outer: &GraphcastMessage<Self>) -> Result<&Self, MessageError> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is something we expect all custom Radio message types to implement, how custom should/could the impl be? If we expect valid_outer to always be the same, shouldn't we enforce it on SDK side?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should require all Radios to check if the inner payload is consistent with the general Graphcast message, although I don't expect them to always be the same. For example SimpleMessage which doesn't contain fields like nonce even though it is more of an example message type. Since we don't have a restriction on the fields for the payload type, I don't think it will be easy to check the fields between the inner and outer types.
Let me know if you think we should remove valid_outer altogether; maybe there's some other way to refactor the messages so the outer type doesn't have overlapping fields (currently identifier, nonce, graph_account, etc must be in inner for the signature and they are on the outer type for interacting with gossips and generic validations

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah this makes total sense, in the current setup this is the best way then, thank you for explaining! 💯

if self.identifier == outer.identifier {
Ok(self)
} else {
Err(MessageError::InvalidFields(anyhow::anyhow!(
"Radio message wrapped by inconsistent GraphcastMessage: {:#?} <- {:#?}",
&self,
&outer,
)))
}
}
}

impl SimpleMessage {
pub fn new(identifier: String, content: String) -> Self {
SimpleMessage {
Expand Down