Skip to content

Commit

Permalink
refactor: agent signal handler and waku msg receiver repositioned
Browse files Browse the repository at this point in the history
  • Loading branch information
hopeyen committed Sep 23, 2023
1 parent 4c23e58 commit 5331734
Show file tree
Hide file tree
Showing 5 changed files with 154 additions and 88 deletions.
150 changes: 102 additions & 48 deletions examples/ping-pong/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use chrono::Utc;
// Load environment variables from .env file
use dotenv::dotenv;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::signal;

// Import Arc and Mutex for thread-safe sharing of data across threads
use std::sync::{mpsc, Arc, Mutex};
// Import Graphcast SDK types and functions for agent configuration, message handling, and more
Expand All @@ -24,7 +27,7 @@ use types::SimpleMessage;
// Import Config from the crate's config module
use config::Config;

use crate::types::{GRAPHCAST_AGENT, MESSAGES};
use crate::types::MESSAGES;

// Include the local config and types modules
mod config;
Expand All @@ -37,6 +40,40 @@ async fn main() {
// Loads the environment variables from .env
dotenv().ok();

let running = Arc::new(AtomicBool::new(true));
let listen_running = running.clone();

tokio::spawn(async move {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let sigterm = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let sigterm = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {
println!("Ctrl+C received! Shutting down...");
}
_ = sigterm => {
println!("SIGTERM received! Shutting down...");
}
}
// Set running boolean to false
debug!("Finish the current running processes...");
listen_running.store(false, Ordering::SeqCst)
});

// Instantiates the configuration struct based on provided environment variables or CLI args
let config = Config::args();
let _parent_span = tracing::info_span!("main").entered();
Expand Down Expand Up @@ -71,70 +108,65 @@ async fn main() {

let (sender, receiver) = mpsc::channel::<WakuMessage>();
debug!("Initializing the Graphcast Agent");
let graphcast_agent = GraphcastAgent::new(graphcast_agent_config, sender)
let graphcast_agent = GraphcastAgent::new(graphcast_agent_config, sender.clone())
.await
.expect("Could not create Graphcast agent");

// A one-off setter to load the Graphcast Agent into the global static variable
_ = GRAPHCAST_AGENT.set(graphcast_agent);
// Original sender not used, simply drop it
drop(sender);

// A one-off setter to instantiate an empty vec before populating it with incoming messages
_ = MESSAGES.set(Arc::new(Mutex::new(vec![])));

// The handler specifies what to do with incoming messages.
// This is where you can define multiple message types and how they gets handled by the radio
// by chaining radio payload typed decode and handler functions
tokio::spawn(async move {
for msg in receiver {
trace!(
"Radio operator received a Waku message from Graphcast agent, now try to fit it to Graphcast Message with Radio specified payload"
);
let _ = GraphcastMessage::<SimpleMessage>::decode(msg.payload())
.await
.map(|msg| {
msg.payload.radio_handler();
})
.map_err(|err| {
error!(
error = tracing::field::debug(&err),
"Failed to handle Waku signal"
let receiver_running = running.clone();
let receiver_handler = tokio::spawn(async move {
while receiver_running.load(Ordering::SeqCst) {
match receiver.recv() {
Ok(msg) => {
trace!(
"Radio operator received a Waku message from Graphcast agent, now try to fit it to Graphcast Message with Radio specified payload"
);
err
});
let _ = GraphcastMessage::<SimpleMessage>::decode(msg.payload())
.map(|msg| {
msg.payload.radio_handler();
})
.map_err(|err| {
error!(
error = tracing::field::debug(&err),
"Failed to handle Waku signal"
);
err
});
}
Err(e) => {
trace!(e = e.to_string(), "All senders have been dropped, exiting");
break;
}
}
}
});

GRAPHCAST_AGENT
.get()
.expect("Could not retrieve Graphcast agent")
.register_handler()
.expect("Could not register handler");
// Main loop of the application
_ = main_loop(&graphcast_agent, running).await;

main_loop().await;
}
match graphcast_agent.stop() {
Ok(_) => {
debug!("Graphcast agent successful shutdown");
receiver_handler.await.unwrap();
debug!("Operator message receiver successful shutdown");
}
Err(e) => panic!("Cannot shutdown Graphcast agent: {:#?}", e),
}

// Helper function to reuse message sending code
async fn send_message(payload: SimpleMessage) {
if let Err(e) = GRAPHCAST_AGENT
.get()
.expect("Could not retrieve Graphcast agent")
.send_message(
// The identifier can be any string that suits your Radio logic
// If it doesn't matter for your Radio logic (like in this case), you can just use a UUID or a hardcoded string
"ping-pong-content-topic",
payload,
Utc::now().timestamp(),
)
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
debug!("Exiting the program");
}

/// Main event loop to send ping and respond pong
async fn main_loop() {
async fn main_loop(agent: &GraphcastAgent, running: Arc<AtomicBool>) {
let mut block_number = 0;
loop {
while running.load(Ordering::SeqCst) {
block_number += 1;
info!(block = block_number, "🔗 Block number");
if block_number & 2 == 0 {
Expand All @@ -143,7 +175,19 @@ async fn main_loop() {
"table".to_string(),
std::env::args().nth(1).unwrap_or("Ping".to_string()),
);
send_message(msg).await;
if let Err(e) = agent
.send_message(
// The identifier can be any string that suits your Radio logic
// If it doesn't matter for your Radio logic (like in this case), you can just use a UUID or a hardcoded string
"ping-pong-content-topic",
msg,
Utc::now().timestamp(),
)
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
// agent.send_message(msg).await;
} else {
// If block number is odd, process received messages
let messages = AsyncMutex::new(
Expand All @@ -156,7 +200,17 @@ async fn main_loop() {
for msg in messages.lock().await.iter() {
if msg.content == *"Ping" {
let replay_msg = SimpleMessage::new("table".to_string(), "Pong".to_string());
send_message(replay_msg).await;
// send_message(replay_msg).await;
if let Err(e) = agent
.send_message(
"ping-pong-content-topic",
replay_msg,
Utc::now().timestamp(),
)
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
};
}

Expand Down
7 changes: 1 addition & 6 deletions examples/ping-pong/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use async_graphql::SimpleObject;
use ethers_contract::EthAbiType;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
use graphcast_sdk::graphcast_agent::GraphcastAgent;

use prost::Message;
use serde::{Deserialize, Serialize};

Expand All @@ -16,11 +16,6 @@ use std::sync::{Arc, Mutex};
/// it is not allowed in the handler itself.
pub static MESSAGES: OnceCell<Arc<Mutex<Vec<SimpleMessage>>>> = OnceCell::new();

/// The Graphcast Agent instance must be a global static variable (for the time being).
/// This is because the Radio handler requires a static immutable context and
/// the handler itself is being passed into the Graphcast Agent, so it needs to be static as well.
pub static GRAPHCAST_AGENT: OnceCell<GraphcastAgent> = OnceCell::new();

/// Make a test radio type
#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, SimpleObject)]
#[eip712(
Expand Down
2 changes: 1 addition & 1 deletion src/graphcast_agent/message_typing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl<
}
}

pub async fn decode(payload: &[u8]) -> Result<Self, WakuHandlingError> {
pub fn decode(payload: &[u8]) -> Result<Self, WakuHandlingError> {
match <GraphcastMessage<T> as Message>::decode(payload) {
Ok(graphcast_message) => Ok(graphcast_message),
Err(e) => Err(WakuHandlingError::InvalidMessage(format!(
Expand Down
71 changes: 42 additions & 29 deletions src/graphcast_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex as SyncMutex};
use tokio::runtime::Runtime;

use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, error, info, trace, warn};
use url::ParseError;
Expand Down Expand Up @@ -200,9 +200,9 @@ pub struct GraphcastAgent {
pub nonces: Arc<AsyncMutex<NoncesMap>>,
/// Callbook that make query requests
pub callbook: CallBook,
/// TODO: remove after confirming that gossippub seen_ttl works
/// msg_seen_ttl is for only relay messages they have not seen before, not effective for client nodes
/// A set of message ids sent from the agent
pub old_message_ids: Arc<AsyncMutex<HashSet<String>>>,
pub seen_msg_ids: Arc<SyncMutex<HashSet<String>>>,
/// Sender identity validation mechanism used by the Graphcast agent
pub id_validation: IdentityValidation,
/// Upon receiving a valid waku signal event of Message type, sender send WakuMessage through mpsc.
Expand Down Expand Up @@ -261,7 +261,6 @@ impl GraphcastAgent {
///
/// let agent = GraphcastAgent::new(config).await?;
/// ```

pub async fn new(
GraphcastAgentConfig {
wallet_key,
Expand Down Expand Up @@ -321,6 +320,10 @@ impl GraphcastAgent {

let callbook = CallBook::new(registry_subgraph, network_subgraph, graph_node_endpoint);

let sender = Arc::new(SyncMutex::new(sender));
let seen_msg_ids = Arc::new(SyncMutex::new(HashSet::new()));
register_handler(sender.clone(), seen_msg_ids.clone()).expect("Could not register handler");

Ok(GraphcastAgent {
graphcast_identity,
radio_name,
Expand All @@ -329,12 +332,25 @@ impl GraphcastAgent {
node_handle,
nonces: Arc::new(AsyncMutex::new(HashMap::new())),
callbook,
old_message_ids: Arc::new(AsyncMutex::new(HashSet::new())),
seen_msg_ids,
id_validation,
sender: Arc::new(SyncMutex::new(sender)),
sender,
})
}

/// Stop a GraphcastAgent instance
pub fn stop(self) -> Result<(), GraphcastAgentError> {
debug!("Stop Waku node");
let r = self
.node_handle
.stop()
.map_err(|e| GraphcastAgentError::WakuNodeError(WakuHandlingError::StopNodeError(e)));
debug!("Drop Arc std sync mutexes");
drop(self.seen_msg_ids);
drop(self.sender);
r
}

/// Get the number of peers excluding self
pub fn number_of_peers(&self) -> usize {
self.node_handle.peer_count().unwrap_or({
Expand Down Expand Up @@ -401,27 +417,6 @@ impl GraphcastAgent {
}
}

/// Establish handler for incoming Waku messages
pub fn register_handler(&'static self) -> Result<(), GraphcastAgentError> {
let sender = self.sender.clone();
let old_message_ids: &Arc<AsyncMutex<HashSet<String>>> = &self.old_message_ids;
let handle_async = move |signal: Signal| {
let rt = Runtime::new().expect("Could not create Tokio runtime");
rt.block_on(async {
let msg = handle_signal(signal, old_message_ids).await;

if let Ok(m) = msg {
match sender.clone().lock().unwrap().send(m) {
Ok(_) => trace!("Sent received message to radio operator"),
Err(e) => error!("Could not send message to channel: {:#?}", e),
}
}
});
};
waku_set_event_callback(handle_async);
Ok(())
}

/// For each topic, construct with custom write function and send
#[allow(unused_must_use)]
pub async fn send_message<
Expand All @@ -445,7 +440,6 @@ impl GraphcastAgent {

// Check network before sending a message
network_check(&self.node_handle).map_err(GraphcastAgentError::WakuNodeError)?;
let mut ids = self.old_message_ids.lock().await;
trace!(
address = &wallet_address(&self.graphcast_identity.wallet),
"local sender id"
Expand All @@ -462,7 +456,7 @@ impl GraphcastAgent {
.send_to_waku(&self.node_handle, self.pubsub_topic.clone(), content_topic)
.map_err(GraphcastAgentError::WakuNodeError)
.map(|id| {
ids.insert(id.clone());
self.seen_msg_ids.lock().unwrap().insert(id.clone());
trace!(id = id, "Sent message");
id
})
Expand Down Expand Up @@ -521,6 +515,25 @@ pub enum GraphcastAgentError {
Other(anyhow::Error),
}

/// Establish handler for incoming Waku messages
pub fn register_handler(
sender: Arc<SyncMutex<Sender<WakuMessage>>>,
seen_msg_ids: Arc<SyncMutex<HashSet<String>>>,
) -> Result<(), GraphcastAgentError> {
let handle_async = move |signal: Signal| {
let msg = handle_signal(signal, &seen_msg_ids);

if let Ok(m) = msg {
match sender.clone().lock().unwrap().send(m) {
Ok(_) => trace!("Sent received message to radio operator"),
Err(e) => error!("Could not send message to channel: {:#?}", e),
}
}
};
waku_set_event_callback(handle_async);
Ok(())
}

impl GraphcastAgentError {
pub fn type_string(&self) -> &'static str {
match self {
Expand Down
Loading

0 comments on commit 5331734

Please sign in to comment.