Skip to content

Commit

Permalink
test(socket): add socket integration tests (ignored)
Browse files Browse the repository at this point in the history
  • Loading branch information
mempirate committed Jan 24, 2024
1 parent 274c893 commit 022a202
Show file tree
Hide file tree
Showing 6 changed files with 300 additions and 6 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 6 additions & 6 deletions msg-sim/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{collections::HashMap, io, net::IpAddr, time::Duration};

use protocol::Protocol;
pub use protocol::Protocol;

mod protocol;

Expand All @@ -13,13 +13,13 @@ use dummynet::{PacketFilter, Pipe};
#[allow(unused)]
pub struct SimulationConfig {
/// The latency of the connection.
latency: Option<Duration>,
pub latency: Option<Duration>,
/// The bandwidth in Kbps.
bw: Option<u64>,
pub bw: Option<u64>,
/// The packet loss rate in percent.
plr: Option<f64>,
pub plr: Option<f64>,
/// The supported protocols.
protocols: Vec<Protocol>,
pub protocols: Vec<Protocol>,
}

#[derive(Default)]
Expand All @@ -34,7 +34,7 @@ impl Simulator {
pub fn new() -> Self {
Self {
active_sims: HashMap::new(),
sim_id: 0,
sim_id: 1,
}
}

Expand Down
2 changes: 2 additions & 0 deletions msg-socket/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ rand.workspace = true
parking_lot.workspace = true

[dev-dependencies]
msg-sim.workspace = true

tracing-subscriber = "0.3"
6 changes: 6 additions & 0 deletions msg-socket/tests/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# `msg-socket` tests

## Integration tests
| Test | Description | Status |
| ----------- | ---------------------------- | ------ |
| [`pubsub`](./it/pubsub.rs) | Different messaging patterns with different transports, chaos through simulated network links & random delay injection. ||
3 changes: 3 additions & 0 deletions msg-socket/tests/it/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
mod pubsub;

fn main() {}
282 changes: 282 additions & 0 deletions msg-socket/tests/it/pubsub.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,282 @@
use bytes::Bytes;
use msg_sim::{Protocol, SimulationConfig, Simulator};
use rand::Rng;
use std::{collections::HashSet, net::IpAddr, time::Duration};
use tokio::{sync::mpsc, task::JoinSet};
use tokio_stream::StreamExt;

use msg_socket::{PubSocket, SubSocket};
use msg_transport::{
quic::{self, Quic},
tcp::{self, Tcp},
Transport,
};

const TOPIC: &str = "test";

fn init_simulation(addr: IpAddr, config: SimulationConfig) -> Simulator {
let mut simulator = Simulator::new();
simulator.start(addr, config).unwrap();

simulator
}

/// Single publisher, single subscriber
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn pubsub_channel() {
let _ = tracing_subscriber::fmt::try_init();

let addr = "127.0.0.1".parse().unwrap();

let mut simulator = init_simulation(
addr,
SimulationConfig {
latency: Some(Duration::from_millis(50)),
bw: None,
plr: None,
protocols: vec![Protocol::UDP, Protocol::TCP],
},
);

let result = pubsub_channel_transport(build_tcp).await;

assert!(result.is_ok());

let result = pubsub_channel_transport(build_quic).await;

assert!(result.is_ok());

simulator.stop(addr);
}

async fn pubsub_channel_transport<F: Fn() -> T, T: Transport + Send + Sync + Unpin + 'static>(
new_transport: F,
) -> Result<(), Box<dyn std::error::Error>> {
let mut publisher = PubSocket::new(new_transport());

let mut subscriber = SubSocket::new(new_transport());
subscriber.connect("127.0.0.1:9879").await?;
subscriber.subscribe(TOPIC).await?;

inject_delay(400).await;

publisher.bind("127.0.0.1:9879").await?;

// Spawn a task to keep sending messages until the subscriber receives one (after connection process)
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
publisher
.publish(TOPIC, Bytes::from("WORLD"))
.await
.unwrap();
}
});

let msg = subscriber.next().await.unwrap();
tracing::info!("Received message: {:?}", msg);
assert_eq!(TOPIC, msg.topic());
assert_eq!("WORLD", msg.payload());

Ok(())
}

/// Single publisher, multiple subscribers
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn pubsub_fan_out() {
let _ = tracing_subscriber::fmt::try_init();

let addr = "127.0.0.1".parse().unwrap();

let mut simulator = init_simulation(
addr,
SimulationConfig {
latency: Some(Duration::from_millis(150)),
bw: None,
plr: None,
protocols: vec![Protocol::UDP, Protocol::TCP],
},
);

let result = pubsub_fan_out_transport(build_tcp, 10).await;

assert!(result.is_ok());

let result = pubsub_fan_out_transport(build_quic, 10).await;

assert!(result.is_ok());

simulator.stop(addr);
}

async fn pubsub_fan_out_transport<
F: Fn() -> T + Send + 'static + Copy,
T: Transport + Send + Sync + Unpin + 'static,
>(
new_transport: F,
subscibers: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let mut publisher = PubSocket::new(new_transport());

let mut sub_tasks = JoinSet::new();

let addr = "127.0.0.1:9880";

for i in 0..subscibers {
let cloned = addr.to_string();
sub_tasks.spawn(async move {
let mut subscriber = SubSocket::new(new_transport());
inject_delay((100 * (i + 1)) as u64).await;

subscriber.connect(cloned).await.unwrap();
inject_delay((1000 / (i + 1)) as u64).await;
subscriber.subscribe(TOPIC).await.unwrap();

let msg = subscriber.next().await.unwrap();
tracing::info!("Received message: {:?}", msg);
assert_eq!(TOPIC, msg.topic());
assert_eq!("WORLD", msg.payload());
});
}

inject_delay(400).await;

publisher.bind(addr).await?;

// Spawn a task to keep sending messages until the subscriber receives one (after connection process)
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
publisher
.publish(TOPIC, Bytes::from("WORLD"))
.await
.unwrap();
}
});

for _ in 0..subscibers {
sub_tasks.join_next().await.unwrap().unwrap();
}

Ok(())
}

/// Multiple publishers, single subscriber
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn pubsub_fan_in() {
let _ = tracing_subscriber::fmt::try_init();

let addr = "127.0.0.1".parse().unwrap();

let mut simulator = init_simulation(
addr,
SimulationConfig {
latency: Some(Duration::from_millis(150)),
bw: None,
plr: None,
protocols: vec![Protocol::UDP, Protocol::TCP],
},
);

let result = pubsub_fan_in_transport(build_tcp, 20).await;

assert!(result.is_ok());

let result = pubsub_fan_in_transport(build_quic, 20).await;

assert!(result.is_ok());

simulator.stop(addr);
}

async fn pubsub_fan_in_transport<
F: Fn() -> T + Send + 'static + Copy,
T: Transport + Send + Sync + Unpin + 'static,
>(
new_transport: F,
publishers: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let mut sub_tasks = JoinSet::new();

let (tx, mut rx) = mpsc::channel(publishers);

for i in 0..publishers {
let tx = tx.clone();
sub_tasks.spawn(async move {
let mut publisher = PubSocket::new(new_transport());
inject_delay((100 * (i + 1)) as u64).await;

publisher.bind("127.0.0.1:0").await.unwrap();

let addr = publisher.local_addr().unwrap();
tx.send(addr).await.unwrap();

// Spawn a task to keep sending messages until the subscriber receives one (after connection process)
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_millis(500)).await;
publisher
.publish(TOPIC, Bytes::from("WORLD"))
.await
.unwrap();
}
});
});
}

drop(tx);

let mut subscriber = SubSocket::new(new_transport());

let mut addrs = HashSet::with_capacity(publishers);

while let Some(addr) = rx.recv().await {
addrs.insert(addr);
}

for addr in &addrs {
inject_delay(500).await;
subscriber.connect(addr).await.unwrap();
subscriber.subscribe(TOPIC).await.unwrap();
}

loop {
if addrs.is_empty() {
break;
}

let msg = subscriber.next().await.unwrap();
tracing::info!("Received message: {:?}", msg);
assert_eq!(TOPIC, msg.topic());
assert_eq!("WORLD", msg.payload());

addrs.remove(&msg.source());
}

for _ in 0..publishers {
sub_tasks.join_next().await.unwrap().unwrap();
}

Ok(())
}

fn build_tcp() -> Tcp {
Tcp::default()
}

fn build_quic() -> Quic {
Quic::default()
}

fn random_delay(upper_ms: u64) -> Duration {
let mut rng = rand::thread_rng();
let delay_ms = rng.gen_range(0..upper_ms);
Duration::from_millis(delay_ms)
}

async fn inject_delay(upper_ms: u64) {
tokio::time::sleep(random_delay(upper_ms)).await;
}

0 comments on commit 022a202

Please sign in to comment.