Skip to content

Commit

Permalink
Fix: ensure existing p2p connection is removed before reconnecting (#…
Browse files Browse the repository at this point in the history
…4045)

* refactor: simplify initial connection to peers

`peer_infos` has the most up-to-date info on current nodes,
and can't contain duplicates, so we use that instead of maintaining
a separate list of peers to connect to.

* fix: allow overwriting existing p2p connection

* chore: more accurate comment

* refactor: send_and_receive helper in p2p tests

* chore: use different ports in p2p tests
  • Loading branch information
msgmaxim authored and dandanlen committed Oct 9, 2023
1 parent d422fb7 commit 0b2d8c7
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 34 deletions.
36 changes: 27 additions & 9 deletions engine/src/p2p/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ impl std::fmt::Display for PeerInfo {
enum RegistrationStatus {
/// The node is not yet known to the network (its peer info
/// may not be known to the network yet)
/// (Stores future peers to connect to when then node is registered)
Pending(Vec<PeerInfo>),
Pending,
/// The node is registered, i.e. its peer info has been
/// recorded/updated
Registered,
Expand Down Expand Up @@ -209,6 +208,11 @@ struct P2PContext {
/// A handle to the authenticator thread that can be used to make changes to the
/// list of allowed peers
authenticator: Arc<Authenticator>,
/// Contains all existing ZMQ sockets for "client" connections. Note that ZMQ socket
/// exists even when there is no internal TCP connection (e.g. before the connection
/// is established for the first time, or when ZMQ it is reconnecting). Also, when
/// our own (independent from ZMQ) reconnection mechanism kicks in, the entry is removed
/// (because we don't want ZMQ's socket behaviour).
/// NOTE: The mapping is from AccountId because we want to optimise for message
/// sending, which uses AccountId
active_connections: ActiveConnectionWrapper,
Expand Down Expand Up @@ -277,7 +281,7 @@ pub(super) fn start(
incoming_message_sender,
own_peer_info_sender,
our_account_id,
status: RegistrationStatus::Pending(vec![]),
status: RegistrationStatus::Pending,
};

debug!("Registering peer info for {} peers", current_peers.len());
Expand Down Expand Up @@ -437,6 +441,14 @@ impl P2PContext {
fn reconnect_to_peer(&mut self, account_id: &AccountId) {
if let Some(peer_info) = self.peer_infos.get(account_id) {
info!("Reconnecting to peer: {}", peer_info.account_id);

// It is possible that while we were waiting to reconnect,
// we received a peer info update and created a new "connection".
// This connection might be "healthy", but it is safer/easier to
// remove it and proceed with reconnecting.
if self.active_connections.remove(account_id).is_some() {
debug!("Reconnecting to a peer that's already connected: {}. Existing connection was removed.", account_id);
}
self.connect_to_peer(peer_info.clone());
} else {
error!("Failed to reconnect to peer {account_id}. (Peer info not found.)");
Expand All @@ -452,16 +464,22 @@ impl P2PContext {

let connected_socket = socket.connect(peer);

assert!(self.active_connections.insert(account_id, connected_socket).is_none());
if let Some(old_socket) = self.active_connections.insert(account_id, connected_socket) {
// This should not happen because we always remove existing connection/socket
// prior to connecting, but even if it does, it should be OK to replace the
// connection (this doesn't break any invariants and the new peer info is
// likely to be more up-to-date).
warn!("Replacing existing ZMQ socket: {:?}", old_socket.peer());
}
}

fn handle_own_registration(&mut self, own_info: PeerInfo) {
debug!("Received own node's registration. Starting to connect to peers.");

self.own_peer_info_sender.send(own_info).unwrap();

if let RegistrationStatus::Pending(peers) = &mut self.status {
let peers = std::mem::take(peers);
if let RegistrationStatus::Pending = &mut self.status {
let peers: Vec<_> = self.peer_infos.values().cloned().collect();
// Connect to all outstanding peers
for peer in peers {
self.connect_to_peer(peer)
Expand Down Expand Up @@ -494,10 +512,10 @@ impl P2PContext {
self.x25519_to_account_id.insert(peer.pubkey, peer.account_id.clone());

match &mut self.status {
RegistrationStatus::Pending(peers) => {
// Not ready to start connecting to peers yet
RegistrationStatus::Pending => {
// We will connect to all peers in `self.peer_infos` once we receive our own
// registration
info!("Delaying connecting to {}", peer.account_id);
peers.push(peer);
},
RegistrationStatus::Registered => {
self.connect_to_peer(peer);
Expand Down
86 changes: 61 additions & 25 deletions engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{info_span, Instrument};
use utilities::{
testing::{expect_recv_with_custom_timeout, expect_recv_with_timeout},
testing::{expect_recv_with_timeout, recv_with_custom_timeout},
Port,
};

Expand All @@ -16,7 +16,15 @@ fn create_node_info(id: AccountId, node_key: &ed25519_dalek::Keypair, port: Port
PeerInfo::new(id, pubkey, ip, port)
}

use std::time::Duration;

/// This has to be large enough to account for the possibility of
/// the initial handshake failing and the node having to reconnect
/// after `RECONNECT_INTERVAL`
const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(500);

struct Node {
account_id: AccountId,
msg_sender: UnboundedSender<OutgoingMultisigStageMessages>,
peer_update_sender: UnboundedSender<PeerUpdate>,
_own_peer_info_receiver: UnboundedReceiver<PeerInfo>,
Expand All @@ -36,11 +44,12 @@ fn spawn_node(

let key = P2PKey::new(secret);
let (msg_sender, peer_update_sender, msg_receiver, own_peer_info_receiver, fut) =
super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id);
super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id.clone());

tokio::spawn(fut.instrument(info_span!("node", idx = idx)));

Node {
account_id,
msg_sender,
peer_update_sender,
_own_peer_info_receiver: own_peer_info_receiver,
Expand Down Expand Up @@ -120,12 +129,25 @@ async fn connect_two_nodes() {
let _ = expect_recv_with_timeout(&mut node2.msg_receiver).await;
}

#[tokio::test]
async fn can_connect_after_pubkey_change() {
use std::time::Duration;
async fn send_and_receive_message(from: &Node, to: &mut Node) -> Option<(AccountId, Vec<u8>)> {
println!(
"[{:?}] Sending from {:?} to {:?}",
std::time::Instant::now(),
from.account_id,
to.account_id,
);
from.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
to.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(200);
recv_with_custom_timeout(&mut to.msg_receiver, MAX_CONNECTION_DELAY).await
}

#[tokio::test]
async fn can_connect_after_pubkey_change() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

Expand All @@ -135,31 +157,24 @@ async fn can_connect_after_pubkey_change() {
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8090);

let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Since we no longer buffer messages until nodes connect, we
// need to explicitly wait for them to connect (this might take a
// while since one of them is likely to fail on the first try)
tokio::time::sleep(std::time::Duration::from_millis(500)).await;

// Check that node 2 can communicate with node 1:
node2
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi1.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await;
send_and_receive_message(&node2, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2).await.unwrap();

// Node 2 disconnects:
drop(node2);

// Node 2 connects with a different key:
let node_key2b = create_keypair();
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2b, 8091);
let node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Node 1 learn about Node 2's new key:
node1.peer_update_sender.send(PeerUpdate::Registered(pi2.clone())).unwrap();
Expand All @@ -169,13 +184,34 @@ async fn can_connect_after_pubkey_change() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// Node 2 should be able to send messages again:
node2b
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi1.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();
send_and_receive_message(&node2b, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2b).await.unwrap();
}

/// Test the behaviour around receiving own registration: at first, if our node
/// is not registered, we delay connecting to other nodes; once we receive our
/// own registration, we connect to other registered nodes.
#[tokio::test]
async fn connects_after_registration() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8092);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8093);

// Node 1 doesn't get its own peer info at first and will wait for registration
let node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// For sanity, check that node 1 can't yet communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_none());

// Update node 1 with its own peer info
node1.peer_update_sender.send(PeerUpdate::Registered(pi1.clone())).unwrap();

// Allow some time for the above command to propagate through the channel
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await;
// It should now be able to communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_some());
}

0 comments on commit 0b2d8c7

Please sign in to comment.