diff --git a/engine/src/p2p/core.rs b/engine/src/p2p/core.rs index 720f3ee7e8..3f6eb774fa 100644 --- a/engine/src/p2p/core.rs +++ b/engine/src/p2p/core.rs @@ -93,8 +93,7 @@ impl std::fmt::Display for PeerInfo { enum RegistrationStatus { /// The node is not yet known to the network (its peer info /// may not be known to the network yet) - /// (Stores future peers to connect to when then node is registered) - Pending(Vec), + Pending, /// The node is registered, i.e. its peer info has been /// recorded/updated Registered, @@ -209,6 +208,11 @@ struct P2PContext { /// A handle to the authenticator thread that can be used to make changes to the /// list of allowed peers authenticator: Arc, + /// Contains all existing ZMQ sockets for "client" connections. Note that ZMQ socket + /// exists even when there is no internal TCP connection (e.g. before the connection + /// is established for the first time, or when ZMQ it is reconnecting). Also, when + /// our own (independent from ZMQ) reconnection mechanism kicks in, the entry is removed + /// (because we don't want ZMQ's socket behaviour). /// NOTE: The mapping is from AccountId because we want to optimise for message /// sending, which uses AccountId active_connections: ActiveConnectionWrapper, @@ -277,7 +281,7 @@ pub(super) fn start( incoming_message_sender, own_peer_info_sender, our_account_id, - status: RegistrationStatus::Pending(vec![]), + status: RegistrationStatus::Pending, }; debug!("Registering peer info for {} peers", current_peers.len()); @@ -437,6 +441,14 @@ impl P2PContext { fn reconnect_to_peer(&mut self, account_id: &AccountId) { if let Some(peer_info) = self.peer_infos.get(account_id) { info!("Reconnecting to peer: {}", peer_info.account_id); + + // It is possible that while we were waiting to reconnect, + // we received a peer info update and created a new "connection". + // This connection might be "healthy", but it is safer/easier to + // remove it and proceed with reconnecting. + if self.active_connections.remove(account_id).is_some() { + debug!("Reconnecting to a peer that's already connected: {}. Existing connection was removed.", account_id); + } self.connect_to_peer(peer_info.clone()); } else { error!("Failed to reconnect to peer {account_id}. (Peer info not found.)"); @@ -452,7 +464,13 @@ impl P2PContext { let connected_socket = socket.connect(peer); - assert!(self.active_connections.insert(account_id, connected_socket).is_none()); + if let Some(old_socket) = self.active_connections.insert(account_id, connected_socket) { + // This should not happen because we always remove existing connection/socket + // prior to connecting, but even if it does, it should be OK to replace the + // connection (this doesn't break any invariants and the new peer info is + // likely to be more up-to-date). + warn!("Replacing existing ZMQ socket: {:?}", old_socket.peer()); + } } fn handle_own_registration(&mut self, own_info: PeerInfo) { @@ -460,8 +478,8 @@ impl P2PContext { self.own_peer_info_sender.send(own_info).unwrap(); - if let RegistrationStatus::Pending(peers) = &mut self.status { - let peers = std::mem::take(peers); + if let RegistrationStatus::Pending = &mut self.status { + let peers: Vec<_> = self.peer_infos.values().cloned().collect(); // Connect to all outstanding peers for peer in peers { self.connect_to_peer(peer) @@ -494,10 +512,10 @@ impl P2PContext { self.x25519_to_account_id.insert(peer.pubkey, peer.account_id.clone()); match &mut self.status { - RegistrationStatus::Pending(peers) => { - // Not ready to start connecting to peers yet + RegistrationStatus::Pending => { + // We will connect to all peers in `self.peer_infos` once we receive our own + // registration info!("Delaying connecting to {}", peer.account_id); - peers.push(peer); }, RegistrationStatus::Registered => { self.connect_to_peer(peer); diff --git a/engine/src/p2p/core/tests.rs b/engine/src/p2p/core/tests.rs index 42c7bf6ae7..0f285ce275 100644 --- a/engine/src/p2p/core/tests.rs +++ b/engine/src/p2p/core/tests.rs @@ -5,7 +5,7 @@ use state_chain_runtime::AccountId; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tracing::{info_span, Instrument}; use utilities::{ - testing::{expect_recv_with_custom_timeout, expect_recv_with_timeout}, + testing::{expect_recv_with_timeout, recv_with_custom_timeout}, Port, }; @@ -16,7 +16,15 @@ fn create_node_info(id: AccountId, node_key: &ed25519_dalek::Keypair, port: Port PeerInfo::new(id, pubkey, ip, port) } +use std::time::Duration; + +/// This has to be large enough to account for the possibility of +/// the initial handshake failing and the node having to reconnect +/// after `RECONNECT_INTERVAL` +const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(500); + struct Node { + account_id: AccountId, msg_sender: UnboundedSender, peer_update_sender: UnboundedSender, _own_peer_info_receiver: UnboundedReceiver, @@ -36,11 +44,12 @@ fn spawn_node( let key = P2PKey::new(secret); let (msg_sender, peer_update_sender, msg_receiver, own_peer_info_receiver, fut) = - super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id); + super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id.clone()); tokio::spawn(fut.instrument(info_span!("node", idx = idx))); Node { + account_id, msg_sender, peer_update_sender, _own_peer_info_receiver: own_peer_info_receiver, @@ -120,12 +129,25 @@ async fn connect_two_nodes() { let _ = expect_recv_with_timeout(&mut node2.msg_receiver).await; } -#[tokio::test] -async fn can_connect_after_pubkey_change() { - use std::time::Duration; +async fn send_and_receive_message(from: &Node, to: &mut Node) -> Option<(AccountId, Vec)> { + println!( + "[{:?}] Sending from {:?} to {:?}", + std::time::Instant::now(), + from.account_id, + to.account_id, + ); + from.msg_sender + .send(OutgoingMultisigStageMessages::Private(vec![( + to.account_id.clone(), + b"test".to_vec(), + )])) + .unwrap(); - const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(200); + recv_with_custom_timeout(&mut to.msg_receiver, MAX_CONNECTION_DELAY).await +} +#[tokio::test] +async fn can_connect_after_pubkey_change() { let node_key1 = create_keypair(); let node_key2 = create_keypair(); @@ -135,7 +157,7 @@ async fn can_connect_after_pubkey_change() { let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8090); let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]); - let node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]); + let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]); // Since we no longer buffer messages until nodes connect, we // need to explicitly wait for them to connect (this might take a @@ -143,15 +165,8 @@ async fn can_connect_after_pubkey_change() { tokio::time::sleep(std::time::Duration::from_millis(500)).await; // Check that node 2 can communicate with node 1: - node2 - .msg_sender - .send(OutgoingMultisigStageMessages::Private(vec![( - pi1.account_id.clone(), - b"test".to_vec(), - )])) - .unwrap(); - - let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await; + send_and_receive_message(&node2, &mut node1).await.unwrap(); + send_and_receive_message(&node1, &mut node2).await.unwrap(); // Node 2 disconnects: drop(node2); @@ -159,7 +174,7 @@ async fn can_connect_after_pubkey_change() { // Node 2 connects with a different key: let node_key2b = create_keypair(); let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2b, 8091); - let node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]); + let mut node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]); // Node 1 learn about Node 2's new key: node1.peer_update_sender.send(PeerUpdate::Registered(pi2.clone())).unwrap(); @@ -169,13 +184,34 @@ async fn can_connect_after_pubkey_change() { tokio::time::sleep(std::time::Duration::from_millis(100)).await; // Node 2 should be able to send messages again: - node2b - .msg_sender - .send(OutgoingMultisigStageMessages::Private(vec![( - pi1.account_id.clone(), - b"test".to_vec(), - )])) - .unwrap(); + send_and_receive_message(&node2b, &mut node1).await.unwrap(); + send_and_receive_message(&node1, &mut node2b).await.unwrap(); +} + +/// Test the behaviour around receiving own registration: at first, if our node +/// is not registered, we delay connecting to other nodes; once we receive our +/// own registration, we connect to other registered nodes. +#[tokio::test] +async fn connects_after_registration() { + let node_key1 = create_keypair(); + let node_key2 = create_keypair(); + + let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8092); + let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8093); + + // Node 1 doesn't get its own peer info at first and will wait for registration + let node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi2.clone()]); + let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]); + + // For sanity, check that node 1 can't yet communicate with node 2: + assert!(send_and_receive_message(&node1, &mut node2).await.is_none()); + + // Update node 1 with its own peer info + node1.peer_update_sender.send(PeerUpdate::Registered(pi1.clone())).unwrap(); + + // Allow some time for the above command to propagate through the channel + tokio::time::sleep(std::time::Duration::from_millis(100)).await; - let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await; + // It should now be able to communicate with node 2: + assert!(send_and_receive_message(&node1, &mut node2).await.is_some()); }