Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peerData, localPeerData GraphQL APIs #107

Merged
merged 2 commits into from Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 5 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -12,4 +12,4 @@ panic = 'unwind'
opt-level = 3

[workspace.dependencies]
graphcast-sdk = "0.5.0"
graphcast-sdk = { git = "https://github.com/graphops/graphcast-sdk" }
2 changes: 1 addition & 1 deletion subgraph-radio/src/config.rs
Expand Up @@ -100,7 +100,7 @@ impl Config {
) -> Result<GraphcastAgentConfig, GraphcastAgentError> {
let wallet_key = self.wallet_input().unwrap().to_string();
let topics = self.radio_infrastructure().topics.clone();
let mut discv5_enrs = self.waku().discv5_enrs.clone().unwrap_or(vec![]);
let mut discv5_enrs = self.waku().discv5_enrs.clone().unwrap_or_default();
// Discovery network
discv5_enrs.push("enr:-P-4QJI8tS1WTdIQxq_yIrD05oIIW1Xg-tm_qfP0CHfJGnp9dfr6ttQJmHwTNxGEl4Le8Q7YHcmi-kXTtphxFysS11oBgmlkgnY0gmlwhLymh5GKbXVsdGlhZGRyc7hgAC02KG5vZGUtMDEuZG8tYW1zMy53YWt1djIucHJvZC5zdGF0dXNpbS5uZXQGdl8ALzYobm9kZS0wMS5kby1hbXMzLndha3V2Mi5wcm9kLnN0YXR1c2ltLm5ldAYfQN4DiXNlY3AyNTZrMaEDbl1X_zJIw3EAJGtmHMVn4Z2xhpSoUaP5ElsHKCv7hlWDdGNwgnZfg3VkcIIjKIV3YWt1Mg8".to_string());

Expand Down
19 changes: 9 additions & 10 deletions subgraph-radio/src/operator/mod.rs
Expand Up @@ -3,9 +3,7 @@ use std::time::Duration;

use graphcast_sdk::{
graphcast_agent::{
message_typing::check_message_validity,
waku_handling::{connected_peer_count, WakuHandlingError},
GraphcastAgent,
message_typing::check_message_validity, waku_handling::WakuHandlingError, GraphcastAgent,
},
graphql::client_graph_node::{subgraph_network_blocks, update_network_chainheads},
WakuMessage,
Expand Down Expand Up @@ -151,6 +149,7 @@ impl RadioOperator {
tokio::spawn(run_server(
config_cloned,
state_ref,
self.graphcast_agent(),
self.control_flow.server_handle.clone(),
));
}
Expand Down Expand Up @@ -185,10 +184,10 @@ impl RadioOperator {
continue;
}
// Update the number of peers connected
let connected_peers = connected_peer_count(&self.graphcast_agent().node_handle).unwrap_or_default();
let gossip_peers = self.graphcast_agent.number_of_peers();
CONNECTED_PEERS.set(connected_peers.try_into().unwrap_or_default());
GOSSIP_PEERS.set(gossip_peers.try_into().unwrap_or_default());
let connected_peers = self.graphcast_agent.connected_peer_count().unwrap_or_default() as i64;
let gossip_peers = self.graphcast_agent.number_of_peers() as i64;
CONNECTED_PEERS.set(connected_peers);
GOSSIP_PEERS.set(gossip_peers);

let diverged_num = self.persisted_state.comparison_result_typed(ComparisonResultType::Divergent).len();
DIVERGING_SUBGRAPHS.set(diverged_num.try_into().unwrap());
Expand Down Expand Up @@ -235,13 +234,13 @@ impl RadioOperator {
// Function takes in an identifier string and make specific queries regarding the identifier
// The example here combines a single function provided query endpoint, current block info based on the subgraph's indexing network
// Then the function gets sent to agent for making identifier independent queries
let identifiers = self.graphcast_agent.content_identifiers().await;
let identifiers = self.graphcast_agent.content_identifiers();
let num_topics = identifiers.len();
let blocks_str = chainhead_block_str(&network_chainhead_blocks);
info!(
chainhead = blocks_str.clone(),
num_gossip_peers = self.graphcast_agent.number_of_peers(),
num_connected_peers = connected_peer_count(&self.graphcast_agent.node_handle).unwrap_or_default(),
num_connected_peers = self.graphcast_agent.connected_peer_count().unwrap_or_default(),
num_topics,
"Network statuses",
);
Expand Down Expand Up @@ -284,7 +283,7 @@ impl RadioOperator {
let network_chainhead_blocks = update_network_chainheads(
indexing_status,
);
let identifiers = self.graphcast_agent().content_identifiers().await;
let identifiers = self.graphcast_agent().content_identifiers();
let blocks_str = chainhead_block_str(&network_chainhead_blocks);

trace!(
Expand Down
14 changes: 12 additions & 2 deletions subgraph-radio/src/server/mod.rs
@@ -1,5 +1,6 @@
use axum::{extract::Extension, routing::get, Router};
use axum_server::Handle;
use graphcast_sdk::graphcast_agent::GraphcastAgent;
use std::net::SocketAddr;
use std::str::FromStr;

Expand All @@ -22,12 +23,21 @@ pub mod routes;
/// Set up the routes for a radio health endpoint at `/health`
/// and a versioned GraphQL endpoint at `api/v1/graphql`
/// This function starts a API server at the configured server_host and server_port
pub async fn run_server(config: Config, persisted_state: &'static PersistedState, handle: Handle) {
pub async fn run_server(
config: Config,
persisted_state: &'static PersistedState,
graphcast_agent: &'static GraphcastAgent,
handle: Handle,
) {
if config.radio_infrastructure().server_port.is_none() {
return;
}
let port = config.radio_infrastructure().server_port.unwrap();
let context = Arc::new(SubgraphRadioContext::init(config.clone(), persisted_state));
let context = Arc::new(SubgraphRadioContext::init(
config.clone(),
persisted_state,
graphcast_agent,
));

let schema = build_schema(Arc::clone(&context)).await;

Expand Down
55 changes: 53 additions & 2 deletions subgraph-radio/src/server/model/mod.rs
Expand Up @@ -14,7 +14,10 @@ use crate::{
},
state::PersistedState,
};
use graphcast_sdk::{graphcast_agent::message_typing::GraphcastMessage, graphql::QueryError};
use graphcast_sdk::{
graphcast_agent::{message_typing::GraphcastMessage, GraphcastAgent, PeerData},
graphql::QueryError,
};

pub(crate) type SubgraphRadioSchema = Schema<QueryRoot, EmptyMutation, EmptySubscription>;

Expand Down Expand Up @@ -139,6 +142,25 @@ impl QueryRoot {
stake: basic_info.1,
})
}

/// Return Waku Peer data excluding local waku node
async fn gossip_peers(&self, ctx: &Context<'_>) -> Result<Vec<PeerData>, HttpServiceError> {
let peers = ctx
.data_unchecked::<Arc<SubgraphRadioContext>>()
.gossip_peers();
Ok(peers)
}

/// Return Waku Peer data for the local waku node
async fn local_gossip_peer(
&self,
ctx: &Context<'_>,
) -> Result<Option<PeerData>, HttpServiceError> {
let peer = ctx
.data_unchecked::<Arc<SubgraphRadioContext>>()
.local_gossip_node();
Ok(peer)
}
}

/// Helper function to order attestations by stake weight and then calculate stake and sender ratios
Expand Down Expand Up @@ -186,13 +208,19 @@ pub async fn build_schema(ctx: Arc<SubgraphRadioContext>) -> SubgraphRadioSchema
pub struct SubgraphRadioContext {
pub radio_config: Config,
pub persisted_state: &'static PersistedState,
pub graphcast_agent: &'static GraphcastAgent,
}

impl SubgraphRadioContext {
pub fn init(radio_config: Config, persisted_state: &'static PersistedState) -> Self {
pub fn init(
radio_config: Config,
persisted_state: &'static PersistedState,
graphcast_agent: &'static GraphcastAgent,
) -> Self {
Self {
radio_config,
persisted_state,
graphcast_agent,
}
}

Expand Down Expand Up @@ -320,6 +348,29 @@ impl SubgraphRadioContext {
pub fn radio_config(&self) -> Config {
self.radio_config.clone()
}

pub fn gossip_peers(&self) -> Vec<PeerData> {
self.graphcast_agent
.peers_data()
.unwrap_or_default()
.into_iter()
.map(|p| PeerData {
peer_id: p.peer_id().to_string(),
protocols: p.protocols().iter().map(|p| p.to_string()).collect(),
addresses: p.addresses().iter().map(|a| a.to_string()).collect(),
connected: p.connected(),
})
.collect()
}

pub fn local_gossip_node(&self) -> Option<PeerData> {
self.graphcast_agent.local_peer().map(|p| PeerData {
peer_id: p.peer_id().to_string(),
protocols: p.protocols().iter().map(|p| p.to_string()).collect(),
addresses: p.addresses().iter().map(|a| a.to_string()).collect(),
connected: p.connected(),
})
}
}

/// Filter funciton for Attestations on deployment and block
Expand Down
1 change: 0 additions & 1 deletion subgraph-radio/src/state.rs
Expand Up @@ -29,7 +29,6 @@ type Local = Arc<SyncMutex<HashMap<String, HashMap<u64, Attestation>>>>;
type Remote = Arc<SyncMutex<Vec<GraphcastMessage<PublicPoiMessage>>>>;
type UpgradeMessages = Arc<SyncMutex<HashMap<String, GraphcastMessage<UpgradeIntentMessage>>>>;
type ComparisonResults = Arc<SyncMutex<HashMap<String, ComparisonResult>>>;

type Notifications = Arc<SyncMutex<HashMap<String, String>>>;

#[derive(Serialize, Deserialize, Clone, Debug)]
Expand Down
2 changes: 1 addition & 1 deletion test-runner/Cargo.toml
Expand Up @@ -21,7 +21,7 @@ categories = [
]

[dependencies]
waku = { version = "0.3.0", package = "waku-bindings" }
waku = { version = "0.4.0", package = "waku-bindings" }
test-utils = { path = "../test-utils" }
graphcast-sdk = { workspace = true }
subgraph-radio = { path = "../subgraph-radio" }
Expand Down
2 changes: 1 addition & 1 deletion test-sender/Cargo.toml
Expand Up @@ -21,7 +21,7 @@ categories = [
]

[dependencies]
waku = { version = "0.3.0", package = "waku-bindings" }
waku = { version = "0.4.0", package = "waku-bindings" }
graphcast-sdk = { workspace = true }
test-utils = { path = "../test-utils" }
subgraph-radio = { path = "../subgraph-radio" }
Expand Down
6 changes: 2 additions & 4 deletions test-sender/src/main.rs
Expand Up @@ -8,15 +8,13 @@ use graphcast_sdk::{
},
init_tracing,
networks::NetworkName,
wallet_address,
wallet_address, WakuPubSubTopic,
};
use std::{net::IpAddr, str::FromStr, thread::sleep, time::Duration};
use subgraph_radio::messages::poi::PublicPoiMessage;
use test_utils::{config::TestSenderConfig, dummy_msg::DummyMsg, find_random_udp_port};
use tracing::{error, info};
use waku::{
waku_new, GossipSubParams, ProtocolId, WakuContentTopic, WakuNodeConfig, WakuPubSubTopic,
};
use waku::{waku_new, GossipSubParams, ProtocolId, WakuContentTopic, WakuNodeConfig};

async fn start_sender(config: TestSenderConfig) {
std::env::set_var(
Expand Down
2 changes: 1 addition & 1 deletion test-utils/Cargo.toml
Expand Up @@ -21,7 +21,7 @@ categories = [
]

[dependencies]
waku = { version = "0.3.0", package = "waku-bindings" }
waku = { version = "0.4.0", package = "waku-bindings" }
graphcast-sdk = { workspace = true }
subgraph-radio = { path = "../subgraph-radio" }
tokio = { version = "1.1.1", features = ["full", "rt"] }
Expand Down