Skip to content

Commit

Permalink
Improved tx_queue test
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Jul 24, 2020
1 parent 68d7005 commit e0348ea
Showing 1 changed file with 23 additions and 37 deletions.
60 changes: 23 additions & 37 deletions zenoh-protocol/src/session/channel/tx_queue.rs
Expand Up @@ -474,10 +474,9 @@ impl TransmissionQueue {
#[cfg(test)]
mod tests {
use async_std::prelude::*;
use async_std::sync::{Arc, Barrier, Mutex};
use async_std::sync::{Arc, Mutex};
use async_std::task;
use std::convert::TryFrom;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Duration;

use crate::core::{ResKey, ZInt};
Expand All @@ -493,9 +492,9 @@ mod tests {
#[test]
fn tx_queue() {
async fn schedule(
queue: Arc<TransmissionQueue>,
num_msg: usize,
payload_size: usize,
queue: &Arc<TransmissionQueue>
payload_size: usize
) {
// Send reliable messages
let reliable = true;
Expand All @@ -517,15 +516,14 @@ mod tests {

async fn consume(
queue: Arc<TransmissionQueue>,
c_threshold: Arc<AtomicUsize>,
c_barrier: Arc<Barrier>
num_msg: usize
) {
let mut batches: usize = 0;
let mut bytes: usize = 0;
let mut msgs: usize = 0;
let mut fragments: usize = 0;

loop {
while msgs != num_msg {
let (batch, priority) = queue.pull().await;
batches += 1;
bytes += batch.len();
Expand All @@ -546,22 +544,13 @@ mod tests {
}
},
_ => { msgs += 1; }
}
// Synchronize for this test
if msgs == c_threshold.load(Ordering::SeqCst) {
println!(" Received {} messages, {} bytes, {} batches, {} fragments", msgs, bytes, batches, fragments);
let res = c_barrier.wait().timeout(TIMEOUT).await;
assert!(res.is_ok());
// Reset counters
batches = 0;
bytes = 0;
msgs = 0;
fragments = 0;
}
}
}
// Reinsert the batch
queue.push_serialization_batch(batch, priority).await;
}
}

println!("<<< Received {} messages, {} bytes, {} batches, {} fragments", msgs, bytes, batches, fragments);
}

// Queue
Expand All @@ -576,18 +565,6 @@ mod tests {
let queue = Arc::new(TransmissionQueue::new(
batch_size, is_streamed, sn_reliable, sn_best_effort
));

// Synch variables
let barrier = Arc::new(Barrier::new(2));
let threshold = Arc::new(AtomicUsize::new(0));

// Consume task
let c_queue = queue.clone();
let c_barrier = barrier.clone();
let c_threshold = threshold.clone();
task::spawn(async move {
consume(c_queue, c_threshold, c_barrier).await;
});

// Total amount of bytes to send in each test
let bytes: usize = 100_000_000;
Expand All @@ -601,12 +578,21 @@ mod tests {
break
}

let num_msg = max_msgs.min(bytes / ps);
threshold.store(num_msg, Ordering::SeqCst);
// Compute the number of messages to send
let num_msg = max_msgs.min(bytes / ps);

schedule(num_msg, *ps, &queue).await;

let res = barrier.wait().timeout(TIMEOUT).await;
let c_queue = queue.clone();
let t_c = task::spawn(async move {
consume(c_queue, num_msg).await;
});

let c_queue = queue.clone();
let c_ps = *ps;
let t_s = task::spawn(async move {
schedule(c_queue, num_msg, c_ps).await;
});

let res = t_c.join(t_s).timeout(TIMEOUT).await;
assert!(res.is_ok());
}
});
Expand Down

0 comments on commit e0348ea

Please sign in to comment.