Skip to content

Commit

Permalink
Fixed reconnection issue in the dev cluster with AWS cluster (#1915)
Browse files Browse the repository at this point in the history
Closes #1592

This change improves and simplifies some aspects of the P2P service. It
also fixes the issue of not reconnecting to the reserved nodes when the
reserved node is restarted and got new IP.

- The change moves the reconnection handling into the `PeerReport`
behavior. It removes ping-ponging reserved peers between the primary
behavior and the `PeerReport` behavior and encapsulates the logic inside
the `PeerReport`. Also, it eliminates the timer and replaces it with the
queue of reconnections, reducing noise in logs(before, we had much more
trash errors).
- Added logs for cases when the dial fails. They are very helpful to
debug issues with connection.
- Simplified initialization of the `ConnectionTracker` and
`FuelAuthenticated`. It allows the reuse of libp2p built-in connections
builder.
- Removed the usage of the Mplex since it doesn't have a backpressure
mechanism. Now we use Yamux by default. It is breaking the change since
nodes with the old codebase can't connect to new nodes.
- Propagated `max_concurrent_streams` for request-response protocol.

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog

### Before requesting review
- [x] I have reviewed the code myself

---------

Co-authored-by: Hannes Karppila <hannes.karppila@gmail.com>
  • Loading branch information
xgreenx and Dentosal committed Jun 4, 2024
1 parent f52102b commit 0a1d591
Show file tree
Hide file tree
Showing 10 changed files with 171 additions and 214 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [#1888](https://github.com/FuelLabs/fuel-core/pull/1888): Upgraded `fuel-vm` to `0.51.0`. See [release](https://github.com/FuelLabs/fuel-vm/releases/tag/v0.51.0) for more information.

### Fixed
- [#1915](https://github.com/FuelLabs/fuel-core/pull/1915): Fixed reconnection issue in the dev cluster with AWS cluster.
- [#1914](https://github.com/FuelLabs/fuel-core/pull/1914): Fixed halting of the node during synchronization in PoA service.

## [Version 0.27.0]
Expand Down
5 changes: 5 additions & 0 deletions bin/fuel-core/src/cli/run/p2p.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,10 @@ pub struct P2PArgs {
#[clap(long = "request-timeout", default_value = "20", env)]
pub request_timeout: u64,

/// Choose max concurrent streams for RequestResponse protocol
#[clap(long = "request-max-concurrent-streams", default_value = "256", env)]
pub max_concurrent_streams: usize,

/// Choose how long RequestResponse protocol connections will live if idle
#[clap(long = "connection-keep-alive", default_value = "20", env)]
pub connection_keep_alive: u64,
Expand Down Expand Up @@ -308,6 +312,7 @@ impl P2PArgs {
gossipsub_config,
heartbeat_config,
set_request_timeout: Duration::from_secs(self.request_timeout),
max_concurrent_streams: self.max_concurrent_streams,
set_connection_keep_alive: Duration::from_secs(self.connection_keep_alive),
heartbeat_check_interval: Duration::from_secs(self.heartbeat_check_interval),
heartbeat_max_avg_interval: Duration::from_secs(
Expand Down
7 changes: 3 additions & 4 deletions crates/services/p2p/src/behavior.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,9 @@ impl FuelBehaviour {
let req_res_protocol =
core::iter::once((codec.get_req_res_protocol(), ProtocolSupport::Full));

let req_res_config = request_response::Config::default();
req_res_config
.clone()
.with_request_timeout(p2p_config.set_request_timeout);
let req_res_config = request_response::Config::default()
.with_request_timeout(p2p_config.set_request_timeout)
.with_max_concurrent_streams(p2p_config.max_concurrent_streams);

let request_response = request_response::Behaviour::with_codec(
codec,
Expand Down
89 changes: 17 additions & 72 deletions crates/services/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,15 @@ use crate::{
use fuel_core_types::blockchain::consensus::Genesis;

use libp2p::{
core::{
muxing::StreamMuxerBox,
transport::Boxed,
},
gossipsub,
identity::{
secp256k1,
Keypair,
},
noise,
tcp::{
self,
tokio,
},
yamux,
Multiaddr,
PeerId,
Transport,
};
use libp2p_mplex::MplexConfig;
use std::{
collections::HashSet,
net::{
Expand All @@ -44,12 +33,10 @@ use self::{
connection_tracker::ConnectionTracker,
fuel_authenticated::FuelAuthenticated,
fuel_upgrade::Checksum,
guarded_node::GuardedNode,
};
mod connection_tracker;
mod fuel_authenticated;
pub(crate) mod fuel_upgrade;
mod guarded_node;

const REQ_RES_TIMEOUT: Duration = Duration::from_secs(20);

Expand All @@ -62,13 +49,9 @@ pub const MAX_RESPONSE_SIZE: usize = 18 * 1024 * 1024;
/// Maximum number of headers per request.
pub const MAX_HEADERS_PER_REQUEST: u32 = 100;

/// Adds a timeout to the setup and protocol upgrade process for all
/// inbound and outbound connections established through the transport.
const TRANSPORT_TIMEOUT: Duration = Duration::from_secs(20);

#[derive(Clone, Debug)]
pub struct Config<State = Initialized> {
/// The keypair used for for handshake during communication with other p2p nodes.
/// The keypair used for handshake during communication with other p2p nodes.
pub keypair: Keypair,

/// Name of the Network
Expand Down Expand Up @@ -126,6 +109,8 @@ pub struct Config<State = Initialized> {
// RequestResponse related fields
/// Sets the timeout for inbound and outbound requests.
pub set_request_timeout: Duration,
/// Sets the maximum number of concurrent streams for a connection.
pub max_concurrent_streams: usize,
/// Sets the keep-alive timeout of idle connections.
pub set_connection_keep_alive: Duration,

Expand Down Expand Up @@ -180,6 +165,7 @@ impl Config<NotInitialized> {
gossipsub_config: self.gossipsub_config,
heartbeat_config: self.heartbeat_config,
set_request_timeout: self.set_request_timeout,
max_concurrent_streams: self.max_concurrent_streams,
set_connection_keep_alive: self.set_connection_keep_alive,
heartbeat_check_interval: self.heartbeat_check_interval,
heartbeat_max_avg_interval: self.heartbeat_max_time_since_last,
Expand Down Expand Up @@ -226,6 +212,7 @@ impl Config<NotInitialized> {
gossipsub_config: default_gossipsub_config(),
heartbeat_config: heartbeat::Config::default(),
set_request_timeout: REQ_RES_TIMEOUT,
max_concurrent_streams: 256,
set_connection_keep_alive: REQ_RES_TIMEOUT,
heartbeat_check_interval: Duration::from_secs(10),
heartbeat_max_avg_interval: Duration::from_secs(20),
Expand Down Expand Up @@ -254,71 +241,29 @@ impl Config<Initialized> {
pub(crate) fn build_transport_function(
p2p_config: &Config,
) -> (
impl FnOnce(&Keypair) -> Boxed<(PeerId, StreamMuxerBox)> + '_,
impl FnOnce(&Keypair) -> Result<FuelAuthenticated<ConnectionTracker>, ()> + '_,
Arc<RwLock<ConnectionState>>,
) {
let connection_state = ConnectionState::new();
let kept_connection_state = connection_state.clone();
let transport_function = move |keypair: &Keypair| {
let transport = {
let generate_tcp_transport = || {
tokio::Transport::new(tcp::Config::new().port_reuse(true).nodelay(true))
};

let tcp = generate_tcp_transport();

let ws_tcp = libp2p::websocket::WsConfig::new(generate_tcp_transport())
.or_transport(tcp);

libp2p::dns::tokio::Transport::system(ws_tcp).unwrap()
}
.upgrade(libp2p::core::upgrade::Version::V1Lazy);

let noise_authenticated =
noise::Config::new(keypair).expect("Noise key generation failed");

let multiplex_config = {
let mplex_config = MplexConfig::default();

let mut yamux_config = yamux::Config::default();
// TODO: remove deprecated method call https://github.com/FuelLabs/fuel-core/issues/1592
#[allow(deprecated)]
yamux_config.set_max_buffer_size(MAX_RESPONSE_SIZE);
libp2p::core::upgrade::SelectUpgrade::new(yamux_config, mplex_config)
let connection_state = if p2p_config.reserved_nodes_only_mode {
None
} else {
Some(connection_state)
};

if p2p_config.reserved_nodes_only_mode {
let guarded_node = GuardedNode::new(&p2p_config.reserved_nodes);
let connection_tracker =
ConnectionTracker::new(&p2p_config.reserved_nodes, connection_state);

let fuel_authenticated = FuelAuthenticated::new(
noise_authenticated,
guarded_node,
p2p_config.checksum,
);

transport
.authenticate(fuel_authenticated)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
} else {
let connection_tracker = ConnectionTracker::new(
&p2p_config.reserved_nodes,
connection_state.clone(),
);

let fuel_authenticated = FuelAuthenticated::new(
noise_authenticated,
connection_tracker,
p2p_config.checksum,
);

transport
.authenticate(fuel_authenticated)
.multiplex(multiplex_config)
.timeout(TRANSPORT_TIMEOUT)
.boxed()
}
Ok(FuelAuthenticated::new(
noise_authenticated,
connection_tracker,
p2p_config.checksum,
))
};

(transport_function, kept_connection_state)
Expand Down
10 changes: 6 additions & 4 deletions crates/services/p2p/src/config/connection_tracker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::{
#[derive(Debug, Clone)]
pub(crate) struct ConnectionTracker {
reserved_nodes: HashSet<PeerId>,
connection_state: Arc<RwLock<ConnectionState>>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
}

impl ConnectionTracker {
pub(crate) fn new(
reserved_nodes: &[Multiaddr],
connection_state: Arc<RwLock<ConnectionState>>,
connection_state: Option<Arc<RwLock<ConnectionState>>>,
) -> Self {
Self {
reserved_nodes: peer_ids_set_from(reserved_nodes),
Expand All @@ -41,8 +41,10 @@ impl Approver for ConnectionTracker {
return true
}

if let Ok(connection_state) = self.connection_state.read() {
return connection_state.available_slot()
if let Some(connection_state) = &self.connection_state {
if let Ok(connection_state) = connection_state.read() {
return connection_state.available_slot()
}
}

false
Expand Down
44 changes: 21 additions & 23 deletions crates/services/p2p/src/config/fuel_authenticated.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
use crate::config::fuel_upgrade::Checksum;
use futures::{
future,
AsyncRead,
AsyncWrite,
Future,
TryFutureExt,
};
use libp2p::{
self,
Expand Down Expand Up @@ -71,17 +69,16 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_inbound(self, socket: T, _: Self::Info) -> Self::Future {
Box::pin(
self.noise_authenticated
.upgrade_inbound(socket, "")
.and_then(move |(remote_peer_id, io)| {
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(noise::Error::AuthenticationFailed)
}
}),
)
Box::pin(async move {
let (remote_peer_id, io) =
self.noise_authenticated.upgrade_inbound(socket, "").await?;

if self.approver.allow_peer(&remote_peer_id) {
Ok((remote_peer_id, io))
} else {
Err(noise::Error::AuthenticationFailed)
}
})
}
}

Expand All @@ -95,16 +92,17 @@ where
type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;

fn upgrade_outbound(self, socket: T, _: Self::Info) -> Self::Future {
Box::pin(
self.noise_authenticated
Box::pin(async move {
let (remote_peer_id, io) = self
.noise_authenticated
.upgrade_outbound(socket, "")
.and_then(move |(remote_peer_id, io)| {
if self.approver.allow_peer(&remote_peer_id) {
future::ok((remote_peer_id, io))
} else {
future::err(noise::Error::AuthenticationFailed)
}
}),
)
.await?;

if self.approver.allow_peer(&remote_peer_id) {
Ok((remote_peer_id, io))
} else {
Err(noise::Error::AuthenticationFailed)
}
})
}
}
31 changes: 0 additions & 31 deletions crates/services/p2p/src/config/guarded_node.rs

This file was deleted.

Loading

0 comments on commit 0a1d591

Please sign in to comment.