Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(connection-pool): Keep only fluence peers in the connection pool #1440

Merged
merged 11 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions connection-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ pub enum Command {
out: OneshotOutlet<Option<Contact>>,
},
Disconnect {
contact: Contact,
peer_id: PeerId,
out: OneshotOutlet<bool>,
},
IsConnected {
Expand Down Expand Up @@ -100,9 +100,9 @@ impl ConnectionPoolT for ConnectionPoolApi {
self.execute(|out| Command::Connect { contact, out })
}

fn disconnect(&self, contact: Contact) -> BoxFuture<'static, bool> {
fn disconnect(&self, peer_id: PeerId) -> BoxFuture<'static, bool> {
// TODO: timeout needed? will be clearer when disconnect is implemented
self.execute(|out| Command::Disconnect { contact, out })
self.execute(|out| Command::Disconnect { peer_id, out })
}

fn is_connected(&self, peer_id: PeerId) -> BoxFuture<'static, bool> {
Expand Down
16 changes: 9 additions & 7 deletions connection-pool/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use std::{
use futures::channel::mpsc;
use futures::StreamExt;
use libp2p::swarm::dial_opts::DialOpts;
use libp2p::swarm::CloseConnection::All;
use libp2p::swarm::{dial_opts, DialError, IntoConnectionHandler};
use libp2p::{
core::{connection::ConnectionId, ConnectedPoint, Multiaddr},
Expand Down Expand Up @@ -124,7 +125,7 @@ impl ConnectionPoolBehaviour {
match cmd {
Command::Dial { addr, out } => self.dial(addr, out),
Command::Connect { contact, out } => self.connect(contact, out),
Command::Disconnect { contact, out } => self.disconnect(contact, out),
Command::Disconnect { peer_id, out } => self.disconnect(peer_id, out),
Command::IsConnected { peer_id, out } => self.is_connected(peer_id, out),
Command::GetContact { peer_id, out } => self.get_contact(peer_id, out),
Command::Send { to, particle, out } => self.send(to, particle, out),
Expand Down Expand Up @@ -195,12 +196,13 @@ impl ConnectionPoolBehaviour {
}
}

// TODO: implement
pub fn disconnect(&mut self, contact: Contact, _outlet: OneshotOutlet<bool>) {
todo!(
"this doesn't make sense with OneShotHandler since connections are short-lived {:?}",
contact
)
pub fn disconnect(&mut self, peer_id: PeerId, outlet: OneshotOutlet<bool>) {
self.push_event(NetworkBehaviourAction::CloseConnection {
peer_id,
connection: All,
});
// TODO: signal disconnect completion only after `peer_removed` was called or Disconnect failed
outlet.send(true).ok();
folex marked this conversation as resolved.
Show resolved Hide resolved
}

/// Returns whether given peer is connected or not
Expand Down
2 changes: 1 addition & 1 deletion connection-pool/src/connection_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Display for LifecycleEvent {
pub trait ConnectionPoolT {
fn dial(&self, addr: Multiaddr) -> BoxFuture<'static, Option<Contact>>;
fn connect(&self, contact: Contact) -> BoxFuture<'static, bool>;
fn disconnect(&self, contact: Contact) -> BoxFuture<'static, bool>;
fn disconnect(&self, peer_id: PeerId) -> BoxFuture<'static, bool>;
fn is_connected(&self, peer_id: PeerId) -> BoxFuture<'static, bool>;
fn get_contact(&self, peer_id: PeerId) -> BoxFuture<'static, Option<Contact>>;
fn send(&self, to: Contact, particle: Particle) -> BoxFuture<'static, SendStatus>;
Expand Down
53 changes: 41 additions & 12 deletions particle-node/src/behaviour/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
* limitations under the License.
*/

use futures::channel::oneshot;
use itertools::Itertools;
use libp2p::{
core::{multiaddr::Protocol, Multiaddr},
identify::Event as IdentifyEvent,
};
use particle_protocol::PROTOCOL_NAME;

use super::FluenceNetworkBehaviour;

Expand All @@ -36,17 +38,44 @@ impl FluenceNetworkBehaviour {
info.listen_addrs
);

let addresses = filter_addresses(info.listen_addrs, allow_local_addresses);
let addresses = filter_addresses(info.listen_addrs.clone(), allow_local_addresses);

// Add addresses to connection pool disregarding whether it supports kademlia or not
// we want to have full info on non-kademlia peers as well
self.connection_pool
.add_discovered_addresses(peer_id, addresses.clone());
let mut supports_kademlia = false;
let mut supports_fluence = false;

let supports_kademlia =
info.protocols.iter().any(|p| p.contains("/ipfs/kad/1.0.0"));
if supports_kademlia {
self.kademlia.add_kad_node(peer_id, addresses);
for protocol in info.protocols.iter() {
if !supports_kademlia && protocol.contains("/ipfs/kad/1.0.0") {
supports_kademlia = true;
}
if !supports_fluence && protocol.contains(PROTOCOL_NAME) {
supports_fluence = true;
}
if supports_fluence && supports_kademlia {
break;
}
}

if supports_fluence {
log::debug!(target: "network", "Found fluence peer {}: protocols: {:?} version: {} listen addrs {:?}", peer_id, info.protocols,
info.protocol_version,
info.listen_addrs);
// Add addresses to connection pool disregarding whether it supports kademlia or not
// we want to have full info on non-kademlia peers as well
self.connection_pool
.add_discovered_addresses(peer_id, addresses.clone());
if supports_kademlia {
self.kademlia.add_kad_node(peer_id, addresses);
}
} else {
log::debug!(
folex marked this conversation as resolved.
Show resolved Hide resolved
target: "blocked",
"Found peer {} not supported fluence protocol, protocols: {:?} version: {} listen addrs {:?}. skipping...",
peer_id, info.protocols,
info.protocol_version,
info.listen_addrs
);
let (out, _inlet) = oneshot::channel();
self.connection_pool.disconnect(peer_id, out);
}
}

Expand All @@ -63,16 +92,16 @@ impl FluenceNetworkBehaviour {

fn filter_addresses(addresses: Vec<Multiaddr>, allow_local: bool) -> Vec<Multiaddr> {
// Deduplicate addresses
let addresses: Vec<_> = addresses.into_iter().unique().collect();
let addresses = addresses.iter().unique();

if allow_local {
// Return all addresses
addresses
addresses.cloned().collect()
} else {
// Keep only global addresses
addresses
.into_iter()
.filter(|maddr| !is_local_maddr(maddr))
.cloned()
.collect()
}
}
Expand Down
2 changes: 1 addition & 1 deletion particle-node/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ impl<RT: AquaRuntime> Node<RT> {
e = swarm.select_next_some() => {
if let Some(m) = libp2p_metrics.as_ref() { m.record(&e) }
if let SwarmEvent::Behaviour(FluenceNetworkBehaviourEvent::Identify(i)) = e {
swarm.behaviour_mut().inject_identify_event(i, true)
swarm.behaviour_mut().inject_identify_event(i, true);
}
},
e = metrics_fut => {
Expand Down