diff --git a/core/src/common/macros.rs b/core/src/common/macros.rs index 4c0eeed8..56d34fb1 100644 --- a/core/src/common/macros.rs +++ b/core/src/common/macros.rs @@ -1,3 +1,5 @@ +/// Check for help information. + /// Constructs an event at the trace level. #[allow(unused_macros)] #[macro_export] @@ -66,8 +68,8 @@ macro_rules! impl_display_by_debug { impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? std::fmt::Display for $struct_name$(<$($generic1),+>)? where - $($($generic2 $( : $trait_tt3 $( + $trait_tt4)*)?),+,)? $struct_name$(<$($generic1),+>)?: std::fmt::Debug, + $($($generic2 $( : $trait_tt3 $( + $trait_tt4)*)?),+,)? { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { std::fmt::Debug::fmt(self, f) @@ -83,19 +85,14 @@ macro_rules! impl_display_by_debug { macro_rules! impl_current_for { ( $name:ident, - $struct_name:ident$(<$($generic1:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)? - $(where $( - $generic2:tt $( : $trait_tt3: tt $( + $trait_tt4: tt)*)? - ),+)? + $struct_name:ident$(<$($generic:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)? ) => { thread_local! { static $name: std::cell::RefCell> = const { std::cell::RefCell::new(std::collections::VecDeque::new()) }; } - impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? $struct_name$(<$($generic1),+>)? - $(where $($generic2 $( : $trait_tt3 $( + $trait_tt4)*)?),+)? - { + impl$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? $struct_name$(<$($generic),+>)? { /// Init the current. pub(crate) fn init_current(current: &Self) { $name.with(|s| { @@ -154,46 +151,65 @@ macro_rules! impl_current_for { /// Check for help information. #[macro_export] macro_rules! impl_for_named { - ($struct_name:ident$(<$($generic1:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)? - $(where $( - $generic2:tt $( : $trait_tt3: tt $( + $trait_tt4: tt)*)? - ),+)? - ) => { - impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? Eq - for $struct_name$(<$($generic1),+>)? - { - } + ($struct_name:ident$(<$($generic:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)?) => { + $crate::impl_ord_for_named!($struct_name$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)?); + $crate::impl_hash_for_named!($struct_name$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)?); + }; +} - impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? PartialEq - for $struct_name$(<$($generic1),+>)? +/// Fast impl `Eq` for `Named` types. +#[macro_export] +macro_rules! impl_eq_for_named { + ($struct_name:ident$(<$($generic:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)?) => { + impl$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? PartialEq + for $struct_name$(<$($generic),+>)? { fn eq(&self, other: &Self) -> bool { self.name().eq(other.name()) } } - impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? Ord - for $struct_name$(<$($generic1),+>)? + impl$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? Eq + for $struct_name$(<$($generic),+>)? { - fn cmp(&self, other: &Self) -> std::cmp::Ordering { - self.name().cmp(other.name()) - } } + }; +} - impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? PartialOrd - for $struct_name$(<$($generic1),+>)? +/// Fast impl `Ord` for `Named` types. +#[macro_export] +macro_rules! impl_ord_for_named { + ($struct_name:ident$(<$($generic:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)?) => { + $crate::impl_eq_for_named!($struct_name$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)?); + + impl$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? PartialOrd + for $struct_name$(<$($generic),+>)? { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } - impl$(<$($generic1 $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? std::hash::Hash - for $struct_name$(<$($generic1),+>)? + impl$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? Ord + for $struct_name$(<$($generic),+>)? + { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + self.name().cmp(other.name()) + } + } + } +} + +/// Fast impl `std::hash::Hash` for `Named` types. +#[macro_export] +macro_rules! impl_hash_for_named { + ($struct_name:ident$(<$($generic:tt $( : $trait_tt1: tt $( + $trait_tt2: tt)*)?),+>)?) => { + impl$(<$($generic $( : $trait_tt1 $( + $trait_tt2)*)?),+>)? std::hash::Hash + for $struct_name$(<$($generic),+>)? { fn hash(&self, state: &mut H) { self.name().hash(state) } } - }; + } } diff --git a/core/src/common/mod.rs b/core/src/common/mod.rs index ef3cdf80..42dfbb44 100644 --- a/core/src/common/mod.rs +++ b/core/src/common/mod.rs @@ -16,9 +16,6 @@ pub(crate) mod macros; /// `BeanFactory` impls. pub mod beans; -/// `TimerList` impls. -pub mod timer; - /// Suppose a thread in a work-stealing scheduler is idle and looking for the next task to run. To /// find an available task, it might do the following: /// diff --git a/core/src/common/timer.rs b/core/src/common/timer.rs deleted file mode 100644 index d3cbec88..00000000 --- a/core/src/common/timer.rs +++ /dev/null @@ -1,210 +0,0 @@ -use crate::impl_display_by_debug; -use std::collections::{BTreeMap, VecDeque}; -use std::ops::{Deref, DerefMut}; -use std::sync::atomic::{AtomicUsize, Ordering}; - -/// A queue for managing multiple entries under a specified timestamp. -#[repr(C)] -#[derive(Debug, Eq, PartialEq)] -pub struct TimerEntry { - timestamp: u64, - inner: VecDeque, -} - -impl Deref for TimerEntry { - type Target = VecDeque; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for TimerEntry { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -impl TimerEntry { - /// Creates an empty deque. - #[must_use] - pub fn new(timestamp: u64) -> Self { - TimerEntry { - timestamp, - inner: VecDeque::new(), - } - } - - /// Get the timestamp. - #[must_use] - pub fn get_timestamp(&self) -> u64 { - self.timestamp - } - - /// Removes and returns the `t` from the deque. - /// Whichever end is closer to the removal point will be moved to make - /// room, and all the affected elements will be moved to new positions. - /// Returns `None` if `t` not found. - pub fn remove(&mut self, t: &T) -> Option - where - T: Ord, - { - let index = self - .inner - .binary_search_by(|x| x.cmp(t)) - .unwrap_or_else(|x| x); - self.inner.remove(index) - } -} - -impl_display_by_debug!(TimerEntry); - -/// A queue for managing multiple `TimerEntry`. -#[repr(C)] -#[derive(educe::Educe)] -#[educe(Debug, Eq, PartialEq)] -pub struct TimerList { - inner: BTreeMap>, - #[educe(PartialEq(ignore))] - total: AtomicUsize, -} - -impl Default for TimerList { - fn default() -> Self { - TimerList { - inner: BTreeMap::default(), - total: AtomicUsize::new(0), - } - } -} - -impl Deref for TimerList { - type Target = BTreeMap>; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for TimerList { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -impl TimerList { - /// Returns the number of elements in the deque. - #[must_use] - pub fn len(&self) -> usize { - if self.inner.is_empty() { - return 0; - } - self.total.load(Ordering::Acquire) - } - - /// Returns the number of entries in the deque. - #[must_use] - pub fn entry_len(&self) -> usize { - self.inner.len() - } - - /// Inserts an element at `timestamp` within the deque, shifting all elements - /// with indices greater than or equal to `timestamp` towards the back. - pub fn insert(&mut self, timestamp: u64, t: T) { - if let Some(entry) = self.inner.get_mut(×tamp) { - entry.push_back(t); - _ = self.total.fetch_add(1, Ordering::Release); - return; - } - let mut entry = TimerEntry::new(timestamp); - entry.push_back(t); - _ = self.total.fetch_add(1, Ordering::Release); - if let Some(mut entry) = self.inner.insert(timestamp, entry) { - // concurrent, just retry - while !entry.is_empty() { - if let Some(e) = entry.pop_front() { - self.insert(timestamp, e); - } - } - } - } - - /// Provides a reference to the front element, or `None` if the deque is empty. - #[must_use] - pub fn front(&self) -> Option<(&u64, &TimerEntry)> { - self.inner.first_key_value() - } - - /// Removes the first element and returns it, or `None` if the deque is empty. - pub fn pop_front(&mut self) -> Option<(u64, TimerEntry)> { - self.inner.pop_first().map(|(timestamp, entry)| { - _ = self.total.fetch_sub(entry.len(), Ordering::Release); - (timestamp, entry) - }) - } - - /// Returns `true` if the deque is empty. - #[must_use] - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Removes and returns the element at `timestamp` from the deque. - /// Whichever end is closer to the removal point will be moved to make - /// room, and all the affected elements will be moved to new positions. - /// Returns `None` if `timestamp` is out of bounds. - pub fn remove_entry(&mut self, timestamp: &u64) -> Option> { - self.inner.remove(timestamp).inspect(|entry| { - _ = self.total.fetch_sub(entry.len(), Ordering::Release); - }) - } - - /// Removes and returns the `t` from the deque. - /// Whichever end is closer to the removal point will be moved to make - /// room, and all the affected elements will be moved to new positions. - /// Returns `None` if `t` not found. - pub fn remove(&mut self, timestamp: &u64, t: &T) -> Option - where - T: Ord, - { - if let Some(entry) = self.inner.get_mut(timestamp) { - let val = entry.remove(t).inspect(|_| { - _ = self.total.fetch_sub(1, Ordering::Release); - }); - if entry.is_empty() { - _ = self.remove_entry(timestamp); - } - return val; - } - None - } -} - -impl_display_by_debug!(TimerList); - -#[cfg(test)] -mod tests { - use crate::common::now; - use crate::common::timer::TimerList; - - #[test] - fn test() { - assert!(now() > 0); - } - - #[test] - fn timer_list() { - let mut list = TimerList::default(); - assert_eq!(list.entry_len(), 0); - list.insert(1, String::from("data is 1")); - list.insert(2, String::from("data is 2")); - list.insert(3, String::from("data is 3")); - assert_eq!(list.entry_len(), 3); - - let mut entry = list.pop_front().unwrap().1; - assert_eq!(entry.len(), 1); - let string = entry.pop_front().unwrap(); - assert_eq!(string, String::from("data is 1")); - assert_eq!(entry.len(), 0); - } -} diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs index 8fb9e2c5..dcd7cc24 100644 --- a/core/src/scheduler.rs +++ b/core/src/scheduler.rs @@ -1,14 +1,13 @@ use crate::common::beans::BeanFactory; use crate::common::constants::{CoroutineState, SyscallState}; use crate::common::ordered_work_steal::{OrderedLocalQueue, OrderedWorkStealQueue}; -use crate::common::timer::TimerList; use crate::common::{get_timeout_time, now}; use crate::coroutine::listener::Listener; use crate::coroutine::suspender::Suspender; use crate::coroutine::Coroutine; use crate::{co, impl_current_for, impl_display_by_debug, impl_for_named}; use dashmap::DashMap; -use std::collections::VecDeque; +use std::collections::{BinaryHeap, VecDeque}; use std::ffi::c_longlong; use std::io::{Error, ErrorKind}; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -23,6 +22,62 @@ pub type SchedulableCoroutine<'s> = Coroutine<'s, (), (), Option>; /// A type for Scheduler. pub type SchedulableSuspender<'s> = Suspender<'s, (), ()>; +#[repr(C)] +#[derive(Debug)] +struct SuspendItem<'s> { + timestamp: u64, + coroutine: SchedulableCoroutine<'s>, +} + +impl PartialEq for SuspendItem<'_> { + fn eq(&self, other: &Self) -> bool { + self.timestamp.eq(&other.timestamp) + } +} + +impl Eq for SuspendItem<'_> {} + +impl PartialOrd for SuspendItem<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for SuspendItem<'_> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // BinaryHeap defaults to a large top heap, but we need a small top heap + other.timestamp.cmp(&self.timestamp) + } +} + +#[repr(C)] +#[derive(Debug)] +struct SyscallSuspendItem<'s> { + timestamp: u64, + co_name: &'s str, +} + +impl PartialEq for SyscallSuspendItem<'_> { + fn eq(&self, other: &Self) -> bool { + self.timestamp.eq(&other.timestamp) + } +} + +impl Eq for SyscallSuspendItem<'_> {} + +impl PartialOrd for SyscallSuspendItem<'_> { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for SyscallSuspendItem<'_> { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + // BinaryHeap defaults to a large top heap, but we need a small top heap + other.timestamp.cmp(&self.timestamp) + } +} + /// The scheduler impls. #[repr(C)] #[derive(Debug)] @@ -31,9 +86,9 @@ pub struct Scheduler<'s> { stack_size: AtomicUsize, listeners: VecDeque<&'s dyn Listener<(), Option>>, ready: OrderedLocalQueue<'s, SchedulableCoroutine<'s>>, - suspend: TimerList>, + suspend: BinaryHeap>, syscall: DashMap<&'s str, SchedulableCoroutine<'s>>, - syscall_suspend: TimerList<&'s str>, + syscall_suspend: BinaryHeap>, results: DashMap<&'s str, Result, &'s str>>, } @@ -90,9 +145,9 @@ impl<'s> Scheduler<'s> { crate::common::constants::COROUTINE_GLOBAL_QUEUE_BEAN, ) .local_queue(), - suspend: TimerList::default(), + suspend: BinaryHeap::default(), syscall: DashMap::default(), - syscall_suspend: TimerList::default(), + syscall_suspend: BinaryHeap::default(), results: DashMap::default(), } } @@ -222,13 +277,17 @@ impl<'s> Scheduler<'s> { //如果已包含,说明当前系统调用还有上层父系统调用,因此直接忽略插入结果 _ = self.syscall.insert(co_name, coroutine); if let SyscallState::Suspend(timestamp) = state { - self.syscall_suspend.insert(timestamp, co_name); + self.syscall_suspend + .push(SyscallSuspendItem { timestamp, co_name }); } } CoroutineState::Suspend((), timestamp) => { if timestamp > now() { //挂起协程到时间轮 - self.suspend.insert(timestamp, coroutine); + self.suspend.push(SuspendItem { + timestamp, + coroutine, + }); } else { //放入就绪队列尾部 self.ready.push(coroutine); @@ -263,39 +322,25 @@ impl<'s> Scheduler<'s> { fn check_ready(&mut self) -> std::io::Result<()> { // Check if the elements in the suspend queue are ready - for _ in 0..self.suspend.entry_len() { - if let Some((exec_time, _)) = self.suspend.front() { - if now() < *exec_time { - break; - } - if let Some((_, mut entry)) = self.suspend.pop_front() { - while let Some(coroutine) = entry.pop_front() { - coroutine.ready()?; - self.ready.push(coroutine); - } + if let Some(item) = self.suspend.peek() { + if now() >= item.timestamp { + if let Some(item) = self.suspend.pop() { + item.coroutine.ready()?; + self.ready.push(item.coroutine); } } } // Check if the elements in the syscall suspend queue are ready - for _ in 0..self.syscall_suspend.entry_len() { - if let Some((exec_time, _)) = self.syscall_suspend.front() { - if now() < *exec_time { - break; - } - if let Some((_, mut entry)) = self.syscall_suspend.pop_front() { - while let Some(co_name) = entry.pop_front() { - if let Some((_, co)) = self.syscall.remove(&co_name) { - match co.state() { - CoroutineState::SystemCall( - val, - syscall, - SyscallState::Suspend(_), - ) => { - co.syscall(val, syscall, SyscallState::Timeout)?; - self.ready.push(co); - } - _ => unreachable!("check_ready should never execute to here"), + if let Some(item) = self.syscall_suspend.peek() { + if now() >= item.timestamp { + if let Some(item) = self.syscall_suspend.pop() { + if let Some((_, co)) = self.syscall.remove(item.co_name) { + match co.state() { + CoroutineState::SystemCall(val, syscall, SyscallState::Suspend(_)) => { + co.syscall(val, syscall, SyscallState::Timeout)?; + self.ready.push(co); } + _ => unreachable!("check_ready should never execute to here"), } } } @@ -304,3 +349,23 @@ impl<'s> Scheduler<'s> { Ok(()) } } + +#[cfg(test)] +mod tests { + use crate::scheduler::SyscallSuspendItem; + use std::collections::BinaryHeap; + + #[test] + fn test_small_heap() { + let mut heap = BinaryHeap::default(); + for timestamp in (0..10).rev() { + heap.push(SyscallSuspendItem { + timestamp, + co_name: "test", + }); + } + for timestamp in 0..10 { + assert_eq!(timestamp, heap.pop().unwrap().timestamp); + } + } +}