Skip to content

Commit

Permalink
feat: add content topic check at message handle
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Oct 30, 2023
1 parent a18248d commit 3cbdb45
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 24 deletions.
21 changes: 21 additions & 0 deletions examples/ping-pong/src/config.rs
Expand Up @@ -88,6 +88,27 @@ pub struct Config {
indexer: must be registered at Graphcast Registry or is a Graph Account, correspond to and Indexer statisfying indexer minimum stake requirement"
)]
pub id_validation: IdentityValidation,
#[clap(
long,
value_name = "WAKU_PORT",
help = "Port for the Waku gossip client",
env = "WAKU_PORT"
)]
pub waku_port: Option<String>,
#[clap(
long,
value_name = "NODE_ADDRESSES",
help = "Comma separated static list of waku boot nodes to connect to",
env = "BOOT_NODE_ADDRESSES"
)]
pub boot_node_addresses: Vec<String>,
#[clap(
long,
value_name = "DISCV5_PORT",
help = "Waku node to expose discoverable udp port",
env = "DISCV5_PORT"
)]
pub discv5_port: Option<u16>,
}

impl Config {
Expand Down
10 changes: 5 additions & 5 deletions examples/ping-pong/src/main.rs
Expand Up @@ -92,16 +92,16 @@ async fn main() {
config.network_subgraph,
config.id_validation.clone(),
config.graph_node_endpoint,
None,
Some(config.boot_node_addresses),
Some("testnet".to_string()),
Some(subtopics),
None,
None,
None,
config.waku_port,
None,
Some(false),
Some(vec![discovery_enr]),
None,
config.discv5_port,
)
.await
.unwrap_or_else(|e| panic!("Could not create GraphcastAgentConfig: {e}"));
Expand Down Expand Up @@ -165,7 +165,7 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc<AtomicBool>) {
.send_message(
// The identifier can be any string that suits your Radio logic
// If it doesn't matter for your Radio logic (like in this case), you can just use a UUID or a hardcoded string
"ping-pong-content-topic",
agent.content_identifiers().first().unwrap(),
msg,
Utc::now().timestamp(),
)
Expand All @@ -191,7 +191,7 @@ async fn main_loop(agent: &GraphcastAgent, running: Arc<AtomicBool>) {
// send_message(replay_msg).await;
if let Err(e) = agent
.send_message(
"ping-pong-content-topic",
agent.content_identifiers().first().unwrap(),
replay_msg,
Utc::now().timestamp(),
)
Expand Down
61 changes: 42 additions & 19 deletions src/graphcast_agent/mod.rs
Expand Up @@ -195,7 +195,7 @@ pub struct GraphcastAgent {
/// Graphcast agent waku instance's pubsub topic
pub pubsub_topic: WakuPubSubTopic,
/// Graphcast agent waku instance's content topics
pub content_topics: Arc<AsyncMutex<Vec<WakuContentTopic>>>,
pub content_topics: Arc<SyncMutex<Vec<WakuContentTopic>>>,
/// Nonces map for caching sender nonces in each subtopic
pub nonces: Arc<AsyncMutex<NoncesMap>>,
/// Callbook that make query requests
Expand Down Expand Up @@ -321,13 +321,15 @@ impl GraphcastAgent {
let callbook = CallBook::new(registry_subgraph, network_subgraph, graph_node_endpoint);

let seen_msg_ids = Arc::new(SyncMutex::new(HashSet::new()));
register_handler(sender, seen_msg_ids.clone()).expect("Could not register handler");
let content_topics = Arc::new(SyncMutex::new(content_topics));
register_handler(sender, seen_msg_ids.clone(), content_topics.clone())
.expect("Could not register handler");

Ok(GraphcastAgent {
graphcast_identity,
radio_name,
pubsub_topic,
content_topics: Arc::new(AsyncMutex::new(content_topics)),
content_topics,
node_handle,
nonces: Arc::new(AsyncMutex::new(HashMap::new())),
callbook,
Expand Down Expand Up @@ -360,36 +362,55 @@ impl GraphcastAgent {
})
}

/// Get Radio content topics in a Vec
pub fn content_topics(&self) -> Vec<WakuContentTopic> {
match self.content_topics.lock() {
Ok(topics) => topics.iter().cloned().collect(),
Err(e) => {
debug!(
err = e.to_string(),
"Graphcast Agent content topics poisoned"
);
vec![]
}
}
}

/// Get identifiers of Radio content topics
pub async fn content_identifiers(&self) -> Vec<String> {
self.content_topics
.lock()
.await
.iter()
.cloned()
.map(|topic| topic.content_topic_name.into_owned())
.collect()
pub fn content_identifiers(&self) -> Vec<String> {
match self.content_topics.lock() {
Ok(topics) => topics
.iter()
.cloned()
.map(|topic| topic.content_topic_name.into_owned())
.collect(),
Err(e) => {
debug!(
err = e.to_string(),
"Graphcast Agent content topics poisoned"
);
vec![]
}
}
}

pub async fn print_subscriptions(&self) {
info!(
pubsub_topic = tracing::field::debug(&self.pubsub_topic),
content_topic = tracing::field::debug(&self.content_identifiers().await),
content_topic = tracing::field::debug(&self.content_identifiers()),
"Subscriptions"
);
}

/// Find the subscribed content topic with an identifier
/// Error if topic doesn't exist
pub async fn match_content_topic(
pub fn match_content_topic_identifier(
&self,
identifier: &str,
) -> Result<WakuContentTopic, GraphcastAgentError> {
trace!(topic = identifier, "Target content topic");
match self
.content_topics
.lock()
.await
.content_topics()
.iter()
.find(|&x| x.content_topic_name == identifier)
{
Expand Down Expand Up @@ -433,7 +454,7 @@ impl GraphcastAgent {
payload: T,
nonce: i64,
) -> Result<String, GraphcastAgentError> {
let content_topic = self.match_content_topic(identifier).await?;
let content_topic = self.match_content_topic_identifier(identifier)?;
trace!(
topic = tracing::field::debug(&content_topic),
"Selected content topic from subscriptions"
Expand Down Expand Up @@ -467,7 +488,7 @@ impl GraphcastAgent {
pub async fn update_content_topics(&self, subtopics: Vec<String>) {
// build content topics
let new_topics = build_content_topics(&self.radio_name, 0, &subtopics);
let mut cur_topics = self.content_topics.lock().await;
let cur_topics = self.content_topics();

// Check if an update to the content topic is necessary
if *cur_topics != new_topics {
Expand All @@ -488,6 +509,7 @@ impl GraphcastAgent {
// Subscribe to the new content topics
filter_peer_subscriptions(&self.node_handle, &self.pubsub_topic, &new_topics)
.expect("Connect and subscribe to subtopics");
let mut cur_topics = self.content_topics.lock().unwrap();
*cur_topics = new_topics;
}
drop(cur_topics);
Expand Down Expand Up @@ -520,9 +542,10 @@ pub enum GraphcastAgentError {
pub fn register_handler(
sender: Sender<WakuMessage>,
seen_msg_ids: Arc<SyncMutex<HashSet<String>>>,
content_topics: Arc<SyncMutex<Vec<WakuContentTopic>>>,
) -> Result<(), GraphcastAgentError> {
let handle_async = move |signal: Signal| {
let msg = handle_signal(signal, &seen_msg_ids);
let msg = handle_signal(signal, &seen_msg_ids, &content_topics);

if let Ok(m) = msg {
match sender.send(m) {
Expand Down
23 changes: 23 additions & 0 deletions src/graphcast_agent/waku_handling.rs
Expand Up @@ -411,13 +411,15 @@ pub fn boot_node_handle(
pub fn handle_signal(
signal: Signal,
seen_msg_ids: &Arc<SyncMutex<HashSet<String>>>,
content_topics: &Arc<SyncMutex<Vec<WakuContentTopic>>>,
) -> Result<WakuMessage, WakuHandlingError> {
// Do not accept messages that were already received or sent by self
match signal.event() {
waku::Event::WakuMessage(event) => {
let msg_id = event.message_id();
trace!(msg_id, "Received message id",);
let mut ids = seen_msg_ids.lock().unwrap();
// Check if message has been received before or sent from local node
if ids.contains(msg_id) {
trace!(msg_id, "Skip repeated message");
return Err(WakuHandlingError::InvalidMessage(format!(
Expand All @@ -426,6 +428,18 @@ pub fn handle_signal(
)));
};
ids.insert(msg_id.to_string());
let content_topic = event.waku_message().content_topic();
// Check if message belongs to a relevant topic
if !match_content_topic(content_topics, content_topic) {
trace!(
topic = tracing::field::debug(content_topic),
"Skip irrelevant content topic"
);
return Err(WakuHandlingError::InvalidMessage(format!(
"Skip irrelevant content topic: {:#?}",
content_topic
)));
};
Ok(event.waku_message().clone())
}

Expand All @@ -439,6 +453,15 @@ pub fn handle_signal(
}
}

/// Check if a content topic exists in a list of topics
pub fn match_content_topic(
content_topics: &Arc<SyncMutex<Vec<WakuContentTopic>>>,
topic: &WakuContentTopic,
) -> bool {
trace!(topic = tracing::field::debug(topic), "Target content topic");
content_topics.lock().unwrap().iter().any(|t| t == topic)
}

/// Parse and validate incoming message
pub async fn generic_graphcast_check<
T: Message
Expand Down

0 comments on commit 3cbdb45

Please sign in to comment.