Skip to content

Commit

Permalink
fix(comms): timeout and ban for bad behaviour in protocol negotation (t…
Browse files Browse the repository at this point in the history
…ari-project#5679)

Description
---
Adds inbound protocol negotiation timeout
Adds ban for misbehaving peer in protocol negotiation 

Motivation and Context
---
Draft: TODO: create an issue

How Has This Been Tested?
---
TODO: write a test for this case

What process can a PR reviewer use to test or verify this change?
---

<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
sdbondi committed Aug 31, 2023
1 parent f7e2753 commit d03d0b5
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 27 deletions.
8 changes: 7 additions & 1 deletion comms/core/src/connection_manager/manager.rs
Expand Up @@ -68,6 +68,9 @@ pub enum ConnectionManagerEvent {

// Substreams
NewInboundSubstream(NodeId, ProtocolId, Substream),

// Other
PeerViolation { peer_node_id: NodeId, details: String },
}

impl fmt::Display for ConnectionManagerEvent {
Expand All @@ -85,6 +88,9 @@ impl fmt::Display for ConnectionManagerEvent {
node_id.short_str(),
String::from_utf8_lossy(protocol)
),
PeerViolation { peer_node_id, details } => {
write!(f, "PeerViolation({}, {})", peer_node_id.short_str(), details)
},
}
}
}
Expand Down Expand Up @@ -401,7 +407,7 @@ where
}

async fn handle_event(&mut self, event: ConnectionManagerEvent) {
use ConnectionManagerEvent::{NewInboundSubstream, PeerConnectFailed, PeerConnected, PeerInboundConnectFailed};
use ConnectionManagerEvent::*;

match event {
NewInboundSubstream(node_id, protocol, stream) => {
Expand Down
91 changes: 66 additions & 25 deletions comms/core/src/connection_manager/peer_connection.rs
Expand Up @@ -30,6 +30,7 @@ use std::{
time::{Duration, Instant},
};

use futures::{future::BoxFuture, stream::FuturesUnordered};
use log::*;
use multiaddr::Multiaddr;
use tokio::{
Expand Down Expand Up @@ -65,6 +66,8 @@ use crate::{

const LOG_TARGET: &str = "comms::connection_manager::peer_connection";

const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(5);

static ID_COUNTER: AtomicUsize = AtomicUsize::new(0);

pub fn try_create(
Expand Down Expand Up @@ -358,7 +361,9 @@ struct PeerConnectionActor {
incoming_substreams: IncomingSubstreams,
control: Control,
event_notifier: mpsc::Sender<ConnectionManagerEvent>,
our_supported_protocols: Vec<ProtocolId>,
our_supported_protocols: Arc<Vec<ProtocolId>>,
inbound_protocol_negotiations:
FuturesUnordered<BoxFuture<'static, Result<(ProtocolId, Substream), PeerConnectionError>>>,
their_supported_protocols: Vec<ProtocolId>,
}

Expand All @@ -381,7 +386,10 @@ impl PeerConnectionActor {
incoming_substreams: connection.into_incoming(),
request_rx,
event_notifier,
our_supported_protocols,
// our_supported_protocols never changes so we make it cheap to clone (used in inbound_protocol_negotiations
// futures)
our_supported_protocols: Arc::new(our_supported_protocols),
inbound_protocol_negotiations: FuturesUnordered::new(),
their_supported_protocols,
}
}
Expand All @@ -401,22 +409,16 @@ impl PeerConnectionActor {

maybe_substream = self.incoming_substreams.next() => {
match maybe_substream {
Some(substream) => {
if let Err(err) = self.handle_incoming_substream(substream).await {
error!(
target: LOG_TARGET,
"[{}] Incoming substream for peer '{}' failed to open because '{error}'",
self,
self.peer_node_id.short_str(),
error = err
)
}
},
Some(substream) => self.handle_incoming_substream(substream).await,
None => {
debug!(target: LOG_TARGET, "[{}] Peer '{}' closed the connection", self, self.peer_node_id.short_str());
break;
},
}
},

Some(result) = self.inbound_protocol_negotiations.next() => {
self.handle_inbound_protocol_negotiation_result(result).await;
}
}
}
Expand Down Expand Up @@ -461,27 +463,66 @@ impl PeerConnectionActor {
}

#[tracing::instrument(level="trace", skip(self, stream),fields(comms.direction="inbound"))]
async fn handle_incoming_substream(&mut self, mut stream: Substream) -> Result<(), PeerConnectionError> {
let selected_protocol = ProtocolNegotiation::new(&mut stream)
.negotiate_protocol_inbound(&self.our_supported_protocols)
.await?;
async fn handle_incoming_substream(&mut self, mut stream: Substream) {
let our_supported_protocols = self.our_supported_protocols.clone();
self.inbound_protocol_negotiations.push(Box::pin(async move {
let mut protocol_negotiation = ProtocolNegotiation::new(&mut stream);

let selected_protocol = time::timeout(
PROTOCOL_NEGOTIATION_TIMEOUT,
protocol_negotiation.negotiate_protocol_inbound(&our_supported_protocols),
)
.await
.map_err(|_| PeerConnectionError::ProtocolNegotiationTimeout)??;
Ok((selected_protocol, stream))
}));
}

self.notify_event(ConnectionManagerEvent::NewInboundSubstream(
self.peer_node_id.clone(),
selected_protocol,
stream,
))
.await;
async fn handle_inbound_protocol_negotiation_result(
&mut self,
result: Result<(ProtocolId, Substream), PeerConnectionError>,
) {
match result {
Ok((selected_protocol, stream)) => {
self.notify_event(ConnectionManagerEvent::NewInboundSubstream(
self.peer_node_id.clone(),
selected_protocol,
stream,
))
.await;
},
Err(PeerConnectionError::ProtocolError(err)) if err.is_ban_offence() => {
error!(
target: LOG_TARGET,
"[{}] PEER VIOLATION: Incoming substream for peer '{}' failed to open because '{}'",
self,
self.peer_node_id.short_str(),
err
);

Ok(())
self.notify_event(ConnectionManagerEvent::PeerViolation {
peer_node_id: self.peer_node_id.clone(),
details: err.to_string(),
})
.await;
},
Err(err) => {
error!(
target: LOG_TARGET,
"[{}] Incoming substream for peer '{}' failed to open because '{error}'",
self,
self.peer_node_id.short_str(),
error = err
);
},
}
}

#[tracing::instrument(skip(self))]
async fn open_negotiated_protocol_stream(
&mut self,
protocol: ProtocolId,
) -> Result<NegotiatedSubstream<Substream>, PeerConnectionError> {
const PROTOCOL_NEGOTIATION_TIMEOUT: Duration = Duration::from_secs(10);
debug!(
target: LOG_TARGET,
"[{}] Negotiating protocol '{}' on new substream for peer '{}'",
Expand Down
12 changes: 11 additions & 1 deletion comms/core/src/connectivity/manager.rs
Expand Up @@ -540,11 +540,12 @@ impl ConnectivityManagerActor {
Ok(())
}

#[allow(clippy::too_many_lines)]
async fn update_state_on_connectivity_event(
&mut self,
event: &ConnectionManagerEvent,
) -> Result<(), ConnectivityError> {
use ConnectionManagerEvent::{PeerConnectFailed, PeerConnected, PeerDisconnected};
use ConnectionManagerEvent::*;
match event {
PeerConnected(new_conn) => {
match self.on_new_connection(new_conn).await {
Expand Down Expand Up @@ -573,6 +574,15 @@ impl ConnectivityManagerActor {
}
}
},
PeerViolation { peer_node_id, details } => {
self.ban_peer(
peer_node_id,
Duration::from_secs(2 * 60 * 60),
format!("Peer violation: {details}"),
)
.await?;
return Ok(());
},
_ => {},
}

Expand Down
16 changes: 16 additions & 0 deletions comms/core/src/protocol/error.rs
Expand Up @@ -44,3 +44,19 @@ pub enum ProtocolError {
#[error("Failed to send notification because notification sender disconnected")]
NotificationSenderDisconnected,
}

impl ProtocolError {
pub fn is_ban_offence(&self) -> bool {
match self {
ProtocolError::IoError(_) |
ProtocolError::ProtocolNegotiationTerminatedByPeer |
ProtocolError::ProtocolOutboundNegotiationFailed { .. } |
ProtocolError::ProtocolNotRegistered |
ProtocolError::ProtocolInboundNegotiationFailed |
ProtocolError::ProtocolOptimisticNegotiationFailed |
ProtocolError::NotificationSenderDisconnected => false,

ProtocolError::ProtocolIdTooLong => true,
}
}
}
8 changes: 8 additions & 0 deletions comms/dht/examples/memory_net/utilities.rs
Expand Up @@ -657,6 +657,14 @@ fn connection_manager_logger(
node_name
);
},
PeerViolation { peer_node_id, details } => {
println!(
"'{}' violated protocol with '{}' because '{}'",
node_name,
get_name(peer_node_id),
details
);
},
}
event
}
Expand Down

0 comments on commit d03d0b5

Please sign in to comment.