diff --git a/crates/p2p/src/behaviours/pluto.rs b/crates/p2p/src/behaviours/pluto.rs index ea9b0826..73c28dc9 100644 --- a/crates/p2p/src/behaviours/pluto.rs +++ b/crates/p2p/src/behaviours/pluto.rs @@ -4,11 +4,13 @@ use std::sync::LazyLock; use libp2p::{identify, identity::Keypair, ping, relay, swarm::NetworkBehaviour}; -use crate::{config::default_ping_config, gater::ConnGater}; +use crate::{config::default_ping_config, gater::ConnGater, peerstore::PeerStore}; /// Pluto network behaviour. #[derive(NetworkBehaviour)] pub struct PlutoBehaviour { + /// Peer store. + pub peerstore: PeerStore, /// Connection gater behaviour. pub gater: ConnGater, /// Relay client behaviour. @@ -84,8 +86,10 @@ impl PlutoBehaviourBuilder { /// Builds the [`PlutoBehaviour`] with the provided keypair and relay /// client. pub fn build(self, key: &Keypair, relay_client: relay::client::Behaviour) -> PlutoBehaviour { + let peerstore = PeerStore::new(); PlutoBehaviour { - gater: self.gater.unwrap_or_else(ConnGater::new_open_gater), + peerstore: peerstore.clone(), + gater: self.gater.unwrap_or_else(|| ConnGater::new_open_gater(peerstore.clone())), relay: relay_client, identify: identify::Behaviour::new( identify::Config::new(self.identify_protocol, key.public()) diff --git a/crates/p2p/src/gater/mod.rs b/crates/p2p/src/gater/mod.rs index 9e06d9d4..c5065c95 100644 --- a/crates/p2p/src/gater/mod.rs +++ b/crates/p2p/src/gater/mod.rs @@ -21,9 +21,9 @@ use libp2p::{ }, }; -use crate::peer::MutablePeer; +use crate::{peer::MutablePeer, peerstore::PeerStore}; -mod handler; +pub mod handler; /// Configuration for the connection gater. #[derive(Debug, Clone, Default)] @@ -68,18 +68,20 @@ impl Config { } /// ConnGater filters incoming and outgoing connections by the cluster peers. -#[derive(Debug, Clone, Default)] +#[derive(Debug, Clone)] pub struct ConnGater { config: Config, events: VecDeque, + peerstore: PeerStore, } impl ConnGater { /// Creates a new connection gater with the given configuration. - pub fn new(config: Config) -> Self { + pub fn new(config: Config, peerstore: PeerStore) -> Self { Self { config, events: VecDeque::new(), + peerstore, } } @@ -89,14 +91,16 @@ impl ConnGater { Self { config: Config::closed().with_peer_ids(peers).with_relays(relays), events: VecDeque::new(), + peerstore: PeerStore::new(), } } /// Creates a new open gater that does not gate any connections. - pub fn new_open_gater() -> Self { + pub fn new_open_gater(peerstore: PeerStore) -> Self { Self { config: Config::open(), events: VecDeque::new(), + peerstore, } } @@ -169,6 +173,8 @@ impl NetworkBehaviour for ConnGater { fn on_swarm_event(&mut self, _event: FromSwarm) { // No special handling needed for swarm events + tracing::warn!(active_peers = ?self.peerstore.peers::>(), "Active peers (with clone)"); + tracing::warn!(active_peers = ?self.peerstore.peers_lock(), "Active peers (without clone)"); } fn on_connection_handler_event( diff --git a/crates/p2p/src/lib.rs b/crates/p2p/src/lib.rs index 2043c8e7..04adfd37 100644 --- a/crates/p2p/src/lib.rs +++ b/crates/p2p/src/lib.rs @@ -28,3 +28,6 @@ pub mod behaviours; /// K1 utilities. pub mod k1; + +/// Peer store. +pub mod peerstore; diff --git a/crates/p2p/src/peerstore.rs b/crates/p2p/src/peerstore.rs new file mode 100644 index 00000000..b073b070 --- /dev/null +++ b/crates/p2p/src/peerstore.rs @@ -0,0 +1,130 @@ +#![allow(missing_docs)] + +use std::{ + collections::HashSet, + sync::{Arc, RwLock, RwLockReadGuard}, task::{Context, Poll}, +}; + +use libp2p::{Multiaddr, PeerId, swarm::{ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, behaviour::ConnectionEstablished}}; + +#[derive(Debug, Clone)] +pub struct PeerStore { + inner: Arc>, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct Peer { + pub id: PeerId, + pub connection_id: ConnectionId, +} + +#[derive(Debug, Clone)] +pub struct PeerStoreInner { + active_peers: HashSet, + inactive_peers: HashSet, +} + +impl PeerStore { + pub fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(PeerStoreInner { + active_peers: HashSet::new(), + inactive_peers: HashSet::new(), + })), + } + } + + pub fn add_active_peer(&self, peer: Peer) { + let mut inner = self.inner.write().unwrap(); + inner.inactive_peers.remove(&peer); + inner.active_peers.insert(peer); + } + + pub fn remove_active_peer(&self, peer: &Peer) { + let mut inner = self.inner.write().unwrap(); + inner.active_peers.remove(peer); + } + + pub fn peers_lock<'a>(&'a self) -> RwLockReadGuard<'a, PeerStoreInner> { + self.inner.read().unwrap() + } + + pub fn peers>(&self) -> T { + let inner = self.inner.read().unwrap(); + inner.active_peers.iter().cloned().collect() + } + + pub fn inactive_peers>(&self) -> T { + let inner = self.inner.read().unwrap(); + inner.inactive_peers.iter().cloned().collect() + } + + pub fn all_peers>(&self) -> T { + let inner = self.peers_lock(); + inner.active_peers.iter().chain(inner.inactive_peers.iter()).cloned().collect() + } +} + + +#[derive(Debug, Clone)] +pub enum Event { + /// A peer was added to the peer store. + PeerAdded(Peer), + /// A peer was removed from the peer store. + PeerRemoved(Peer), +} + + +impl NetworkBehaviour for PeerStore { + type ConnectionHandler = crate::gater::handler::Handler; + type ToSwarm = Event; + + fn handle_established_inbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _local_addr: &Multiaddr, + _remote_addr: &Multiaddr, + ) -> Result, ConnectionDenied> { + Ok(Self::ConnectionHandler::new()) + } + + fn handle_established_outbound_connection( + &mut self, + _connection_id: ConnectionId, + _peer: PeerId, + _addr: &Multiaddr, + _role_override: libp2p::core::Endpoint, + _port_use: libp2p::core::transport::PortUse, + ) -> Result, ConnectionDenied> { + Ok(Self::ConnectionHandler::new()) + } + + fn on_swarm_event(&mut self, event: FromSwarm) { + match event { + FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, connection_id, .. }) => { + self.add_active_peer(Peer { id: peer_id, connection_id }); + } + FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => { + self.remove_active_peer(&Peer { id: peer_id, connection_id }); + } + _ => {} + } + } + + fn on_connection_handler_event( + &mut self, + _peer_id: PeerId, + _connection_id: ConnectionId, + _event: THandlerOutEvent, + ) { + // Handler events are Void, so this is unreachable + } + + fn poll( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll>> { + Poll::Pending + } +} diff --git a/crates/peerinfo/examples/peerinfo.rs b/crates/peerinfo/examples/peerinfo.rs index 1fbc7b7d..244461b5 100644 --- a/crates/peerinfo/examples/peerinfo.rs +++ b/crates/peerinfo/examples/peerinfo.rs @@ -20,10 +20,7 @@ use libp2p::{ use pluto_cluster::lock::Lock; use pluto_core::version::{VERSION, git_commit}; use pluto_p2p::{ - config::P2PConfig, - k1, - name::peer_name, - p2p::{Node, NodeType}, + config::P2PConfig, gater::ConnGater, k1, name::peer_name, p2p::{Node, NodeType}, peerstore::PeerStore }; use pluto_peerinfo::{Behaviour, Config, Event, LocalPeerInfo}; use pluto_tracing::{LokiConfig, TracingConfig}; @@ -86,6 +83,8 @@ fn parse_key_value(s: &str) -> Result<(String, String), String> { /// Combined behaviour with peerinfo, identify, ping, and mdns #[derive(NetworkBehaviour)] pub struct CombinedBehaviour { + pub peerstore: PeerStore, + pub gater: ConnGater, pub peer_info: Behaviour, pub identify: identify::Behaviour, pub ping: ping::Behaviour, @@ -277,12 +276,17 @@ async fn main() -> anyhow::Result<()> { &args.nickname, ); + let peerstore = PeerStore::new(); + let gater = ConnGater::new_open_gater(peerstore.clone()); + let Node { mut swarm, .. } = Node::new( P2PConfig::default(), key, false, NodeType::TCP, |key, relay_client| CombinedBehaviour { + peerstore: peerstore.clone(), + gater: gater.clone(), peer_info: Behaviour::new( Config::new(local_info.clone()) .with_peers(peers.clone()) diff --git a/crates/relay-server/src/p2p.rs b/crates/relay-server/src/p2p.rs index 923a2e53..244f3ce2 100644 --- a/crates/relay-server/src/p2p.rs +++ b/crates/relay-server/src/p2p.rs @@ -15,7 +15,7 @@ use crate::{ error::RelayP2PError, web::enr_server, }; -use pluto_p2p::{gater::ConnGater, p2p::Node}; +use pluto_p2p::{gater::ConnGater, p2p::Node, peerstore::PeerStore}; /// Runs a relay P2P node. #[instrument(skip(config, key, ct))] @@ -26,7 +26,7 @@ pub async fn run_relay_p2p_node( ) -> Result> { let mut node = Node::new_relay_server(config.p2p_config.clone(), key.clone(), false, |key| { RelayServerBehaviour::builder() - .with_gater(ConnGater::new_open_gater()) + .with_gater(ConnGater::new_open_gater(PeerStore::new())) .with_relay_config(create_relay_config(config)) .build(key) })?;