From 5ac25260f908efefe159dac6c1b35682ef85ab21 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Fri, 29 Nov 2024 16:06:42 +0800 Subject: [PATCH 1/3] use OrderedLocalQueue --- core/src/co_pool/mod.rs | 18 ++++++---- core/src/co_pool/task.rs | 13 +++++++ core/src/common/mod.rs | 28 +++++++-------- core/src/common/ordered_work_steal.rs | 51 ++++++++++++++------------- core/src/common/work_steal.rs | 46 ++++++++++++------------ core/src/coroutine/korosensei.rs | 20 +++++++---- core/src/coroutine/mod.rs | 38 ++++++++++++++++---- core/src/net/event_loop.rs | 6 ++-- core/src/net/mod.rs | 18 ++++++---- core/src/scheduler.rs | 21 ++++++----- core/tests/co_pool.rs | 7 +++- core/tests/scheduler.rs | 11 ++++-- hook/src/lib.rs | 9 +++-- open-coroutine/src/lib.rs | 22 ++++++++++-- 14 files changed, 202 insertions(+), 106 deletions(-) diff --git a/core/src/co_pool/mod.rs b/core/src/co_pool/mod.rs index 716ef662..a0a80eb1 100644 --- a/core/src/co_pool/mod.rs +++ b/core/src/co_pool/mod.rs @@ -2,13 +2,14 @@ use crate::co_pool::creator::CoroutineCreator; use crate::co_pool::task::Task; use crate::common::beans::BeanFactory; use crate::common::constants::PoolState; -use crate::common::work_steal::{LocalQueue, WorkStealQueue}; +use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue}; use crate::common::{get_timeout_time, now, CondvarBlocker}; use crate::coroutine::suspender::Suspender; use crate::scheduler::{SchedulableCoroutine, Scheduler}; use crate::{impl_current_for, impl_display_by_debug, impl_for_named, trace}; use dashmap::DashMap; use std::cell::Cell; +use std::ffi::c_longlong; use std::io::{Error, ErrorKind}; use std::ops::{Deref, DerefMut}; use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; @@ -31,7 +32,7 @@ pub struct CoroutinePool<'p> { //协程池状态 state: Cell, //任务队列 - task_queue: LocalQueue<'p, Task<'p>>, + task_queue: OrderedLocalQueue<'p, Task<'p>>, //工作协程组 workers: Scheduler<'p>, //当前协程数 @@ -128,7 +129,7 @@ impl<'p> CoroutinePool<'p> { pop_fail_times: AtomicUsize::new(0), min_size: AtomicUsize::new(min_size), max_size: AtomicUsize::new(max_size), - task_queue: BeanFactory::get_or_default::>>( + task_queue: BeanFactory::get_or_default::>>( crate::common::constants::TASK_GLOBAL_QUEUE_BEAN, ) .local_queue(), @@ -210,6 +211,7 @@ impl<'p> CoroutinePool<'p> { name: Option, func: impl FnOnce(Option) -> Option + 'p, param: Option, + priority: Option, ) -> std::io::Result { match self.state() { PoolState::Running => {} @@ -221,7 +223,7 @@ impl<'p> CoroutinePool<'p> { } } let name = name.unwrap_or(format!("{}@{}", self.name(), uuid::Uuid::new_v4())); - self.submit_raw_task(Task::new(name.clone(), func, param)); + self.submit_raw_task(Task::new(name.clone(), func, param, priority)); Ok(name) } @@ -230,7 +232,7 @@ impl<'p> CoroutinePool<'p> { /// Allow multiple threads to concurrently submit task to the pool, /// but only allow one thread to execute scheduling. pub(crate) fn submit_raw_task(&self, task: Task<'p>) { - self.task_queue.push_back(task); + self.task_queue.push(task); self.blocker.notify(); } @@ -338,6 +340,7 @@ impl<'p> CoroutinePool<'p> { } }, None, + None, ) } @@ -349,6 +352,7 @@ impl<'p> CoroutinePool<'p> { &self, f: impl FnOnce(&Suspender<(), ()>, ()) -> Option + 'static, stack_size: Option, + priority: Option, ) -> std::io::Result<()> { if self.get_running_size() >= self.get_max_size() { trace!( @@ -360,7 +364,7 @@ impl<'p> CoroutinePool<'p> { "The coroutine pool has reached its maximum size !", )); } - self.deref().submit_co(f, stack_size).map(|()| { + self.deref().submit_co(f, stack_size, priority).map(|()| { _ = self.running.fetch_add(1, Ordering::Release); }) } @@ -370,7 +374,7 @@ impl<'p> CoroutinePool<'p> { } fn try_run(&self) -> Option<()> { - self.task_queue.pop_front().map(|task| { + self.task_queue.pop().map(|task| { let (task_name, result) = task.run(); assert!( self.results.insert(task_name.clone(), result).is_none(), diff --git a/core/src/co_pool/task.rs b/core/src/co_pool/task.rs index ef374143..12fffa45 100644 --- a/core/src/co_pool/task.rs +++ b/core/src/co_pool/task.rs @@ -1,4 +1,6 @@ use crate::catch; +use crate::common::ordered_work_steal::Ordered; +use std::ffi::c_longlong; /// 做C兼容时会用到 pub type UserTaskFunc = extern "C" fn(usize) -> usize; @@ -12,6 +14,7 @@ pub struct Task<'t> { #[educe(Debug(ignore))] func: Box) -> Option + 't>, param: Option, + priority: Option, } impl<'t> Task<'t> { @@ -20,11 +23,13 @@ impl<'t> Task<'t> { name: String, func: impl FnOnce(Option) -> Option + 't, param: Option, + priority: Option, ) -> Self { Task { name, func: Box::new(func), param, + priority, } } @@ -44,6 +49,12 @@ impl<'t> Task<'t> { } } +impl Ordered for Task<'_> { + fn priority(&self) -> Option { + self.priority + } +} + #[cfg(test)] mod tests { use crate::co_pool::task::Task; @@ -57,6 +68,7 @@ mod tests { p }, None, + None, ); assert_eq!((String::from("test"), Ok(None)), task.run()); } @@ -69,6 +81,7 @@ mod tests { panic!("test panic, just ignore it"); }, None, + None, ); assert_eq!( (String::from("test"), Err("test panic, just ignore it")), diff --git a/core/src/common/mod.rs b/core/src/common/mod.rs index 19f62213..ef3cdf80 100644 --- a/core/src/common/mod.rs +++ b/core/src/common/mod.rs @@ -38,19 +38,19 @@ pub mod timer; /// queue.push(7); /// /// let local0 = queue.local_queue(); -/// local0.push_back(2); -/// local0.push_back(3); -/// local0.push_back(4); -/// local0.push_back(5); +/// local0.push(2); +/// local0.push(3); +/// local0.push(4); +/// local0.push(5); /// /// let local1 = queue.local_queue(); -/// local1.push_back(0); -/// local1.push_back(1); +/// local1.push(0); +/// local1.push(1); /// for i in 0..8 { -/// assert_eq!(local1.pop_front(), Some(i)); +/// assert_eq!(local1.pop(), Some(i)); /// } -/// assert_eq!(local0.pop_front(), None); -/// assert_eq!(local1.pop_front(), None); +/// assert_eq!(local0.pop(), None); +/// assert_eq!(local1.pop(), None); /// assert_eq!(queue.pop(), None); /// ``` /// @@ -83,16 +83,16 @@ pub mod work_steal; /// local1.push_with_priority(i, i); /// } /// for i in 0..2 { -/// assert_eq!(local1.pop_front(), Some(i)); +/// assert_eq!(local1.pop(), Some(i)); /// } /// for i in (2..6).rev() { -/// assert_eq!(local1.pop_front(), Some(i)); +/// assert_eq!(local1.pop(), Some(i)); /// } /// for i in 6..8 { -/// assert_eq!(local1.pop_front(), Some(i)); +/// assert_eq!(local1.pop(), Some(i)); /// } -/// assert_eq!(local0.pop_front(), None); -/// assert_eq!(local1.pop_front(), None); +/// assert_eq!(local0.pop(), None); +/// assert_eq!(local1.pop(), None); /// assert_eq!(queue.pop(), None); /// ``` /// diff --git a/core/src/common/ordered_work_steal.rs b/core/src/common/ordered_work_steal.rs index a7493dc4..c9293d3f 100644 --- a/core/src/common/ordered_work_steal.rs +++ b/core/src/common/ordered_work_steal.rs @@ -7,16 +7,19 @@ use std::ffi::c_longlong; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering}; +/// The highest precedence. +pub const HIGHEST_PRECEDENCE: c_longlong = c_longlong::MIN; + +/// The lowest precedence. +pub const LOWEST_PRECEDENCE: c_longlong = c_longlong::MAX; + +/// The default precedence. +pub const DEFAULT_PRECEDENCE: c_longlong = 0; + /// Ordered trait for user's datastructures. pub trait Ordered { - /// The highest precedence. - const HIGHEST_PRECEDENCE: c_longlong = c_longlong::MIN; - /// The lowest precedence. - const LOWEST_PRECEDENCE: c_longlong = c_longlong::MAX; - /// The default precedence. - const DEFAULT_PRECEDENCE: c_longlong = 0; /// Get the priority of the element. - fn priority(&self) -> c_longlong; + fn priority(&self) -> Option; } /// Work stealing global queue, shared by multiple threads. @@ -48,7 +51,7 @@ impl Drop for OrderedWorkStealQueue { impl OrderedWorkStealQueue { /// Push an element to the global queue. pub fn push(&self, item: T) { - self.push_with_priority(item.priority(), item); + self.push_with_priority(item.priority().unwrap_or(DEFAULT_PRECEDENCE), item); } } @@ -159,7 +162,7 @@ impl OrderedLocalQueue<'_, T> { /// If the queue is full, first push half to global, /// then push the item to global. pub fn push(&self, item: T) { - self.push_with_priority(item.priority(), item); + self.push_with_priority(item.priority().unwrap_or(DEFAULT_PRECEDENCE), item); } } @@ -196,10 +199,10 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { /// local.push_with_priority(i, i); /// } /// assert!(local.is_full()); - /// assert_eq!(local.pop_front(), Some(0)); + /// assert_eq!(local.pop(), Some(0)); /// assert_eq!(local.len(), 1); - /// assert_eq!(local.pop_front(), Some(1)); - /// assert_eq!(local.pop_front(), None); + /// assert_eq!(local.pop(), Some(1)); + /// assert_eq!(local.pop(), None); /// assert!(local.is_empty()); /// ``` pub fn is_full(&self) -> bool { @@ -253,9 +256,9 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { /// local.push_with_priority(i, i); /// } /// for i in 0..4 { - /// assert_eq!(local.pop_front(), Some(i)); + /// assert_eq!(local.pop(), Some(i)); /// } - /// assert_eq!(local.pop_front(), None); + /// assert_eq!(local.pop(), None); /// ``` pub fn push_with_priority(&self, priority: c_longlong, item: T) { if self.is_full() { @@ -314,9 +317,9 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { /// } /// let local = queue.local_queue(); /// for i in 0..4 { - /// assert_eq!(local.pop_front(), Some(i)); + /// assert_eq!(local.pop(), Some(i)); /// } - /// assert_eq!(local.pop_front(), None); + /// assert_eq!(local.pop(), None); /// assert_eq!(queue.pop(), None); /// ``` /// @@ -336,23 +339,23 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { /// } /// assert_eq!(local1.len(), 2); /// for i in 0..2 { - /// assert_eq!(local1.pop_front(), Some(i)); + /// assert_eq!(local1.pop(), Some(i)); /// } /// for i in (2..6).rev() { - /// assert_eq!(local1.pop_front(), Some(i)); + /// assert_eq!(local1.pop(), Some(i)); /// } - /// assert_eq!(local0.pop_front(), None); - /// assert_eq!(local1.pop_front(), None); + /// assert_eq!(local0.pop(), None); + /// assert_eq!(local1.pop(), None); /// assert_eq!(queue.pop(), None); /// ``` - pub fn pop_front(&self) -> Option { + pub fn pop(&self) -> Option { //每从本地弹出61次,就从全局队列弹出 if self.tick() % 61 == 0 { if let Some(val) = self.shared.pop() { return Some(val); } } - if let Some(val) = self.pop() { + if let Some(val) = self.pop_local() { return Some(val); } if self.try_lock() { @@ -398,7 +401,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { .is_ok() { self.release_lock(); - return self.pop(); + return self.pop_local(); } } } @@ -409,7 +412,7 @@ impl<'l, T: Debug> OrderedLocalQueue<'l, T> { self.shared.pop() } - fn pop(&self) -> Option { + fn pop_local(&self) -> Option { //从本地队列弹出元素 for entry in self.queue { if let Some(val) = entry.value().pop() { diff --git a/core/src/common/work_steal.rs b/core/src/common/work_steal.rs index a2590b34..3477e299 100644 --- a/core/src/common/work_steal.rs +++ b/core/src/common/work_steal.rs @@ -149,13 +149,13 @@ impl<'l, T: Debug> LocalQueue<'l, T> { /// let local = queue.local_queue(); /// assert!(local.is_empty()); /// for i in 0..2 { - /// local.push_back(i); + /// local.push(i); /// } /// assert!(local.is_full()); - /// assert_eq!(local.pop_front(), Some(0)); + /// assert_eq!(local.pop(), Some(0)); /// assert_eq!(local.len(), 1); - /// assert_eq!(local.pop_front(), Some(1)); - /// assert_eq!(local.pop_front(), None); + /// assert_eq!(local.pop(), Some(1)); + /// assert_eq!(local.pop(), None); /// assert!(local.is_empty()); /// ``` pub fn is_full(&self) -> bool { @@ -202,15 +202,15 @@ impl<'l, T: Debug> LocalQueue<'l, T> { /// let queue = WorkStealQueue::new(1, 2); /// let local = queue.local_queue(); /// for i in 0..4 { - /// local.push_back(i); + /// local.push(i); /// } - /// assert_eq!(local.pop_front(), Some(1)); - /// assert_eq!(local.pop_front(), Some(3)); - /// assert_eq!(local.pop_front(), Some(0)); - /// assert_eq!(local.pop_front(), Some(2)); - /// assert_eq!(local.pop_front(), None); + /// assert_eq!(local.pop(), Some(1)); + /// assert_eq!(local.pop(), Some(3)); + /// assert_eq!(local.pop(), Some(0)); + /// assert_eq!(local.pop(), Some(2)); + /// assert_eq!(local.pop(), None); /// ``` - pub fn push_back(&self, item: T) { + pub fn push(&self, item: T) { if let Err(item) = self.queue.push(item) { //把本地队列的一半放到全局队列 let count = self.len() / 2; @@ -248,9 +248,9 @@ impl<'l, T: Debug> LocalQueue<'l, T> { /// } /// let local = queue.local_queue(); /// for i in 0..4 { - /// assert_eq!(local.pop_front(), Some(i)); + /// assert_eq!(local.pop(), Some(i)); /// } - /// assert_eq!(local.pop_front(), None); + /// assert_eq!(local.pop(), None); /// assert_eq!(queue.pop(), None); /// ``` /// @@ -260,23 +260,23 @@ impl<'l, T: Debug> LocalQueue<'l, T> { /// /// let queue = WorkStealQueue::new(2, 64); /// let local0 = queue.local_queue(); - /// local0.push_back(2); - /// local0.push_back(3); - /// local0.push_back(4); - /// local0.push_back(5); + /// local0.push(2); + /// local0.push(3); + /// local0.push(4); + /// local0.push(5); /// assert_eq!(local0.len(), 4); /// let local1 = queue.local_queue(); - /// local1.push_back(0); - /// local1.push_back(1); + /// local1.push(0); + /// local1.push(1); /// assert_eq!(local1.len(), 2); /// for i in 0..6 { - /// assert_eq!(local1.pop_front(), Some(i)); + /// assert_eq!(local1.pop(), Some(i)); /// } - /// assert_eq!(local0.pop_front(), None); - /// assert_eq!(local1.pop_front(), None); + /// assert_eq!(local0.pop(), None); + /// assert_eq!(local1.pop(), None); /// assert_eq!(queue.pop(), None); /// ``` - pub fn pop_front(&self) -> Option { + pub fn pop(&self) -> Option { //每从本地弹出61次,就从全局队列弹出 if self.tick() % 61 == 0 { if let Some(val) = self.shared.pop() { diff --git a/core/src/coroutine/korosensei.rs b/core/src/coroutine/korosensei.rs index 7697b4e6..31e9886b 100644 --- a/core/src/coroutine/korosensei.rs +++ b/core/src/coroutine/korosensei.rs @@ -9,6 +9,7 @@ use corosensei::trap::TrapHandlerRegs; use corosensei::CoroutineResult; use std::cell::{Cell, RefCell}; use std::collections::VecDeque; +use std::ffi::c_longlong; use std::fmt::Debug; use std::io::{Error, ErrorKind}; @@ -31,6 +32,7 @@ pub struct Coroutine<'c, Param, Yield, Return> { pub(crate) stack_infos: RefCell>, pub(crate) listeners: VecDeque<&'c dyn Listener>, pub(crate) local: CoroutineLocal<'c>, + pub(crate) priority: Option, } impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { @@ -97,11 +99,11 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> { target_arch = "x86_64", ))] { let TrapHandlerRegs { rip, rsp, rbp, rdi, rsi } = regs; - context.uc_mcontext.gregs[usize::try_from(libc::REG_RIP).expect("overflow")] = std::ffi::c_longlong::try_from(rip).expect("overflow"); - context.uc_mcontext.gregs[usize::try_from(libc::REG_RSP).expect("overflow")] = std::ffi::c_longlong::try_from(rsp).expect("overflow"); - context.uc_mcontext.gregs[usize::try_from(libc::REG_RBP).expect("overflow")] = std::ffi::c_longlong::try_from(rbp).expect("overflow"); - context.uc_mcontext.gregs[usize::try_from(libc::REG_RDI).expect("overflow")] = std::ffi::c_longlong::try_from(rdi).expect("overflow"); - context.uc_mcontext.gregs[usize::try_from(libc::REG_RSI).expect("overflow")] = std::ffi::c_longlong::try_from(rsi).expect("overflow"); + context.uc_mcontext.gregs[usize::try_from(libc::REG_RIP).expect("overflow")] = c_longlong::try_from(rip).expect("overflow"); + context.uc_mcontext.gregs[usize::try_from(libc::REG_RSP).expect("overflow")] = c_longlong::try_from(rsp).expect("overflow"); + context.uc_mcontext.gregs[usize::try_from(libc::REG_RBP).expect("overflow")] = c_longlong::try_from(rbp).expect("overflow"); + context.uc_mcontext.gregs[usize::try_from(libc::REG_RDI).expect("overflow")] = c_longlong::try_from(rdi).expect("overflow"); + context.uc_mcontext.gregs[usize::try_from(libc::REG_RSI).expect("overflow")] = c_longlong::try_from(rsi).expect("overflow"); } else if #[cfg(all( any(target_os = "linux", target_os = "android"), target_arch = "x86", @@ -345,7 +347,12 @@ where /// ///# Errors /// if stack allocate failed. - pub fn new(name: String, f: F, stack_size: usize) -> std::io::Result + pub fn new( + name: String, + f: F, + stack_size: usize, + priority: Option, + ) -> std::io::Result where F: FnOnce(&Suspender, Param) -> Return + 'static, { @@ -378,6 +385,7 @@ where state: Cell::new(CoroutineState::Ready), listeners: VecDeque::default(), local: CoroutineLocal::default(), + priority, }; #[cfg(all(unix, feature = "preemptive"))] co.add_listener(crate::monitor::MonitorListener::default()); diff --git a/core/src/coroutine/mod.rs b/core/src/coroutine/mod.rs index 950d4c8e..125f007e 100644 --- a/core/src/coroutine/mod.rs +++ b/core/src/coroutine/mod.rs @@ -3,6 +3,7 @@ use crate::coroutine::listener::Listener; use crate::coroutine::local::CoroutineLocal; use crate::{impl_current_for, impl_display_by_debug, impl_for_named}; use std::collections::VecDeque; +use std::ffi::c_longlong; use std::fmt::{Debug, Formatter}; use std::ops::Deref; @@ -15,29 +16,48 @@ pub mod local; /// Coroutine listener abstraction and impl. pub mod listener; +use crate::common::ordered_work_steal::Ordered; #[cfg(feature = "korosensei")] pub use korosensei::Coroutine; + #[cfg(feature = "korosensei")] mod korosensei; /// Create a new coroutine. #[macro_export] macro_rules! co { - ($f:expr, $size:literal $(,)?) => { - $crate::coroutine::Coroutine::new(uuid::Uuid::new_v4().to_string(), $f, $size) + ($name:expr, $f:expr, $size:expr, $priority:expr $(,)?) => { + $crate::coroutine::Coroutine::new($name, $f, $size, $priority) }; - ($f:expr $(,)?) => { + ($f:expr, $size:literal, $priority:literal $(,)?) => { $crate::coroutine::Coroutine::new( uuid::Uuid::new_v4().to_string(), $f, - $crate::common::constants::DEFAULT_STACK_SIZE, + $size, + Some($priority), ) }; ($name:expr, $f:expr, $size:expr $(,)?) => { - $crate::coroutine::Coroutine::new($name, $f, $size) + $crate::coroutine::Coroutine::new($name, $f, $size, None) + }; + ($f:expr, $size:literal $(,)?) => { + $crate::coroutine::Coroutine::new(uuid::Uuid::new_v4().to_string(), $f, $size, None) }; ($name:expr, $f:expr $(,)?) => { - $crate::coroutine::Coroutine::new($name, $f, $crate::common::constants::DEFAULT_STACK_SIZE) + $crate::coroutine::Coroutine::new( + $name, + $f, + $crate::common::constants::DEFAULT_STACK_SIZE, + None, + ) + }; + ($f:expr $(,)?) => { + $crate::coroutine::Coroutine::new( + uuid::Uuid::new_v4().to_string(), + $f, + $crate::common::constants::DEFAULT_STACK_SIZE, + None, + ) }; } @@ -193,6 +213,12 @@ impl<'c, Param, Yield, Return> Deref for Coroutine<'c, Param, Yield, Return> { } } +impl Ordered for Coroutine<'_, Param, Yield, Return> { + fn priority(&self) -> Option { + self.priority + } +} + impl_display_by_debug!( Coroutine<'c, Param, Yield, Return> where diff --git a/core/src/net/event_loop.rs b/core/src/net/event_loop.rs index a5eb1215..acf1d374 100644 --- a/core/src/net/event_loop.rs +++ b/core/src/net/event_loop.rs @@ -456,7 +456,7 @@ mod tests { fn test_simple() -> std::io::Result<()> { let mut event_loop = EventLoop::default(); event_loop.set_max_size(1); - _ = event_loop.submit_task(None, |_| panic!("test panic, just ignore it"), None)?; + _ = event_loop.submit_task(None, |_| panic!("test panic, just ignore it"), None, None)?; _ = event_loop.submit_task( None, |_| { @@ -464,6 +464,7 @@ mod tests { Some(2) }, None, + None, )?; event_loop.stop_sync(Duration::from_secs(3)) } @@ -473,7 +474,7 @@ mod tests { fn test_simple_auto() -> std::io::Result<()> { let event_loop = EventLoop::default().start()?; event_loop.set_max_size(1); - _ = event_loop.submit_task(None, |_| panic!("test panic, just ignore it"), None)?; + _ = event_loop.submit_task(None, |_| panic!("test panic, just ignore it"), None, None)?; _ = event_loop.submit_task( None, |_| { @@ -481,6 +482,7 @@ mod tests { Some(2) }, None, + None, )?; event_loop.stop(Duration::from_secs(3)) } diff --git a/core/src/net/mod.rs b/core/src/net/mod.rs index 662bfd48..9bbcc28a 100644 --- a/core/src/net/mod.rs +++ b/core/src/net/mod.rs @@ -5,7 +5,7 @@ use crate::net::join::JoinHandle; use crate::{error, info}; use once_cell::sync::OnceCell; use std::collections::VecDeque; -use std::ffi::c_int; +use std::ffi::{c_int, c_longlong}; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Condvar, Mutex}; @@ -14,7 +14,7 @@ use std::time::Duration; cfg_if::cfg_if! { if #[cfg(all(target_os = "linux", feature = "io_uring"))] { use libc::{epoll_event, iovec, msghdr, off_t, size_t, sockaddr, socklen_t}; - use std::ffi::{c_longlong, c_void}; + use std::ffi::c_void; } } @@ -133,12 +133,15 @@ impl EventLoops { name: Option, func: impl FnOnce(Option) -> Option + 'static, param: Option, + priority: Option, ) -> JoinHandle { let event_loop = Self::round_robin(); - event_loop.submit_task(name, func, param).map_or_else( - |_| JoinHandle::err(event_loop), - |n| JoinHandle::new(event_loop, n.as_str()), - ) + event_loop + .submit_task(name, func, param, priority) + .map_or_else( + |_| JoinHandle::err(event_loop), + |n| JoinHandle::new(event_loop, n.as_str()), + ) } /// Submit a new coroutine to event-loop. @@ -148,8 +151,9 @@ impl EventLoops { pub fn submit_co( f: impl FnOnce(&Suspender<(), ()>, ()) -> Option + 'static, stack_size: Option, + priority: Option, ) -> std::io::Result<()> { - Self::round_robin().submit_co(f, stack_size) + Self::round_robin().submit_co(f, stack_size, priority) } /// Waiting for read or write events to occur. diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs index 2b619ad1..8fb9e2c5 100644 --- a/core/src/scheduler.rs +++ b/core/src/scheduler.rs @@ -1,7 +1,7 @@ use crate::common::beans::BeanFactory; use crate::common::constants::{CoroutineState, SyscallState}; +use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue}; use crate::common::timer::TimerList; -use crate::common::work_steal::{LocalQueue, WorkStealQueue}; use crate::common::{get_timeout_time, now}; use crate::coroutine::listener::Listener; use crate::coroutine::suspender::Suspender; @@ -9,6 +9,7 @@ use crate::coroutine::Coroutine; use crate::{co, impl_current_for, impl_display_by_debug, impl_for_named}; use dashmap::DashMap; use std::collections::VecDeque; +use std::ffi::c_longlong; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicUsize, Ordering}; use std::time::Duration; @@ -29,7 +30,7 @@ pub struct Scheduler<'s> { name: String, stack_size: AtomicUsize, listeners: VecDeque<&'s dyn Listener<(), Option>>, - ready: LocalQueue<'s, SchedulableCoroutine<'s>>, + ready: OrderedLocalQueue<'s, SchedulableCoroutine<'s>>, suspend: TimerList>, syscall: DashMap<&'s str, SchedulableCoroutine<'s>>, syscall_suspend: TimerList<&'s str>, @@ -85,7 +86,7 @@ impl<'s> Scheduler<'s> { name, stack_size: AtomicUsize::new(stack_size), listeners: VecDeque::new(), - ready: BeanFactory::get_or_default::>( + ready: BeanFactory::get_or_default::>( crate::common::constants::COROUTINE_GLOBAL_QUEUE_BEAN, ) .local_queue(), @@ -118,11 +119,13 @@ impl<'s> Scheduler<'s> { &self, f: impl FnOnce(&Suspender<(), ()>, ()) -> Option + 'static, stack_size: Option, + priority: Option, ) -> std::io::Result<()> { let mut co = co!( format!("{}@{}", self.name(), uuid::Uuid::new_v4()), f, stack_size.unwrap_or(self.stack_size()), + priority )?; for listener in self.listeners.clone() { co.add_raw_listener(listener); @@ -141,7 +144,7 @@ impl<'s> Scheduler<'s> { /// Allow multiple threads to concurrently submit coroutine to the scheduler, /// but only allow one thread to execute scheduling. pub fn submit_raw_co(&self, coroutine: SchedulableCoroutine<'s>) -> std::io::Result<()> { - self.ready.push_back(coroutine); + self.ready.push(coroutine); Ok(()) } @@ -161,7 +164,7 @@ impl<'s> Scheduler<'s> { } _ => unreachable!("try_resume unexpect CoroutineState"), } - self.ready.push_back(co); + self.ready.push(co); } } @@ -211,7 +214,7 @@ impl<'s> Scheduler<'s> { } self.check_ready()?; // schedule coroutines - if let Some(mut coroutine) = self.ready.pop_front() { + if let Some(mut coroutine) = self.ready.pop() { match coroutine.resume()? { CoroutineState::SystemCall((), _, state) => { //挂起协程到系统调用表 @@ -228,7 +231,7 @@ impl<'s> Scheduler<'s> { self.suspend.insert(timestamp, coroutine); } else { //放入就绪队列尾部 - self.ready.push_back(coroutine); + self.ready.push(coroutine); } } CoroutineState::Complete(result) => { @@ -268,7 +271,7 @@ impl<'s> Scheduler<'s> { if let Some((_, mut entry)) = self.suspend.pop_front() { while let Some(coroutine) = entry.pop_front() { coroutine.ready()?; - self.ready.push_back(coroutine); + self.ready.push(coroutine); } } } @@ -289,7 +292,7 @@ impl<'s> Scheduler<'s> { SyscallState::Suspend(_), ) => { co.syscall(val, syscall, SyscallState::Timeout)?; - self.ready.push_back(co); + self.ready.push(co); } _ => unreachable!("check_ready should never execute to here"), } diff --git a/core/tests/co_pool.rs b/core/tests/co_pool.rs index 081c2307..39f86e65 100644 --- a/core/tests/co_pool.rs +++ b/core/tests/co_pool.rs @@ -9,6 +9,7 @@ fn co_pool_basic() -> std::io::Result<()> { Some(String::from("test_panic")), |_| panic!("test panic, just ignore it"), None, + None, )?; assert!(!pool.is_empty()); pool.submit_task( @@ -18,6 +19,7 @@ fn co_pool_basic() -> std::io::Result<()> { Some(2) }, None, + None, )?; pool.try_schedule_task() } @@ -39,6 +41,7 @@ fn co_pool_suspend() -> std::io::Result<()> { param }, None, + None, )?; _ = pool.submit_task( None, @@ -47,6 +50,7 @@ fn co_pool_suspend() -> std::io::Result<()> { Some(1) }, None, + None, )?; pool.try_schedule_task()?; std::thread::sleep(std::time::Duration::from_millis(200)); @@ -58,7 +62,7 @@ fn co_pool_suspend() -> std::io::Result<()> { fn co_pool_stop() -> std::io::Result<()> { let pool = open_coroutine_core::co_pool::CoroutinePool::default(); pool.set_max_size(1); - _ = pool.submit_task(None, |_| panic!("test panic, just ignore it"), None)?; + _ = pool.submit_task(None, |_| panic!("test panic, just ignore it"), None, None)?; pool.submit_task( None, |_| { @@ -66,6 +70,7 @@ fn co_pool_stop() -> std::io::Result<()> { Some(2) }, None, + None, ) .map(|_| ()) } diff --git a/core/tests/scheduler.rs b/core/tests/scheduler.rs index 89677cd2..a74f34b3 100644 --- a/core/tests/scheduler.rs +++ b/core/tests/scheduler.rs @@ -10,6 +10,7 @@ fn scheduler_basic() -> std::io::Result<()> { None }, None, + None, )?; _ = scheduler.submit_co( |_, _| { @@ -17,6 +18,7 @@ fn scheduler_basic() -> std::io::Result<()> { None }, None, + None, )?; scheduler.try_schedule() } @@ -25,13 +27,14 @@ fn scheduler_basic() -> std::io::Result<()> { #[test] fn scheduler_backtrace() -> std::io::Result<()> { let mut scheduler = Scheduler::default(); - _ = scheduler.submit_co(|_, _| None, None)?; + _ = scheduler.submit_co(|_, _| None, None, None)?; _ = scheduler.submit_co( |_, _| { println!("{:?}", backtrace::Backtrace::new()); None }, None, + None, )?; scheduler.try_schedule() } @@ -47,6 +50,7 @@ fn scheduler_suspend() -> std::io::Result<()> { None }, None, + None, )?; _ = scheduler.submit_co( |suspender, _| { @@ -56,6 +60,7 @@ fn scheduler_suspend() -> std::io::Result<()> { None }, None, + None, )?; scheduler.try_schedule() } @@ -71,6 +76,7 @@ fn scheduler_delay() -> std::io::Result<()> { None }, None, + None, )?; scheduler.try_schedule()?; std::thread::sleep(Duration::from_millis(100)); @@ -111,13 +117,14 @@ fn scheduler_listener() -> std::io::Result<()> { let mut scheduler = Scheduler::default(); scheduler.add_listener(TestListener::default()); - scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None)?; + scheduler.submit_co(|_, _| panic!("test panic, just ignore it"), None, None)?; scheduler.submit_co( |_, _| { println!("2"); None }, None, + None, )?; scheduler.try_schedule() } diff --git a/hook/src/lib.rs b/hook/src/lib.rs index 7c1c7d4d..b1701189 100644 --- a/hook/src/lib.rs +++ b/hook/src/lib.rs @@ -91,8 +91,13 @@ pub extern "C" fn open_coroutine_stop(secs: c_uint) -> c_int { ///创建任务 #[no_mangle] -pub extern "C" fn task_crate(f: UserTaskFunc, param: usize) -> JoinHandle { - EventLoops::submit_task(None, move |p| Some(f(p.unwrap_or(0))), Some(param)) +pub extern "C" fn task_crate(f: UserTaskFunc, param: usize, priority: c_longlong) -> JoinHandle { + EventLoops::submit_task( + None, + move |p| Some(f(p.unwrap_or(0))), + Some(param), + Some(priority), + ) } ///等待任务完成 diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs index 7aca001e..e991516e 100644 --- a/open-coroutine/src/lib.rs +++ b/open-coroutine/src/lib.rs @@ -75,7 +75,11 @@ extern "C" { #[allow(improper_ctypes)] extern "C" { - fn task_crate(f: UserTaskFunc, param: usize) -> open_coroutine_core::net::join::JoinHandle; + fn task_crate( + f: UserTaskFunc, + param: usize, + priority: c_longlong, + ) -> open_coroutine_core::net::join::JoinHandle; fn task_join(handle: &open_coroutine_core::net::join::JoinHandle) -> c_longlong; @@ -104,13 +108,24 @@ pub fn shutdown() { /// Create a task. #[macro_export] macro_rules! task { + ( $f: expr , $param:expr , $priority: expr $(,)? ) => { + $crate::task($f, $param, $priority) + }; ( $f: expr , $param:expr $(,)? ) => { - $crate::task($f, $param) + $crate::task( + $f, + $param, + open_coroutine_core::common::ordered_work_steal::DEFAULT_PRECEDENCE, + ) }; } /// Create a task. -pub fn task R>(f: F, param: P) -> JoinHandle { +pub fn task R>( + f: F, + param: P, + priority: c_longlong, +) -> JoinHandle { extern "C" fn task_main R>(input: usize) -> usize { unsafe { let ptr = &mut *((input as *mut c_void).cast::<(F, P)>()); @@ -133,6 +148,7 @@ pub fn task R>(f: F, param: P) -> JoinHa task_crate( task_main::, std::ptr::from_mut(inner).cast::() as usize, + priority, ) .into() } From 12fc8ada6bfeabea454157a17e9b6fa08c582f2d Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Fri, 29 Nov 2024 16:11:45 +0800 Subject: [PATCH 2/3] update readme --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index c96f1bcb..87a48809 100644 --- a/README.md +++ b/README.md @@ -201,6 +201,7 @@ nanosleep hooked ### 0.6.x +- [x] support custom task and coroutine priority. - [x] support scalable stack ### 0.5.x From 32c366b167d1c45f74c242898a0f79c338d77955 Mon Sep 17 00:00:00 2001 From: dragon-zhang Date: Fri, 29 Nov 2024 16:42:54 +0800 Subject: [PATCH 3/3] update workflow --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 212307dd..5950f529 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -53,7 +53,7 @@ jobs: PROJECT_DIR: ${{ github.workspace }} run: sh .github/workflows/ci.sh - name: Run preemptive tests - if: always() + if: ${{ contains(fromJSON('["x86_64-unknown-linux-gnu", "i686-unknown-linux-gnu", "x86_64-apple-darwin", "aarch64-apple-darwin"]'), matrix.target) }} env: CHANNEL: ${{ matrix.channel }} CROSS: ${{ !startsWith(matrix.target, 'x86_64') && contains(matrix.target, 'linux') && '1' || '0' }}