Skip to content

Commit

Permalink
refactor: send_and_receive helper in p2p tests
Browse files Browse the repository at this point in the history
  • Loading branch information
msgmaxim committed Sep 26, 2023
1 parent 89b5e42 commit d95333a
Showing 1 changed file with 37 additions and 59 deletions.
96 changes: 37 additions & 59 deletions engine/src/p2p/core/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use state_chain_runtime::AccountId;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tracing::{info_span, Instrument};
use utilities::{
testing::{
expect_recv_with_custom_timeout, expect_recv_with_timeout, recv_with_custom_timeout,
},
testing::{expect_recv_with_timeout, recv_with_custom_timeout},
Port,
};

Expand All @@ -18,7 +16,15 @@ fn create_node_info(id: AccountId, node_key: &ed25519_dalek::Keypair, port: Port
PeerInfo::new(id, pubkey, ip, port)
}

use std::time::Duration;

/// This has to be large enough to account for the possibility of
/// the initial handshake failing and the node having to reconnect
/// after `RECONNECT_INTERVAL`
const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(500);

struct Node {
account_id: AccountId,
msg_sender: UnboundedSender<OutgoingMultisigStageMessages>,
peer_update_sender: UnboundedSender<PeerUpdate>,
_own_peer_info_receiver: UnboundedReceiver<PeerInfo>,
Expand All @@ -38,11 +44,12 @@ fn spawn_node(

let key = P2PKey::new(secret);
let (msg_sender, peer_update_sender, msg_receiver, own_peer_info_receiver, fut) =
super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id);
super::start(&key, our_peer_info.port, peer_infos.to_vec(), account_id.clone());

tokio::spawn(fut.instrument(info_span!("node", idx = idx)));

Node {
account_id,
msg_sender,
peer_update_sender,
_own_peer_info_receiver: own_peer_info_receiver,
Expand Down Expand Up @@ -122,12 +129,25 @@ async fn connect_two_nodes() {
let _ = expect_recv_with_timeout(&mut node2.msg_receiver).await;
}

#[tokio::test]
async fn can_connect_after_pubkey_change() {
use std::time::Duration;
async fn send_and_receive_message(from: &Node, to: &mut Node) -> Option<(AccountId, Vec<u8>)> {
println!(
"[{:?}] Sending from {:?} to {:?}",
std::time::Instant::now(),
from.account_id,
to.account_id,
);
from.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
to.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(200);
recv_with_custom_timeout(&mut to.msg_receiver, MAX_CONNECTION_DELAY).await
}

#[tokio::test]
async fn can_connect_after_pubkey_change() {
let node_key1 = create_keypair();
let node_key2 = create_keypair();

Expand All @@ -137,51 +157,33 @@ async fn can_connect_after_pubkey_change() {
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2, 8090);

let mut node1 = spawn_node(&node_key1, 0, pi1.clone(), &[pi1.clone(), pi2.clone()]);
let node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Check that node 2 can communicate with node 1:
node2
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi1.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await;
send_and_receive_message(&node2, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2).await.unwrap();

// Node 2 disconnects:
drop(node2);

// Node 2 connects with a different key:
let node_key2b = create_keypair();
let pi2 = create_node_info(AccountId::new([2; 32]), &node_key2b, 8091);
let node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);
let mut node2b = spawn_node(&node_key2b, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// Node 1 learn about Node 2's new key:
node1.peer_update_sender.send(PeerUpdate::Registered(pi2.clone())).unwrap();

// Node 2 should be able to send messages again:
node2b
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi1.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

let _ = expect_recv_with_custom_timeout(&mut node1.msg_receiver, MAX_CONNECTION_DELAY).await;
send_and_receive_message(&node2b, &mut node1).await.unwrap();
send_and_receive_message(&node1, &mut node2b).await.unwrap();
}

/// That the behaviour around receiving own registration: at first, if our node
/// Test the behaviour around receiving own registration: at first, if our node
/// is not registered, we delay connecting to other nodes; once we receive our
/// own registration, we connect to other registered nodes.
#[tokio::test]
async fn connects_after_registration() {
use std::time::Duration;

const MAX_CONNECTION_DELAY: Duration = Duration::from_millis(200);

let node_key1 = create_keypair();
let node_key2 = create_keypair();

Expand All @@ -193,19 +195,7 @@ async fn connects_after_registration() {
let mut node2 = spawn_node(&node_key2, 1, pi2.clone(), &[pi1.clone(), pi2.clone()]);

// For sanity, check that node 1 can't yet communicate with node 2:
{
node1
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi2.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

assert!(recv_with_custom_timeout(&mut node2.msg_receiver, MAX_CONNECTION_DELAY)
.await
.is_none());
}
assert!(send_and_receive_message(&node1, &mut node2).await.is_none());

// Update node 1 with its own peer info
node1.peer_update_sender.send(PeerUpdate::Registered(pi1.clone())).unwrap();
Expand All @@ -214,17 +204,5 @@ async fn connects_after_registration() {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;

// It should now be able to communicate with node 2:
{
node1
.msg_sender
.send(OutgoingMultisigStageMessages::Private(vec![(
pi2.account_id.clone(),
b"test".to_vec(),
)]))
.unwrap();

assert!(recv_with_custom_timeout(&mut node2.msg_receiver, MAX_CONNECTION_DELAY)
.await
.is_some());
}
assert!(send_and_receive_message(&node1, &mut node2).await.is_some());
}

0 comments on commit d95333a

Please sign in to comment.