Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[r2r] Tendermint multiple rpcs optimization #1568

Merged
merged 36 commits into from
Dec 26, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
19dfcd6
branch from dev
laruh Dec 5, 2022
014bc59
trait RpcCommonOps
laruh Dec 5, 2022
040f25f
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 5, 2022
c6e6b10
import HttpTransportNode after dev merge
laruh Dec 5, 2022
847072d
wip
laruh Dec 6, 2022
99fcd04
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 6, 2022
7f70b63
small changes in errors
laruh Dec 6, 2022
0f26cf9
wasm tests
laruh Dec 6, 2022
5aa99fe
use String in RpcClientInitError
laruh Dec 7, 2022
6e68b6e
iterate_over_urls func
laruh Dec 7, 2022
557adc1
wip
laruh Dec 7, 2022
a58b40f
use iterate_over_urls func in get_rpc_client func. need to handle err…
laruh Dec 7, 2022
476f8d7
wip
laruh Dec 7, 2022
1b77448
wip
laruh Dec 8, 2022
0252557
leave WrongRpcClient in TendermintCoinRpcError
laruh Dec 8, 2022
f3c9753
add RpcClientError for match
laruh Dec 8, 2022
5c684bb
remove some notes
laruh Dec 8, 2022
0632c4c
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 12, 2022
f7a0808
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 12, 2022
e7c23df
fmt
laruh Dec 12, 2022
619b607
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 12, 2022
af6a7e7
save dev state, some notes added
laruh Dec 12, 2022
9f86b4a
wip
laruh Dec 14, 2022
d02353e
wip
laruh Dec 14, 2022
dfc4e3e
wip
laruh Dec 14, 2022
26acb77
impl RpcCommonOps for TendermintCoin, use just AsyncMutex
laruh Dec 14, 2022
28229b5
remove RpcCommonError and RpcClientEnum, use types in trait instead
laruh Dec 14, 2022
f137824
use retain to filter valid urls
laruh Dec 14, 2022
a1532d3
add some notes
laruh Dec 15, 2022
8c71642
return error if we have one or more invalid urls
laruh Dec 15, 2022
238cead
remove pub and dead_code
laruh Dec 15, 2022
3918e1c
use rotate_right
laruh Dec 16, 2022
a25ffaa
Merge remote-tracking branch 'origin/dev' into tendermint-multiple-rp…
laruh Dec 16, 2022
d3b46fe
add rotate_left, into_iter
laruh Dec 16, 2022
5edff34
use slice::join, return MmResult
laruh Dec 19, 2022
b561c11
add check rpc_urls.is_empty
laruh Dec 22, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion mm2src/coins/eth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub use rlp;

#[cfg(test)] mod eth_tests;
#[cfg(target_arch = "wasm32")] mod eth_wasm_tests;
mod web3_transport;
pub mod web3_transport;
shamardy marked this conversation as resolved.
Show resolved Hide resolved

#[path = "eth/v2_activation.rs"] pub mod v2_activation;
use v2_activation::build_address_and_priv_key_policy;
Expand Down
33 changes: 33 additions & 0 deletions mm2src/coins/lp_coins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
#[macro_use] extern crate serde_json;
#[macro_use] extern crate ser_error_derive;

use crate::eth::web3_transport::http_transport::HttpTransportNode;
use crate::tendermint::rpc::HttpClient;
shamardy marked this conversation as resolved.
Show resolved Hide resolved
use async_trait::async_trait;
use base58::FromBase58Error;
use common::executor::{abortable_queue::{AbortableQueue, WeakSpawner},
Expand Down Expand Up @@ -71,6 +73,11 @@ use std::sync::Arc;
use std::time::Duration;
use utxo_signer::with_key_pair::UtxoSignWithKeyPairError;

#[cfg(not(target_arch = "wasm32"))]
mod z_coin_grpc {
tonic::include_proto!("cash.z.wallet.sdk.rpc");
}

cfg_native! {
use crate::lightning::LightningCoin;
use crate::lightning::ln_conf::PlatformCoinConfirmationTargets;
Expand All @@ -81,6 +88,8 @@ cfg_native! {
use std::io;
use zcash_primitives::transaction::Transaction as ZTransaction;
use z_coin::ZcoinProtocolInfo;
use tonic::transport::Channel as TonicChannel;
use z_coin_grpc::compact_tx_streamer_client::CompactTxStreamerClient;
}

cfg_wasm32! {
Expand Down Expand Up @@ -3511,3 +3520,27 @@ where
b.block_height.cmp(&a.block_height)
}
}

#[derive(Display, Debug)]
pub enum RpcCommonError {
FindClientError(String),
WrongRpcClient,
}

#[derive(Debug)]
pub enum RpcClientEnum {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid adding such common RPC enum?
In my opinion, RpcClientEnum could be used as a return type of MmCoinEnum::get_rpc_client. As I can see, currently, RpcCommonOps::get_rpc_client is used when the RPC client type is known.
https://github.com/KomodoPlatform/atomicDEX-API/blob/af6a7e7ad7f5ea329e081a63b5df732224b50c7c/mm2src/coins/tendermint/tendermint_coin.rs#L439-L445

In conclusion, I believe that we could refactor this way:

/// Use trait in the case, when we have to send requests to rpc client.
#[async_trait]
pub trait RpcCommonOps {
    type RpcClient;
    type Error;

    /// Returns an alive RPC client or returns an error if no RPC endpoint is currently available.
    async fn get_rpc_client(&self) -> Result<Self::RpcClient, Self::Error>;
}

Please note that RpcCommonOps::iterate_over_urls is going to be used within RpcCommonOps::get_rpc_client method implementation only, so we can remove it from from the public RpcCommonOps interface.

Copy link
Member Author

@laruh laruh Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually thank you for this hint!
I removed enums due to adding types in trait.
So I can just implement this. it's like you suggested below.

impl RpcCommonOps for TendermintCoin {
    type RpcClient = HttpClient;
    type Error = TendermintCoinRpcError;

    async fn get_live_client(&self) -> Result<Self::RpcClient, Self::Error> {

please note, that we use get_live_client function like one line only in rpc_client method from trait TendermintCommons.
But I can not remove rpc_client from trait and use just get_live_client instead, bcz we need this function regarding trait CoinCapabilities: TendermintCommons + CoinWithTxHistoryV2 + MmCoin + MarketCoinOps {}.
This explanation is just in case there is a question whether rpc_client should be removed.

#[cfg(not(target_arch = "wasm32"))]
ZcoinRpcClient(CompactTxStreamerClient<TonicChannel>),
TendermintHttpClient(HttpClient),
Web3HttpTransportNode(HttpTransportNode),
}

/// Use trait in the case, when we have to send requests to rpc client.
#[async_trait]
pub trait RpcCommonOps {
/// get alive client from custom `RpcClient` structure, that contains one rpc client and vector of rpc urls.
async fn get_rpc_client(&self) -> Result<RpcClientEnum, RpcCommonError>;

/// if rpc client isn't available anymore, iterate over rpc urls, to get the first available client.
async fn iterate_over_urls(&self) -> Result<RpcClientEnum, RpcCommonError>;
shamardy marked this conversation as resolved.
Show resolved Hide resolved
}
2 changes: 1 addition & 1 deletion mm2src/coins/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// https://docs.cosmos.network/

mod iris;
mod rpc;
pub mod rpc;
shamardy marked this conversation as resolved.
Show resolved Hide resolved
mod tendermint_coin;
mod tendermint_token;
pub mod tendermint_tx_history_v2;
Expand Down
148 changes: 107 additions & 41 deletions mm2src/coins/tendermint/tendermint_coin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,21 @@ use crate::{big_decimal_from_sat_unsigned, BalanceError, BalanceFut, BigDecimal,
CoinBalance, CoinFutSpawner, FeeApproxStage, FoundSwapTxSpend, HistorySyncState, MarketCoinOps, MmCoin,
NegotiateSwapContractAddrErr, PaymentInstructions, PaymentInstructionsErr, PrivKeyBuildPolicy,
PrivKeyPolicyNotAllowed, RawTransactionError, RawTransactionFut, RawTransactionRequest, RawTransactionRes,
SearchForSwapTxSpendInput, SendMakerPaymentArgs, SendMakerRefundsPaymentArgs,
SendMakerSpendsTakerPaymentArgs, SendTakerPaymentArgs, SendTakerRefundsPaymentArgs,
SendTakerSpendsMakerPaymentArgs, SignatureError, SignatureResult, SwapOps, TradeFee, TradePreimageError,
TradePreimageFut, TradePreimageResult, TradePreimageValue, TransactionDetails, TransactionEnum,
TransactionErr, TransactionFut, TransactionType, TxFeeDetails, TxMarshalingErr,
UnexpectedDerivationMethod, ValidateAddressResult, ValidateFeeArgs, ValidateInstructionsErr,
ValidateOtherPubKeyErr, ValidatePaymentFut, ValidatePaymentInput, VerificationError, VerificationResult,
WatcherOps, WatcherSearchForSwapTxSpendInput, WatcherValidatePaymentInput, WatcherValidateTakerFeeInput,
WithdrawError, WithdrawFut, WithdrawRequest};
use async_std::prelude::FutureExt as AsyncStdFutureExt;
RpcClientEnum, RpcCommonError, RpcCommonOps, SearchForSwapTxSpendInput, SendMakerPaymentArgs,
SendMakerRefundsPaymentArgs, SendMakerSpendsTakerPaymentArgs, SendTakerPaymentArgs,
SendTakerRefundsPaymentArgs, SendTakerSpendsMakerPaymentArgs, SignatureError, SignatureResult, SwapOps,
TradeFee, TradePreimageError, TradePreimageFut, TradePreimageResult, TradePreimageValue,
TransactionDetails, TransactionEnum, TransactionErr, TransactionFut, TransactionType, TxFeeDetails,
TxMarshalingErr, UnexpectedDerivationMethod, ValidateAddressResult, ValidateFeeArgs,
ValidateInstructionsErr, ValidateOtherPubKeyErr, ValidatePaymentFut, ValidatePaymentInput,
VerificationError, VerificationResult, WatcherOps, WatcherSearchForSwapTxSpendInput,
WatcherValidatePaymentInput, WatcherValidateTakerFeeInput, WithdrawError, WithdrawFut, WithdrawRequest};
use async_trait::async_trait;
use bitcrypto::{dhash160, sha256};
use common::executor::{abortable_queue::AbortableQueue, AbortableSystem};
use common::executor::{AbortedError, Timer};
use common::log::warn;
use common::{get_utc_timestamp, log, now_ms, Future01CompatExt, DEX_FEE_ADDR_PUBKEY};
use common::{get_utc_timestamp, now_ms, Future01CompatExt, DEX_FEE_ADDR_PUBKEY};
use cosmrs::bank::MsgSend;
use cosmrs::crypto::secp256k1::SigningKey;
use cosmrs::proto::cosmos::auth::v1beta1::{BaseAccount, QueryAccountRequest, QueryAccountResponse};
Expand Down Expand Up @@ -60,7 +59,6 @@ use std::convert::TryFrom;
use std::ops::Deref;
use std::str::FromStr;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use uuid::Uuid;

// ABCI Request Paths
Expand Down Expand Up @@ -164,9 +162,52 @@ impl TendermintConf {
}
}

struct TendermintRpcClient {
rpc_urls: Vec<String>,
rpc_client: AsyncMutex<HttpClient>,
}

#[async_trait]
impl RpcCommonOps for TendermintRpcClient {
async fn get_rpc_client(&self) -> Result<RpcClientEnum, RpcCommonError> {
let mut rpc_client = self.rpc_client.lock().await;
match rpc_client.perform(HealthRequest).await {
Ok(_) => Ok(RpcClientEnum::TendermintHttpClient(rpc_client.clone())),
// try HealthRequest one more time
Err(_) => match rpc_client.perform(HealthRequest).await {
Ok(_) => Ok(RpcClientEnum::TendermintHttpClient(rpc_client.clone())),
Err(_) => {
let new_client = self.iterate_over_urls().await?;
match new_client {
RpcClientEnum::TendermintHttpClient(client) => {
*rpc_client = client.clone();
Ok(RpcClientEnum::TendermintHttpClient(client))
},
_ => Err(RpcCommonError::WrongRpcClient),
}
},
},
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider the following changes:

#[derive(Clone)]
struct TendermintRpcClient(Arc<AsyncMutex<TendermintRpcClientImpl>>);

struct TendermintRpcClientImpl {
    rpc_clients: Vec<HttpClient>,
    current: usize,
}

#[async_trait]
impl RpcCommonOps for TendermintCoin {
    type RpcClient = TendermintRpcClient;
    type Error = TendermintCoinRpcError;

    async fn get_rpc_client(&self) -> Result<Self::RpcClient, Self::Error> {
        let mut rpc /*: AsyncGuard<TendermintRpcClientImpl>*/ = self.rpc_client.lock().await;

        match rpc.perform(HealthRequest).await {
            // As I suggested above.
            Ok(_) => return Ok(self.rpc_client.clone()),
            Err(_) => (),
        }

        // Let's imagine there are 3 RPC clients, and we had an active RPC2:
        // | RPC1 | RPC2 | RPC3 |
        //           |
        //         Active
        //
        // Then we should give every RPC client one attempt, but starting with `RPC3`. The order is:
        // RPC3, RPC1, RPC2 (one more time).
        for ...
    }
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, could you tell, why do we need Arc here? As I can see we dont need to clone structure and share its instances.
Should we just use this ?

struct TendermintRpcClient {
    rpc_urls: Vec<String>,
    rpc_client: AsyncMutex<HttpClient>,
    current: AtomicUsize,
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't need to clone, let's avoid using Arc

Copy link
Member Author

@laruh laruh Dec 15, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the code example. I left struct TendermintRpcClient(AsyncMutex<TendermintRpcClientImpl>) and used

type RpcClient = HttpClient;
type Error = TendermintCoinRpcError;

UPD also I implemented pushing current url to the end

}

async fn iterate_over_urls(&self) -> Result<RpcClientEnum, RpcCommonError> {
let urls = &self.rpc_urls;
for url in urls {
let client = HttpClient::new(url.as_str());
if let Ok(client) = client {
if client.perform(HealthRequest).await.is_ok() {
return Ok(RpcClientEnum::TendermintHttpClient(client.clone()));
}
}
}
Err(RpcCommonError::FindClientError(
"All the current rpc nodes are unavailable.".to_string(),
))
}
}

pub struct TendermintCoinImpl {
ticker: String,
rpc_clients: Vec<HttpClient>,
/// As seconds
avg_blocktime: u8,
/// My address
Expand All @@ -183,6 +224,7 @@ pub struct TendermintCoinImpl {
/// or on [`MmArc::stop`].
pub(super) abortable_system: AbortableQueue,
pub(crate) history_sync_state: Mutex<HistorySyncState>,
client_impl: TendermintRpcClient,
}

#[derive(Clone)]
Expand All @@ -206,6 +248,7 @@ pub enum TendermintInitErrorKind {
InvalidPrivKey(String),
CouldNotGenerateAccountId(String),
EmptyRpcUrls,
#[display(fmt = "Fail to init HttpClient during rpc urls iteration {}", _0)]
RpcClientInitError(String),
InvalidChainId(String),
InvalidDenom(String),
Expand All @@ -226,12 +269,18 @@ pub enum TendermintCoinRpcError {
Prost(DecodeError),
InvalidResponse(String),
PerformError(String),
WrongRpcClient,
RpcClientError(RpcCommonError),
}

impl From<DecodeError> for TendermintCoinRpcError {
fn from(err: DecodeError) -> Self { TendermintCoinRpcError::Prost(err) }
}

impl From<RpcCommonError> for TendermintCoinRpcError {
fn from(err: RpcCommonError) -> Self { TendermintCoinRpcError::RpcClientError(err) }
}

impl From<TendermintCoinRpcError> for WithdrawError {
fn from(err: TendermintCoinRpcError) -> Self { WithdrawError::Transport(err.to_string()) }
}
Expand All @@ -242,6 +291,8 @@ impl From<TendermintCoinRpcError> for BalanceError {
TendermintCoinRpcError::InvalidResponse(e) => BalanceError::InvalidResponse(e),
TendermintCoinRpcError::Prost(e) => BalanceError::InvalidResponse(e.to_string()),
TendermintCoinRpcError::PerformError(e) => BalanceError::Transport(e),
TendermintCoinRpcError::WrongRpcClient => BalanceError::Internal("Wrong rpc client type".to_string()),
TendermintCoinRpcError::RpcClientError(e) => BalanceError::Transport(format!("{}", e)),
}
}
}
Expand All @@ -252,6 +303,10 @@ impl From<TendermintCoinRpcError> for ValidatePaymentError {
TendermintCoinRpcError::InvalidResponse(e) => ValidatePaymentError::InvalidRpcResponse(e),
TendermintCoinRpcError::Prost(e) => ValidatePaymentError::InvalidRpcResponse(e.to_string()),
TendermintCoinRpcError::PerformError(e) => ValidatePaymentError::Transport(e),
TendermintCoinRpcError::WrongRpcClient => {
ValidatePaymentError::InternalError("Wrong rpc client type".to_string())
},
TendermintCoinRpcError::RpcClientError(e) => ValidatePaymentError::Transport(format!("{}", e)),
}
}
}
Expand Down Expand Up @@ -381,25 +436,12 @@ impl TendermintCommons for TendermintCoin {
Ok(result)
}

// TODO
// Save one working client to the coin context, only try others once it doesn't
// work anymore.
// Also, try couple times more on health check errors.
async fn rpc_client(&self) -> MmResult<HttpClient, TendermintCoinRpcError> {
for rpc_client in self.rpc_clients.iter() {
match rpc_client.perform(HealthRequest).timeout(Duration::from_secs(3)).await {
Ok(Ok(_)) => return Ok(rpc_client.clone()),
Ok(Err(e)) => log::warn!(
"Recieved error from Tendermint rpc node during health check. Error: {:?}",
e
),
Err(_) => log::warn!("Tendermint rpc node: {:?} got timeout during health check", rpc_client),
};
let client_enum = self.client_impl.get_rpc_client().await?;
match client_enum {
RpcClientEnum::TendermintHttpClient(client) => Ok(client),
_ => MmError::err(TendermintCoinRpcError::WrongRpcClient),
}

MmError::err(TendermintCoinRpcError::PerformError(
"All the current rpc nodes are unavailable.".to_string(),
))
}
}

Expand Down Expand Up @@ -430,17 +472,15 @@ impl TendermintCoin {
}
})?;

let rpc_clients: Result<Vec<HttpClient>, _> = rpc_urls
.iter()
.map(|url| {
HttpClient::new(url.as_str()).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
kind: TendermintInitErrorKind::RpcClientInitError(e.to_string()),
})
})
.collect();
let (rpc_client, rpc_urls) = find_client(rpc_urls).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
kind: TendermintInitErrorKind::RpcClientInitError(e),
})?;

let rpc_clients = rpc_clients?;
let client_impl = TendermintRpcClient {
rpc_urls,
rpc_client: AsyncMutex::new(rpc_client),
};

let chain_id = ChainId::try_from(protocol_info.chain_id).map_to_mm(|e| TendermintInitError {
ticker: ticker.clone(),
Expand Down Expand Up @@ -470,7 +510,6 @@ impl TendermintCoin {

Ok(TendermintCoin(Arc::new(TendermintCoinImpl {
ticker,
rpc_clients,
account_id,
account_prefix: protocol_info.account_prefix,
priv_key: priv_key.to_vec(),
Expand All @@ -483,6 +522,7 @@ impl TendermintCoin {
tokens_info: PaMutex::new(HashMap::new()),
abortable_system,
history_sync_state: Mutex::new(history_sync_state),
client_impl,
})))
}

Expand Down Expand Up @@ -1357,6 +1397,32 @@ impl TendermintCoin {
}
}

fn find_client(rpc_urls: Vec<String>) -> Result<(HttpClient, Vec<String>), String> {
let mut res_urls = rpc_urls.clone();
let mut errors = Vec::new();
let mut clients = Vec::new();
for (i, url) in rpc_urls.iter().enumerate() {
match HttpClient::new(url.as_str()) {
Ok(client) => clients.push(client),
Err(e) => {
errors.push(e);
res_urls.remove(i);
},
}
}
shamardy marked this conversation as resolved.
Show resolved Hide resolved
if let Some(client) = clients.pop() {
if !rpc_urls.is_empty() {
Ok((client, rpc_urls))
} else {
let errors: String = errors.iter().map(|e| format!("{:?}", e)).collect();
Err(errors)
}
} else {
let errors: String = errors.iter().map(|e| format!("{:?}", e)).collect();
sergeyboyko0791 marked this conversation as resolved.
Show resolved Hide resolved
Err(errors)
}
}

#[async_trait]
#[allow(unused_variables)]
impl MmCoin for TendermintCoin {
Expand Down