Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify namespacing in P2P codebase #1597

Merged
merged 5 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ Description of the upcoming release here.

### Changed

- [#1597](https://github.com/FuelLabs/fuel-core/pull/1597): Unify namespacing for `libp2p` modules
- [#1591](https://github.com/FuelLabs/fuel-core/pull/1591): Simplify libp2p dependencies and not depend on all sub modules directly.
- [#1590](https://github.com/FuelLabs/fuel-core/pull/1590): Use `AtomicView` in the `TxPool` to read the state of the database during insertion of the transactions.
- [#1587](https://github.com/FuelLabs/fuel-core/pull/1587): Use `BlockHeight` as a primary key for the `FuelsBlock` table.
Expand Down
2 changes: 1 addition & 1 deletion benches/src/bin/collect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ fn decode_input(line: &str) -> Option<Output> {
},
_ => return None,
};
let throughput = if let Some(t) = val.get("throughput")?.as_array()?.get(0) {
let throughput = if let Some(t) = val.get("throughput")?.as_array()?.first() {
Some(t.as_object()?.get("per_iteration")?.as_u64()?)
} else {
None
Expand Down
4 changes: 2 additions & 2 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use fuel_core::{
MAX_RESPONSE_SIZE,
},
gossipsub_config::default_gossipsub_builder,
HeartbeatConfig,
heartbeat,
Multiaddr,
},
types::{
Expand Down Expand Up @@ -276,7 +276,7 @@ impl P2PArgs {
let heartbeat_config = {
let send_duration = Duration::from_secs(self.heartbeat_send_duration);
let idle_duration = Duration::from_secs(self.heartbeat_idle_duration);
HeartbeatConfig::new(
heartbeat::Config::new(
send_duration,
idle_duration,
self.heartbeat_max_failures,
Expand Down
37 changes: 18 additions & 19 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,13 @@ use crate::{
NetworkCodec,
},
config::Config,
discovery::{
DiscoveryBehaviour,
DiscoveryConfig,
},
discovery,
gossipsub::{
config::build_gossipsub_behaviour,
topics::GossipTopic,
},
heartbeat,
peer_report::PeerReportBehaviour,
peer_report,
request_response::messages::{
RequestMessage,
ResponseMessage,
Expand All @@ -23,15 +20,14 @@ use fuel_core_types::fuel_types::BlockHeight;
use libp2p::{
allow_block_list,
gossipsub::{
Behaviour as Gossipsub,
self,
MessageAcceptance,
MessageId,
PublishError,
},
identify,
request_response::{
Behaviour as RequestResponse,
Config as RequestResponseConfig,
self,
OutboundRequestId,
ProtocolSupport,
ResponseChannel,
Expand All @@ -50,22 +46,22 @@ pub struct FuelBehaviour {
blocked_peer: allow_block_list::Behaviour<allow_block_list::BlockedPeers>,

/// Message propagation for p2p
gossipsub: Gossipsub,
gossipsub: gossipsub::Behaviour,

/// Handles regular heartbeats from peers
heartbeat: heartbeat::Heartbeat,
heartbeat: heartbeat::Behaviour,

/// The Behaviour to identify peers.
identify: identify::Behaviour,

/// Identifies and periodically requests `BlockHeight` from connected nodes
peer_report: PeerReportBehaviour,
peer_report: peer_report::Behaviour,

/// Node discovery
discovery: DiscoveryBehaviour,
discovery: discovery::Behaviour,

/// RequestResponse protocol
request_response: RequestResponse<PostcardCodec>,
request_response: request_response::Behaviour<PostcardCodec>,
}

impl FuelBehaviour {
Expand All @@ -75,7 +71,7 @@ impl FuelBehaviour {

let discovery_config = {
let mut discovery_config =
DiscoveryConfig::new(local_peer_id, p2p_config.network_name.clone());
discovery::Config::new(local_peer_id, p2p_config.network_name.clone());

discovery_config
.enable_mdns(p2p_config.enable_mdns)
Expand All @@ -97,7 +93,7 @@ impl FuelBehaviour {

let gossipsub = build_gossipsub_behaviour(p2p_config);

let peer_report = PeerReportBehaviour::new(p2p_config);
let peer_report = peer_report::Behaviour::new(p2p_config);

let identify = {
let identify_config = identify::Config::new(
Expand All @@ -111,21 +107,24 @@ impl FuelBehaviour {
}
};

let heartbeat = heartbeat::Heartbeat::new(
let heartbeat = heartbeat::Behaviour::new(
p2p_config.heartbeat_config.clone(),
BlockHeight::default(),
);

let req_res_protocol =
core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full));

let req_res_config = RequestResponseConfig::default();
let req_res_config = request_response::Config::default();
req_res_config
.clone()
.with_request_timeout(p2p_config.set_request_timeout);

let request_response =
RequestResponse::with_codec(codec, req_res_protocol, req_res_config);
let request_response = request_response::Behaviour::with_codec(
codec,
req_res_protocol,
req_res_config,
);

Self {
discovery: discovery_config.finish(),
Expand Down
6 changes: 3 additions & 3 deletions crates/services/p2p/src/codecs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::{
ResponseMessage,
},
};
use libp2p::request_response::Codec as RequestResponseCodec;
use libp2p::request_response;
use std::io;

/// Implement this in order to handle serialization & deserialization of Gossipsub messages
Expand All @@ -34,12 +34,12 @@ pub trait NetworkCodec:
GossipsubCodec<
RequestMessage = GossipsubBroadcastRequest,
ResponseMessage = GossipsubMessage,
> + RequestResponseCodec<Request = RequestMessage, Response = ResponseMessage>
> + request_response::Codec<Request = RequestMessage, Response = ResponseMessage>
+ Clone
+ Send
+ 'static
{
/// Returns RequestResponse's Protocol
/// Needed for initialization of RequestResponse Behaviour
fn get_req_res_protocol(&self) -> <Self as RequestResponseCodec>::Protocol;
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol;
}
6 changes: 3 additions & 3 deletions crates/services/p2p/src/codecs/postcard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use futures::{
AsyncReadExt,
AsyncWriteExt,
};
use libp2p::request_response::Codec as RequestResponseCodec;
use libp2p::request_response;
use serde::{
Deserialize,
Serialize,
Expand Down Expand Up @@ -68,7 +68,7 @@ impl PostcardCodec {
/// If the substream was not properly closed when dropped, the sender would instead
/// run into a timeout waiting for the response.
#[async_trait]
impl RequestResponseCodec for PostcardCodec {
impl request_response::Codec for PostcardCodec {
type Protocol = MessageExchangePostcardProtocol;
type Request = RequestMessage;
type Response = ResponseMessage;
Expand Down Expand Up @@ -161,7 +161,7 @@ impl GossipsubCodec for PostcardCodec {
}

impl NetworkCodec for PostcardCodec {
fn get_req_res_protocol(&self) -> <Self as RequestResponseCodec>::Protocol {
fn get_req_res_protocol(&self) -> <Self as request_response::Codec>::Protocol {
MessageExchangePostcardProtocol {}
}
}
Expand Down
24 changes: 12 additions & 12 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{
gossipsub::config::default_gossipsub_config,
heartbeat::HeartbeatConfig,
heartbeat,
peer_manager::ConnectionState,
TryPeerId,
};
Expand All @@ -11,17 +11,17 @@ use libp2p::{
muxing::StreamMuxerBox,
transport::Boxed,
},
gossipsub::Config as GossipsubConfig,
gossipsub,
identity::{
secp256k1,
Keypair,
},
noise::Config as NoiseConfig,
noise,
tcp::{
tokio::Transport as TokioTcpTransport,
Config as TcpConfig,
self,
tokio,
},
yamux::Config as YamuxConfig,
yamux,
Multiaddr,
PeerId,
Transport,
Expand Down Expand Up @@ -119,9 +119,9 @@ pub struct Config<State = Initialized> {
pub info_interval: Option<Duration>,

// `Gossipsub` config
pub gossipsub_config: GossipsubConfig,
pub gossipsub_config: gossipsub::Config,

pub heartbeat_config: HeartbeatConfig,
pub heartbeat_config: heartbeat::Config,

// RequestResponse related fields
/// Sets the timeout for inbound and outbound requests.
Expand Down Expand Up @@ -224,7 +224,7 @@ impl Config<NotInitialized> {
reserved_nodes: vec![],
reserved_nodes_only_mode: false,
gossipsub_config: default_gossipsub_config(),
heartbeat_config: HeartbeatConfig::default(),
heartbeat_config: heartbeat::Config::default(),
set_request_timeout: REQ_RES_TIMEOUT,
set_connection_keep_alive: REQ_RES_TIMEOUT,
heartbeat_check_interval: Duration::from_secs(10),
Expand Down Expand Up @@ -262,7 +262,7 @@ pub(crate) fn build_transport_function(
let transport_function = move |keypair: &Keypair| {
let transport = {
let generate_tcp_transport = || {
TokioTcpTransport::new(TcpConfig::new().port_reuse(true).nodelay(true))
tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true))
};

let tcp = generate_tcp_transport();
Expand All @@ -275,12 +275,12 @@ pub(crate) fn build_transport_function(
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

let noise_authenticated =
NoiseConfig::new(keypair).expect("Noise key generation failed");
noise::Config::new(keypair).expect("Noise key generation failed");

let multiplex_config = {
let mplex_config = MplexConfig::default();

let mut yamux_config = YamuxConfig::default();
let mut yamux_config = yamux::Config::default();
// TODO: remove deprecated method call https://github.com/FuelLabs/fuel-core/issues/1592
#[allow(deprecated)]
yamux_config.set_max_buffer_size(MAX_RESPONSE_SIZE);
Expand Down
23 changes: 10 additions & 13 deletions crates/services/p2p/src/config/fuel_authenticated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,15 @@ use futures::{
TryFutureExt,
};
use libp2p::{
self,
core::{
upgrade::{
InboundConnectionUpgrade,
OutboundConnectionUpgrade,
},
UpgradeInfo,
},
noise::{
Config as NoiseConfig,
Error as NoiseError,
Output as NoiseOutput,
},
noise,
PeerId,
};
use std::pin::Pin;
Expand All @@ -30,14 +27,14 @@ pub(crate) trait Approver {

#[derive(Clone)]
pub(crate) struct FuelAuthenticated<A: Approver> {
noise_authenticated: NoiseConfig,
noise_authenticated: noise::Config,
approver: A,
checksum: Checksum,
}

impl<A: Approver> FuelAuthenticated<A> {
pub(crate) fn new(
noise_authenticated: NoiseConfig,
noise_authenticated: noise::Config,
approver: A,
checksum: Checksum,
) -> Self {
Expand Down Expand Up @@ -69,8 +66,8 @@ where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Approver + Send + 'static,
{
type Output = (PeerId, NoiseOutput<T>);
type Error = NoiseError;
type Output = (PeerId, noise::Output<T>);
type Error = noise::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future {
Expand All @@ -81,7 +78,7 @@ where
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(NoiseError::AuthenticationFailed)
future::err(noise::Error::AuthenticationFailed)
}
}),
)
Expand All @@ -93,8 +90,8 @@ where
T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
A: Approver + Send + 'static,
{
type Output = (PeerId, NoiseOutput<T>);
type Error = NoiseError;
type Output = (PeerId, noise::Output<T>);
type Error = noise::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future {
Expand All @@ -105,7 +102,7 @@ where
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(NoiseError::AuthenticationFailed)
future::err(noise::Error::AuthenticationFailed)
}
}),
)
Expand Down