Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ impl<'p> CoroutinePool<'p> {
"The coroutine pool has reached its maximum size !",
));
}
self.deref().submit_co(f, stack_size, priority).map(|()| {
self.deref().submit_co(f, stack_size, priority).map(|_| {
_ = self.running.fetch_add(1, Ordering::Release);
})
}
Expand Down
32 changes: 16 additions & 16 deletions core/src/coroutine/suspender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,22 @@
self.suspend_with(arg)
}

pub(crate) fn timestamp() -> u64 {
TIMESTAMP
.with(|s| unsafe {
s.as_ptr()
.as_mut()
.unwrap_or_else(|| {
panic!(
"thread:{} get TIMESTAMP current failed",
std::thread::current().name().unwrap_or("unknown")
)

Check warning on line 46 in core/src/coroutine/suspender.rs

View check run for this annotation

Codecov / codecov/patch

core/src/coroutine/suspender.rs#L43-L46

Added lines #L43 - L46 were not covered by tests
})
.pop_front()
})
.unwrap_or(0)
}

/// Cancel the execution of the coroutine.
pub fn cancel(&self) -> ! {
CANCEL.with(|s| unsafe {
Expand All @@ -51,22 +67,6 @@
unreachable!()
}

pub(crate) fn timestamp() -> u64 {
TIMESTAMP
.with(|s| unsafe {
s.as_ptr()
.as_mut()
.unwrap_or_else(|| {
panic!(
"thread:{} get TIMESTAMP current failed",
std::thread::current().name().unwrap_or("unknown")
)
})
.pop_front()
})
.unwrap_or(0)
}

pub(crate) fn is_cancel() -> bool {
CANCEL
.with(|s| unsafe {
Expand Down
7 changes: 4 additions & 3 deletions core/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ impl<'s> Scheduler<'s> {
f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
stack_size: Option<usize>,
priority: Option<c_longlong>,
) -> std::io::Result<()> {
) -> std::io::Result<String> {
self.submit_raw_co(co!(
Some(format!("{}@{}", self.name(), uuid::Uuid::new_v4())),
f,
Expand All @@ -204,12 +204,13 @@ impl<'s> Scheduler<'s> {
///
/// Allow multiple threads to concurrently submit coroutine to the scheduler,
/// but only allow one thread to execute scheduling.
pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<()> {
pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<String> {
for listener in self.listeners.clone() {
co.add_raw_listener(listener);
}
let co_name = co.name().to_string();
self.ready.push(co);
Ok(())
Ok(co_name)
}

/// Resume a coroutine from the syscall table to the ready queue,
Expand Down
29 changes: 27 additions & 2 deletions core/tests/co_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#[cfg(not(all(unix, feature = "preemptive")))]
#[test]
fn co_pool_basic() -> std::io::Result<()> {
let task_name = "test_simple";
let mut pool = open_coroutine_core::co_pool::CoroutinePool::default();
pool.set_max_size(1);
assert!(pool.is_empty());
Expand All @@ -13,7 +12,7 @@ fn co_pool_basic() -> std::io::Result<()> {
)?;
assert!(!pool.is_empty());
pool.submit_task(
Some(String::from(task_name)),
Some(String::from("test_simple")),
|_| {
println!("2");
Some(2)
Expand Down Expand Up @@ -74,3 +73,29 @@ fn co_pool_stop() -> std::io::Result<()> {
)
.map(|_| ())
}

#[cfg(not(all(unix, feature = "preemptive")))]
#[test]
fn co_pool_cancel() -> std::io::Result<()> {
let mut pool = open_coroutine_core::co_pool::CoroutinePool::default();
pool.set_max_size(1);
assert!(pool.is_empty());
let task_name = pool.submit_task(
Some(String::from("test_panic")),
|_| panic!("test panic, just ignore it"),
None,
None,
)?;
assert!(!pool.is_empty());
open_coroutine_core::co_pool::CoroutinePool::try_cancel_task(&task_name);
pool.submit_task(
Some(String::from("test_simple")),
|_| {
println!("2");
Some(2)
},
None,
None,
)?;
pool.try_schedule_task()
}
44 changes: 44 additions & 0 deletions core/tests/coroutine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,3 +280,47 @@ fn coroutine_syscall_not_preemptive() -> std::io::Result<()> {
))
}
}

#[cfg(all(
unix,
not(feature = "preemptive"),
not(target_os = "linux"),
not(target_arch = "x86_64")
))]
#[test]
fn coroutine_cancel() -> std::io::Result<()> {
use std::os::unix::prelude::JoinHandleExt;
let pair = std::sync::Arc::new((std::sync::Mutex::new(true), std::sync::Condvar::new()));
let pair2 = pair.clone();
let handle = std::thread::Builder::new()
.name("cancel".to_string())
.spawn(move || {
let mut coroutine: Coroutine<(), (), ()> = co!(|_, ()| { loop {} })?;
assert_eq!(CoroutineState::Cancelled, coroutine.resume()?);
assert_eq!(CoroutineState::Cancelled, coroutine.state());
// should execute to here
let (lock, cvar) = &*pair2;
let mut pending = lock.lock().unwrap();
*pending = false;
cvar.notify_one();
Ok::<(), std::io::Error>(())
})?;
// wait for the thread to start up
std::thread::sleep(std::time::Duration::from_millis(500));
nix::sys::pthread::pthread_kill(handle.as_pthread_t(), nix::sys::signal::Signal::SIGVTALRM)?;
let (lock, cvar) = &*pair;
let result = cvar
.wait_timeout_while(
lock.lock().unwrap(),
std::time::Duration::from_millis(3000),
|&mut pending| pending,
)
.unwrap();
if result.1.timed_out() {
Err(std::io::Error::other(
"The test thread should send signals to coroutines in running state",
))
} else {
Ok(())
}
}
32 changes: 30 additions & 2 deletions core/tests/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,8 @@ fn scheduler_listener() -> std::io::Result<()> {

let mut scheduler = Scheduler::default();
scheduler.add_listener(TestListener::default());
scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?;
scheduler.submit_co(
_ = scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?;
_ = scheduler.submit_co(
|_, _| {
println!("2");
None
Expand All @@ -129,3 +129,31 @@ fn scheduler_listener() -> std::io::Result<()> {
scheduler.try_schedule()?;
Ok(())
}

#[test]
fn scheduler_try_cancel_coroutine() -> std::io::Result<()> {
let mut scheduler = Scheduler::default();
let co_name = scheduler.submit_co(
|suspender, _| {
println!("[coroutine1] suspend");
suspender.suspend();
println!("[coroutine1] back");
None
},
None,
None,
)?;
Scheduler::try_cancel_coroutine(&co_name);
_ = scheduler.submit_co(
|suspender, _| {
println!("[coroutine2] suspend");
suspender.suspend();
println!("[coroutine2] back");
None
},
None,
None,
)?;
scheduler.try_schedule()?;
Ok(())
}
8 changes: 8 additions & 0 deletions open-coroutine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@
fn test() {
init(Config::single());
_ = any_join!(task!(|_| 1, ()), task!(|_| 2, ()), task!(|_| 3, ()));
task!(
|_| {
println!("Try cancel!");

Check warning on line 380 in open-coroutine/src/lib.rs

View check run for this annotation

Codecov / codecov/patch

open-coroutine/src/lib.rs#L380

Added line #L380 was not covered by tests
},
(),
)
.try_cancel()
.expect("cancel failed");
let join = task!(
|_| {
println!("Hello, world!");
Expand Down
Loading