Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

validate p2p messages and topics #1755

Merged
merged 10 commits into from
Apr 22, 2023
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
**Features:**

**Enhancements/Fixes:**
- p2p stack is improved [#1755](https://github.com/KomodoPlatform/atomicDEX-API/pull/1755)
- - Validate topics if they are mixed or not.
- - Do early return if the message data is not valid (since no point to iterate over and over on the invalid message)
- - Break the loop right after processing any of `SWAP_PREFIX`, `WATCHER_PREFIX`, `TX_HELPER_PREFIX` topic.
- An issue was fixed where we don't have to wait for all EVM nodes to sync the latest account nonce [#1757](https://github.com/KomodoPlatform/atomicDEX-API/pull/1757)


## v1.0.2-beta - 2023-04-11

**Features:**
Expand Down
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ const ABCI_REQUEST_PROVE: bool = false;
/// 0.25 is good average gas price on atom and iris
const DEFAULT_GAS_PRICE: f64 = 0.25;
pub(super) const TIMEOUT_HEIGHT_DELTA: u64 = 100;
pub const GAS_LIMIT_DEFAULT: u64 = 100_000;
pub const GAS_LIMIT_DEFAULT: u64 = 125_000;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you document(either here in the PR comments or in the changelog) why you are changing this value?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because HTLCs requiring more gas than our limit lately. Since gas prices can be updated from the coins configuration, this should never be problem even if we make it 200_000.

pub(crate) const TX_DEFAULT_MEMO: &str = "";

// https://github.com/irisnet/irismod/blob/5016c1be6fdbcffc319943f33713f4a057622f0a/modules/htlc/types/validation.go#L19-L22
Expand Down
2 changes: 1 addition & 1 deletion mm2src/mm2_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use secp256k1::{Message as SecpMessage, PublicKey as Secp256k1Pubkey, Secp256k1,
use sha2::{Digest, Sha256};

pub use atomicdex_behaviour::{spawn_gossipsub, AdexBehaviourError, NodeType, WssCerts};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId};
pub use atomicdex_gossipsub::{GossipsubEvent, GossipsubMessage, MessageId, TopicHash};
pub use libp2p::identity::error::DecodingError;
pub use libp2p::identity::secp256k1::PublicKey as Libp2pSecpPublic;
pub use libp2p::identity::PublicKey as Libp2pPublic;
Expand Down
99 changes: 76 additions & 23 deletions mm2src/mm2_main/src/lp_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use mm2_libp2p::atomicdex_behaviour::{AdexBehaviourCmd, AdexBehaviourEvent, Adex
AdexResponseChannel};
use mm2_libp2p::peers_exchange::PeerAddresses;
use mm2_libp2p::{decode_message, encode_message, DecodingError, GossipsubMessage, Libp2pPublic, Libp2pSecpPublic,
MessageId, NetworkPorts, PeerId, TOPIC_SEPARATOR};
MessageId, NetworkPorts, PeerId, TopicHash, TOPIC_SEPARATOR};
use mm2_metrics::{mm_label, mm_timing};
#[cfg(test)] use mocktopus::macros::*;
use parking_lot::Mutex as PaMutex;
Expand All @@ -39,7 +39,8 @@ use std::net::ToSocketAddrs;
use std::sync::Arc;
use wasm_timer::Instant;

use crate::mm2::{lp_ordermatch, lp_stats, lp_swap};
use crate::mm2::lp_ordermatch;
use crate::mm2::{lp_stats, lp_swap};

pub type P2PRequestResult<T> = Result<T, MmError<P2PRequestError>>;

Expand Down Expand Up @@ -138,36 +139,99 @@ async fn process_p2p_message(
mut message: GossipsubMessage,
i_am_relay: bool,
) {
fn is_valid(topics: &[TopicHash]) -> Result<(), String> {
if topics.is_empty() {
return Err("At least one topic must be provided.".to_string());
}

let first_topic_prefix = topics[0].as_str().split(TOPIC_SEPARATOR).next().unwrap_or_default();
for item in topics.iter().skip(1) {
if !item.as_str().starts_with(first_topic_prefix) {
return Err(format!(
"Topics are invalid, received more than one topic kind. Topics '{:?}",
shamardy marked this conversation as resolved.
Show resolved Hide resolved
topics
));
}
}

Ok(())
}

let mut to_propagate = false;
let mut orderbook_pairs = vec![];

message.topics.dedup();
drop_mutability!(message);

for topic in message.topics {
if let Err(err) = is_valid(&message.topics) {
log::error!("{}", err);
return;
}

let inform_about_break = |used: &str, all: &[TopicHash]| {
log::debug!(
"Topic '{}' proceed and loop is killed. Whole topic list was '{:?}'",
used,
all
);
};
shamardy marked this conversation as resolved.
Show resolved Hide resolved

for topic in message.topics.iter() {
let mut split = topic.as_str().split(TOPIC_SEPARATOR);

match split.next() {
Some(lp_ordermatch::ORDERBOOK_PREFIX) => {
if let Some(pair) = split.next() {
orderbook_pairs.push(pair.to_string());
let fut = lp_ordermatch::handle_orderbook_msg(
ctx.clone(),
&message.topics,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if let Err(e) = fut.await {
if e.get_inner().is_warning() {
log::warn!("{}", e);
} else {
log::error!("{}", e);
}
return;
}

to_propagate = true;
break;
},
Some(lp_swap::SWAP_PREFIX) => {
lp_swap::process_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await;
if let Err(e) =
lp_swap::process_swap_msg(ctx.clone(), split.next().unwrap_or_default(), &message.data).await
{
log::error!("{}", e);
return;
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
rozhkovdmitrii marked this conversation as resolved.
Show resolved Hide resolved
},
Some(lp_swap::WATCHER_PREFIX) => {
if ctx.is_watcher() {
lp_swap::process_watcher_msg(ctx.clone(), &message.data).await;
if let Err(e) = lp_swap::process_watcher_msg(ctx.clone(), &message.data) {
log::error!("{}", e);
return;
}
}

to_propagate = true;

inform_about_break(topic.as_str(), &message.topics);
break;
},
Some(lp_swap::TX_HELPER_PREFIX) => {
if let Some(pair) = split.next() {
if let Ok(Some(coin)) = lp_coinfind(&ctx, pair).await {
if let Err(e) = coin.tx_enum_from_bytes(&message.data) {
log::error!("Message cannot continue the process due to: {:?}", e);
continue;
return;
};

let fut = coin.send_raw_tx_bytes(&message.data);
Expand All @@ -182,25 +246,14 @@ async fn process_p2p_message(
})
}
}

inform_about_break(topic.as_str(), &message.topics);
break;
},
None | Some(_) => (),
}
}

if !orderbook_pairs.is_empty() {
let process_fut = lp_ordermatch::process_msg(
ctx.clone(),
orderbook_pairs,
peer_id.to_string(),
&message.data,
i_am_relay,
);

if process_fut.await {
to_propagate = true;
}
}

if to_propagate && i_am_relay {
propagate_message(&ctx, message_id, peer_id);
}
Expand Down