diff --git a/tokio/tests/rt_metrics.rs b/tokio/tests/rt_metrics.rs index 58869c530ae..7630ee02a19 100644 --- a/tokio/tests/rt_metrics.rs +++ b/tokio/tests/rt_metrics.rs @@ -3,7 +3,7 @@ #![cfg(all(feature = "full", tokio_unstable, not(target_os = "wasi")))] use std::future::Future; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Barrier}; use std::task::Poll; use tokio::macros::support::poll_fn; @@ -504,7 +504,7 @@ fn worker_overflow_count() { } #[test] -fn injection_queue_depth() { +fn injection_queue_depth_current_thread() { use std::thread; let rt = current_thread(); @@ -518,44 +518,36 @@ fn injection_queue_depth() { .unwrap(); assert_eq!(1, metrics.injection_queue_depth()); +} +#[test] +fn injection_queue_depth_multi_thread() { let rt = threaded(); - let handle = rt.handle().clone(); let metrics = rt.metrics(); - // First we need to block the runtime workers - let (tx1, rx1) = std::sync::mpsc::channel(); - let (tx2, rx2) = std::sync::mpsc::channel(); - let (tx3, rx3) = std::sync::mpsc::channel(); - let rx3 = Arc::new(Mutex::new(rx3)); - - rt.spawn(async move { rx1.recv().unwrap() }); - rt.spawn(async move { rx2.recv().unwrap() }); - - // Spawn some more to make sure there are items - for _ in 0..10 { - let rx = rx3.clone(); - rt.spawn(async move { - rx.lock().unwrap().recv().unwrap(); - }); + let barrier1 = Arc::new(Barrier::new(3)); + let barrier2 = Arc::new(Barrier::new(3)); + + // Spawn a task per runtime worker to block it. + for _ in 0..2 { + let barrier1 = barrier1.clone(); + let barrier2 = barrier2.clone(); + rt.spawn( + async move { + barrier1.wait(); + barrier2.wait(); + } + ); } - thread::spawn(move || { - handle.spawn(async {}); - }) - .join() - .unwrap(); - - let n = metrics.injection_queue_depth(); - assert!(1 <= n, "{}", n); - assert!(15 >= n, "{}", n); + barrier1.wait(); - for _ in 0..10 { - tx3.send(()).unwrap(); + for i in 0..10 { + assert_eq!(i, metrics.injection_queue_depth()); + rt.spawn(async {}); } - tx1.send(()).unwrap(); - tx2.send(()).unwrap(); + barrier2.wait(); } #[test]