diff --git a/CHANGELOG.md b/CHANGELOG.md index 11971d144e6..a6e7dd50480 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Changed +- [#1597](https://github.com/FuelLabs/fuel-core/pull/1597): Unify namespacing for `libp2p` modules - [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly. - [#1590](https://github.com/FuelLabs/fuel-core/pull/1590): Use `AtomicView` in the `TxPool` to read the state of the database during insertion of the transactions. - [#1587](https://github.com/FuelLabs/fuel-core/pull/1587): Use `BlockHeight` as a primary key for the `FuelsBlock` table. diff --git a/benches/src/bin/collect.rs b/benches/src/bin/collect.rs index 0b81125d1d0..e0f5eb64ecf 100644 --- a/benches/src/bin/collect.rs +++ b/benches/src/bin/collect.rs @@ -292,7 +292,7 @@ fn decode_input(line: &str) -> Option { }, _ => return None, }; - let throughput = if let Some(t) = val.get("throughput")?.as_array()?.get(0) { + let throughput = if let Some(t) = val.get("throughput")?.as_array()?.first() { Some(t.as_object()?.get("per_iteration")?.as_u64()?) } else { None diff --git a/bin/fuel-core/src/cli/run/p2p.rs b/bin/fuel-core/src/cli/run/p2p.rs index f598e22381c..1b069db5625 100644 --- a/bin/fuel-core/src/cli/run/p2p.rs +++ b/bin/fuel-core/src/cli/run/p2p.rs @@ -12,7 +12,7 @@ use fuel_core::{ MAX_RESPONSE_SIZE, }, gossipsub_config::default_gossipsub_builder, - HeartbeatConfig, + heartbeat, Multiaddr, }, types::{ @@ -276,7 +276,7 @@ impl P2PArgs { let heartbeat_config = { let send_duration = Duration::from_secs(self.heartbeat_send_duration); let idle_duration = Duration::from_secs(self.heartbeat_idle_duration); - HeartbeatConfig::new( + heartbeat::Config::new( send_duration, idle_duration, self.heartbeat_max_failures, diff --git a/crates/services/p2p/src/behavior.rs b/crates/services/p2p/src/behavior.rs index 8246cf2af3e..a8ccd9a38f0 100644 --- a/crates/services/p2p/src/behavior.rs +++ b/crates/services/p2p/src/behavior.rs @@ -4,16 +4,13 @@ use crate::{ NetworkCodec, }, config::Config, - discovery::{ - DiscoveryBehaviour, - DiscoveryConfig, - }, + discovery, gossipsub::{ config::build_gossipsub_behaviour, topics::GossipTopic, }, heartbeat, - peer_report::PeerReportBehaviour, + peer_report, request_response::messages::{ RequestMessage, ResponseMessage, @@ -23,15 +20,14 @@ use fuel_core_types::fuel_types::BlockHeight; use libp2p::{ allow_block_list, gossipsub::{ - Behaviour as Gossipsub, + self, MessageAcceptance, MessageId, PublishError, }, identify, request_response::{ - Behaviour as RequestResponse, - Config as RequestResponseConfig, + self, OutboundRequestId, ProtocolSupport, ResponseChannel, @@ -50,22 +46,22 @@ pub struct FuelBehaviour { blocked_peer: allow_block_list::Behaviour, /// Message propagation for p2p - gossipsub: Gossipsub, + gossipsub: gossipsub::Behaviour, /// Handles regular heartbeats from peers - heartbeat: heartbeat::Heartbeat, + heartbeat: heartbeat::Behaviour, /// The Behaviour to identify peers. identify: identify::Behaviour, /// Identifies and periodically requests `BlockHeight` from connected nodes - peer_report: PeerReportBehaviour, + peer_report: peer_report::Behaviour, /// Node discovery - discovery: DiscoveryBehaviour, + discovery: discovery::Behaviour, /// RequestResponse protocol - request_response: RequestResponse, + request_response: request_response::Behaviour, } impl FuelBehaviour { @@ -75,7 +71,7 @@ impl FuelBehaviour { let discovery_config = { let mut discovery_config = - DiscoveryConfig::new(local_peer_id, p2p_config.network_name.clone()); + discovery::Config::new(local_peer_id, p2p_config.network_name.clone()); discovery_config .enable_mdns(p2p_config.enable_mdns) @@ -97,7 +93,7 @@ impl FuelBehaviour { let gossipsub = build_gossipsub_behaviour(p2p_config); - let peer_report = PeerReportBehaviour::new(p2p_config); + let peer_report = peer_report::Behaviour::new(p2p_config); let identify = { let identify_config = identify::Config::new( @@ -111,7 +107,7 @@ impl FuelBehaviour { } }; - let heartbeat = heartbeat::Heartbeat::new( + let heartbeat = heartbeat::Behaviour::new( p2p_config.heartbeat_config.clone(), BlockHeight::default(), ); @@ -119,13 +115,16 @@ impl FuelBehaviour { let req_res_protocol = core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full)); - let req_res_config = RequestResponseConfig::default(); + let req_res_config = request_response::Config::default(); req_res_config .clone() .with_request_timeout(p2p_config.set_request_timeout); - let request_response = - RequestResponse::with_codec(codec, req_res_protocol, req_res_config); + let request_response = request_response::Behaviour::with_codec( + codec, + req_res_protocol, + req_res_config, + ); Self { discovery: discovery_config.finish(), diff --git a/crates/services/p2p/src/codecs.rs b/crates/services/p2p/src/codecs.rs index 02b2c0ba7c7..c22aacd5671 100644 --- a/crates/services/p2p/src/codecs.rs +++ b/crates/services/p2p/src/codecs.rs @@ -11,7 +11,7 @@ use crate::{ ResponseMessage, }, }; -use libp2p::request_response::Codec as RequestResponseCodec; +use libp2p::request_response; use std::io; /// Implement this in order to handle serialization & deserialization of Gossipsub messages @@ -34,12 +34,12 @@ pub trait NetworkCodec: GossipsubCodec< RequestMessage = GossipsubBroadcastRequest, ResponseMessage = GossipsubMessage, - > + RequestResponseCodec + > + request_response::Codec + Clone + Send + 'static { /// Returns RequestResponse's Protocol /// Needed for initialization of RequestResponse Behaviour - fn get_req_res_protocol(&self) -> ::Protocol; + fn get_req_res_protocol(&self) -> ::Protocol; } diff --git a/crates/services/p2p/src/codecs/postcard.rs b/crates/services/p2p/src/codecs/postcard.rs index 1af88d08f18..94f23cd6fd2 100644 --- a/crates/services/p2p/src/codecs/postcard.rs +++ b/crates/services/p2p/src/codecs/postcard.rs @@ -20,7 +20,7 @@ use futures::{ AsyncReadExt, AsyncWriteExt, }; -use libp2p::request_response::Codec as RequestResponseCodec; +use libp2p::request_response; use serde::{ Deserialize, Serialize, @@ -68,7 +68,7 @@ impl PostcardCodec { /// If the substream was not properly closed when dropped, the sender would instead /// run into a timeout waiting for the response. #[async_trait] -impl RequestResponseCodec for PostcardCodec { +impl request_response::Codec for PostcardCodec { type Protocol = MessageExchangePostcardProtocol; type Request = RequestMessage; type Response = ResponseMessage; @@ -161,7 +161,7 @@ impl GossipsubCodec for PostcardCodec { } impl NetworkCodec for PostcardCodec { - fn get_req_res_protocol(&self) -> ::Protocol { + fn get_req_res_protocol(&self) -> ::Protocol { MessageExchangePostcardProtocol {} } } diff --git a/crates/services/p2p/src/config.rs b/crates/services/p2p/src/config.rs index 242e208abe3..1b333c87586 100644 --- a/crates/services/p2p/src/config.rs +++ b/crates/services/p2p/src/config.rs @@ -1,6 +1,6 @@ use crate::{ gossipsub::config::default_gossipsub_config, - heartbeat::HeartbeatConfig, + heartbeat, peer_manager::ConnectionState, TryPeerId, }; @@ -11,17 +11,17 @@ use libp2p::{ muxing::StreamMuxerBox, transport::Boxed, }, - gossipsub::Config as GossipsubConfig, + gossipsub, identity::{ secp256k1, Keypair, }, - noise::Config as NoiseConfig, + noise, tcp::{ - tokio::Transport as TokioTcpTransport, - Config as TcpConfig, + self, + tokio, }, - yamux::Config as YamuxConfig, + yamux, Multiaddr, PeerId, Transport, @@ -119,9 +119,9 @@ pub struct Config { pub info_interval: Option, // `Gossipsub` config - pub gossipsub_config: GossipsubConfig, + pub gossipsub_config: gossipsub::Config, - pub heartbeat_config: HeartbeatConfig, + pub heartbeat_config: heartbeat::Config, // RequestResponse related fields /// Sets the timeout for inbound and outbound requests. @@ -224,7 +224,7 @@ impl Config { reserved_nodes: vec![], reserved_nodes_only_mode: false, gossipsub_config: default_gossipsub_config(), - heartbeat_config: HeartbeatConfig::default(), + heartbeat_config: heartbeat::Config::default(), set_request_timeout: REQ_RES_TIMEOUT, set_connection_keep_alive: REQ_RES_TIMEOUT, heartbeat_check_interval: Duration::from_secs(10), @@ -262,7 +262,7 @@ pub(crate) fn build_transport_function( let transport_function = move |keypair: &Keypair| { let transport = { let generate_tcp_transport = || { - TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true)) + tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true)) }; let tcp = generate_tcp_transport(); @@ -275,12 +275,12 @@ pub(crate) fn build_transport_function( .upgrade(libp2p::core::upgrade::Version::V1Lazy); let noise_authenticated = - NoiseConfig::new(keypair).expect("Noise key generation failed"); + noise::Config::new(keypair).expect("Noise key generation failed"); let multiplex_config = { let mplex_config = MplexConfig::default(); - let mut yamux_config = YamuxConfig::default(); + let mut yamux_config = yamux::Config::default(); // TODO: remove deprecated method call https://github.com/FuelLabs/fuel-core/issues/1592 #[allow(deprecated)] yamux_config.set_max_buffer_size(MAX_RESPONSE_SIZE); diff --git a/crates/services/p2p/src/config/fuel_authenticated.rs b/crates/services/p2p/src/config/fuel_authenticated.rs index 912c2a9be12..2f92e0a9ac4 100644 --- a/crates/services/p2p/src/config/fuel_authenticated.rs +++ b/crates/services/p2p/src/config/fuel_authenticated.rs @@ -7,6 +7,7 @@ use futures::{ TryFutureExt, }; use libp2p::{ + self, core::{ upgrade::{ InboundConnectionUpgrade, @@ -14,11 +15,7 @@ use libp2p::{ }, UpgradeInfo, }, - noise::{ - Config as NoiseConfig, - Error as NoiseError, - Output as NoiseOutput, - }, + noise, PeerId, }; use std::pin::Pin; @@ -30,14 +27,14 @@ pub(crate) trait Approver { #[derive(Clone)] pub(crate) struct FuelAuthenticated { - noise_authenticated: NoiseConfig, + noise_authenticated: noise::Config, approver: A, checksum: Checksum, } impl FuelAuthenticated { pub(crate) fn new( - noise_authenticated: NoiseConfig, + noise_authenticated: noise::Config, approver: A, checksum: Checksum, ) -> Self { @@ -69,8 +66,8 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, A: Approver + Send + 'static, { - type Output = (PeerId, NoiseOutput); - type Error = NoiseError; + type Output = (PeerId, noise::Output); + type Error = noise::Error; type Future = Pin> + Send>>; fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future { @@ -81,7 +78,7 @@ where if self.approver.allow_peer(&remote_peer_id) { future::ok((remote_peer_id, io)) } else { - future::err(NoiseError::AuthenticationFailed) + future::err(noise::Error::AuthenticationFailed) } }), ) @@ -93,8 +90,8 @@ where T: AsyncRead + AsyncWrite + Unpin + Send + 'static, A: Approver + Send + 'static, { - type Output = (PeerId, NoiseOutput); - type Error = NoiseError; + type Output = (PeerId, noise::Output); + type Error = noise::Error; type Future = Pin> + Send>>; fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future { @@ -105,7 +102,7 @@ where if self.approver.allow_peer(&remote_peer_id) { future::ok((remote_peer_id, io)) } else { - future::err(NoiseError::AuthenticationFailed) + future::err(noise::Error::AuthenticationFailed) } }), ) diff --git a/crates/services/p2p/src/discovery.rs b/crates/services/p2p/src/discovery.rs index 2a5d832f933..46d8661a18e 100644 --- a/crates/services/p2p/src/discovery.rs +++ b/crates/services/p2p/src/discovery.rs @@ -1,13 +1,12 @@ -use self::mdns::MdnsWrapper; +use self::mdns_wrapper::MdnsWrapper; use futures::FutureExt; use libp2p::{ core::Endpoint, kad::{ + self, store::MemoryStore, - Behaviour as KademliaBehavior, - Event, }, - mdns::Event as MdnsEvent, + mdns, swarm::{ derive_prelude::{ ConnectionClosed, @@ -39,13 +38,15 @@ use std::{ }; use tracing::trace; mod discovery_config; -mod mdns; -pub use discovery_config::DiscoveryConfig; +mod mdns_wrapper; +pub use discovery_config::Config; const SIXTY_SECONDS: Duration = Duration::from_secs(60); +pub type Event = kad::Event; + /// NetworkBehavior for discovery of nodes -pub struct DiscoveryBehaviour { +pub struct Behaviour { /// Track the connected peers connected_peers: HashSet, @@ -53,7 +54,7 @@ pub struct DiscoveryBehaviour { mdns: MdnsWrapper, /// Kademlia with MemoryStore - kademlia: KademliaBehavior, + kademlia: kad::Behaviour, /// If enabled, the Stream that will fire after the delay expires, /// starting new random walk @@ -66,17 +67,17 @@ pub struct DiscoveryBehaviour { max_peers_connected: usize, } -impl DiscoveryBehaviour { +impl Behaviour { /// Adds a known listen address of a peer participating in the DHT to the routing table. pub fn add_address(&mut self, peer_id: &PeerId, address: Multiaddr) { self.kademlia.add_address(peer_id, address); } } -impl NetworkBehaviour for DiscoveryBehaviour { +impl NetworkBehaviour for Behaviour { type ConnectionHandler = - as NetworkBehaviour>::ConnectionHandler; - type ToSwarm = Event; + as NetworkBehaviour>::ConnectionHandler; + type ToSwarm = kad::Event; fn handle_established_inbound_connection( &mut self, @@ -203,7 +204,7 @@ impl NetworkBehaviour for DiscoveryBehaviour { while let Poll::Ready(mdns_event) = self.mdns.poll(cx) { match mdns_event { - ToSwarm::GenerateEvent(MdnsEvent::Discovered(list)) => { + ToSwarm::GenerateEvent(mdns::Event::Discovered(list)) => { for (peer_id, multiaddr) in list { self.kademlia.add_address(&peer_id, multiaddr); } @@ -227,9 +228,9 @@ impl NetworkBehaviour for DiscoveryBehaviour { #[cfg(test)] mod tests { use super::{ - DiscoveryBehaviour, - DiscoveryConfig, - Event as KademliaEvent, + Behaviour, + Config, + Event, }; use futures::{ future::poll_fn, @@ -260,12 +261,10 @@ mod tests { fn build_behavior_fn( bootstrap_nodes: Vec, - ) -> impl FnOnce(Keypair) -> DiscoveryBehaviour { + ) -> impl FnOnce(Keypair) -> Behaviour { |keypair| { - let mut config = DiscoveryConfig::new( - keypair.public().to_peer_id(), - "test_network".into(), - ); + let mut config = + Config::new(keypair.public().to_peer_id(), "test_network".into()); config .max_peers_connected(MAX_PEERS) .with_bootstrap_nodes(bootstrap_nodes) @@ -278,7 +277,7 @@ mod tests { /// helper function for building Discovery Behaviour for testing fn build_fuel_discovery( bootstrap_nodes: Vec, - ) -> (Swarm, Multiaddr, PeerId) { + ) -> (Swarm, Multiaddr, PeerId) { let behaviour_fn = build_behavior_fn(bootstrap_nodes); let listen_addr: Multiaddr = Protocol::Memory(rand::random::()).into(); @@ -354,7 +353,7 @@ mod tests { // if peer has connected - remove it from the set left_to_discover[swarm_index].remove(&peer_id); } - SwarmEvent::Behaviour(KademliaEvent::UnroutablePeer { + SwarmEvent::Behaviour(Event::UnroutablePeer { peer: peer_id, }) => { // kademlia discovered a peer but does not have it's address diff --git a/crates/services/p2p/src/discovery/discovery_config.rs b/crates/services/p2p/src/discovery/discovery_config.rs index ff6cb479808..34eca530bc2 100644 --- a/crates/services/p2p/src/discovery/discovery_config.rs +++ b/crates/services/p2p/src/discovery/discovery_config.rs @@ -1,15 +1,14 @@ use crate::{ discovery::{ - mdns::MdnsWrapper, - DiscoveryBehaviour, + mdns_wrapper::MdnsWrapper, + Behaviour, }, TryPeerId, }; use libp2p::{ kad::{ + self, store::MemoryStore, - Behaviour as KademliaBehaviour, - Config as KademliaConfig, Mode, }, swarm::StreamProtocol, @@ -23,7 +22,7 @@ use std::{ use tracing::warn; #[derive(Clone, Debug)] -pub struct DiscoveryConfig { +pub struct Config { local_peer_id: PeerId, bootstrap_nodes: Vec, reserved_nodes: Vec, @@ -35,7 +34,7 @@ pub struct DiscoveryConfig { connection_idle_timeout: Duration, } -impl DiscoveryConfig { +impl Config { pub fn new(local_peer_id: PeerId, network_name: String) -> Self { Self { local_peer_id, @@ -98,8 +97,8 @@ impl DiscoveryConfig { self } - pub fn finish(self) -> DiscoveryBehaviour { - let DiscoveryConfig { + pub fn finish(self) -> Behaviour { + let Config { local_peer_id, bootstrap_nodes, network_name, @@ -111,14 +110,14 @@ impl DiscoveryConfig { // kademlia setup let memory_store = MemoryStore::new(local_peer_id.to_owned()); - let mut kademlia_config = KademliaConfig::default(); + let mut kademlia_config = kad::Config::default(); let network = format!("/fuel/kad/{network_name}/kad/1.0.0"); kademlia_config.set_protocol_names(vec![ StreamProtocol::try_from_owned(network).expect("Invalid kad protocol") ]); let mut kademlia = - KademliaBehaviour::with_config(local_peer_id, memory_store, kademlia_config); + kad::Behaviour::with_config(local_peer_id, memory_store, kademlia_config); kademlia.set_mode(Some(Mode::Server)); // bootstrap nodes need to have their peer_id defined in the Multiaddr @@ -168,7 +167,7 @@ impl DiscoveryConfig { MdnsWrapper::disabled() }; - DiscoveryBehaviour { + Behaviour { connected_peers: HashSet::new(), kademlia, next_kad_random_walk, diff --git a/crates/services/p2p/src/discovery/mdns.rs b/crates/services/p2p/src/discovery/mdns_wrapper.rs similarity index 96% rename from crates/services/p2p/src/discovery/mdns.rs rename to crates/services/p2p/src/discovery/mdns_wrapper.rs index 0debed57e38..2b38b703759 100644 --- a/crates/services/p2p/src/discovery/mdns.rs +++ b/crates/services/p2p/src/discovery/mdns_wrapper.rs @@ -2,9 +2,8 @@ use crate::Multiaddr; use libp2p::{ core::Endpoint, mdns::{ + self, tokio::Behaviour as TokioMdns, - Config, - Event as MdnsEvent, }, swarm::{ dummy, @@ -33,7 +32,7 @@ pub enum MdnsWrapper { impl MdnsWrapper { pub fn new(peer_id: PeerId) -> Self { - match TokioMdns::new(Config::default(), peer_id) { + match TokioMdns::new(mdns::Config::default(), peer_id) { Ok(mdns) => Self::Ready(mdns), Err(err) => { warn!("Failed to initialize mDNS: {:?}", err); @@ -64,7 +63,7 @@ impl MdnsWrapper { impl NetworkBehaviour for MdnsWrapper { type ConnectionHandler = dummy::ConnectionHandler; - type ToSwarm = MdnsEvent; + type ToSwarm = mdns::Event; fn handle_established_inbound_connection( &mut self, diff --git a/crates/services/p2p/src/gossipsub/config.rs b/crates/services/p2p/src/gossipsub/config.rs index 334392c669b..7c816c47fe1 100644 --- a/crates/services/p2p/src/gossipsub/config.rs +++ b/crates/services/p2p/src/gossipsub/config.rs @@ -7,10 +7,7 @@ use crate::{ }; use fuel_core_metrics::p2p_metrics::p2p_metrics; use libp2p::gossipsub::{ - Behaviour as Gossipsub, - Config as GossipsubConfig, - ConfigBuilder as GossipsubConfigBuilder, - Message as GossipsubMessage, + self, MessageAuthenticity, MessageId, MetricsConfig, @@ -58,12 +55,12 @@ const NEW_TX_GOSSIP_WEIGHT: f64 = 0.05; pub const GRAYLIST_THRESHOLD: f64 = -16000.0; /// Creates `GossipsubConfigBuilder` with few of the Gossipsub values already defined -pub fn default_gossipsub_builder() -> GossipsubConfigBuilder { - let gossip_message_id = move |message: &GossipsubMessage| { +pub fn default_gossipsub_builder() -> gossipsub::ConfigBuilder { + let gossip_message_id = move |message: &gossipsub::Message| { MessageId::from(&Sha256::digest(&message.data)[..]) }; - let mut builder = GossipsubConfigBuilder::default(); + let mut builder = gossipsub::ConfigBuilder::default(); builder .protocol_id_prefix("/meshsub/1.0.0") @@ -75,7 +72,7 @@ pub fn default_gossipsub_builder() -> GossipsubConfigBuilder { /// Builds a default `GossipsubConfig`. /// Used in testing. -pub(crate) fn default_gossipsub_config() -> GossipsubConfig { +pub(crate) fn default_gossipsub_config() -> gossipsub::Config { default_gossipsub_builder() .mesh_n(MESH_SIZE) .mesh_n_low(6) @@ -175,14 +172,14 @@ fn initialize_peer_score_thresholds() -> PeerScoreThresholds { } /// Given a `P2pConfig` containing `GossipsubConfig` creates a Gossipsub Behaviour -pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { +pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> gossipsub::Behaviour { let mut gossipsub = if p2p_config.metrics { // Move to Metrics related feature flag let mut p2p_registry = prometheus_client::registry::Registry::default(); let metrics_config = MetricsConfig::default(); - let mut gossipsub = Gossipsub::new_with_metrics( + let mut gossipsub = gossipsub::Behaviour::new_with_metrics( MessageAuthenticity::Signed(p2p_config.keypair.clone()), p2p_config.gossipsub_config.clone(), &mut p2p_registry, @@ -200,7 +197,7 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { gossipsub } else { - let mut gossipsub = Gossipsub::new( + let mut gossipsub = gossipsub::Behaviour::new( MessageAuthenticity::Signed(p2p_config.keypair.clone()), p2p_config.gossipsub_config.clone(), ) @@ -221,7 +218,7 @@ pub(crate) fn build_gossipsub_behaviour(p2p_config: &Config) -> Gossipsub { gossipsub } -fn initialize_gossipsub(gossipsub: &mut Gossipsub, p2p_config: &Config) { +fn initialize_gossipsub(gossipsub: &mut gossipsub::Behaviour, p2p_config: &Config) { let peer_score_thresholds = initialize_peer_score_thresholds(); let peer_score_params = initialize_peer_score_params(&peer_score_thresholds); diff --git a/crates/services/p2p/src/heartbeat.rs b/crates/services/p2p/src/heartbeat.rs index e36f66a7dd3..5e0f4d4544a 100644 --- a/crates/services/p2p/src/heartbeat.rs +++ b/crates/services/p2p/src/heartbeat.rs @@ -1,6 +1,6 @@ use crate::Multiaddr; use fuel_core_types::fuel_types::BlockHeight; -pub use handler::HeartbeatConfig; +pub use handler::Config; use handler::{ HeartbeatHandler, HeartbeatInEvent, @@ -32,7 +32,7 @@ pub const HEARTBEAT_PROTOCOL: &str = "/fuel/heartbeat/0.0.1"; #[derive(Debug, Clone)] enum HeartbeatAction { - HeartbeatEvent(HeartbeatEvent), + HeartbeatEvent(Event), BlockHeightRequest { peer_id: PeerId, connection_id: ConnectionId, @@ -41,7 +41,7 @@ enum HeartbeatAction { } impl HeartbeatAction { - fn build(self) -> ToSwarm { + fn build(self) -> ToSwarm { match self { Self::HeartbeatEvent(event) => ToSwarm::GenerateEvent(event), Self::BlockHeightRequest { @@ -58,20 +58,20 @@ impl HeartbeatAction { } #[derive(Debug, Clone, Copy)] -pub struct HeartbeatEvent { +pub struct Event { pub peer_id: PeerId, pub latest_block_height: BlockHeight, } #[derive(Debug, Clone)] -pub struct Heartbeat { - config: HeartbeatConfig, +pub struct Behaviour { + config: Config, pending_events: VecDeque, current_block_height: BlockHeight, } -impl Heartbeat { - pub fn new(config: HeartbeatConfig, block_height: BlockHeight) -> Self { +impl Behaviour { + pub fn new(config: Config, block_height: BlockHeight) -> Self { Self { config, pending_events: VecDeque::default(), @@ -84,9 +84,9 @@ impl Heartbeat { } } -impl NetworkBehaviour for Heartbeat { +impl NetworkBehaviour for Behaviour { type ConnectionHandler = HeartbeatHandler; - type ToSwarm = HeartbeatEvent; + type ToSwarm = Event; fn handle_established_inbound_connection( &mut self, @@ -119,7 +119,7 @@ impl NetworkBehaviour for Heartbeat { match event { HeartbeatOutEvent::BlockHeight(latest_block_height) => self .pending_events - .push_back(HeartbeatAction::HeartbeatEvent(HeartbeatEvent { + .push_back(HeartbeatAction::HeartbeatEvent(Event { peer_id, latest_block_height, })), diff --git a/crates/services/p2p/src/heartbeat/handler.rs b/crates/services/p2p/src/heartbeat/handler.rs index 436c35b947d..6f7cd8fb899 100644 --- a/crates/services/p2p/src/heartbeat/handler.rs +++ b/crates/services/p2p/src/heartbeat/handler.rs @@ -46,7 +46,7 @@ pub enum HeartbeatOutEvent { } #[derive(Debug, Clone)] -pub struct HeartbeatConfig { +pub struct Config { /// Sending of `BlockHeight` should not take longer than this send_timeout: Duration, /// Idle time before sending next `BlockHeight` @@ -56,7 +56,7 @@ pub struct HeartbeatConfig { max_failures: NonZeroU32, } -impl HeartbeatConfig { +impl Config { pub fn new( send_timeout: Duration, idle_timeout: Duration, @@ -70,7 +70,7 @@ impl HeartbeatConfig { } } -impl Default for HeartbeatConfig { +impl Default for Config { fn default() -> Self { Self::new( Duration::from_secs(60), @@ -84,7 +84,7 @@ type InboundData = BoxFuture<'static, Result<(Stream, BlockHeight), std::io::Err type OutboundData = BoxFuture<'static, Result>; pub struct HeartbeatHandler { - config: HeartbeatConfig, + config: Config, inbound: Option, outbound: Option, timer: Pin>, @@ -92,7 +92,7 @@ pub struct HeartbeatHandler { } impl HeartbeatHandler { - pub fn new(config: HeartbeatConfig) -> Self { + pub fn new(config: Config) -> Self { Self { config, inbound: None, diff --git a/crates/services/p2p/src/lib.rs b/crates/services/p2p/src/lib.rs index 30efbe263ec..58f870e5cd6 100644 --- a/crates/services/p2p/src/lib.rs +++ b/crates/services/p2p/src/lib.rs @@ -15,7 +15,7 @@ pub mod request_response; pub mod service; pub use gossipsub::config as gossipsub_config; -pub use heartbeat::HeartbeatConfig; +pub use heartbeat::Config; pub use libp2p::{ multiaddr::Protocol, diff --git a/crates/services/p2p/src/p2p_service.rs b/crates/services/p2p/src/p2p_service.rs index afb2c9cf8e6..9f606582385 100644 --- a/crates/services/p2p/src/p2p_service.rs +++ b/crates/services/p2p/src/p2p_service.rs @@ -18,7 +18,7 @@ use crate::{ }, topics::GossipsubTopics, }, - heartbeat::HeartbeatEvent, + heartbeat, peer_manager::{ PeerManager, Punisher, @@ -41,7 +41,7 @@ use fuel_core_types::{ use futures::prelude::*; use libp2p::{ gossipsub::{ - Event as GossipsubEvent, + self, MessageAcceptance, MessageId, PublishError, @@ -50,9 +50,8 @@ use libp2p::{ identify, multiaddr::Protocol, request_response::{ - Event as RequestResponseEvent, + self, InboundRequestId, - Message as RequestResponseMessage, OutboundRequestId, ResponseChannel, }, @@ -453,8 +452,11 @@ impl FuelP2PService { } } - fn handle_gossipsub_event(&mut self, event: GossipsubEvent) -> Option { - if let GossipsubEvent::Message { + fn handle_gossipsub_event( + &mut self, + event: gossipsub::Event, + ) -> Option { + if let gossipsub::Event::Message { propagation_source, message, message_id, @@ -538,11 +540,11 @@ impl FuelP2PService { fn handle_request_response_event( &mut self, - event: RequestResponseEvent, + event: request_response::Event, ) -> Option { match event { - RequestResponseEvent::Message { peer, message } => match message { - RequestResponseMessage::Request { + request_response::Event::Message { peer, message } => match message { + request_response::Message::Request { request, channel, request_id, @@ -554,7 +556,7 @@ impl FuelP2PService { request_message: request, }) } - RequestResponseMessage::Response { + request_response::Message::Response { request_id, response, } => { @@ -591,14 +593,14 @@ impl FuelP2PService { } } }, - RequestResponseEvent::InboundFailure { + request_response::Event::InboundFailure { peer, error, request_id, } => { tracing::error!("RequestResponse inbound error for peer: {:?} with id: {:?} and error: {:?}", peer, request_id, error); } - RequestResponseEvent::OutboundFailure { + request_response::Event::OutboundFailure { peer, error, request_id, @@ -651,8 +653,11 @@ impl FuelP2PService { None } - fn handle_heartbeat_event(&mut self, event: HeartbeatEvent) -> Option { - let HeartbeatEvent { + fn handle_heartbeat_event( + &mut self, + event: heartbeat::Event, + ) -> Option { + let heartbeat::Event { peer_id, latest_block_height, } = event; diff --git a/crates/services/p2p/src/peer_report.rs b/crates/services/p2p/src/peer_report.rs index ed4b8ee38df..5b33fc3f28d 100644 --- a/crates/services/p2p/src/peer_report.rs +++ b/crates/services/p2p/src/peer_report.rs @@ -1,18 +1,15 @@ -use crate::{ - config::Config, - heartbeat::Heartbeat, -}; +use crate::config::Config; use libp2p::{ self, core::Endpoint, - identify::Behaviour as Identify, + identify, swarm::{ derive_prelude::{ ConnectionClosed, ConnectionEstablished, FromSwarm, }, - dummy::ConnectionHandler as DummyConnectionHandler, + dummy, ConnectionDenied, ConnectionId, NetworkBehaviour, @@ -57,14 +54,14 @@ pub enum PeerReportEvent { } // `Behaviour` that reports events about peers -pub struct PeerReportBehaviour { +pub struct Behaviour { pending_events: VecDeque, // regulary checks if reserved nodes are connected health_check: Interval, decay_interval: Interval, } -impl PeerReportBehaviour { +impl Behaviour { pub(crate) fn new(_config: &Config) -> Self { Self { pending_events: VecDeque::default(), @@ -78,8 +75,8 @@ impl PeerReportBehaviour { } } -impl NetworkBehaviour for PeerReportBehaviour { - type ConnectionHandler = DummyConnectionHandler; +impl NetworkBehaviour for Behaviour { + type ConnectionHandler = dummy::ConnectionHandler; type ToSwarm = PeerReportEvent; fn handle_established_inbound_connection( @@ -89,7 +86,7 @@ impl NetworkBehaviour for PeerReportBehaviour { _local_addr: &Multiaddr, _remote_addr: &Multiaddr, ) -> Result, ConnectionDenied> { - Ok(DummyConnectionHandler) + Ok(dummy::ConnectionHandler) } fn handle_established_outbound_connection( @@ -99,7 +96,7 @@ impl NetworkBehaviour for PeerReportBehaviour { _addr: &Multiaddr, _role_override: Endpoint, ) -> Result, ConnectionDenied> { - Ok(DummyConnectionHandler) + Ok(dummy::ConnectionHandler) } fn on_swarm_event(&mut self, event: FromSwarm) { @@ -170,8 +167,8 @@ trait FromAction: NetworkBehaviour { ) -> Option>>; } -impl FromSwarmEvent for Heartbeat {} -impl FromSwarmEvent for Identify {} +impl FromSwarmEvent for Behaviour {} +impl FromSwarmEvent for identify::Behaviour {} trait FromSwarmEvent: NetworkBehaviour { fn handle_swarm_event(&mut self, event: &FromSwarm) { diff --git a/crates/services/p2p/src/service.rs b/crates/services/p2p/src/service.rs index 46b2f8c333e..294fb974eae 100644 --- a/crates/services/p2p/src/service.rs +++ b/crates/services/p2p/src/service.rs @@ -938,10 +938,7 @@ pub mod tests { } fn get_all_peer_info(&self) -> Vec<(&PeerId, &PeerInfo)> { - self.peer_info - .iter() - .map(|(peer_id, peer_info)| (peer_id, peer_info)) - .collect() + self.peer_info.iter().map(|tup| (&tup.0, &tup.1)).collect() } fn get_peer_id_with_height(&self, _height: &BlockHeight) -> Option {