diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs
index f4841043..7076182e 100644
--- a/core/src/co_pool/mod.rs
+++ b/core/src/co_pool/mod.rs
@@ -378,7 +378,7 @@ impl<'p> CoroutinePool<'p> {
"The coroutine pool has reached its maximum size !",
));
}
- self.deref().submit_co(f, stack_size, priority).map(|()| {
+ self.deref().submit_co(f, stack_size, priority).map(|_| {
_ = self.running.fetch_add(1, Ordering::Release);
})
}
diff --git a/core/src/coroutine/suspender.rs b/core/src/coroutine/suspender.rs
index 94a0f2e5..1492f43d 100644
--- a/core/src/coroutine/suspender.rs
+++ b/core/src/coroutine/suspender.rs
@@ -34,6 +34,22 @@ impl Suspender<'_, Param, Yield> {
self.suspend_with(arg)
}
+ pub(crate) fn timestamp() -> u64 {
+ TIMESTAMP
+ .with(|s| unsafe {
+ s.as_ptr()
+ .as_mut()
+ .unwrap_or_else(|| {
+ panic!(
+ "thread:{} get TIMESTAMP current failed",
+ std::thread::current().name().unwrap_or("unknown")
+ )
+ })
+ .pop_front()
+ })
+ .unwrap_or(0)
+ }
+
/// Cancel the execution of the coroutine.
pub fn cancel(&self) -> ! {
CANCEL.with(|s| unsafe {
@@ -51,22 +67,6 @@ impl Suspender<'_, Param, Yield> {
unreachable!()
}
- pub(crate) fn timestamp() -> u64 {
- TIMESTAMP
- .with(|s| unsafe {
- s.as_ptr()
- .as_mut()
- .unwrap_or_else(|| {
- panic!(
- "thread:{} get TIMESTAMP current failed",
- std::thread::current().name().unwrap_or("unknown")
- )
- })
- .pop_front()
- })
- .unwrap_or(0)
- }
-
pub(crate) fn is_cancel() -> bool {
CANCEL
.with(|s| unsafe {
diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs
index 524e38ba..3620dbc6 100644
--- a/core/src/scheduler.rs
+++ b/core/src/scheduler.rs
@@ -186,7 +186,7 @@ impl<'s> Scheduler<'s> {
f: impl FnOnce(&Suspender<(), ()>, ()) -> Option + 'static,
stack_size: Option,
priority: Option,
- ) -> std::io::Result<()> {
+ ) -> std::io::Result {
self.submit_raw_co(co!(
Some(format!("{}@{}", self.name(), uuid::Uuid::new_v4())),
f,
@@ -204,12 +204,13 @@ impl<'s> Scheduler<'s> {
///
/// Allow multiple threads to concurrently submit coroutine to the scheduler,
/// but only allow one thread to execute scheduling.
- pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<()> {
+ pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result {
for listener in self.listeners.clone() {
co.add_raw_listener(listener);
}
+ let co_name = co.name().to_string();
self.ready.push(co);
- Ok(())
+ Ok(co_name)
}
/// Resume a coroutine from the syscall table to the ready queue,
diff --git a/core/tests/co_pool.rs b/core/tests/co_pool.rs
index 39f86e65..62c4089f 100644
--- a/core/tests/co_pool.rs
+++ b/core/tests/co_pool.rs
@@ -1,7 +1,6 @@
#[cfg(not(all(unix, feature = "preemptive")))]
#[test]
fn co_pool_basic() -> std::io::Result<()> {
- let task_name = "test_simple";
let mut pool = open_coroutine_core::co_pool::CoroutinePool::default();
pool.set_max_size(1);
assert!(pool.is_empty());
@@ -13,7 +12,7 @@ fn co_pool_basic() -> std::io::Result<()> {
)?;
assert!(!pool.is_empty());
pool.submit_task(
- Some(String::from(task_name)),
+ Some(String::from("test_simple")),
|_| {
println!("2");
Some(2)
@@ -74,3 +73,29 @@ fn co_pool_stop() -> std::io::Result<()> {
)
.map(|_| ())
}
+
+#[cfg(not(all(unix, feature = "preemptive")))]
+#[test]
+fn co_pool_cancel() -> std::io::Result<()> {
+ let mut pool = open_coroutine_core::co_pool::CoroutinePool::default();
+ pool.set_max_size(1);
+ assert!(pool.is_empty());
+ let task_name = pool.submit_task(
+ Some(String::from("test_panic")),
+ |_| panic!("test panic, just ignore it"),
+ None,
+ None,
+ )?;
+ assert!(!pool.is_empty());
+ open_coroutine_core::co_pool::CoroutinePool::try_cancel_task(&task_name);
+ pool.submit_task(
+ Some(String::from("test_simple")),
+ |_| {
+ println!("2");
+ Some(2)
+ },
+ None,
+ None,
+ )?;
+ pool.try_schedule_task()
+}
diff --git a/core/tests/coroutine.rs b/core/tests/coroutine.rs
index 685d01a0..b1a82095 100644
--- a/core/tests/coroutine.rs
+++ b/core/tests/coroutine.rs
@@ -280,3 +280,47 @@ fn coroutine_syscall_not_preemptive() -> std::io::Result<()> {
))
}
}
+
+#[cfg(all(
+ unix,
+ not(feature = "preemptive"),
+ not(target_os = "linux"),
+ not(target_arch = "x86_64")
+))]
+#[test]
+fn coroutine_cancel() -> std::io::Result<()> {
+ use std::os::unix::prelude::JoinHandleExt;
+ let pair = std::sync::Arc::new((std::sync::Mutex::new(true), std::sync::Condvar::new()));
+ let pair2 = pair.clone();
+ let handle = std::thread::Builder::new()
+ .name("cancel".to_string())
+ .spawn(move || {
+ let mut coroutine: Coroutine<(), (), ()> = co!(|_, ()| { loop {} })?;
+ assert_eq!(CoroutineState::Cancelled, coroutine.resume()?);
+ assert_eq!(CoroutineState::Cancelled, coroutine.state());
+ // should execute to here
+ let (lock, cvar) = &*pair2;
+ let mut pending = lock.lock().unwrap();
+ *pending = false;
+ cvar.notify_one();
+ Ok::<(), std::io::Error>(())
+ })?;
+ // wait for the thread to start up
+ std::thread::sleep(std::time::Duration::from_millis(500));
+ nix::sys::pthread::pthread_kill(handle.as_pthread_t(), nix::sys::signal::Signal::SIGVTALRM)?;
+ let (lock, cvar) = &*pair;
+ let result = cvar
+ .wait_timeout_while(
+ lock.lock().unwrap(),
+ std::time::Duration::from_millis(3000),
+ |&mut pending| pending,
+ )
+ .unwrap();
+ if result.1.timed_out() {
+ Err(std::io::Error::other(
+ "The test thread should send signals to coroutines in running state",
+ ))
+ } else {
+ Ok(())
+ }
+}
diff --git a/core/tests/scheduler.rs b/core/tests/scheduler.rs
index 178205cf..76a8d1c3 100644
--- a/core/tests/scheduler.rs
+++ b/core/tests/scheduler.rs
@@ -117,8 +117,8 @@ fn scheduler_listener() -> std::io::Result<()> {
let mut scheduler = Scheduler::default();
scheduler.add_listener(TestListener::default());
- scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?;
- scheduler.submit_co(
+ _ = scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?;
+ _ = scheduler.submit_co(
|_, _| {
println!("2");
None
@@ -129,3 +129,31 @@ fn scheduler_listener() -> std::io::Result<()> {
scheduler.try_schedule()?;
Ok(())
}
+
+#[test]
+fn scheduler_try_cancel_coroutine() -> std::io::Result<()> {
+ let mut scheduler = Scheduler::default();
+ let co_name = scheduler.submit_co(
+ |suspender, _| {
+ println!("[coroutine1] suspend");
+ suspender.suspend();
+ println!("[coroutine1] back");
+ None
+ },
+ None,
+ None,
+ )?;
+ Scheduler::try_cancel_coroutine(&co_name);
+ _ = scheduler.submit_co(
+ |suspender, _| {
+ println!("[coroutine2] suspend");
+ suspender.suspend();
+ println!("[coroutine2] back");
+ None
+ },
+ None,
+ None,
+ )?;
+ scheduler.try_schedule()?;
+ Ok(())
+}
diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs
index de067257..ffd05410 100644
--- a/open-coroutine/src/lib.rs
+++ b/open-coroutine/src/lib.rs
@@ -375,6 +375,14 @@ mod tests {
fn test() {
init(Config::single());
_ = any_join!(task!(|_| 1, ()), task!(|_| 2, ()), task!(|_| 3, ()));
+ task!(
+ |_| {
+ println!("Try cancel!");
+ },
+ (),
+ )
+ .try_cancel()
+ .expect("cancel failed");
let join = task!(
|_| {
println!("Hello, world!");