Skip to content

Commit

Permalink
Unify namespacing in P2P codebase (#1597)
Browse files Browse the repository at this point in the history
Closes #1556
  • Loading branch information
MitchTurner committed Jan 22, 2024
1 parent 7de49ae commit 1965f9d
Show file tree
Hide file tree
Showing 18 changed files with 142 additions and 152 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Changed

- [#1597](https://github.com/FuelLabs/fuel-core/pull/1597): Unify namespacing for `libp2p` modules
- [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly.
- [#1590](https://github.com/FuelLabs/fuel-core/pull/1590): Use `AtomicView` in the `TxPool` to read the state of the database during insertion of the transactions.
- [#1587](https://github.com/FuelLabs/fuel-core/pull/1587): Use `BlockHeight` as a primary key for the `FuelsBlock` table.
Expand Down
2 changes: 1 addition & 1 deletion benches/src/bin/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ fn decode_input(line: &str) -> Option<Output> {
},
_ => return None,
};
let throughput = if let Some(t) = val.get("throughput")?.as_array()?.get(0) {
let throughput = if let Some(t) = val.get("throughput")?.as_array()?.first() {
Some(t.as_object()?.get("per_iteration")?.as_u64()?)
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use fuel_core::{
MAX_RESPONSE_SIZE,
},
gossipsub_config::default_gossipsub_builder,
HeartbeatConfig,
heartbeat,
Multiaddr,
},
types::{
Expand Down Expand Up @@ -276,7 +276,7 @@ impl P2PArgs {
let heartbeat_config = {
let send_duration = Duration::from_secs(self.heartbeat_send_duration);
let idle_duration = Duration::from_secs(self.heartbeat_idle_duration);
HeartbeatConfig::new(
heartbeat::Config::new(
send_duration,
idle_duration,
self.heartbeat_max_failures,
Expand Down
37 changes: 18 additions & 19 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@ use crate::{
NetworkCodec,
},
config::Config,
discovery::{
DiscoveryBehaviour,
DiscoveryConfig,
},
discovery,
gossipsub::{
config::build_gossipsub_behaviour,
topics::GossipTopic,
},
heartbeat,
peer_report::PeerReportBehaviour,
peer_report,
request_response::messages::{
RequestMessage,
ResponseMessage,
Expand All @@ -23,15 +20,14 @@ use fuel_core_types::fuel_types::BlockHeight;
use libp2p::{
allow_block_list,
gossipsub::{
Behaviour as Gossipsub,
self,
MessageAcceptance,
MessageId,
PublishError,
},
identify,
request_response::{
Behaviour as RequestResponse,
Config as RequestResponseConfig,
self,
OutboundRequestId,
ProtocolSupport,
ResponseChannel,
Expand All @@ -50,22 +46,22 @@ pub struct FuelBehaviour {
blocked_peer: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,

/// Message propagation for p2p
gossipsub: Gossipsub,
gossipsub: gossipsub::Behaviour,

/// Handles regular heartbeats from peers
heartbeat: heartbeat::Heartbeat,
heartbeat: heartbeat::Behaviour,

/// The Behaviour to identify peers.
identify: identify::Behaviour,

/// Identifies and periodically requests `BlockHeight` from connected nodes
peer_report: PeerReportBehaviour,
peer_report: peer_report::Behaviour,

/// Node discovery
discovery: DiscoveryBehaviour,
discovery: discovery::Behaviour,

/// RequestResponse protocol
request_response: RequestResponse<PostcardCodec>,
request_response: request_response::Behaviour<PostcardCodec>,
}

impl FuelBehaviour {
Expand All @@ -75,7 +71,7 @@ impl FuelBehaviour {

let discovery_config = {
let mut discovery_config =
DiscoveryConfig::new(local_peer_id, p2p_config.network_name.clone());
discovery::Config::new(local_peer_id, p2p_config.network_name.clone());

discovery_config
.enable_mdns(p2p_config.enable_mdns)
Expand All @@ -97,7 +93,7 @@ impl FuelBehaviour {

let gossipsub = build_gossipsub_behaviour(p2p_config);

let peer_report = PeerReportBehaviour::new(p2p_config);
let peer_report = peer_report::Behaviour::new(p2p_config);

let identify = {
let identify_config = identify::Config::new(
Expand All @@ -111,21 +107,24 @@ impl FuelBehaviour {
}
};

let heartbeat = heartbeat::Heartbeat::new(
let heartbeat = heartbeat::Behaviour::new(
p2p_config.heartbeat_config.clone(),
BlockHeight::default(),
);

let req_res_protocol =
core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full));

let req_res_config = RequestResponseConfig::default();
let req_res_config = request_response::Config::default();
req_res_config
.clone()
.with_request_timeout(p2p_config.set_request_timeout);

let request_response =
RequestResponse::with_codec(codec, req_res_protocol, req_res_config);
let request_response = request_response::Behaviour::with_codec(
codec,
req_res_protocol,
req_res_config,
);

Self {
discovery: discovery_config.finish(),
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
ResponseMessage,
},
};
use libp2p::request_response::Codec as RequestResponseCodec;
use libp2p::request_response;
use std::io;

/// Implement this in order to handle serialization & deserialization of Gossipsub messages
Expand All @@ -34,12 +34,12 @@ pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + RequestResponseCodec<Request = RequestMessage, Response = ResponseMessage>
> + request_response::Codec<Request = RequestMessage, Response = ResponseMessage>
+ Clone
+ Send
+ 'static
{
/// Returns RequestResponse's Protocol
/// Needed for initialization of RequestResponse Behaviour
fn get_req_res_protocol(&self) -> <Self as RequestResponseCodec>::Protocol;
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol;
}
6 changes: 3 additions & 3 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{
AsyncReadExt,
AsyncWriteExt,
};
use libp2p::request_response::Codec as RequestResponseCodec;
use libp2p::request_response;
use serde::{
Deserialize,
Serialize,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl PostcardCodec {
/// If the substream was not properly closed when dropped, the sender would instead
/// run into a timeout waiting for the response.
#[async_trait]
impl RequestResponseCodec for PostcardCodec {
impl request_response::Codec for PostcardCodec {
type Protocol = MessageExchangePostcardProtocol;
type Request = RequestMessage;
type Response = ResponseMessage;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl GossipsubCodec for PostcardCodec {
}

impl NetworkCodec for PostcardCodec {
fn get_req_res_protocol(&self) -> <Self as RequestResponseCodec>::Protocol {
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol {
MessageExchangePostcardProtocol {}
}
}
Expand Down
24 changes: 12 additions & 12 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
gossipsub::config::default_gossipsub_config,
heartbeat::HeartbeatConfig,
heartbeat,
peer_manager::ConnectionState,
TryPeerId,
};
Expand All @@ -11,17 +11,17 @@ use libp2p::{
muxing::StreamMuxerBox,
transport::Boxed,
},
gossipsub::Config as GossipsubConfig,
gossipsub,
identity::{
secp256k1,
Keypair,
},
noise::Config as NoiseConfig,
noise,
tcp::{
tokio::Transport as TokioTcpTransport,
Config as TcpConfig,
self,
tokio,
},
yamux::Config as YamuxConfig,
yamux,
Multiaddr,
PeerId,
Transport,
Expand Down Expand Up @@ -119,9 +119,9 @@ pub struct Config<State = Initialized> {
pub info_interval: Option<Duration>,

// `Gossipsub` config
pub gossipsub_config: GossipsubConfig,
pub gossipsub_config: gossipsub::Config,

pub heartbeat_config: HeartbeatConfig,
pub heartbeat_config: heartbeat::Config,

// RequestResponse related fields
/// Sets the timeout for inbound and outbound requests.
Expand Down Expand Up @@ -224,7 +224,7 @@ impl Config<NotInitialized> {
reserved_nodes: vec![],
reserved_nodes_only_mode: false,
gossipsub_config: default_gossipsub_config(),
heartbeat_config: HeartbeatConfig::default(),
heartbeat_config: heartbeat::Config::default(),
set_request_timeout: REQ_RES_TIMEOUT,
set_connection_keep_alive: REQ_RES_TIMEOUT,
heartbeat_check_interval: Duration::from_secs(10),
Expand Down Expand Up @@ -262,7 +262,7 @@ pub(crate) fn build_transport_function(
let transport_function = move |keypair: &Keypair| {
let transport = {
let generate_tcp_transport = || {
TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true))
tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true))
};

let tcp = generate_tcp_transport();
Expand All @@ -275,12 +275,12 @@ pub(crate) fn build_transport_function(
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

let noise_authenticated =
NoiseConfig::new(keypair).expect("Noise key generation failed");
noise::Config::new(keypair).expect("Noise key generation failed");

let multiplex_config = {
let mplex_config = MplexConfig::default();

let mut yamux_config = YamuxConfig::default();
let mut yamux_config = yamux::Config::default();
// TODO: remove deprecated method call https://github.com/FuelLabs/fuel-core/issues/1592
#[allow(deprecated)]
yamux_config.set_max_buffer_size(MAX_RESPONSE_SIZE);
Expand Down
23 changes: 10 additions & 13 deletions crates/services/p2p/src/config/fuel_authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@ use futures::{
TryFutureExt,
};
use libp2p::{
self,
core::{
upgrade::{
InboundConnectionUpgrade,
OutboundConnectionUpgrade,
},
UpgradeInfo,
},
noise::{
Config as NoiseConfig,
Error as NoiseError,
Output as NoiseOutput,
},
noise,
PeerId,
};
use std::pin::Pin;
Expand All @@ -30,14 +27,14 @@ pub(crate) trait Approver {

#[derive(Clone)]
pub(crate) struct FuelAuthenticated<A: Approver> {
noise_authenticated: NoiseConfig,
noise_authenticated: noise::Config,
approver: A,
checksum: Checksum,
}

impl<A: Approver> FuelAuthenticated<A> {
pub(crate) fn new(
noise_authenticated: NoiseConfig,
noise_authenticated: noise::Config,
approver: A,
checksum: Checksum,
) -> Self {
Expand Down Expand Up @@ -69,8 +66,8 @@ where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Approver + Send + 'static,
{
type Output = (PeerId, NoiseOutput<T>);
type Error = NoiseError;
type Output = (PeerId, noise::Output<T>);
type Error = noise::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future {
Expand All @@ -81,7 +78,7 @@ where
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(NoiseError::AuthenticationFailed)
future::err(noise::Error::AuthenticationFailed)
}
}),
)
Expand All @@ -93,8 +90,8 @@ where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Approver + Send + 'static,
{
type Output = (PeerId, NoiseOutput<T>);
type Error = NoiseError;
type Output = (PeerId, noise::Output<T>);
type Error = noise::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future {
Expand All @@ -105,7 +102,7 @@ where
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(NoiseError::AuthenticationFailed)
future::err(noise::Error::AuthenticationFailed)
}
}),
)
Expand Down

0 comments on commit 1965f9d

Please sign in to comment.