Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,9 @@ jobs:

- target: x86_64-pc-windows-gnu
os: windows-latest
- target: i686-pc-windows-gnu
os: windows-latest
# - target: i686-pc-windows-gnu
# os: windows-latest
- target: x86_64-pc-windows-msvc
os: windows-latest
- target: i686-pc-windows-msvc
os: windows-latest
# - target: i686-pc-windows-msvc
# os: windows-latest
3 changes: 3 additions & 0 deletions core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean";
/// Task global queue bean name.
pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean";

/// Stack pool bean name.
pub const STACK_POOL_BEAN: &str = "stackPoolBean";

/// Monitor bean name.
pub const MONITOR_BEAN: &str = "monitorBean";

Expand Down
12 changes: 7 additions & 5 deletions core/src/coroutine/korosensei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ use crate::catch;
use crate::common::constants::CoroutineState;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::coroutine::stack_pool::{PooledStack, StackPool};
use crate::coroutine::suspender::Suspender;
use crate::coroutine::StackInfo;
use corosensei::stack::{DefaultStack, Stack};
use corosensei::stack::Stack;
use corosensei::trap::TrapHandlerRegs;
use corosensei::CoroutineResult;
use std::cell::{Cell, RefCell};
Expand All @@ -26,7 +27,7 @@ cfg_if::cfg_if! {
#[repr(C)]
pub struct Coroutine<'c, Param, Yield, Return> {
pub(crate) name: String,
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, DefaultStack>,
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, PooledStack>,
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
pub(crate) stack_size: usize,
pub(crate) stack_infos: RefCell<VecDeque<StackInfo>>,
Expand Down Expand Up @@ -307,12 +308,13 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
stack_size: usize,
callback: F,
) -> std::io::Result<R> {
let stack_pool = StackPool::get_instance();
if let Some(co) = Self::current() {
let remaining_stack = unsafe { co.remaining_stack() };
if remaining_stack >= red_zone {
return Ok(callback());
}
return DefaultStack::new(stack_size).map(|stack| {
return stack_pool.allocate(stack_size).map(|stack| {
co.stack_infos.borrow_mut().push_back(StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
Expand All @@ -335,7 +337,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
return Ok(callback());
}
}
DefaultStack::new(stack_size).map(|stack| {
stack_pool.allocate(stack_size).map(|stack| {
STACK_INFOS.with(|s| {
s.borrow_mut().push_back(StackInfo {
stack_top: stack.base().get(),
Expand Down Expand Up @@ -378,7 +380,7 @@ where
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
{
let stack_size = stack_size.max(crate::common::page_size());
let stack = DefaultStack::new(stack_size)?;
let stack = StackPool::get_instance().allocate(stack_size)?;
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
Expand Down
2 changes: 2 additions & 0 deletions core/src/coroutine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ pub struct StackInfo {
/// Coroutine state abstraction and impl.
mod state;

pub(crate) mod stack_pool;

impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
/// Get the name of this coroutine.
pub fn name(&self) -> &str {
Expand Down
297 changes: 297 additions & 0 deletions core/src/coroutine/stack_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,297 @@
use crate::common::beans::BeanFactory;
use crate::common::constants::STACK_POOL_BEAN;
use crate::common::now;
use corosensei::stack::{DefaultStack, Stack, StackPointer};
use std::cell::UnsafeCell;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
use std::ops::{Deref, DerefMut};
use std::rc::Rc;
use std::sync::atomic::{AtomicU64, AtomicUsize};

pub(crate) struct PooledStack {
stack_size: usize,
stack: Rc<UnsafeCell<DefaultStack>>,
create_time: u64,
}

impl Deref for PooledStack {
type Target = DefaultStack;

fn deref(&self) -> &DefaultStack {
unsafe {
self.stack
.deref()
.get()
.as_ref()
.expect("PooledStack is not unique")
}
}
}

impl DerefMut for PooledStack {
fn deref_mut(&mut self) -> &mut Self::Target {
unsafe {
self.stack
.deref()
.get()
.as_mut()
.expect("PooledStack is not unique")
}
}
}

impl Clone for PooledStack {
fn clone(&self) -> Self {
Self {
stack_size: self.stack_size,
stack: self.stack.clone(),
create_time: self.create_time,
}
}
}

impl PartialEq<Self> for PooledStack {
fn eq(&self, other: &Self) -> bool {
Rc::strong_count(&other.stack).eq(&Rc::strong_count(&self.stack))
}
}

impl Eq for PooledStack {}

impl PartialOrd<Self> for PooledStack {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl Ord for PooledStack {
fn cmp(&self, other: &Self) -> Ordering {
// BinaryHeap defaults to a large top heap, but we need a small top heap
match Rc::strong_count(&other.stack).cmp(&Rc::strong_count(&self.stack)) {
Ordering::Less => Ordering::Less,
Ordering::Equal => match other.stack_size.cmp(&self.stack_size) {
Ordering::Less => Ordering::Less,
Ordering::Equal => other.create_time.cmp(&self.create_time),
Ordering::Greater => Ordering::Greater,
},
Ordering::Greater => Ordering::Greater,
}
}
}

unsafe impl Stack for PooledStack {
#[inline]
fn base(&self) -> StackPointer {
self.deref().base()
}

#[inline]
fn limit(&self) -> StackPointer {
self.deref().limit()
}

#[cfg(windows)]
#[inline]
fn teb_fields(&self) -> corosensei::stack::StackTebFields {
self.deref().teb_fields()
}

#[cfg(windows)]
#[inline]
fn update_teb_fields(&mut self, stack_limit: usize, guaranteed_stack_bytes: usize) {
self.deref_mut()
.update_teb_fields(stack_limit, guaranteed_stack_bytes);
}
}

impl PooledStack {
pub(crate) fn new(stack_size: usize) -> std::io::Result<Self> {
Ok(Self {
stack_size,
stack: Rc::new(UnsafeCell::new(DefaultStack::new(stack_size)?)),
create_time: now(),
})
}

/// This function must be called after a stack has finished running a coroutine
/// so that the `StackLimit` and `GuaranteedStackBytes` fields from the TEB can
/// be updated in the stack. This is necessary if the stack is reused for
/// another coroutine.
#[inline]
#[cfg(windows)]
pub(crate) fn update_stack_teb_fields(&mut self) {
cfg_if::cfg_if! {
if #[cfg(target_arch = "x86_64")] {
type StackWord = u64;
} else if #[cfg(target_arch = "x86")] {
type StackWord = u32;
}
}
let base = self.base().get() as *const StackWord;
unsafe {
let stack_limit = usize::try_from(*base.sub(1)).expect("stack limit overflow");
let guaranteed_stack_bytes =
usize::try_from(*base.sub(2)).expect("guaranteed stack bytes overflow");
self.update_teb_fields(stack_limit, guaranteed_stack_bytes);
}
}
}

pub(crate) struct StackPool {
pool: UnsafeCell<BinaryHeap<PooledStack>>,
len: AtomicUsize,
//最小内存数,即核心内存数
min_size: AtomicUsize,
//非核心内存的最大存活时间,单位ns
keep_alive_time: AtomicU64,
}

unsafe impl Send for StackPool {}

unsafe impl Sync for StackPool {}

impl Default for StackPool {
fn default() -> Self {
Self::new(0, 10_000_000_000)
}
}

impl StackPool {
pub(crate) fn get_instance<'m>() -> &'m Self {
BeanFactory::get_or_default(STACK_POOL_BEAN)
}

pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
Self {
pool: UnsafeCell::new(BinaryHeap::default()),
len: AtomicUsize::default(),
min_size: AtomicUsize::new(min_size),
keep_alive_time: AtomicU64::new(keep_alive_time),
}
}

pub(crate) fn allocate(&self, stack_size: usize) -> std::io::Result<PooledStack> {
let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") };
// find min stack
let mut not_use = Vec::new();
while let Some(stack) = heap.peek() {
if Rc::strong_count(&stack.stack) > 1 {
// can't use the stack
break;
}
#[allow(unused_mut)]
if let Some(mut stack) = heap.pop() {
self.sub_len();
if stack_size <= stack.stack_size {
for s in not_use {
heap.push(s);
self.add_len();
}
heap.push(stack.clone());
self.add_len();
#[cfg(windows)]
stack.update_stack_teb_fields();
return Ok(stack);
}
if self.min_size() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// clean the expired stack
continue;
}
not_use.push(stack);
}
}
let stack = PooledStack::new(stack_size)?;
heap.push(stack.clone());
self.add_len();
Ok(stack)
}

#[allow(dead_code)]
pub(crate) fn set_keep_alive_time(&self, keep_alive_time: u64) {
self.keep_alive_time
.store(keep_alive_time, std::sync::atomic::Ordering::Release);
}

pub(crate) fn keep_alive_time(&self) -> u64 {
self.keep_alive_time
.load(std::sync::atomic::Ordering::Acquire)
}

#[allow(dead_code)]
pub(crate) fn set_min_size(&self, min_size: usize) {
self.min_size
.store(min_size, std::sync::atomic::Ordering::Release);
}

pub(crate) fn min_size(&self) -> usize {
self.min_size.load(std::sync::atomic::Ordering::Acquire)
}

pub(crate) fn len(&self) -> usize {
self.len.load(std::sync::atomic::Ordering::Acquire)
}

fn add_len(&self) {
self.len.store(
self.len().saturating_add(1),
std::sync::atomic::Ordering::Release,
);
}

fn sub_len(&self) {
self.len.store(
self.len().saturating_sub(1),
std::sync::atomic::Ordering::Release,
);
}

/// Clean the expired stack.
#[allow(dead_code)]
pub(crate) fn clean(&self) {
let heap = unsafe { self.pool.get().as_mut().expect("StackPool is not unique") };
let mut maybe_free = Vec::new();
while let Some(stack) = heap.peek() {
if Rc::strong_count(&stack.stack) > 1 {
// can't free the stack
break;
}
if let Some(stack) = heap.pop() {
self.sub_len();
maybe_free.push(stack);
}
}
for stack in maybe_free {
if self.min_size() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// free the stack
continue;
}
heap.push(stack);
self.add_len();
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::common::constants::DEFAULT_STACK_SIZE;

#[test]
fn test_stack_pool() -> std::io::Result<()> {
let pool = StackPool::default();
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
assert_eq!(Rc::strong_count(&stack.stack), 2);
drop(stack);
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
assert_eq!(Rc::strong_count(&stack.stack), 2);
assert_eq!(pool.len(), 1);
_ = pool.allocate(DEFAULT_STACK_SIZE)?;
assert_eq!(pool.len(), 2);
Ok(())
}
}
Loading
Loading