Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: shutdown signaling in ping-pong #284

Merged
merged 5 commits into from Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
153 changes: 106 additions & 47 deletions examples/ping-pong/src/main.rs
@@ -1,6 +1,9 @@
use chrono::Utc;
// Load environment variables from .env file
use dotenv::dotenv;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::signal;

// Import Arc and Mutex for thread-safe sharing of data across threads
use std::sync::{mpsc, Arc, Mutex};
// Import Graphcast SDK types and functions for agent configuration, message handling, and more
Expand All @@ -24,7 +27,7 @@ use types::SimpleMessage;
// Import Config from the crate's config module
use config::Config;

use crate::types::{GRAPHCAST_AGENT, MESSAGES};
use crate::types::MESSAGES;

// Include the local config and types modules
mod config;
Expand All @@ -37,6 +40,40 @@ async fn main() {
// Loads the environment variables from .env
dotenv().ok();

let running = Arc::new(AtomicBool::new(true));
let listen_running = running.clone();

tokio::spawn(async move {
let ctrl_c = async {
signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};

#[cfg(unix)]
let sigterm = async {
signal::unix::signal(signal::unix::SignalKind::terminate())
.expect("failed to install signal handler")
.recv()
.await;
};

#[cfg(not(unix))]
let sigterm = std::future::pending::<()>();

tokio::select! {
_ = ctrl_c => {
println!("Ctrl+C received! Shutting down...");
}
_ = sigterm => {
println!("SIGTERM received! Shutting down...");
}
}
// Set running boolean to false
debug!("Finish the current running processes...");
listen_running.store(false, Ordering::SeqCst)
});

// Instantiates the configuration struct based on provided environment variables or CLI args
let config = Config::args();
let _parent_span = tracing::info_span!("main").entered();
Expand Down Expand Up @@ -71,65 +108,65 @@ async fn main() {

let (sender, receiver) = mpsc::channel::<WakuMessage>();
debug!("Initializing the Graphcast Agent");
let graphcast_agent = GraphcastAgent::new(graphcast_agent_config, sender)
let graphcast_agent = GraphcastAgent::new(graphcast_agent_config, sender.clone())
.await
.expect("Could not create Graphcast agent");

// A one-off setter to load the Graphcast Agent into the global static variable
_ = GRAPHCAST_AGENT.set(graphcast_agent);
// Original sender not used, simply drop it
drop(sender);

// A one-off setter to instantiate an empty vec before populating it with incoming messages
_ = MESSAGES.set(Arc::new(Mutex::new(vec![])));
// Helper function to reuse message sending code
async fn send_message(payload: SimpleMessage) {
if let Err(e) = GRAPHCAST_AGENT
.get()
.expect("Could not retrieve Graphcast agent")
.send_message(
// The identifier can be any string that suits your Radio logic
// If it doesn't matter for your Radio logic (like in this case), you can just use a UUID or a hardcoded string
"ping-pong-content-topic",
payload,
Utc::now().timestamp(),
)
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
}

// The handler specifies what to do with incoming messages.
// This is where you can define multiple message types and how they gets handled by the radio
// by chaining radio payload typed decode and handler functions
tokio::spawn(async move {
for msg in receiver {
trace!(
"Radio operator received a Waku message from Graphcast agent, now try to fit it to Graphcast Message with Radio specified payload"
);
let _ = GraphcastMessage::<SimpleMessage>::decode(msg.payload())
.await
.map(|msg| {
msg.payload.radio_handler();
})
.map_err(|err| {
error!(
error = tracing::field::debug(&err),
"Failed to handle Waku signal"
let receiver_running = running.clone();
let receiver_handler = tokio::spawn(async move {
while receiver_running.load(Ordering::SeqCst) {
match receiver.recv() {
Ok(msg) => {
trace!(
"Radio operator received a Waku message from Graphcast agent, now try to fit it to Graphcast Message with Radio specified payload"
);
err
});
let _ = GraphcastMessage::<SimpleMessage>::decode(msg.payload())
.map(|msg| {
msg.payload.radio_handler();
})
.map_err(|err| {
error!(
error = tracing::field::debug(&err),
"Failed to handle Waku signal"
);
err
});
}
Err(e) => {
trace!(e = e.to_string(), "All senders have been dropped, exiting");
break;
}
}
}
});

GRAPHCAST_AGENT
.get()
.expect("Could not retrieve Graphcast agent")
.register_handler()
.expect("Could not register handler");
// Main loop of the application
_ = main_loop(&graphcast_agent, running).await;

let mut block_number = 0;
match graphcast_agent.stop() {
Ok(_) => {
debug!("Graphcast agent successful shutdown");
receiver_handler.await.unwrap();
debug!("Operator message receiver successful shutdown");
}
Err(e) => panic!("Cannot shutdown Graphcast agent: {:#?}", e),
}

loop {
debug!("Exiting the program");
}

/// Main event loop to send ping and respond pong
async fn main_loop(agent: &GraphcastAgent, running: Arc<AtomicBool>) {
let mut block_number = 0;
while running.load(Ordering::SeqCst) {
block_number += 1;
info!(block = block_number, "🔗 Block number");
if block_number & 2 == 0 {
Expand All @@ -138,7 +175,19 @@ async fn main() {
"table".to_string(),
std::env::args().nth(1).unwrap_or("Ping".to_string()),
);
send_message(msg).await;
if let Err(e) = agent
.send_message(
// The identifier can be any string that suits your Radio logic
// If it doesn't matter for your Radio logic (like in this case), you can just use a UUID or a hardcoded string
"ping-pong-content-topic",
msg,
Utc::now().timestamp(),
)
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
// agent.send_message(msg).await;
} else {
// If block number is odd, process received messages
let messages = AsyncMutex::new(
Expand All @@ -151,7 +200,17 @@ async fn main() {
for msg in messages.lock().await.iter() {
if msg.content == *"Ping" {
let replay_msg = SimpleMessage::new("table".to_string(), "Pong".to_string());
send_message(replay_msg).await;
// send_message(replay_msg).await;
if let Err(e) = agent
.send_message(
"ping-pong-content-topic",
replay_msg,
Utc::now().timestamp(),
)
.await
{
error!(error = tracing::field::debug(&e), "Failed to send message");
};
};
}

Expand Down
7 changes: 1 addition & 6 deletions examples/ping-pong/src/types.rs
Expand Up @@ -2,7 +2,7 @@ use async_graphql::SimpleObject;
use ethers_contract::EthAbiType;
use ethers_core::types::transaction::eip712::Eip712;
use ethers_derive_eip712::*;
use graphcast_sdk::graphcast_agent::GraphcastAgent;

use prost::Message;
use serde::{Deserialize, Serialize};

Expand All @@ -16,11 +16,6 @@ use std::sync::{Arc, Mutex};
/// it is not allowed in the handler itself.
pub static MESSAGES: OnceCell<Arc<Mutex<Vec<SimpleMessage>>>> = OnceCell::new();

/// The Graphcast Agent instance must be a global static variable (for the time being).
/// This is because the Radio handler requires a static immutable context and
/// the handler itself is being passed into the Graphcast Agent, so it needs to be static as well.
pub static GRAPHCAST_AGENT: OnceCell<GraphcastAgent> = OnceCell::new();

/// Make a test radio type
#[derive(Eip712, EthAbiType, Clone, Message, Serialize, Deserialize, SimpleObject)]
#[eip712(
Expand Down
2 changes: 1 addition & 1 deletion src/graphcast_agent/message_typing.rs
Expand Up @@ -287,7 +287,7 @@ impl<
}
}

pub async fn decode(payload: &[u8]) -> Result<Self, WakuHandlingError> {
pub fn decode(payload: &[u8]) -> Result<Self, WakuHandlingError> {
match <GraphcastMessage<T> as Message>::decode(payload) {
Ok(graphcast_message) => Ok(graphcast_message),
Err(e) => Err(WakuHandlingError::InvalidMessage(format!(
Expand Down
71 changes: 42 additions & 29 deletions src/graphcast_agent/mod.rs
Expand Up @@ -20,7 +20,7 @@ use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex as SyncMutex};
use tokio::runtime::Runtime;

use tokio::sync::Mutex as AsyncMutex;
use tracing::{debug, error, info, trace, warn};
use url::ParseError;
Expand Down Expand Up @@ -200,9 +200,9 @@ pub struct GraphcastAgent {
pub nonces: Arc<AsyncMutex<NoncesMap>>,
/// Callbook that make query requests
pub callbook: CallBook,
/// TODO: remove after confirming that gossippub seen_ttl works
/// msg_seen_ttl is for only relay messages they have not seen before, not effective for client nodes
/// A set of message ids sent from the agent
pub old_message_ids: Arc<AsyncMutex<HashSet<String>>>,
pub seen_msg_ids: Arc<SyncMutex<HashSet<String>>>,
/// Sender identity validation mechanism used by the Graphcast agent
pub id_validation: IdentityValidation,
/// Upon receiving a valid waku signal event of Message type, sender send WakuMessage through mpsc.
Expand Down Expand Up @@ -261,7 +261,6 @@ impl GraphcastAgent {
///
/// let agent = GraphcastAgent::new(config).await?;
/// ```

pub async fn new(
GraphcastAgentConfig {
wallet_key,
Expand Down Expand Up @@ -321,6 +320,10 @@ impl GraphcastAgent {

let callbook = CallBook::new(registry_subgraph, network_subgraph, graph_node_endpoint);

let sender = Arc::new(SyncMutex::new(sender));
let seen_msg_ids = Arc::new(SyncMutex::new(HashSet::new()));
register_handler(sender.clone(), seen_msg_ids.clone()).expect("Could not register handler");

Ok(GraphcastAgent {
graphcast_identity,
radio_name,
Expand All @@ -329,12 +332,25 @@ impl GraphcastAgent {
node_handle,
nonces: Arc::new(AsyncMutex::new(HashMap::new())),
callbook,
old_message_ids: Arc::new(AsyncMutex::new(HashSet::new())),
seen_msg_ids,
id_validation,
sender: Arc::new(SyncMutex::new(sender)),
sender,
})
}

/// Stop a GraphcastAgent instance
pub fn stop(self) -> Result<(), GraphcastAgentError> {
debug!("Stop Waku node");
let r = self
.node_handle
.stop()
.map_err(|e| GraphcastAgentError::WakuNodeError(WakuHandlingError::StopNodeError(e)));
debug!("Drop Arc std sync mutexes");
drop(self.seen_msg_ids);
drop(self.sender);
r
}

/// Get the number of peers excluding self
pub fn number_of_peers(&self) -> usize {
self.node_handle.peer_count().unwrap_or({
Expand Down Expand Up @@ -401,27 +417,6 @@ impl GraphcastAgent {
}
}

/// Establish handler for incoming Waku messages
pub fn register_handler(&'static self) -> Result<(), GraphcastAgentError> {
let sender = self.sender.clone();
let old_message_ids: &Arc<AsyncMutex<HashSet<String>>> = &self.old_message_ids;
let handle_async = move |signal: Signal| {
let rt = Runtime::new().expect("Could not create Tokio runtime");
rt.block_on(async {
let msg = handle_signal(signal, old_message_ids).await;

if let Ok(m) = msg {
match sender.clone().lock().unwrap().send(m) {
Ok(_) => trace!("Sent received message to radio operator"),
Err(e) => error!("Could not send message to channel: {:#?}", e),
}
}
});
};
waku_set_event_callback(handle_async);
Ok(())
}

/// For each topic, construct with custom write function and send
#[allow(unused_must_use)]
pub async fn send_message<
Expand All @@ -445,7 +440,6 @@ impl GraphcastAgent {

// Check network before sending a message
network_check(&self.node_handle).map_err(GraphcastAgentError::WakuNodeError)?;
let mut ids = self.old_message_ids.lock().await;
trace!(
address = &wallet_address(&self.graphcast_identity.wallet),
"local sender id"
Expand All @@ -462,7 +456,7 @@ impl GraphcastAgent {
.send_to_waku(&self.node_handle, self.pubsub_topic.clone(), content_topic)
.map_err(GraphcastAgentError::WakuNodeError)
.map(|id| {
ids.insert(id.clone());
self.seen_msg_ids.lock().unwrap().insert(id.clone());
trace!(id = id, "Sent message");
id
})
Expand Down Expand Up @@ -521,6 +515,25 @@ pub enum GraphcastAgentError {
Other(anyhow::Error),
}

/// Establish handler for incoming Waku messages
pub fn register_handler(
sender: Arc<SyncMutex<Sender<WakuMessage>>>,
seen_msg_ids: Arc<SyncMutex<HashSet<String>>>,
) -> Result<(), GraphcastAgentError> {
let handle_async = move |signal: Signal| {
let msg = handle_signal(signal, &seen_msg_ids);

if let Ok(m) = msg {
match sender.clone().lock().unwrap().send(m) {
Ok(_) => trace!("Sent received message to radio operator"),
Err(e) => error!("Could not send message to channel: {:#?}", e),
}
}
};
waku_set_event_callback(handle_async);
Ok(())
}

impl GraphcastAgentError {
pub fn type_string(&self) -> &'static str {
match self {
Expand Down