Skip to content

Commit

Permalink
add message validation for gossipsub messages from txpool (#936)
Browse files Browse the repository at this point in the history
Closes #595 

For gossipsub messages, we need to validate **all** the messages.
Messages will not be propagated unless a _timely_ validation is
reported.

I have updated the gossipsub tests, now instead of just checking that
Node A sends a gossipsub message to Node B,
we also check that Node C gets the propagated message from Node B.
Of course if the validation is positive, if the message is rejected Node
C should not receive it.

Co-authored-by: Green Baneling <XgreenX9999@gmail.com>
Co-authored-by: Voxelot <brandonkite92@gmail.com>
  • Loading branch information
3 people committed Jan 26, 2023
1 parent 805d5dc commit 1542d93
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 69 deletions.
7 changes: 4 additions & 3 deletions crates/fuel-core/src/service/adapters/txpool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use fuel_core_types::{
block_importer::ImportResult,
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
TransactionGossipData,
},
},
Expand Down Expand Up @@ -74,11 +75,11 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {

fn notify_gossip_transaction_validity(
&self,
message: &Self::GossipedTransaction,
message_info: GossipsubMessageInfo,
validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()> {
self.service
.notify_gossip_transaction_validity(message, validity)
.notify_gossip_transaction_validity(message_info, validity)
}
}

Expand All @@ -99,7 +100,7 @@ impl fuel_core_txpool::ports::PeerToPeer for P2PAdapter {

fn notify_gossip_transaction_validity(
&self,
_message: &Self::GossipedTransaction,
_message_info: GossipsubMessageInfo,
_validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()> {
Ok(())
Expand Down
106 changes: 90 additions & 16 deletions crates/services/p2p/src/p2p_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -566,6 +566,7 @@ mod tests {
RequestMessage,
ResponseChannelItem,
},
service::to_message_acceptance,
};
use fuel_core_types::{
blockchain::{
Expand All @@ -581,6 +582,7 @@ mod tests {
SealedBlockHeader,
},
fuel_tx::Transaction,
services::p2p::GossipsubMessageAcceptance,
};
use futures::StreamExt;
use libp2p::{
Expand Down Expand Up @@ -1160,33 +1162,69 @@ mod tests {

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_tx() {
gossipsub_broadcast(GossipsubBroadcastRequest::NewTx(Arc::new(
Transaction::default(),
)))
async fn gossipsub_broadcast_tx_with_accept() {
gossipsub_broadcast(
GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default())),
GossipsubMessageAcceptance::Accept,
)
.await;
}

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_tx_with_reject() {
gossipsub_broadcast(
GossipsubBroadcastRequest::NewTx(Arc::new(Transaction::default())),
GossipsubMessageAcceptance::Reject,
)
.await;
}

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_vote_with_accept() {
gossipsub_broadcast(
GossipsubBroadcastRequest::ConsensusVote(Arc::new(ConsensusVote::default())),
GossipsubMessageAcceptance::Accept,
)
.await;
}

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_vote_with_reject() {
gossipsub_broadcast(
GossipsubBroadcastRequest::ConsensusVote(Arc::new(ConsensusVote::default())),
GossipsubMessageAcceptance::Reject,
)
.await;
}

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_vote() {
gossipsub_broadcast(GossipsubBroadcastRequest::ConsensusVote(Arc::new(
ConsensusVote::default(),
)))
async fn gossipsub_broadcast_block_with_accept() {
gossipsub_broadcast(
GossipsubBroadcastRequest::NewBlock(Arc::new(Block::default())),
GossipsubMessageAcceptance::Accept,
)
.await;
}

#[tokio::test]
#[instrument]
async fn gossipsub_broadcast_block() {
gossipsub_broadcast(GossipsubBroadcastRequest::NewBlock(Arc::new(
Block::default(),
)))
async fn gossipsub_broadcast_block_with_ignore() {
gossipsub_broadcast(
GossipsubBroadcastRequest::NewBlock(Arc::new(Block::default())),
GossipsubMessageAcceptance::Ignore,
)
.await;
}

/// Reusable helper function for Broadcasting Gossipsub requests
async fn gossipsub_broadcast(broadcast_request: GossipsubBroadcastRequest) {
async fn gossipsub_broadcast(
broadcast_request: GossipsubBroadcastRequest,
acceptance: GossipsubMessageAcceptance,
) {
let mut p2p_config = Config::default_initialized("gossipsub_exchanges_messages");

let selected_topic: GossipTopic = {
Expand All @@ -1206,8 +1244,17 @@ mod tests {
let mut node_a = node_a_data.create_service(p2p_config.clone());

// Node B
let node_b_data = NodeData::random();
p2p_config.bootstrap_nodes = vec![node_a_data.multiaddr];
let mut node_b = build_service_from_config(p2p_config.clone());
let mut node_b = node_b_data.create_service(p2p_config.clone());

// Node C
p2p_config.bootstrap_nodes = vec![node_b_data.multiaddr];
let mut node_c = build_service_from_config(p2p_config.clone());

// Node C does not connecto to Node A
// it should receive the propagated message from Node B if `GossipsubMessageAcceptance` is `Accept`
node_c.swarm.ban_peer_id(node_a.local_peer_id);

loop {
tokio::select! {
Expand All @@ -1226,7 +1273,12 @@ mod tests {
tracing::info!("Node A Event: {:?}", node_a_event);
},
node_b_event = node_b.next_event() => {
if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, .. }) = node_b_event.clone() {
if let Some(FuelP2PEvent::GossipsubMessage { topic_hash, message, message_id, peer_id }) = node_b_event.clone() {
// Message Validation must be reported
// If it's `Accept`, Node B will propagate the message to Node C
// If it's `Ignore` or `Reject`, Node C should not receive anything
let msg_acceptance = to_message_acceptance(&acceptance);
let _ = node_b.report_message_validation_result(&message_id, &peer_id, msg_acceptance);
if topic_hash != selected_topic.hash() {
tracing::error!("Wrong topic hash, expected: {} - actual: {}", selected_topic.hash(), topic_hash);
panic!("Wrong Topic");
Expand Down Expand Up @@ -1260,11 +1312,33 @@ mod tests {
let broadcast_request = broadcast_request.clone();
matches!(node_b.publish_message(broadcast_request), Err(PublishError::Duplicate));

break
match acceptance {
GossipsubMessageAcceptance::Reject | GossipsubMessageAcceptance::Ignore => {
break
},
_ => {
// the `exit` should happen in Node C
}
}
}

tracing::info!("Node B Event: {:?}", node_b_event);
}

node_c_event = node_c.next_event() => {
if let Some(FuelP2PEvent::GossipsubMessage { peer_id, .. }) = node_c_event.clone() {
// Node B should be the source propagator
assert!(peer_id == node_b.local_peer_id);
match acceptance {
GossipsubMessageAcceptance::Reject | GossipsubMessageAcceptance::Ignore => {
panic!("Node C should not receive Rejected or Ignored messages")
},
GossipsubMessageAcceptance::Accept => {
break
}
}
}
}
};
}
}
Expand Down
51 changes: 18 additions & 33 deletions crates/services/p2p/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use fuel_core_types::{
BlockHeightHeartbeatData,
GossipData,
GossipsubMessageAcceptance,
GossipsubMessageInfo,
TransactionGossipData,
},
};
Expand Down Expand Up @@ -312,19 +313,15 @@ pub struct SharedState {
}

impl SharedState {
pub fn notify_gossip_transaction_validity<'a, T>(
pub fn notify_gossip_transaction_validity(
&self,
message: &'a T,
message_info: GossipsubMessageInfo,
acceptance: GossipsubMessageAcceptance,
) -> anyhow::Result<()>
where
GossipsubMessageInfo: From<&'a T>,
{
let msg_info = message.into();

) -> anyhow::Result<()> {
self.request_sender
.try_send(TaskRequest::RespondWithGossipsubMessageReport((
msg_info, acceptance,
message_info,
acceptance,
)))?;
Ok(())
}
Expand Down Expand Up @@ -439,6 +436,16 @@ where
))
}

pub(crate) fn to_message_acceptance(
acceptance: &GossipsubMessageAcceptance,
) -> MessageAcceptance {
match acceptance {
GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
}
}

fn report_message<T: NetworkCodec>(
p2p_service: &mut FuelP2PService<T>,
message: GossipsubMessageInfo,
Expand All @@ -450,13 +457,10 @@ fn report_message<T: NetworkCodec>(
} = message;

let msg_id = message_id.into();
let peer_id: Vec<u8> = peer_id.into();

if let Ok(peer_id) = peer_id.try_into() {
let acceptance = match acceptance {
GossipsubMessageAcceptance::Accept => MessageAcceptance::Accept,
GossipsubMessageAcceptance::Reject => MessageAcceptance::Reject,
GossipsubMessageAcceptance::Ignore => MessageAcceptance::Ignore,
};
let acceptance = to_message_acceptance(&acceptance);

match p2p_service.report_message_validation_result(&msg_id, &peer_id, acceptance)
{
Expand All @@ -475,25 +479,6 @@ fn report_message<T: NetworkCodec>(
}
}

/// Lightweight representation of gossipped data that only includes IDs
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
pub struct GossipsubMessageInfo {
/// The message id that corresponds to a message payload (typically a unique hash)
pub message_id: Vec<u8>,
/// The ID of the network peer that sent this message
pub peer_id: Vec<u8>,
}

impl<T> From<&GossipData<T>> for GossipsubMessageInfo {
fn from(gossip_data: &GossipData<T>) -> Self {
Self {
message_id: gossip_data.message_id.clone(),
peer_id: gossip_data.peer_id.clone(),
}
}
}

#[cfg(test)]
pub mod tests {
use crate::ports::P2pDb;
Expand Down
3 changes: 2 additions & 1 deletion crates/services/txpool/src/ports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use fuel_core_types::{
block_importer::ImportResult,
p2p::{
GossipsubMessageAcceptance,
GossipsubMessageInfo,
NetworkData,
},
},
Expand All @@ -36,7 +37,7 @@ pub trait PeerToPeer: Send + Sync {
// Report the validity of a transaction received from the network.
fn notify_gossip_transaction_validity(
&self,
message: &Self::GossipedTransaction,
message_info: GossipsubMessageInfo,
validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()>;
}
Expand Down
27 changes: 23 additions & 4 deletions crates/services/txpool/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ use fuel_core_types::{
block_importer::ImportResult,
p2p::{
GossipData,
GossipsubMessageAcceptance,
GossipsubMessageInfo,
TransactionGossipData,
},
txpool::{
Expand Down Expand Up @@ -110,7 +112,7 @@ pub struct Task<P2P, DB> {
#[async_trait::async_trait]
impl<P2P, DB> RunnableService for Task<P2P, DB>
where
P2P: Send + Sync,
P2P: PeerToPeer<GossipedTransaction = TransactionGossipData> + Send + Sync,
DB: TxPoolDb,
{
const NAME: &'static str = "TxPool";
Expand All @@ -130,7 +132,7 @@ where
#[async_trait::async_trait]
impl<P2P, DB> RunnableTask for Task<P2P, DB>
where
P2P: Send + Sync,
P2P: PeerToPeer<GossipedTransaction = TransactionGossipData> + Send + Sync,
DB: TxPoolDb,
{
async fn run(&mut self, watcher: &mut StateWatcher) -> anyhow::Result<bool> {
Expand All @@ -140,16 +142,33 @@ where
should_continue = false;
}
new_transaction = self.gossiped_tx_stream.next() => {
if let Some(GossipData { data: Some(tx), .. }) = new_transaction {
if let Some(GossipData { data: Some(tx), message_id, peer_id }) = new_transaction {
let id = tx.id();
let txs = vec!(Arc::new(tx));
tracing::info_span!("Received tx via gossip", %id)
let mut result = tracing::info_span!("Received tx via gossip", %id)
.in_scope(|| {
self.shared.txpool.lock().insert(
&self.shared.tx_status_sender,
&txs
)
});

if let Some(acceptance) = match result.pop() {
Some(Ok(_)) => {
Some(GossipsubMessageAcceptance::Accept)
},
Some(Err(_)) => {
Some(GossipsubMessageAcceptance::Reject)
}
_ => None
} {
let message_info = GossipsubMessageInfo {
message_id,
peer_id,
};
let _ = self.shared.p2p.notify_gossip_transaction_validity(message_info, acceptance);
}

should_continue = true;
} else {
should_continue = false;
Expand Down
2 changes: 1 addition & 1 deletion crates/services/txpool/src/service/test_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mockall::mock! {

fn notify_gossip_transaction_validity(
&self,
message: &GossipedTransaction,
message_info: GossipsubMessageInfo,
validity: GossipsubMessageAcceptance,
) -> anyhow::Result<()>;
}
Expand Down
Loading

0 comments on commit 1542d93

Please sign in to comment.