Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions crates/p2p/src/behaviours/pluto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ use std::sync::LazyLock;

use libp2p::{identify, identity::Keypair, ping, relay, swarm::NetworkBehaviour};

use crate::{config::default_ping_config, gater::ConnGater};
use crate::{config::default_ping_config, gater::ConnGater, peerstore::PeerStore};

/// Pluto network behaviour.
#[derive(NetworkBehaviour)]
pub struct PlutoBehaviour {
/// Peer store.
pub peerstore: PeerStore,
/// Connection gater behaviour.
pub gater: ConnGater,
/// Relay client behaviour.
Expand Down Expand Up @@ -84,8 +86,10 @@ impl PlutoBehaviourBuilder {
/// Builds the [`PlutoBehaviour`] with the provided keypair and relay
/// client.
pub fn build(self, key: &Keypair, relay_client: relay::client::Behaviour) -> PlutoBehaviour {
let peerstore = PeerStore::new();
PlutoBehaviour {
gater: self.gater.unwrap_or_else(ConnGater::new_open_gater),
peerstore: peerstore.clone(),
gater: self.gater.unwrap_or_else(|| ConnGater::new_open_gater(peerstore.clone())),
relay: relay_client,
identify: identify::Behaviour::new(
identify::Config::new(self.identify_protocol, key.public())
Expand Down
16 changes: 11 additions & 5 deletions crates/p2p/src/gater/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ use libp2p::{
},
};

use crate::peer::MutablePeer;
use crate::{peer::MutablePeer, peerstore::PeerStore};

mod handler;
pub mod handler;

/// Configuration for the connection gater.
#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -68,18 +68,20 @@ impl Config {
}

/// ConnGater filters incoming and outgoing connections by the cluster peers.
#[derive(Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct ConnGater {
config: Config,
events: VecDeque<Event>,
peerstore: PeerStore,
}

impl ConnGater {
/// Creates a new connection gater with the given configuration.
pub fn new(config: Config) -> Self {
pub fn new(config: Config, peerstore: PeerStore) -> Self {
Self {
config,
events: VecDeque::new(),
peerstore,
}
}

Expand All @@ -89,14 +91,16 @@ impl ConnGater {
Self {
config: Config::closed().with_peer_ids(peers).with_relays(relays),
events: VecDeque::new(),
peerstore: PeerStore::new(),
}
}

/// Creates a new open gater that does not gate any connections.
pub fn new_open_gater() -> Self {
pub fn new_open_gater(peerstore: PeerStore) -> Self {
Self {
config: Config::open(),
events: VecDeque::new(),
peerstore,
}
}

Expand Down Expand Up @@ -169,6 +173,8 @@ impl NetworkBehaviour for ConnGater {

fn on_swarm_event(&mut self, _event: FromSwarm) {
// No special handling needed for swarm events
tracing::warn!(active_peers = ?self.peerstore.peers::<Vec<_>>(), "Active peers (with clone)");
tracing::warn!(active_peers = ?self.peerstore.peers_lock(), "Active peers (without clone)");
}

fn on_connection_handler_event(
Expand Down
3 changes: 3 additions & 0 deletions crates/p2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,6 @@ pub mod behaviours;

/// K1 utilities.
pub mod k1;

/// Peer store.
pub mod peerstore;
130 changes: 130 additions & 0 deletions crates/p2p/src/peerstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#![allow(missing_docs)]

use std::{
collections::HashSet,
sync::{Arc, RwLock, RwLockReadGuard}, task::{Context, Poll},
};

use libp2p::{Multiaddr, PeerId, swarm::{ConnectionClosed, ConnectionDenied, ConnectionId, FromSwarm, NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm, behaviour::ConnectionEstablished}};

#[derive(Debug, Clone)]
pub struct PeerStore {
inner: Arc<RwLock<PeerStoreInner>>,
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Peer {
pub id: PeerId,
pub connection_id: ConnectionId,
}

#[derive(Debug, Clone)]
pub struct PeerStoreInner {
active_peers: HashSet<Peer>,
inactive_peers: HashSet<Peer>,
}

impl PeerStore {
pub fn new() -> Self {
Self {
inner: Arc::new(RwLock::new(PeerStoreInner {
active_peers: HashSet::new(),
inactive_peers: HashSet::new(),
})),
}
}

pub fn add_active_peer(&self, peer: Peer) {
let mut inner = self.inner.write().unwrap();
inner.inactive_peers.remove(&peer);
inner.active_peers.insert(peer);
}

pub fn remove_active_peer(&self, peer: &Peer) {
let mut inner = self.inner.write().unwrap();
inner.active_peers.remove(peer);
}

pub fn peers_lock<'a>(&'a self) -> RwLockReadGuard<'a, PeerStoreInner> {
self.inner.read().unwrap()
}

pub fn peers<T: FromIterator<Peer>>(&self) -> T {
let inner = self.inner.read().unwrap();
inner.active_peers.iter().cloned().collect()
}

pub fn inactive_peers<T: FromIterator<Peer>>(&self) -> T {
let inner = self.inner.read().unwrap();
inner.inactive_peers.iter().cloned().collect()
}

pub fn all_peers<T: FromIterator<Peer>>(&self) -> T {
let inner = self.peers_lock();
inner.active_peers.iter().chain(inner.inactive_peers.iter()).cloned().collect()
}
}


#[derive(Debug, Clone)]
pub enum Event {
/// A peer was added to the peer store.
PeerAdded(Peer),
/// A peer was removed from the peer store.
PeerRemoved(Peer),
}


impl NetworkBehaviour for PeerStore {
type ConnectionHandler = crate::gater::handler::Handler;
type ToSwarm = Event;

fn handle_established_inbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_local_addr: &Multiaddr,
_remote_addr: &Multiaddr,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Self::ConnectionHandler::new())
}

fn handle_established_outbound_connection(
&mut self,
_connection_id: ConnectionId,
_peer: PeerId,
_addr: &Multiaddr,
_role_override: libp2p::core::Endpoint,
_port_use: libp2p::core::transport::PortUse,
) -> Result<THandler<Self>, ConnectionDenied> {
Ok(Self::ConnectionHandler::new())
}

fn on_swarm_event(&mut self, event: FromSwarm) {
match event {
FromSwarm::ConnectionEstablished(ConnectionEstablished { peer_id, connection_id, .. }) => {
self.add_active_peer(Peer { id: peer_id, connection_id });
}
FromSwarm::ConnectionClosed(ConnectionClosed { peer_id, connection_id, .. }) => {
self.remove_active_peer(&Peer { id: peer_id, connection_id });
}
_ => {}
}
}

fn on_connection_handler_event(
&mut self,
_peer_id: PeerId,
_connection_id: ConnectionId,
_event: THandlerOutEvent<Self>,
) {
// Handler events are Void, so this is unreachable
}

fn poll(
&mut self,
_cx: &mut Context<'_>,
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
Poll::Pending
}
}
12 changes: 8 additions & 4 deletions crates/peerinfo/examples/peerinfo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use libp2p::{
use pluto_cluster::lock::Lock;
use pluto_core::version::{VERSION, git_commit};
use pluto_p2p::{
config::P2PConfig,
k1,
name::peer_name,
p2p::{Node, NodeType},
config::P2PConfig, gater::ConnGater, k1, name::peer_name, p2p::{Node, NodeType}, peerstore::PeerStore
};
use pluto_peerinfo::{Behaviour, Config, Event, LocalPeerInfo};
use pluto_tracing::{LokiConfig, TracingConfig};
Expand Down Expand Up @@ -86,6 +83,8 @@ fn parse_key_value(s: &str) -> Result<(String, String), String> {
/// Combined behaviour with peerinfo, identify, ping, and mdns
#[derive(NetworkBehaviour)]
pub struct CombinedBehaviour {
pub peerstore: PeerStore,
pub gater: ConnGater,
pub peer_info: Behaviour,
pub identify: identify::Behaviour,
pub ping: ping::Behaviour,
Expand Down Expand Up @@ -277,12 +276,17 @@ async fn main() -> anyhow::Result<()> {
&args.nickname,
);

let peerstore = PeerStore::new();
let gater = ConnGater::new_open_gater(peerstore.clone());

let Node { mut swarm, .. } = Node::new(
P2PConfig::default(),
key,
false,
NodeType::TCP,
|key, relay_client| CombinedBehaviour {
peerstore: peerstore.clone(),
gater: gater.clone(),
peer_info: Behaviour::new(
Config::new(local_info.clone())
.with_peers(peers.clone())
Expand Down
4 changes: 2 additions & 2 deletions crates/relay-server/src/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::{
error::RelayP2PError,
web::enr_server,
};
use pluto_p2p::{gater::ConnGater, p2p::Node};
use pluto_p2p::{gater::ConnGater, p2p::Node, peerstore::PeerStore};

/// Runs a relay P2P node.
#[instrument(skip(config, key, ct))]
Expand All @@ -26,7 +26,7 @@ pub async fn run_relay_p2p_node(
) -> Result<Node<RelayServerBehaviour>> {
let mut node = Node::new_relay_server(config.p2p_config.clone(), key.clone(), false, |key| {
RelayServerBehaviour::builder()
.with_gater(ConnGater::new_open_gater())
.with_gater(ConnGater::new_open_gater(PeerStore::new()))
.with_relay_config(create_relay_config(config))
.build(key)
})?;
Expand Down
Loading