Skip to content

Commit

Permalink
fix(dht): limit peer sync and ban on server-caused errors (tari-proje…
Browse files Browse the repository at this point in the history
…ct#5714)

Description
---
request a maximum of 500 peers
ban on server-caused errors

Motivation and Context
---
Peer could previously send endless peers.

TARI-026

How Has This Been Tested?
---
Manually and existing tests - ban cases not explicitly tested

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->

Co-authored-by: SW van Heerden <swvheerden@gmail.com>
  • Loading branch information
sdbondi and SWvheerden committed Sep 1, 2023
1 parent e70c752 commit b3f2dca
Show file tree
Hide file tree
Showing 9 changed files with 137 additions and 56 deletions.
2 changes: 1 addition & 1 deletion comms/core/src/protocol/rpc/client/mod.rs
Expand Up @@ -938,7 +938,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin
}

if chunk_count >= RPC_CHUNKING_MAX_CHUNKS {
return Err(RpcError::ExceededMaxChunkCount {
return Err(RpcError::RemotePeerExceededMaxChunkCount {
expected: RPC_CHUNKING_MAX_CHUNKS,
});
}
Expand Down
39 changes: 36 additions & 3 deletions comms/core/src/protocol/rpc/error.rs
Expand Up @@ -62,14 +62,14 @@ pub enum RpcError {
PeerManagerError(#[from] PeerManagerError),
#[error("Connectivity error: {0}")]
ConnectivityError(#[from] ConnectivityError),
#[error("Reply Timeout")]
#[error("Reply from peer timed out")]
ReplyTimeout,
#[error("Received an invalid ping response")]
InvalidPingResponse,
#[error("Unexpected ACK response. This is likely because of a previous ACK timeout")]
UnexpectedAckResponse,
#[error("Attempted to send more than {expected} payload chunks")]
ExceededMaxChunkCount { expected: usize },
#[error("Remote peer attempted to send more than {expected} payload chunks")]
RemotePeerExceededMaxChunkCount { expected: usize },
#[error("Request body was too large. Expected <= {expected} but got {got}")]
MaxRequestSizeExceeded { got: usize, expected: usize },
#[error(transparent)]
Expand All @@ -80,6 +80,39 @@ impl RpcError {
pub fn client_internal_error<T: ToString>(err: &T) -> Self {
RpcError::ClientInternalError(err.to_string())
}

/// Returns true if the server directly caused the error, otherwise false
pub fn is_caused_by_server(&self) -> bool {
match self {
RpcError::ReplyTimeout |
RpcError::DecodeError(_) |
RpcError::RemotePeerExceededMaxChunkCount { .. } |
RpcError::HandshakeError(RpcHandshakeError::DecodeError(_)) |
RpcError::HandshakeError(RpcHandshakeError::ServerClosedRequest) |
RpcError::HandshakeError(RpcHandshakeError::Rejected(_)) |
RpcError::HandshakeError(RpcHandshakeError::TimedOut) |
RpcError::ServerClosedRequest |
RpcError::UnexpectedAckResponse |
RpcError::ResponseIdDidNotMatchRequest { .. } => true,

// Some of these may be caused by the server, but not with 100% certainty
RpcError::RequestFailed(_) |
RpcError::Io(_) |
RpcError::ClientClosed |
RpcError::RequestCancelled |
RpcError::ClientInternalError(_) |
RpcError::ServerError(_) |
RpcError::PeerConnectionError(_) |
RpcError::PeerManagerError(_) |
RpcError::ConnectivityError(_) |
RpcError::InvalidPingResponse |
RpcError::MaxRequestSizeExceeded { .. } |
RpcError::HandshakeError(RpcHandshakeError::Io(_)) |
RpcError::HandshakeError(RpcHandshakeError::ClientNoSupportedVersion) |
RpcError::HandshakeError(RpcHandshakeError::ClientClosed) |
RpcError::UnknownError(_) => false,
}
}
}

#[derive(Debug, Error, Clone, Copy)]
Expand Down
8 changes: 4 additions & 4 deletions comms/dht/src/config.rs
Expand Up @@ -84,11 +84,11 @@ pub struct DhtConfig {
/// Network discovery config
pub network_discovery: NetworkDiscoveryConfig,
/// Length of time to ban a peer if the peer misbehaves at the DHT-level.
/// Default: 6 hrs
/// Default: 2 hrs
#[serde(with = "serializers::seconds")]
pub ban_duration: Duration,
/// Length of time to ban a peer for a "short" duration.
/// Default: 30 mins
/// Default: 10 mins
#[serde(with = "serializers::seconds")]
pub ban_duration_short: Duration,

Expand Down Expand Up @@ -181,8 +181,8 @@ impl Default for DhtConfig {
auto_join: false,
join_cooldown_interval: Duration::from_secs(10 * 60),
network_discovery: Default::default(),
ban_duration: Duration::from_secs(6 * 60 * 60),
ban_duration_short: Duration::from_secs(60 * 60),
ban_duration: Duration::from_secs(2 * 60 * 60),
ban_duration_short: Duration::from_secs(10 * 60),
flood_ban_max_msg_count: 100_000,
flood_ban_timespan: Duration::from_secs(100),
max_permitted_peer_claims: 5,
Expand Down
4 changes: 4 additions & 0 deletions comms/dht/src/network_discovery/config.rs
Expand Up @@ -50,6 +50,9 @@ pub struct NetworkDiscoveryConfig {
/// current state.
/// Default: 5
pub max_sync_peers: usize,
/// The maximum number of peers we allow per round of sync.
/// Default: 500
pub max_peers_to_sync_per_round: u32,
}

impl Default for NetworkDiscoveryConfig {
Expand All @@ -61,6 +64,7 @@ impl Default for NetworkDiscoveryConfig {
idle_after_num_rounds: 10,
on_failure_idle_period: Duration::from_secs(5),
max_sync_peers: 5,
max_peers_to_sync_per_round: 500,
}
}
}
112 changes: 74 additions & 38 deletions comms/dht/src/network_discovery/discovering.rs
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::convert::{TryFrom, TryInto};
use std::convert::TryInto;

use futures::{stream::FuturesUnordered, Stream, StreamExt};
use log::*;
Expand All @@ -35,7 +35,14 @@ use super::{
state_machine::{DhtNetworkDiscoveryRoundInfo, DiscoveryParams, NetworkDiscoveryContext, StateEvent},
NetworkDiscoveryError,
};
use crate::{peer_validator::PeerValidator, proto::rpc::GetPeersRequest, rpc, rpc::UnvalidatedPeerInfo, DhtConfig};
use crate::{
actor::OffenceSeverity,
peer_validator::PeerValidator,
proto::rpc::{GetPeersRequest, GetPeersResponse},
rpc,
rpc::UnvalidatedPeerInfo,
DhtConfig,
};

const LOG_TARGET: &str = "comms::dht::network_discovery";

Expand Down Expand Up @@ -126,7 +133,8 @@ impl Discovering {
target: LOG_TARGET,
"Established RPC connection to peer `{}`", peer_node_id
);
self.request_peers(peer_node_id, client).await?;
let result = self.request_peers(peer_node_id, client).await;
self.ban_on_offence(peer_node_id.clone(), result).await?;

Ok(())
}
Expand All @@ -140,19 +148,12 @@ impl Discovering {
target: LOG_TARGET,
"Requesting {} peers from `{}`",
self.params
.num_peers_to_request
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "∞".into()),
.num_peers_to_request,
sync_peer
);
match client
let mut stream = client
.get_peers(GetPeersRequest {
n: self
.params
.num_peers_to_request
.map(|v| u32::try_from(v).unwrap())
.unwrap_or_default(),
n: self.params.num_peers_to_request,
include_clients: true,
max_claims: self.config().max_permitted_peer_claims.try_into().unwrap_or_else(|_| {
error!(target: LOG_TARGET, "Node configured to accept more than u32::MAX claims per peer");
Expand All @@ -168,31 +169,16 @@ impl Discovering {
u32::MAX
}),
})
.await
{
Ok(mut stream) => {
while let Some(resp) = stream.next().await {
match resp {
Ok(resp) => match resp.peer.and_then(|peer| peer.try_into().ok()) {
Some(peer) => {
self.validate_and_add_peer(sync_peer, peer).await?;
},
None => {
debug!(target: LOG_TARGET, "Invalid response from peer `{}`", sync_peer);
},
},
Err(err) => {
debug!(target: LOG_TARGET, "Error response from peer `{}`: {}", sync_peer, err);
},
}
}
},
Err(err) => {
debug!(
target: LOG_TARGET,
"Failed to request for peers from peer `{}`: {}", sync_peer, err
);
},
.await?;

while let Some(resp) = stream.next().await {
let GetPeersResponse { peer } = resp?;

let peer = peer.ok_or_else(|| NetworkDiscoveryError::EmptyPeerMessageReceived)?;
let new_peer = peer
.try_into()
.map_err(NetworkDiscoveryError::InvalidPeerDataReceived)?;
self.validate_and_add_peer(sync_peer, new_peer).await?;
}

Ok(())
Expand Down Expand Up @@ -233,6 +219,56 @@ impl Discovering {
}
}

async fn ban_on_offence<T>(
&mut self,
peer: NodeId,
result: Result<T, NetworkDiscoveryError>,
) -> Result<T, NetworkDiscoveryError> {
match result {
Ok(t) => Ok(t),
Err(err) => {
match &err {
NetworkDiscoveryError::EmptyPeerMessageReceived |
NetworkDiscoveryError::InvalidPeerDataReceived(_) |
NetworkDiscoveryError::PeerValidationError(_) => {
self.ban_peer(peer, OffenceSeverity::High, &err).await;
},
NetworkDiscoveryError::RpcError(rpc_err) if rpc_err.is_caused_by_server() => {
self.ban_peer(peer, OffenceSeverity::High, &err).await;
},
NetworkDiscoveryError::RpcStatus(status) if !status.is_ok() => {
self.ban_peer(peer, OffenceSeverity::Low, &err).await;
},
// Other errors
NetworkDiscoveryError::RpcStatus(_) |
NetworkDiscoveryError::NoSyncPeers |
NetworkDiscoveryError::PeerManagerError(_) |
NetworkDiscoveryError::RpcError(_) |
NetworkDiscoveryError::ConnectivityError(_) => {},
}
Err(err)
},
}
}

async fn ban_peer<T: ToString>(&mut self, peer: NodeId, severity: OffenceSeverity, err: T) {
if let Err(e) = self
.context
.connectivity
.ban_peer_until(
peer.clone(),
self.config().ban_duration_from_severity(severity),
err.to_string(),
)
.await
{
warn!(
target: LOG_TARGET,
"Failed to ban peer `{}`: {}", peer, e
);
}
}

fn config(&self) -> &DhtConfig {
&self.context.config
}
Expand Down
12 changes: 11 additions & 1 deletion comms/dht/src/network_discovery/error.rs
Expand Up @@ -20,14 +20,20 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use tari_comms::{connectivity::ConnectivityError, peer_manager::PeerManagerError, protocol::rpc::RpcError};
use tari_comms::{
connectivity::ConnectivityError,
peer_manager::PeerManagerError,
protocol::rpc::{RpcError, RpcStatus},
};

use crate::peer_validator::DhtPeerValidatorError;

#[derive(thiserror::Error, Debug)]
pub enum NetworkDiscoveryError {
#[error("RPC error: {0}")]
RpcError(#[from] RpcError),
#[error("RPC status error: {0}")]
RpcStatus(#[from] RpcStatus),
#[error("Peer manager error: {0}")]
PeerManagerError(#[from] PeerManagerError),
#[error("Connectivity error: {0}")]
Expand All @@ -36,4 +42,8 @@ pub enum NetworkDiscoveryError {
NoSyncPeers,
#[error("Sync peer sent invalid peer: {0}")]
PeerValidationError(#[from] DhtPeerValidatorError),
#[error("Sync peer sent empty peer message")]
EmptyPeerMessageReceived,
#[error("Sync peer sent invalid peer data: {0}")]
InvalidPeerDataReceived(anyhow::Error),
}
6 changes: 2 additions & 4 deletions comms/dht/src/network_discovery/ready.rs
Expand Up @@ -97,8 +97,7 @@ impl DiscoveryReady {
}

return Ok(StateEvent::BeginDiscovery(DiscoveryParams {
// All peers
num_peers_to_request: None,
num_peers_to_request: self.config().network_discovery.max_peers_to_sync_per_round,
peers,
}));
}
Expand Down Expand Up @@ -188,8 +187,7 @@ impl DiscoveryReady {
}

Ok(StateEvent::BeginDiscovery(DiscoveryParams {
// Request all peers
num_peers_to_request: None,
num_peers_to_request: self.config().network_discovery.max_peers_to_sync_per_round,
peers,
}))
}
Expand Down
5 changes: 1 addition & 4 deletions comms/dht/src/network_discovery/state_machine.rs
Expand Up @@ -295,7 +295,7 @@ where Fut: Future<Output = StateEvent> + Unpin {
#[derive(Debug, Clone)]
pub struct DiscoveryParams {
pub peers: Vec<NodeId>,
pub num_peers_to_request: Option<usize>,
pub num_peers_to_request: u32,
}

impl Display for DiscoveryParams {
Expand All @@ -306,9 +306,6 @@ impl Display for DiscoveryParams {
self.peers.len(),
self.peers.iter().map(|p| format!("{}, ", p)).collect::<String>(),
self.num_peers_to_request
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "∞".into()),
)
}
}
Expand Down
5 changes: 4 additions & 1 deletion comms/dht/src/network_discovery/test.rs
Expand Up @@ -212,7 +212,10 @@ mod discovery_ready {
}
let state_event = ready.next_event().await;
unpack_enum!(StateEvent::BeginDiscovery(params) = state_event);
assert!(params.num_peers_to_request.is_none());
assert_eq!(
params.num_peers_to_request,
NetworkDiscoveryConfig::default().max_peers_to_sync_per_round
);
}

#[tokio::test]
Expand Down

0 comments on commit b3f2dca

Please sign in to comment.