From 569ed48faf72c531ea1cb13a749f55bdfb3d42ad Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Mon, 17 Mar 2025 00:38:14 +0800 Subject: [PATCH] add cancel tests --- core/src/co_pool/mod.rs | 2 +- core/src/coroutine/suspender.rs | 32 ++++++++++++------------ core/src/scheduler.rs | 7 +++--- core/tests/co_pool.rs | 29 ++++++++++++++++++++-- core/tests/coroutine.rs | 44 +++++++++++++++++++++++++++++++++ core/tests/scheduler.rs | 32 ++++++++++++++++++++++-- open-coroutine/src/lib.rs | 8 ++++++ 7 files changed, 130 insertions(+), 24 deletions(-) diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs index f4841043..7076182e 100644 --- a/core/src/co_pool/mod.rs +++ b/core/src/co_pool/mod.rs @@ -378,7 +378,7 @@ impl<'p> CoroutinePool<'p> { "The coroutine pool has reached its maximum size !", )); } - self.deref().submit_co(f, stack_size, priority).map(|()| { + self.deref().submit_co(f, stack_size, priority).map(|_| { _ = self.running.fetch_add(1, Ordering::Release); }) } diff --git a/core/src/coroutine/suspender.rs b/core/src/coroutine/suspender.rs index 94a0f2e5..1492f43d 100644 --- a/core/src/coroutine/suspender.rs +++ b/core/src/coroutine/suspender.rs @@ -34,6 +34,22 @@ impl Suspender<'_, Param, Yield> { self.suspend_with(arg) } + pub(crate) fn timestamp() -> u64 { + TIMESTAMP + .with(|s| unsafe { + s.as_ptr() + .as_mut() + .unwrap_or_else(|| { + panic!( + "thread:{} get TIMESTAMP current failed", + std::thread::current().name().unwrap_or("unknown") + ) + }) + .pop_front() + }) + .unwrap_or(0) + } + /// Cancel the execution of the coroutine. pub fn cancel(&self) -> ! { CANCEL.with(|s| unsafe { @@ -51,22 +67,6 @@ impl Suspender<'_, Param, Yield> { unreachable!() } - pub(crate) fn timestamp() -> u64 { - TIMESTAMP - .with(|s| unsafe { - s.as_ptr() - .as_mut() - .unwrap_or_else(|| { - panic!( - "thread:{} get TIMESTAMP current failed", - std::thread::current().name().unwrap_or("unknown") - ) - }) - .pop_front() - }) - .unwrap_or(0) - } - pub(crate) fn is_cancel() -> bool { CANCEL .with(|s| unsafe { diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs index 524e38ba..3620dbc6 100644 --- a/core/src/scheduler.rs +++ b/core/src/scheduler.rs @@ -186,7 +186,7 @@ impl<'s> Scheduler<'s> { f: impl FnOnce(&Suspender<(), ()>, ()) -> Option + 'static, stack_size: Option, priority: Option, - ) -> std::io::Result<()> { + ) -> std::io::Result { self.submit_raw_co(co!( Some(format!("{}@{}", self.name(), uuid::Uuid::new_v4())), f, @@ -204,12 +204,13 @@ impl<'s> Scheduler<'s> { /// /// Allow multiple threads to concurrently submit coroutine to the scheduler, /// but only allow one thread to execute scheduling. - pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<()> { + pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result { for listener in self.listeners.clone() { co.add_raw_listener(listener); } + let co_name = co.name().to_string(); self.ready.push(co); - Ok(()) + Ok(co_name) } /// Resume a coroutine from the syscall table to the ready queue, diff --git a/core/tests/co_pool.rs b/core/tests/co_pool.rs index 39f86e65..62c4089f 100644 --- a/core/tests/co_pool.rs +++ b/core/tests/co_pool.rs @@ -1,7 +1,6 @@ #[cfg(not(all(unix, feature = "preemptive")))] #[test] fn co_pool_basic() -> std::io::Result<()> { - let task_name = "test_simple"; let mut pool = open_coroutine_core::co_pool::CoroutinePool::default(); pool.set_max_size(1); assert!(pool.is_empty()); @@ -13,7 +12,7 @@ fn co_pool_basic() -> std::io::Result<()> { )?; assert!(!pool.is_empty()); pool.submit_task( - Some(String::from(task_name)), + Some(String::from("test_simple")), |_| { println!("2"); Some(2) @@ -74,3 +73,29 @@ fn co_pool_stop() -> std::io::Result<()> { ) .map(|_| ()) } + +#[cfg(not(all(unix, feature = "preemptive")))] +#[test] +fn co_pool_cancel() -> std::io::Result<()> { + let mut pool = open_coroutine_core::co_pool::CoroutinePool::default(); + pool.set_max_size(1); + assert!(pool.is_empty()); + let task_name = pool.submit_task( + Some(String::from("test_panic")), + |_| panic!("test panic, just ignore it"), + None, + None, + )?; + assert!(!pool.is_empty()); + open_coroutine_core::co_pool::CoroutinePool::try_cancel_task(&task_name); + pool.submit_task( + Some(String::from("test_simple")), + |_| { + println!("2"); + Some(2) + }, + None, + None, + )?; + pool.try_schedule_task() +} diff --git a/core/tests/coroutine.rs b/core/tests/coroutine.rs index 685d01a0..b1a82095 100644 --- a/core/tests/coroutine.rs +++ b/core/tests/coroutine.rs @@ -280,3 +280,47 @@ fn coroutine_syscall_not_preemptive() -> std::io::Result<()> { )) } } + +#[cfg(all( + unix, + not(feature = "preemptive"), + not(target_os = "linux"), + not(target_arch = "x86_64") +))] +#[test] +fn coroutine_cancel() -> std::io::Result<()> { + use std::os::unix::prelude::JoinHandleExt; + let pair = std::sync::Arc::new((std::sync::Mutex::new(true), std::sync::Condvar::new())); + let pair2 = pair.clone(); + let handle = std::thread::Builder::new() + .name("cancel".to_string()) + .spawn(move || { + let mut coroutine: Coroutine<(), (), ()> = co!(|_, ()| { loop {} })?; + assert_eq!(CoroutineState::Cancelled, coroutine.resume()?); + assert_eq!(CoroutineState::Cancelled, coroutine.state()); + // should execute to here + let (lock, cvar) = &*pair2; + let mut pending = lock.lock().unwrap(); + *pending = false; + cvar.notify_one(); + Ok::<(), std::io::Error>(()) + })?; + // wait for the thread to start up + std::thread::sleep(std::time::Duration::from_millis(500)); + nix::sys::pthread::pthread_kill(handle.as_pthread_t(), nix::sys::signal::Signal::SIGVTALRM)?; + let (lock, cvar) = &*pair; + let result = cvar + .wait_timeout_while( + lock.lock().unwrap(), + std::time::Duration::from_millis(3000), + |&mut pending| pending, + ) + .unwrap(); + if result.1.timed_out() { + Err(std::io::Error::other( + "The test thread should send signals to coroutines in running state", + )) + } else { + Ok(()) + } +} diff --git a/core/tests/scheduler.rs b/core/tests/scheduler.rs index 178205cf..76a8d1c3 100644 --- a/core/tests/scheduler.rs +++ b/core/tests/scheduler.rs @@ -117,8 +117,8 @@ fn scheduler_listener() -> std::io::Result<()> { let mut scheduler = Scheduler::default(); scheduler.add_listener(TestListener::default()); - scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?; - scheduler.submit_co( + _ = scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?; + _ = scheduler.submit_co( |_, _| { println!("2"); None @@ -129,3 +129,31 @@ fn scheduler_listener() -> std::io::Result<()> { scheduler.try_schedule()?; Ok(()) } + +#[test] +fn scheduler_try_cancel_coroutine() -> std::io::Result<()> { + let mut scheduler = Scheduler::default(); + let co_name = scheduler.submit_co( + |suspender, _| { + println!("[coroutine1] suspend"); + suspender.suspend(); + println!("[coroutine1] back"); + None + }, + None, + None, + )?; + Scheduler::try_cancel_coroutine(&co_name); + _ = scheduler.submit_co( + |suspender, _| { + println!("[coroutine2] suspend"); + suspender.suspend(); + println!("[coroutine2] back"); + None + }, + None, + None, + )?; + scheduler.try_schedule()?; + Ok(()) +} diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs index de067257..ffd05410 100644 --- a/open-coroutine/src/lib.rs +++ b/open-coroutine/src/lib.rs @@ -375,6 +375,14 @@ mod tests { fn test() { init(Config::single()); _ = any_join!(task!(|_| 1, ()), task!(|_| 2, ()), task!(|_| 3, ())); + task!( + |_| { + println!("Try cancel!"); + }, + (), + ) + .try_cancel() + .expect("cancel failed"); let join = task!( |_| { println!("Hello, world!");