Skip to content

Commit

Permalink
fix(dht): add SAF bans (tari-project#5711)
Browse files Browse the repository at this point in the history
Description
---
Added bans for deviant peer behaviour when processing SAF messages 
Ban if peer sends more than a maximum number of SAF messages
Specify locally configured message limit over the wire to ensure that
peers can comply with this limit

Motivation and Context
---
Implement bans for bad peer responses from SAF messages

A peer could send many tiny messages in order to stay within the max
message size. Another buffering vec is allocated per message, which
allows the SAF responder to allocate a large vec in the recipient.

The addition of a limit to the SAF request is not a breaking change (0
== no limit which implies use the remote peer's configured limit). This
only matters if peers have non-default values.

How Has This Been Tested?
---
New unit test for bad message semantics. Existing SAF unit tests provide
ok coverage incl bad paths.

What process can a PR reviewer use to test or verify this change?
---
Test SAF
<!-- Checklist -->
<!-- 1. Is the title of your PR in the form that would make nice release
notes? The title, excluding the conventional commit
tag, will be included exactly as is in the CHANGELOG, so please think
about it carefully. -->


Breaking Changes
---

- [x] None
- [ ] Requires data directory on base node to be deleted
- [ ] Requires hard fork
- [ ] Other - Please specify

<!-- Does this include a breaking change? If so, include this line as a
footer -->
<!-- BREAKING CHANGE: Description what the user should do, e.g. delete a
database, resync the chain -->
  • Loading branch information
sdbondi committed Sep 1, 2023
1 parent 4b2b28b commit 594e03e
Show file tree
Hide file tree
Showing 14 changed files with 487 additions and 224 deletions.
2 changes: 2 additions & 0 deletions comms/dht/src/discovery/error.rs
Expand Up @@ -55,6 +55,8 @@ pub enum DhtDiscoveryError {
InvalidDiscoveryResponse { details: anyhow::Error },
#[error("DHT peer validator error: {0}")]
PeerValidatorError(#[from] DhtPeerValidatorError),
#[error("Cannot send discovery for this node")]
CannotDiscoverThisNode,
}

impl DhtDiscoveryError {
Expand Down
5 changes: 5 additions & 0 deletions comms/dht/src/discovery/service.rs
Expand Up @@ -303,6 +303,11 @@ impl DhtDiscoveryService {
reply_tx: oneshot::Sender<Result<Peer, DhtDiscoveryError>>,
) -> Result<(), DhtDiscoveryError> {
let nonce = OsRng.next_u64();
if *dest_pubkey == *self.node_identity.public_key() {
let _result = reply_tx.send(Err(DhtDiscoveryError::CannotDiscoverThisNode));
return Ok(());
}

if let Err(err) = self.send_discover(nonce, destination, dest_pubkey.clone()).await {
let _result = reply_tx.send(Err(err));
return Ok(());
Expand Down
32 changes: 27 additions & 5 deletions comms/dht/src/envelope.rs
Expand Up @@ -67,6 +67,7 @@ pub(crate) fn epochtime_to_datetime(datetime: EpochTime) -> DateTime<Utc> {
DateTime::from_utc(dt, Utc)
}

/// Message errors that should be verified by every node
#[derive(Debug, Error)]
pub enum DhtMessageError {
#[error("Invalid node destination")]
Expand All @@ -83,8 +84,10 @@ pub enum DhtMessageError {
InvalidMessageFlags,
#[error("Invalid ephemeral public key")]
InvalidEphemeralPublicKey,
#[error("Header was omitted from the message")]
#[error("Header is omitted from the message")]
HeaderOmitted,
#[error("Message Body is empty")]
BodyEmpty,
}

impl fmt::Display for DhtMessageType {
Expand Down Expand Up @@ -157,12 +160,31 @@ pub struct DhtMessageHeader {
}

impl DhtMessageHeader {
pub fn is_valid(&self) -> bool {
/// Checks if the DHT header is semantically valid. For example, if the message is flagged as encrypted, but sets a
/// empty signature or provides no ephemeral public key, this returns false.
pub fn is_semantically_valid(&self) -> bool {
// If the message is encrypted:
// - it needs a destination
// - it needs an ephemeral public key
// - it needs a signature
if self.flags.is_encrypted() {
!self.message_signature.is_empty() && self.ephemeral_public_key.is_some()
} else {
true
// Must have a destination
if self.destination.is_unknown() {
return false;
}

// Must have an ephemeral public key
if self.ephemeral_public_key.is_none() {
return false;
}

// Must have a signature
if self.message_signature.is_empty() {
return false;
}
}

true
}
}

Expand Down
37 changes: 10 additions & 27 deletions comms/dht/src/inbound/decryption.rs
Expand Up @@ -56,6 +56,8 @@ enum DecryptionError {
MessageRejectDecryptionFailed,
#[error("Failed to decode envelope body")]
EnvelopeBodyDecodeFailed,
#[error("Bad clear-text message semantics")]
BadClearTextMessageSemantics,
}

/// This layer is responsible for attempting to decrypt inbound messages.
Expand Down Expand Up @@ -294,36 +296,17 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
///
/// These failure modes are detectable by any node, so it is generally safe to ban an offending peer.
fn initial_validation(message: DhtInboundMessage) -> Result<ValidatedDhtInboundMessage, DecryptionError> {
// If an unencrypted message has no signature, it passes this validation automatically
if !message.dht_header.flags.is_encrypted() && message.dht_header.message_signature.is_empty() {
return Ok(ValidatedDhtInboundMessage::new(message, None));
}

// If the message is encrypted:
// - it must be nonempty
// - it needs a destination
// - it needs an ephemeral public key
// - it needs a signature
if message.dht_header.flags.is_encrypted() {
// Must be nonempty
if message.body.is_empty() {
return Err(DecryptionError::BadEncryptedMessageSemantics);
}

// Must have a destination
if message.dht_header.destination.is_unknown() {
return Err(DecryptionError::BadEncryptedMessageSemantics);
}

// Must have an ephemeral public key
if message.dht_header.ephemeral_public_key.is_none() {
if !message.is_semantically_valid() {
if message.dht_header.flags.is_encrypted() {
return Err(DecryptionError::BadEncryptedMessageSemantics);
} else {
return Err(DecryptionError::BadClearTextMessageSemantics);
}
}

// Must have a signature
if message.dht_header.message_signature.is_empty() {
return Err(DecryptionError::BadEncryptedMessageSemantics);
}
// If a signature is not present, the message is valid at this point
if message.dht_header.message_signature.is_empty() {
return Ok(ValidatedDhtInboundMessage::new(message, None));
}

// If a signature is present, it must be valid
Expand Down
18 changes: 2 additions & 16 deletions comms/dht/src/inbound/forward.rs
Expand Up @@ -186,26 +186,12 @@ where S: Service<DecryptedDhtMessage, Response = (), Error = PipelineError>
dht_header,
is_saf_stored,
is_already_forwarded,
authenticated_origin,
..
} = message;

if self.destination_matches_source(&dht_header.destination, source_peer) {
// #banheuristic - the origin of this message was the destination. Two things are wrong here:
// 1. The origin/destination should not have forwarded this (the destination node didnt do this
// destination_matches_source check)
// 1. The origin sent a message that the destination could not decrypt
// The authenticated source should be banned (malicious), and origin should be temporarily banned
// (bug?)
if let Some(authenticated_origin) = authenticated_origin {
self.dht
.ban_peer(
authenticated_origin.clone(),
OffenceSeverity::High,
"Received message from peer that is destined for that peer. This peer originally sent it.",
)
.await;
}
// The origin/destination should not have forwarded this (the source node didnt do this
// destination_matches_source check)
self.dht
.ban_peer(
source_peer.public_key.clone(),
Expand Down
16 changes: 16 additions & 0 deletions comms/dht/src/inbound/message.rs
Expand Up @@ -91,6 +91,22 @@ impl DhtInboundMessage {
body,
}
}

pub fn is_semantically_valid(&self) -> bool {
if !self.dht_header.is_semantically_valid() {
return false;
}

// If the message is encrypted:
// - it must be nonempty
if self.dht_header.flags.is_encrypted() {
// Body must be nonempty
if self.body.is_empty() {
return false;
}
}
true
}
}

impl Display for DhtInboundMessage {
Expand Down
2 changes: 1 addition & 1 deletion comms/dht/src/message_signature.rs
Expand Up @@ -123,7 +123,7 @@ pub struct ProtoMessageSignature {

#[derive(Debug, thiserror::Error, PartialEq)]
pub enum MessageSignatureError {
#[error("Failed to validate message signature")]
#[error("Message signature does not contain valid scalar bytes")]
InvalidSignatureBytes,
#[error("Message signature contained an invalid public nonce")]
InvalidPublicNonceBytes,
Expand Down
1 change: 1 addition & 0 deletions comms/dht/src/proto/store_forward.proto
Expand Up @@ -15,6 +15,7 @@ package tari.dht.store_forward;
message StoredMessagesRequest {
google.protobuf.Timestamp since = 1;
uint32 request_id = 2;
uint32 limit = 3;
}

// Storage for a single message envelope, including the date and time when the element was stored
Expand Down
38 changes: 12 additions & 26 deletions comms/dht/src/store_forward/error.rs
Expand Up @@ -27,14 +27,12 @@ use tari_comms::{
message::MessageError,
peer_manager::{NodeId, PeerManagerError},
};
use tari_utilities::{byte_array::ByteArrayError, epoch_time::EpochTime};
use thiserror::Error;

use crate::{
actor::DhtActorError,
envelope::DhtMessageError,
error::DhtEncryptError,
inbound::DhtInboundError,
message_signature::MessageSignatureError,
outbound::DhtOutboundError,
storage::StorageError,
Expand All @@ -55,14 +53,12 @@ pub enum StoreAndForwardError {
DhtEncryptError(#[from] DhtEncryptError),
#[error("Received stored message has an invalid destination")]
InvalidDestination,
#[error("DhtInboundError: {0}")]
DhtInboundError(#[from] DhtInboundError),
#[error("Received stored message has an invalid origin signature: {0}")]
InvalidMessageSignature(#[from] MessageSignatureError),
#[error("Invalid envelope body")]
InvalidEnvelopeBody,
#[error("DHT header is invalid")]
InvalidDhtHeader,
#[error("Envelope body is missing a required message part")]
EnvelopeBodyMissingMessagePart,
#[error("DHT header did not pass semantic validation rules")]
BadDhtHeaderSemanticallyInvalid,
#[error("Unable to decrypt received stored message")]
DecryptionFailed,
#[error("DhtActorError: {0}")]
Expand All @@ -71,10 +67,8 @@ pub enum StoreAndForwardError {
DuplicateMessage,
#[error("Unable to decode message: {0}")]
DecodeError(#[from] DecodeError),
#[error("Dht header was not provided")]
DhtHeaderNotProvided,
#[error("The message was malformed")]
MalformedMessage,
#[error("The message envelope was malformed: {0}")]
MalformedEnvelopeBody(DecodeError),
#[error("StorageError: {0}")]
StorageError(#[from] StorageError),
#[error("The store and forward service requester channel closed")]
Expand All @@ -83,24 +77,16 @@ pub enum StoreAndForwardError {
RequestCancelled,
#[error("The {field} field was not valid, discarding SAF response: {details}")]
InvalidSafResponseMessage { field: &'static str, details: String },
#[error("The message has expired, not storing message in SAF db (expiry: {expired}, now: {now})")]
NotStoringExpiredMessage { expired: EpochTime, now: EpochTime },
#[error("MalformedNodeId: {0}")]
MalformedNodeId(String),
#[error("DHT message type should not have been forwarded")]
InvalidDhtMessageType,
#[error("Failed to send request for store and forward messages: {0}")]
RequestMessagesFailed(DhtOutboundError),
#[error("DHT message type should not have been stored/forwarded")]
PeerSentDhtMessageViaSaf,
#[error("SAF message type should not have been stored/forwarded")]
PeerSentSafMessageViaSaf,
#[error("Received SAF messages that were not requested")]
ReceivedUnrequestedSafMessages,
#[error("SAF messages received from peer {peer} after deadline. Received after {message_age:.2?}")]
SafMessagesReceivedAfterDeadline { peer: NodeId, message_age: Duration },
#[error("Invalid SAF request: `stored_at` cannot be in the future")]
StoredAtWasInFuture,
}

impl From<ByteArrayError> for StoreAndForwardError {
fn from(e: ByteArrayError) -> Self {
StoreAndForwardError::MalformedNodeId(e.to_string())
}
#[error("Invariant error (POSSIBLE BUG): {0}")]
InvariantError(String),
}
3 changes: 2 additions & 1 deletion comms/dht/src/store_forward/message.rs
Expand Up @@ -40,14 +40,15 @@ impl StoredMessagesRequest {
Self {
since: None,
request_id: OsRng.next_u32(),
limit: 0,
}
}

#[allow(unused)]
pub fn since(since: DateTime<Utc>) -> Self {
Self {
since: Some(datetime_to_timestamp(since)),
request_id: OsRng.next_u32(),
limit: 0,
}
}
}
Expand Down

0 comments on commit 594e03e

Please sign in to comment.