Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ jobs:
PROJECT_DIR: ${{ github.workspace }}
run: sh .github/workflows/ci.sh
- name: Run preemptive tests
if: always()
if: ${{ contains(fromJSON('["x86_64-unknown-linux-gnu", "i686-unknown-linux-gnu", "x86_64-apple-darwin", "aarch64-apple-darwin"]'), matrix.target) }}
env:
CHANNEL: ${{ matrix.channel }}
CROSS: ${{ !startsWith(matrix.target, 'x86_64') && contains(matrix.target, 'linux') && '1' || '0' }}
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,7 @@ nanosleep hooked

### 0.6.x

- [x] support custom task and coroutine priority.
- [x] support scalable stack

### 0.5.x
Expand Down
18 changes: 11 additions & 7 deletions core/src/co_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::co_pool::creator::CoroutineCreator;
use crate::co_pool::task::Task;
use crate::common::beans::BeanFactory;
use crate::common::constants::PoolState;
use crate::common::work_steal::{LocalQueue, WorkStealQueue};
use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue};
use crate::common::{get_timeout_time, now, CondvarBlocker};
use crate::coroutine::suspender::Suspender;
use crate::scheduler::{SchedulableCoroutine, Scheduler};
use crate::{impl_current_for, impl_display_by_debug, impl_for_named, trace};
use dashmap::DashMap;
use std::cell::Cell;
use std::ffi::c_longlong;
use std::io::{Error, ErrorKind};
use std::ops::{Deref, DerefMut};
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
Expand All @@ -31,7 +32,7 @@ pub struct CoroutinePool<'p> {
//协程池状态
state: Cell<PoolState>,
//任务队列
task_queue: LocalQueue<'p, Task<'p>>,
task_queue: OrderedLocalQueue<'p, Task<'p>>,
//工作协程组
workers: Scheduler<'p>,
//当前协程数
Expand Down Expand Up @@ -128,7 +129,7 @@ impl<'p> CoroutinePool<'p> {
pop_fail_times: AtomicUsize::new(0),
min_size: AtomicUsize::new(min_size),
max_size: AtomicUsize::new(max_size),
task_queue: BeanFactory::get_or_default::<WorkStealQueue<Task<'p>>>(
task_queue: BeanFactory::get_or_default::<OrderedWorkStealQueue<Task<'p>>>(
crate::common::constants::TASK_GLOBAL_QUEUE_BEAN,
)
.local_queue(),
Expand Down Expand Up @@ -210,6 +211,7 @@ impl<'p> CoroutinePool<'p> {
name: Option<String>,
func: impl FnOnce(Option<usize>) -> Option<usize> + 'p,
param: Option<usize>,
priority: Option<c_longlong>,
) -> std::io::Result<String> {
match self.state() {
PoolState::Running => {}
Expand All @@ -221,7 +223,7 @@ impl<'p> CoroutinePool<'p> {
}
}
let name = name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4()));
self.submit_raw_task(Task::new(name.clone(), func, param));
self.submit_raw_task(Task::new(name.clone(), func, param, priority));
Ok(name)
}

Expand All @@ -230,7 +232,7 @@ impl<'p> CoroutinePool<'p> {
/// Allow multiple threads to concurrently submit task to the pool,
/// but only allow one thread to execute scheduling.
pub(crate) fn submit_raw_task(&self, task: Task<'p>) {
self.task_queue.push_back(task);
self.task_queue.push(task);
self.blocker.notify();
}

Expand Down Expand Up @@ -338,6 +340,7 @@ impl<'p> CoroutinePool<'p> {
}
},
None,
None,
)
}

Expand All @@ -349,6 +352,7 @@ impl<'p> CoroutinePool<'p> {
&self,
f: impl FnOnce(&Suspender<(), ()>, ()) -> Option<usize> + 'static,
stack_size: Option<usize>,
priority: Option<c_longlong>,
) -> std::io::Result<()> {
if self.get_running_size() >= self.get_max_size() {
trace!(
Expand All @@ -360,7 +364,7 @@ impl<'p> CoroutinePool<'p> {
"The coroutine pool has reached its maximum size !",
));
}
self.deref().submit_co(f, stack_size).map(|()| {
self.deref().submit_co(f, stack_size, priority).map(|()| {
_ = self.running.fetch_add(1, Ordering::Release);
})
}
Expand All @@ -370,7 +374,7 @@ impl<'p> CoroutinePool<'p> {
}

fn try_run(&self) -> Option<()> {
self.task_queue.pop_front().map(|task| {
self.task_queue.pop().map(|task| {
let (task_name, result) = task.run();
assert!(
self.results.insert(task_name.clone(), result).is_none(),
Expand Down
13 changes: 13 additions & 0 deletions core/src/co_pool/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use crate::catch;
use crate::common::ordered_work_steal::Ordered;
use std::ffi::c_longlong;

/// 做C兼容时会用到
pub type UserTaskFunc = extern "C" fn(usize) -> usize;
Expand All @@ -12,6 +14,7 @@ pub struct Task<'t> {
#[educe(Debug(ignore))]
func: Box<dyn FnOnce(Option<usize>) -> Option<usize> + 't>,
param: Option<usize>,
priority: Option<c_longlong>,
}

impl<'t> Task<'t> {
Expand All @@ -20,11 +23,13 @@ impl<'t> Task<'t> {
name: String,
func: impl FnOnce(Option<usize>) -> Option<usize> + 't,
param: Option<usize>,
priority: Option<c_longlong>,
) -> Self {
Task {
name,
func: Box::new(func),
param,
priority,
}
}

Expand All @@ -44,6 +49,12 @@ impl<'t> Task<'t> {
}
}

impl Ordered for Task<'_> {
fn priority(&self) -> Option<c_longlong> {
self.priority
}
}

#[cfg(test)]
mod tests {
use crate::co_pool::task::Task;
Expand All @@ -57,6 +68,7 @@ mod tests {
p
},
None,
None,
);
assert_eq!((String::from("test"), Ok(None)), task.run());
}
Expand All @@ -69,6 +81,7 @@ mod tests {
panic!("test panic, just ignore it");
},
None,
None,
);
assert_eq!(
(String::from("test"), Err("test panic, just ignore it")),
Expand Down
28 changes: 14 additions & 14 deletions core/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,19 @@ pub mod timer;
/// queue.push(7);
///
/// let local0 = queue.local_queue();
/// local0.push_back(2);
/// local0.push_back(3);
/// local0.push_back(4);
/// local0.push_back(5);
/// local0.push(2);
/// local0.push(3);
/// local0.push(4);
/// local0.push(5);
///
/// let local1 = queue.local_queue();
/// local1.push_back(0);
/// local1.push_back(1);
/// local1.push(0);
/// local1.push(1);
/// for i in 0..8 {
/// assert_eq!(local1.pop_front(), Some(i));
/// assert_eq!(local1.pop(), Some(i));
/// }
/// assert_eq!(local0.pop_front(), None);
/// assert_eq!(local1.pop_front(), None);
/// assert_eq!(local0.pop(), None);
/// assert_eq!(local1.pop(), None);
/// assert_eq!(queue.pop(), None);
/// ```
///
Expand Down Expand Up @@ -83,16 +83,16 @@ pub mod work_steal;
/// local1.push_with_priority(i, i);
/// }
/// for i in 0..2 {
/// assert_eq!(local1.pop_front(), Some(i));
/// assert_eq!(local1.pop(), Some(i));
/// }
/// for i in (2..6).rev() {
/// assert_eq!(local1.pop_front(), Some(i));
/// assert_eq!(local1.pop(), Some(i));
/// }
/// for i in 6..8 {
/// assert_eq!(local1.pop_front(), Some(i));
/// assert_eq!(local1.pop(), Some(i));
/// }
/// assert_eq!(local0.pop_front(), None);
/// assert_eq!(local1.pop_front(), None);
/// assert_eq!(local0.pop(), None);
/// assert_eq!(local1.pop(), None);
/// assert_eq!(queue.pop(), None);
/// ```
///
Expand Down
51 changes: 27 additions & 24 deletions core/src/common/ordered_work_steal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ use std::ffi::c_longlong;
use std::fmt::Debug;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};

/// The highest precedence.
pub const HIGHEST_PRECEDENCE: c_longlong = c_longlong::MIN;

/// The lowest precedence.
pub const LOWEST_PRECEDENCE: c_longlong = c_longlong::MAX;

/// The default precedence.
pub const DEFAULT_PRECEDENCE: c_longlong = 0;

/// Ordered trait for user's datastructures.
pub trait Ordered {
/// The highest precedence.
const HIGHEST_PRECEDENCE: c_longlong = c_longlong::MIN;
/// The lowest precedence.
const LOWEST_PRECEDENCE: c_longlong = c_longlong::MAX;
/// The default precedence.
const DEFAULT_PRECEDENCE: c_longlong = 0;
/// Get the priority of the element.
fn priority(&self) -> c_longlong;
fn priority(&self) -> Option<c_longlong>;
}

/// Work stealing global queue, shared by multiple threads.
Expand Down Expand Up @@ -48,7 +51,7 @@ impl<T: Debug> Drop for OrderedWorkStealQueue<T> {
impl<T: Debug + Ordered> OrderedWorkStealQueue<T> {
/// Push an element to the global queue.
pub fn push(&self, item: T) {
self.push_with_priority(item.priority(), item);
self.push_with_priority(item.priority().unwrap_or(DEFAULT_PRECEDENCE), item);
}
}

Expand Down Expand Up @@ -159,7 +162,7 @@ impl<T: Debug + Ordered> OrderedLocalQueue<'_, T> {
/// If the queue is full, first push half to global,
/// then push the item to global.
pub fn push(&self, item: T) {
self.push_with_priority(item.priority(), item);
self.push_with_priority(item.priority().unwrap_or(DEFAULT_PRECEDENCE), item);
}
}

Expand Down Expand Up @@ -196,10 +199,10 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
/// local.push_with_priority(i, i);
/// }
/// assert!(local.is_full());
/// assert_eq!(local.pop_front(), Some(0));
/// assert_eq!(local.pop(), Some(0));
/// assert_eq!(local.len(), 1);
/// assert_eq!(local.pop_front(), Some(1));
/// assert_eq!(local.pop_front(), None);
/// assert_eq!(local.pop(), Some(1));
/// assert_eq!(local.pop(), None);
/// assert!(local.is_empty());
/// ```
pub fn is_full(&self) -> bool {
Expand Down Expand Up @@ -253,9 +256,9 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
/// local.push_with_priority(i, i);
/// }
/// for i in 0..4 {
/// assert_eq!(local.pop_front(), Some(i));
/// assert_eq!(local.pop(), Some(i));
/// }
/// assert_eq!(local.pop_front(), None);
/// assert_eq!(local.pop(), None);
/// ```
pub fn push_with_priority(&self, priority: c_longlong, item: T) {
if self.is_full() {
Expand Down Expand Up @@ -314,9 +317,9 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
/// }
/// let local = queue.local_queue();
/// for i in 0..4 {
/// assert_eq!(local.pop_front(), Some(i));
/// assert_eq!(local.pop(), Some(i));
/// }
/// assert_eq!(local.pop_front(), None);
/// assert_eq!(local.pop(), None);
/// assert_eq!(queue.pop(), None);
/// ```
///
Expand All @@ -336,23 +339,23 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
/// }
/// assert_eq!(local1.len(), 2);
/// for i in 0..2 {
/// assert_eq!(local1.pop_front(), Some(i));
/// assert_eq!(local1.pop(), Some(i));
/// }
/// for i in (2..6).rev() {
/// assert_eq!(local1.pop_front(), Some(i));
/// assert_eq!(local1.pop(), Some(i));
/// }
/// assert_eq!(local0.pop_front(), None);
/// assert_eq!(local1.pop_front(), None);
/// assert_eq!(local0.pop(), None);
/// assert_eq!(local1.pop(), None);
/// assert_eq!(queue.pop(), None);
/// ```
pub fn pop_front(&self) -> Option<T> {
pub fn pop(&self) -> Option<T> {
//每从本地弹出61次,就从全局队列弹出
if self.tick() % 61 == 0 {
if let Some(val) = self.shared.pop() {
return Some(val);
}
}
if let Some(val) = self.pop() {
if let Some(val) = self.pop_local() {
return Some(val);
}
if self.try_lock() {
Expand Down Expand Up @@ -398,7 +401,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
.is_ok()
{
self.release_lock();
return self.pop();
return self.pop_local();
}
}
}
Expand All @@ -409,7 +412,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> {
self.shared.pop()
}

fn pop(&self) -> Option<T> {
fn pop_local(&self) -> Option<T> {
//从本地队列弹出元素
for entry in self.queue {
if let Some(val) = entry.value().pop() {
Expand Down
Loading
Loading