Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: ensure existing p2p connection is removed before reconnecting #4045

Merged
merged 5 commits into from
Sep 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions engine/src/p2p/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ impl std::fmt::Display for PeerInfo {
enum RegistrationStatus {
/// The node is not yet known to the network (its peer info
/// may not be known to the network yet)
/// (Stores future peers to connect to when then node is registered)
Pending(Vec<PeerInfo>),
Pending,
/// The node is registered, i.e. its peer info has been
/// recorded/updated
Registered,
Expand Down Expand Up @@ -209,6 +208,11 @@ struct P2PContext {
/// A handle to the authenticator thread that can be used to make changes to the
/// list of allowed peers
authenticator: Arc<Authenticator>,
/// Contains all existing ZMQ sockets for "client" connections. Note that ZMQ socket
/// exists even when there is no internal TCP connection (e.g. before the connection
/// is established for the first time, or when ZMQ it is reconnecting). Also, when
/// our own (independent from ZMQ) reconnection mechanism kicks in, the entry is removed
/// (because we don't want ZMQ's socket behaviour).
/// NOTE: The mapping is from AccountId because we want to optimise for message
/// sending, which uses AccountId
active_connections: ActiveConnectionWrapper,
Expand Down Expand Up @@ -277,7 +281,7 @@ pub(super) fn start(
incoming_message_sender,
own_peer_info_sender,
our_account_id,
status: RegistrationStatus::Pending(vec![]),
status: RegistrationStatus::Pending,
};

debug!("Registering peer info for {} peers", current_peers.len());
Expand Down Expand Up @@ -437,6 +441,14 @@ impl P2PContext {
fn reconnect_to_peer(&mut self, account_id: &AccountId) {
if let Some(peer_info) = self.peer_infos.get(account_id) {
info!("Reconnecting to peer: {}", peer_info.account_id);

// It is possible that while we were waiting to reconnect,
// we received a peer info update and created a new "connection".
// This connection might be "healthy", but it is safer/easier to
// remove it and proceed with reconnecting.
if self.active_connections.remove(account_id).is_some() {
debug!("Reconnecting to a peer that's already connected: {}. Existing connection was removed.", account_id);
}
self.connect_to_peer(peer_info.clone());
} else {
error!("Failed to reconnect to peer {account_id}. (Peer info not found.)");
Expand All @@ -452,16 +464,22 @@ impl P2PContext {

let connected_socket = socket.connect(peer);

assert!(self.active_connections.insert(account_id, connected_socket).is_none());
if let Some(old_socket) = self.active_connections.insert(account_id, connected_socket) {
// This should not happen because we always remove existing connection/socket
// prior to connecting, but even if it does, it should be OK to replace the
// connection (this doesn't break any invariants and the new peer info is
// likely to be more up-to-date).
warn!("Replacing existing ZMQ socket: {:?}", old_socket.peer());
}
}

fn handle_own_registration(&mut self, own_info: PeerInfo) {
debug!("Received own node's registration. Starting to connect to peers.");

self.own_peer_info_sender.send(own_info).unwrap();

if let RegistrationStatus::Pending(peers) = &mut self.status {
let peers = std::mem::take(peers);
if let RegistrationStatus::Pending = &mut self.status {
let peers: Vec<_> = self.peer_infos.values().cloned().collect();
// Connect to all outstanding peers
for peer in peers {
self.connect_to_peer(peer)
Expand Down Expand Up @@ -494,10 +512,10 @@ impl P2PContext {
self.x25519_to_account_id.insert(peer.pubkey, peer.account_id.clone());

match &mut self.status {
RegistrationStatus::Pending(peers) => {
// Not ready to start connecting to peers yet
RegistrationStatus::Pending => {
// We will connect to all peers in `self.peer_infos` once we receive our own
// registration
info!("Delaying connecting to {}", peer.account_id);
peers.push(peer);
},
RegistrationStatus::Registered => {
self.connect_to_peer(peer);
Expand Down
86 changes: 61 additions & 25 deletions engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{info_span, Instrument};
use utilities::{
testing::{expect_recv_with_custom_timeout, expect_recv_with_timeout},
testing::{expect_recv_with_timeout, recv_with_custom_timeout},
Port,
};

Expand All @@ -16,7 +16,15 @@ fn create_node_info(id: AccountId, node_key: &ed25519_dalek::Keypair, port: Port
PeerInfo::new(id, pubkey, ip, port)
}

use std::time::Duration;

/// This has to be large enough to account for the possibility of
/// the initial handshake failing and the node having to reconnect
/// after `RECONNECT_INTERVAL`
const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(500);

struct Node {
account_id: AccountId,
msg_sender: UnboundedSender<OutgoingMultisigStageMessages>,
peer_update_sender: UnboundedSender<PeerUpdate>,
_own_peer_info_receiver: UnboundedReceiver<PeerInfo>,
Expand All @@ -36,11 +44,12 @@ fn spawn_node(

let key = P2PKey::new(secret);
let (msg_sender, peer_update_sender, msg_receiver, own_peer_info_receiver, fut) =
super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id);
super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id.clone());

tokio::spawn(fut.instrument(info_span!("node", idx = idx)));

Node {
account_id,
msg_sender,
peer_update_sender,
_own_peer_info_receiver: own_peer_info_receiver,
Expand Down Expand Up @@ -120,12 +129,25 @@ async fn connect_two_nodes() {
let _ = expect_recv_with_timeout(&mut node2.msg_receiver).await;
}

#[tokio::test]
async fn can_connect_after_pubkey_change() {
use std::time::Duration;
async fn send_and_receive_message(from: &Node, to: &mut Node) -> Option<(AccountId, Vec<u8>)> {
println!(
"[{:?}] Sending from {:?} to {:?}",
std::time::Instant::now(),
from.account_id,
to.account_id,
);
from.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
to.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(200);
recv_with_custom_timeout(&mut to.msg_receiver, MAX_CONNECTION_DELAY).await
}

#[tokio::test]
async fn can_connect_after_pubkey_change() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

Expand All @@ -135,38 +157,52 @@ async fn can_connect_after_pubkey_change() {
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8090);

let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Check that node 2 can communicate with node 1:
node2
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi1.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await;
send_and_receive_message(&node2, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2).await.unwrap();

// Node 2 disconnects:
drop(node2);

// Node 2 connects with a different key:
let node_key2b = create_keypair();
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2b, 8091);
let node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Node 1 learn about Node 2's new key:
node1.peer_update_sender.send(PeerUpdate::Registered(pi2.clone())).unwrap();

// Node 2 should be able to send messages again:
node2b
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi1.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();
send_and_receive_message(&node2b, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2b).await.unwrap();
}

/// Test the behaviour around receiving own registration: at first, if our node
/// is not registered, we delay connecting to other nodes; once we receive our
/// own registration, we connect to other registered nodes.
#[tokio::test]
async fn connects_after_registration() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

let pi1 = create_node_info(AccountId::new([1; 32]), &node_key1, 8092);
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8093);

// Node 1 doesn't get its own peer info at first and will wait for registration
let node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// For sanity, check that node 1 can't yet communicate with node 2:
j4m1ef0rd marked this conversation as resolved.
Show resolved Hide resolved
assert!(send_and_receive_message(&node1, &mut node2).await.is_none());

// Update node 1 with its own peer info
node1.peer_update_sender.send(PeerUpdate::Registered(pi1.clone())).unwrap();

// Allow some time for the above command to propagate through the channel
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await;
// It should now be able to communicate with node 2:
assert!(send_and_receive_message(&node1, &mut node2).await.is_some());
}
Loading