Skip to content

Commit

Permalink
Add disconnect/re-connect to base node monitor
Browse files Browse the repository at this point in the history
- This PR disconnects and re-sets a wallet's base node peer if a valid RPC
connection to that peer cannot be obtained, but only for the wallet's base
node monitor service. The spinoff is that all subsequent RPC connections to
the same peer/node will benefit from the comms connection service retrying
to re-establish the peer connection. As the wallet's base node monitor
service runs in a slow loop, it will assist other services that are
dependent on the same base node peer connection; if one service suffers
due to a bad/stale peer connection all services suffer.
- Also implementing sending transaction to all connected peers as per tari-project#3239
- Fixed the base_node_service_config not being initialized with values
from the config file.
  • Loading branch information
hansieodendaal committed Aug 30, 2021
1 parent 5dc10c2 commit 6b128b8
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 4 deletions.
2 changes: 1 addition & 1 deletion base_layer/core/src/mempool/service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ async fn handle_outbound_tx(
exclude_peers: Vec<NodeId>,
) -> Result<(), MempoolServiceError> {
let result = outbound_message_service
.propagate(
.flood(
NodeDestination::Unknown,
OutboundEncryption::ClearText,
exclude_peers,
Expand Down
34 changes: 31 additions & 3 deletions base_layer/wallet/src/base_node_service/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,11 @@ use crate::{
};
use chrono::Utc;
use log::*;
use std::{convert::TryFrom, sync::Arc, time::Duration};
use std::{
convert::TryFrom,
sync::Arc,
time::{Duration, Instant},
};
use tari_common_types::chain_metadata::ChainMetadata;
use tari_comms::{peer_manager::NodeId, protocol::rpc::RpcError};
use tokio::{sync::RwLock, time};
Expand Down Expand Up @@ -79,8 +83,13 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
Err(e @ BaseNodeMonitorError::RpcFailed(_)) => {
warn!(target: LOG_TARGET, "Connectivity failure to base node: {}", e);
debug!(target: LOG_TARGET, "Setting as OFFLINE and retrying...",);

self.set_offline().await;
// Disconnecting and re-setting the base node peer would revive a stale peer connection - most
// the client can do. As the wallet's base node monitor service runs in a slow loop, it will
// assist other services that are dependent on the same base node peer connection; if one service
// suffers due to a bad/stale peer connection all services suffer.
self.reconnect_current_base_node_peer().await;
time::delay_for(self.interval).await;
continue;
},
Err(e @ BaseNodeMonitorError::InvalidBaseNodeResponse(_)) |
Expand All @@ -96,8 +105,9 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
);
}

async fn update_connectivity_status(&self) -> NodeId {
async fn update_connectivity_status(&mut self) -> NodeId {
let mut watcher = self.wallet_connectivity.get_connectivity_status_watch();
let mut start = Instant::now();
loop {
use OnlineStatus::*;
match watcher.recv().await.unwrap_or(Offline) {
Expand All @@ -112,9 +122,27 @@ impl<T: WalletBackend + 'static> BaseNodeMonitor<T> {
self.set_offline().await;
},
}
if Instant::now().duration_since(start) > self.interval {
warn!(
target: LOG_TARGET,
"Could not connect to base node in {}s",
Instant::now().duration_since(start).as_secs()
);
// Disconnecting and re-setting the base node peer would revive a stale peer connection; this breaks
// the unending loop as watcher events are not guaranteed.
self.reconnect_current_base_node_peer().await;
start = Instant::now();
}
}
}

async fn reconnect_current_base_node_peer(&mut self) {
if let Some(peer) = self.wallet_connectivity.get_current_base_node_peer() {
let _ = self.wallet_connectivity.disconnect_base_node(peer.clone()).await;
let _ = self.wallet_connectivity.set_base_node(peer).await;
};
}

async fn monitor_node(&mut self) -> Result<(), BaseNodeMonitorError> {
loop {
let peer_node_id = self.update_connectivity_status().await;
Expand Down
8 changes: 8 additions & 0 deletions base_layer/wallet/src/connectivity_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use tokio::sync::watch;
pub enum WalletConnectivityRequest {
ObtainBaseNodeWalletRpcClient(oneshot::Sender<RpcClientLease<BaseNodeWalletRpcClient>>),
ObtainBaseNodeSyncRpcClient(oneshot::Sender<RpcClientLease<BaseNodeSyncRpcClient>>),
DisconnectBaseNode(Box<Peer>),
}

#[derive(Clone)]
Expand Down Expand Up @@ -63,6 +64,13 @@ impl WalletConnectivityHandle {
Ok(())
}

pub async fn disconnect_base_node(&mut self, base_node_peer: Peer) -> Result<(), WalletConnectivityError> {
self.sender
.send(WalletConnectivityRequest::DisconnectBaseNode(Box::new(base_node_peer)))
.await?;
Ok(())
}

/// Obtain a BaseNodeWalletRpcClient.
///
/// This can be relied on to obtain a pooled BaseNodeWalletRpcClient rpc session from a currently selected base
Expand Down
10 changes: 10 additions & 0 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ impl WalletConnectivityService {
ObtainBaseNodeSyncRpcClient(reply) => {
self.handle_pool_request(reply.into()).await;
},

DisconnectBaseNode(peer) => {
self.disconnect_base_node(*peer).await;
},
}
}

Expand Down Expand Up @@ -205,6 +209,12 @@ impl WalletConnectivityService {
self.base_node_watch.borrow().as_ref().map(|p| p.node_id.clone())
}

async fn disconnect_base_node(&mut self, peer: Peer) {
if let Ok(Some(connection)) = self.connectivity.get_connection(peer.node_id).await {
if connection.clone().disconnect().await.is_ok() {}
};
}

async fn setup_base_node_connection(&mut self) {
self.pools = None;
loop {
Expand Down

0 comments on commit 6b128b8

Please sign in to comment.