Skip to content

Commit

Permalink
Add disconnect/re-connect to base node monitor
Browse files Browse the repository at this point in the history
- This PR disconnects and re-sets a wallet's base node peer if a valid RPC
connection to that peer cannot be obtained, but only for the wallet's base
node monitor service. The spinoff is that all subsequent RPC connections to
the same peer/node will benefit from the comms connection service retrying
to re-establish the peer connection. As the wallet's base node monitor
service runs in a slow loop, it will assist other services that are
dependent on the same base node peer connection; if one service suffers
due to a bad/stale peer connection all services suffer.
- Also implementing sending transaction to all connected peers as per tari-project#3239
- Fixed the base_node_service_config not being initialized with values
from the config file.
  • Loading branch information
hansieodendaal committed Aug 30, 2021
1 parent 8a36fb5 commit ca6d309
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 1 deletion.
20 changes: 19 additions & 1 deletion base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ use crate::{
};
use chrono::Utc;
use log::*;
use std::{convert::TryFrom, sync::Arc, time::Duration};
use std::{
convert::TryFrom,
sync::Arc,
time::{Duration, Instant},
};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::protocol::rpc::RpcError;
use tokio::{sync::RwLock, time};
Expand Down Expand Up @@ -78,6 +82,13 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
},
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
debug!(target: LOG_TARGET, "Reconnect current base node peer...");
// Disconnecting and re-setting the base node peer would revive a stale peer connection - most
// the client can do. As the wallet's base node monitor service runs in a slow loop, it will
// assist other services that are dependent on the same base node peer connection; if one service
// suffers due to a bad/stale peer connection all services suffer.
self.reconnect_current_base_node_peer().await;
time::delay_for(self.interval).await;
continue;
},
Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) |
Expand All @@ -93,6 +104,13 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
);
}

async fn reconnect_current_base_node_peer(&mut self) {
if let Some(peer) = self.wallet_connectivity.get_current_base_node_peer() {
let _ = self.wallet_connectivity.disconnect_base_node(peer.clone()).await;
let _ = self.wallet_connectivity.set_base_node(peer).await;
};
}

async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> {
loop {
let mut client = self
Expand Down
8 changes: 8 additions & 0 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use tokio::sync::{mpsc, oneshot, watch};
pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
DisconnectBaseNode(Box<Peer>),
}

#[derive(Clone)]
Expand Down Expand Up @@ -59,6 +60,13 @@ impl WalletConnectivityHandle {
Ok(())
}

pub async fn disconnect_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.sender
.send(WalletConnectivityRequest::DisconnectBaseNode(Box::new(base_node_peer)))
.await?;
Ok(())
}

/// Obtain a BaseNodeWalletRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base
Expand Down
10 changes: 10 additions & 0 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ impl WalletConnectivityService {
ObtainBaseNodeSyncRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},

DisconnectBaseNode(peer) => {
self.disconnect_base_node(*peer).await;
},
}
}

Expand Down Expand Up @@ -204,6 +208,12 @@ impl WalletConnectivityService {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}

async fn disconnect_base_node(&mut self, peer: Peer) {
if let Ok(Some(connection)) = self.connectivity.get_connection(peer.node_id).await {
if connection.clone().disconnect().await.is_ok() {}
};
}

async fn setup_base_node_connection(&mut self) {
self.pools = None;
loop {
Expand Down

0 comments on commit ca6d309

Please sign in to comment.