Skip to content

Commit

Permalink
improve p2p stack
Browse files Browse the repository at this point in the history
Signed-off-by: ozkanonur <work@onurozkan.dev>
  • Loading branch information
onur-ozkan committed Apr 13, 2023
1 parent 8ce972d commit 0f3196c
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 25 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
## ${next-version} - ${release-date}

**Features:**

**Enhancements/Fixes:**
- p2p stack is improved [#1755](https://github.com/KomodoPlatform/atomicDEX-API/pull/1755)
- - Validate topics if they are mixed or not.
- - Do early return if the message data is not valid (since no point to iterate over and over on the invalid message)
- - Avoid decoding messages that have more than 25 topics


## v1.0.2-beta - 2023-04-11

**Features:**
Expand Down
17 changes: 16 additions & 1 deletion mm2src/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,8 @@ impl Decoder for GossipsubCodec {
type Error = io::Error;

fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
const MAX_TOPICS: usize = 25;

let packet = some_or_return_ok_none!(self.length_codec.decode(src)?);

let rpc = rpc_proto::Rpc::decode(&packet[..])?;
Expand All @@ -228,12 +230,25 @@ impl Decoder for GossipsubCodec {
"sequence number has an incorrect size",
));
}

let topics = publish.topic_ids.into_iter().map(TopicHash::from_raw);
if topics.len() > MAX_TOPICS {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Too many topics provided. Allowed topic count: {}, Received topic count: {}",
MAX_TOPICS,
topics.len()
),
));
}

messages.push(GossipsubMessage {
source: PeerId::from_bytes(&publish.from.unwrap_or_default())
.map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "Invalid Peer Id"))?,
data: publish.data.unwrap_or_default(),
sequence_number: BigEndian::read_u64(&seq_no),
topics: publish.topic_ids.into_iter().map(TopicHash::from_raw).collect(),
topics: topics.collect(),
});
}

Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1,
use sha2::{Digest, Sha256};

pub use atomicdex_behaviour::{spawn_gossipsub, AdexBehaviourError, NodeType, WssCerts};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId, TopicHash};
pub use libp2p::identity::error::DecodingError;
pub use libp2p::identity::secp256k1::PublicKey as Libp2pSecpPublic;
pub use libp2p::identity::PublicKey as Libp2pPublic;
Expand Down
45 changes: 38 additions & 7 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use mm2_err_handle::prelude::*;
use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, AdexCmdTx, AdexEventRx, AdexResponse,
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic,
MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
use mm2_libp2p::{decode_message, decode_signed, encode_message, DecodingError, GossipsubMessage, Libp2pPublic,
Libp2pSecpPublic, MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR};
use mm2_metrics::{mm_label, mm_timing};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
Expand All @@ -39,7 +39,8 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;
use wasm_timer::Instant;

use crate::mm2::{lp_ordermatch, lp_stats, lp_swap};
use crate::mm2::lp_ordermatch::{self, new_protocol::OrdermatchMessage};
use crate::mm2::{lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;

Expand Down Expand Up @@ -138,36 +139,66 @@ async fn process_p2p_message(
mut message: GossipsubMessage,
i_am_relay: bool,
) {
fn is_valid(topics: &Vec<TopicHash>) -> bool {
if topics.is_empty() {
return false;
}

let first_topic_prefix = &topics[0].as_str().split(TOPIC_SEPARATOR).next().unwrap_or_default();
for item in topics.iter().skip(1) {
if !item.as_str().starts_with(first_topic_prefix) {
return false;
}
}

true
}

let mut to_propagate = false;
let mut orderbook_pairs = vec![];

message.topics.dedup();
drop_mutability!(message);

if !is_valid(&message.topics) {
return;
}

for topic in message.topics {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);
match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
if let Some(pair) = split.next() {
if decode_signed::<OrdermatchMessage>(&message.data).is_err() {
return;
};

orderbook_pairs.push(pair.to_string());
}
},
Some(lp_swap::SWAP_PREFIX) => {
lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await;
if lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data)
.await
.is_err()
{
return;
}

to_propagate = true;
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
lp_swap::process_watcher_msg(ctx.clone(), &message.data).await;
if ctx.is_watcher() && lp_swap::process_watcher_msg(ctx.clone(), &message.data).is_err() {
return;
}

to_propagate = true;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
continue;
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
Expand Down
3 changes: 2 additions & 1 deletion mm2src/mm2_main/src/lp_ordermatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ pub use lp_bot::{start_simple_market_maker_bot, stop_simple_market_maker_bot, St

#[path = "lp_ordermatch/my_orders_storage.rs"]
mod my_orders_storage;
#[path = "lp_ordermatch/new_protocol.rs"] mod new_protocol;
#[path = "lp_ordermatch/new_protocol.rs"]
pub(crate) mod new_protocol;
#[path = "lp_ordermatch/order_requests_tracker.rs"]
mod order_requests_tracker;
#[path = "lp_ordermatch/orderbook_depth.rs"] mod orderbook_depth;
Expand Down
19 changes: 11 additions & 8 deletions mm2src/mm2_main/src/lp_swap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@
// marketmaker
//

use crate::mm2::lp_network::{broadcast_p2p_msg, Libp2pPeerId};
use super::lp_network::P2PRequestResult;
use crate::mm2::lp_network::{broadcast_p2p_msg, Libp2pPeerId, P2PRequestError};
use bitcrypto::{dhash160, sha256};
use coins::eth::Web3RpcError;
use coins::{lp_coinfind, lp_coinfind_or_err, CoinFindError, MmCoinEnum, TradeFee, TransactionEnum};
Expand Down Expand Up @@ -97,7 +98,7 @@ use std::sync::atomic::{AtomicU64, Ordering};
#[path = "lp_swap/recreate_swap_data.rs"] mod recreate_swap_data;
#[path = "lp_swap/saved_swap.rs"] mod saved_swap;
#[path = "lp_swap/swap_lock.rs"] mod swap_lock;
#[path = "lp_swap/swap_watcher.rs"] mod swap_watcher;
#[path = "lp_swap/swap_watcher.rs"] pub(crate) mod swap_watcher;
#[path = "lp_swap/taker_swap.rs"] mod taker_swap;
#[path = "lp_swap/trade_preimage.rs"] mod trade_preimage;

Expand Down Expand Up @@ -236,10 +237,12 @@ pub fn broadcast_p2p_tx_msg(ctx: &MmArc, topic: String, msg: &TransactionEnum, p
broadcast_p2p_msg(ctx, vec![topic], encoded_msg, from);
}

pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) {
pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) -> P2PRequestResult<()> {
let uuid = match Uuid::from_str(topic) {
Ok(u) => u,
Err(_) => return,
Err(e) => {
return MmError::err(P2PRequestError::DecodeError(e.to_string()));
},
};

let msg = match decode_signed::<SwapMsg>(msg) {
Expand All @@ -258,9 +261,7 @@ pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) {
error!("Couldn't deserialize 'SwapStatus': {:?}", swap_status_err);
},
};
// Drop it to avoid dead_code warning
drop(swap_msg_err);
return;
return MmError::err(P2PRequestError::DecodeError(swap_msg_err.to_string()));
},
};

Expand All @@ -280,7 +281,9 @@ pub async fn process_msg(ctx: MmArc, topic: &str, msg: &[u8]) {
} else {
warn!("Received message from unexpected sender for swap {}", uuid);
}
}
};

Ok(())
}

pub fn swap_topic(uuid: &Uuid) -> String { pub_sub_topic(SWAP_PREFIX, &uuid.to_string()) }
Expand Down
15 changes: 8 additions & 7 deletions mm2src/mm2_main/src/lp_swap/swap_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::{broadcast_p2p_tx_msg, get_payment_locktime, lp_coinfind, min_watcher_reward, taker_payment_spend_deadline,
tx_helper_topic, H256Json, SwapsContext, WAIT_CONFIRM_INTERVAL};
use crate::mm2::lp_network::{P2PRequestError, P2PRequestResult};
use crate::mm2::MmError;
use async_trait::async_trait;
use coins::{CanRefundHtlc, ConfirmPaymentInput, FoundSwapTxSpend, MmCoinEnum, RefundPaymentArgs,
Expand Down Expand Up @@ -513,14 +514,12 @@ impl LastState for Stopped {
async fn on_changed(self: Box<Self>, _watcher_ctx: &mut Self::Ctx) -> Self::Result {}
}

pub async fn process_watcher_msg(ctx: MmArc, msg: &[u8]) {
pub fn process_watcher_msg(ctx: MmArc, msg: &[u8]) -> P2PRequestResult<()> {
let msg = match decode_signed::<SwapWatcherMsg>(msg) {
Ok(m) => m,
Err(watcher_msg_err) => {
error!("Couldn't deserialize 'SwapWatcherMsg': {:?}", watcher_msg_err);
// Drop it to avoid dead_code warning
drop(watcher_msg_err);
return;
Err(e) => {
error!("Couldn't deserialize 'SwapWatcherMsg': {:?}", e);
return MmError::err(P2PRequestError::DecodeError(e.to_string()));
},
};

Expand All @@ -530,7 +529,9 @@ pub async fn process_watcher_msg(ctx: MmArc, msg: &[u8]) {
SwapWatcherMsg::TakerSwapWatcherMsg(watcher_data) => {
spawn_taker_swap_watcher(ctx, watcher_data, verified_pubkey.to_bytes())
},
}
};

Ok(())
}

/// Currently, Taker Swap Watcher is supported only.
Expand Down

0 comments on commit 0f3196c

Please sign in to comment.