Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
### 0.7.x

- [x] support cancel coroutine/task

### 0.6.x

- [x] support custom task and coroutine priority.
Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ English | [中文](README_ZH.md)

- [ ] add
performance [benchmark](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview);
- [ ] cancel coroutine/task;
- [ ] add metrics;
- [ ] add synchronization toolkit;
- [ ] support and compatibility for AF_XDP socket;
Expand Down
1 change: 0 additions & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

- [ ]
增加性能[基准测试](https://github.com/TechEmpower/FrameworkBenchmarks/wiki/Project-Information-Framework-Tests-Overview);
- [ ] 取消协程/任务;
- [ ] 增加性能指标监控;
- [ ] 增加并发工具包;
- [ ] 支持AF_XDP套接字;
Expand Down
2 changes: 1 addition & 1 deletion core/src/co_pool/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl Listener<(), Option<usize>> for CoroutineCreator {
.store(pool.get_running_size().saturating_sub(1), Ordering::Release);
}
}
CoroutineState::Error(_) => {
CoroutineState::Cancelled | CoroutineState::Error(_) => {
if let Some(pool) = CoroutinePool::current() {
//worker协程异常退出,需要先回收再创建
pool.running
Expand Down
56 changes: 55 additions & 1 deletion core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue
use crate::common::{get_timeout_time, now, CondvarBlocker};
use crate::coroutine::suspender::Suspender;
use crate::scheduler::{SchedulableCoroutine, Scheduler};
use crate::{error, impl_current_for, impl_display_by_debug, impl_for_named, trace};
use crate::{error, impl_current_for, impl_display_by_debug, impl_for_named, trace, warn};
use dashmap::{DashMap, DashSet};
use once_cell::sync::Lazy;
use std::cell::Cell;
use std::ffi::c_longlong;
use std::io::{Error, ErrorKind};
Expand All @@ -25,6 +26,11 @@ mod state;
/// Creator for coroutine pool.
mod creator;

/// `task_name` -> `co_name`
static RUNNING_TASKS: Lazy<DashMap<&str, &str>> = Lazy::new(DashMap::new);

static CANCEL_TASKS: Lazy<DashSet<&str>> = Lazy::new(DashSet::new);

/// The coroutine pool impls.
#[repr(C)]
#[derive(Debug)]
Expand Down Expand Up @@ -383,7 +389,17 @@ impl<'p> CoroutinePool<'p> {

fn try_run(&self) -> Option<()> {
self.task_queue.pop().map(|task| {
let tname = task.get_name().to_string().leak();
if CANCEL_TASKS.contains(tname) {
_ = CANCEL_TASKS.remove(tname);
warn!("Cancel task:{} successfully !", tname);
return;
}
if let Some(co) = SchedulableCoroutine::current() {
_ = RUNNING_TASKS.insert(tname, co.name());
}
let (task_name, result) = task.run();
_ = RUNNING_TASKS.remove(tname);
let n = task_name.clone().leak();
if self.no_waits.contains(n) {
_ = self.no_waits.remove(n);
Expand All @@ -406,6 +422,44 @@ impl<'p> CoroutinePool<'p> {
}
}

/// Try to cancel a task.
pub fn try_cancel_task(task_name: &str) {
// 检查正在运行的任务是否是要取消的任务
if let Some(info) = RUNNING_TASKS.get(task_name) {
let co_name = *info;
// todo windows support
#[allow(unused_variables)]
if let Some(pthread) = Scheduler::get_scheduling_thread(co_name) {
// 发送SIGVTALRM信号,在运行时取消任务
#[cfg(unix)]
if nix::sys::pthread::pthread_kill(pthread, nix::sys::signal::Signal::SIGVTALRM)
.is_ok()
{
warn!(
"Attempt to cancel task:{} running on coroutine:{} by thread:{}, cancelling...",
task_name, co_name, pthread
);
} else {
error!(
"Attempt to cancel task:{} running on coroutine:{} by thread:{} failed !",
task_name, co_name, pthread
);
}
} else {
// 添加到待取消队列
Scheduler::try_cancel_coroutine(co_name);
warn!(
"Attempt to cancel task:{} running on coroutine:{}, cancelling...",
task_name, co_name
);
}
} else {
// 添加到待取消队列
_ = CANCEL_TASKS.insert(Box::leak(Box::from(task_name)));
warn!("Attempt to cancel task:{}, cancelling...", task_name);
}
}

/// Schedule the tasks.
///
/// Allow multiple threads to concurrently submit task to the pool,
Expand Down
6 changes: 6 additions & 0 deletions core/src/co_pool/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ impl<'t> Task<'t> {
}
}

/// get the task name.
#[must_use]
pub fn get_name(&self) -> &str {
&self.name
}

/// execute the task
///
/// # Errors
Expand Down
4 changes: 3 additions & 1 deletion core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,11 @@ pub enum CoroutineState<Y, R> {
Suspend(Y, u64),
///The coroutine enters the syscall.
Syscall(Y, SyscallName, SyscallState),
/// The coroutine cancelled.
Cancelled,
/// The coroutine completed with a return value.
Complete(R),
/// The coroutine completed with a error message.
/// The coroutine completed with an error message.
Error(&'static str),
}

Expand Down
4 changes: 4 additions & 0 deletions core/src/coroutine/korosensei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,10 @@ where
let current = self.state();
match current {
CoroutineState::Running => {
if Suspender::<Yield, Param>::is_cancel() {
self.cancel()?;
return Ok(CoroutineState::Cancelled);
}
let timestamp = Suspender::<Yield, Param>::timestamp();
self.suspend(y, timestamp)?;
Ok(CoroutineState::Suspend(y, timestamp))
Expand Down
8 changes: 8 additions & 0 deletions core/src/coroutine/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ pub trait Listener<Yield, Return>: Debug {
/// callback when the coroutine enters syscall.
fn on_syscall(&self, local: &CoroutineLocal, old_state: CoroutineState<Yield, Return>) {}

/// Callback when the coroutine is cancelled.
fn on_cancel(&self, local: &CoroutineLocal, old_state: CoroutineState<Yield, Return>) {}

/// Callback when the coroutine is completed.
fn on_complete(
&self,
Expand Down Expand Up @@ -91,6 +94,11 @@ where
old_state: CoroutineState<Yield, Return>
), "on_syscall");

broadcast!(on_cancel(
local: &CoroutineLocal,
old_state: CoroutineState<Yield, Return>
), "on_cancel");

broadcast!(on_complete(
local: &CoroutineLocal,
old_state: CoroutineState<Yield, Return>,
Expand Down
31 changes: 31 additions & 0 deletions core/src/coroutine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,35 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
callback,
)
}

/// handle SIGVTALRM
#[cfg(unix)]
fn setup_sigvtalrm_handler() {
use nix::sys::signal::{sigaction, SaFlags, SigAction, SigHandler, SigSet, Signal};
use std::sync::atomic::{AtomicBool, Ordering};
static CANCEL_HANDLER_INITED: AtomicBool = AtomicBool::new(false);
if CANCEL_HANDLER_INITED
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
extern "C" fn sigvtalrm_handler<Param, Yield>(_: libc::c_int) {
if let Some(suspender) = suspender::Suspender::<Param, Yield>::current() {
suspender.cancel();
}
}
// install SIGVTALRM signal handler
let mut set = SigSet::empty();
set.add(Signal::SIGVTALRM);
let sa = SigAction::new(
SigHandler::Handler(sigvtalrm_handler::<Param, Yield>),
SaFlags::SA_RESTART,
set,
);
unsafe {
_ = sigaction(Signal::SIGVTALRM, &sa).expect("install SIGVTALRM handler failed !");
}
}
}
}

impl<Yield, Return> Coroutine<'_, (), Yield, Return>
Expand Down Expand Up @@ -170,6 +199,8 @@ where
}
Self::init_current(self);
self.running()?;
#[cfg(unix)]
Self::setup_sigvtalrm_handler();
let r = self.raw_resume(arg);
Self::clean_current();
r
Expand Down
19 changes: 19 additions & 0 deletions core/src/coroutine/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ where
)))
}

/// running -> cancel
///
/// # Errors
/// if change state fails.
pub(super) fn cancel(&self) -> std::io::Result<()> {
let current = self.state();
if CoroutineState::Running == current {
let new_state = CoroutineState::Cancelled;
let old_state = self.change_state(new_state);
self.on_cancel(self, old_state);
return Ok(());
}
Err(Error::other(format!(
"{} unexpected {current}->{:?}",
self.name(),
CoroutineState::<Yield, Return>::Cancelled
)))
}

/// running -> complete
///
/// # Errors
Expand Down
37 changes: 37 additions & 0 deletions core/src/coroutine/suspender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ thread_local! {
#[allow(clippy::missing_const_for_thread_local)]
static TIMESTAMP: crossbeam_utils::atomic::AtomicCell<std::collections::VecDeque<u64>> =
const { crossbeam_utils::atomic::AtomicCell::new(std::collections::VecDeque::new()) };

#[allow(clippy::missing_const_for_thread_local)]
static CANCEL: crossbeam_utils::atomic::AtomicCell<std::collections::VecDeque<bool>> =
const { crossbeam_utils::atomic::AtomicCell::new(std::collections::VecDeque::new()) };
}

impl<Param, Yield> Suspender<'_, Param, Yield> {
Expand All @@ -30,6 +34,23 @@ impl<Param, Yield> Suspender<'_, Param, Yield> {
self.suspend_with(arg)
}

/// Cancel the execution of the coroutine.
pub fn cancel(&self) -> ! {
CANCEL.with(|s| unsafe {
s.as_ptr()
.as_mut()
.unwrap_or_else(|| {
panic!(
"thread:{} init CANCEL current failed",
std::thread::current().name().unwrap_or("unknown")
)
})
.push_front(true);
});
_ = self.suspend_with(unsafe { std::mem::zeroed() });
unreachable!()
}

pub(crate) fn timestamp() -> u64 {
TIMESTAMP
.with(|s| unsafe {
Expand All @@ -45,6 +66,22 @@ impl<Param, Yield> Suspender<'_, Param, Yield> {
})
.unwrap_or(0)
}

pub(crate) fn is_cancel() -> bool {
CANCEL
.with(|s| unsafe {
s.as_ptr()
.as_mut()
.unwrap_or_else(|| {
panic!(
"thread:{} get CANCEL current failed",
std::thread::current().name().unwrap_or("unknown")
)
})
.pop_front()
})
.unwrap_or(false)
}
}

#[allow(clippy::must_use_candidate)]
Expand Down
1 change: 1 addition & 0 deletions core/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ impl<Yield, Return> Listener<Yield, Return> for MonitorListener {
}
CoroutineState::Suspend(_, _)
| CoroutineState::Syscall(_, _, _)
| CoroutineState::Cancelled
| CoroutineState::Complete(_)
| CoroutineState::Error(_) => {
if let Some(node) = local.get(NOTIFY_NODE) {
Expand Down
5 changes: 5 additions & 0 deletions core/src/net/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ impl<'e> EventLoop<'e> {
})
}

/// Try to cancel a task from `CoroutinePool`.
pub(super) fn try_cancel_task(name: &str) {
CoroutinePool::try_cancel_task(name);
}

#[allow(trivial_numeric_casts, clippy::cast_possible_truncation)]
fn token(syscall: SyscallName) -> usize {
if let Some(co) = SchedulableCoroutine::current() {
Expand Down
5 changes: 5 additions & 0 deletions core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ impl EventLoops {
)
}

/// Try to cancel a task from event-loop.
pub fn try_cancel_task(name: &str) {
EventLoop::try_cancel_task(name);
}

/// Submit a new coroutine to event-loop.
///
/// Allow multiple threads to concurrently submit coroutine to the pool,
Expand Down
Loading
Loading