Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ jobs:
aarch64-apple-darwin,

x86_64-pc-windows-gnu,
i686-pc-windows-gnu,
# i686-pc-windows-gnu,
x86_64-pc-windows-msvc,
i686-pc-windows-msvc,
# i686-pc-windows-msvc,
]
channel: [ 1.81.0, nightly-2024-08-02 ]
include:
Expand Down
3 changes: 0 additions & 3 deletions core/src/common/constants.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ pub const COROUTINE_GLOBAL_QUEUE_BEAN: &str = "coroutineGlobalQueueBean";
/// Task global queue bean name.
pub const TASK_GLOBAL_QUEUE_BEAN: &str = "taskGlobalQueueBean";

/// Stack pool bean name.
pub const STACK_POOL_BEAN: &str = "stackPoolBean";

/// Monitor bean name.
pub const MONITOR_BEAN: &str = "monitorBean";

Expand Down
40 changes: 38 additions & 2 deletions core/src/net/config.rs → core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,27 @@ pub struct Config {
min_size: usize,
max_size: usize,
keep_alive_time: u64,
min_memory_count: usize,
memory_keep_alive_time: u64,
hook: bool,
}

impl Config {
#[must_use]
pub fn single() -> Self {
Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, true)
Self::new(1, DEFAULT_STACK_SIZE, 0, 65536, 0, 0, 10_000_000_000, true)
}

#[allow(clippy::too_many_arguments)]
#[must_use]
pub fn new(
event_loop_size: usize,
stack_size: usize,
min_size: usize,
max_size: usize,
keep_alive_time: u64,
min_memory_count: usize,
memory_keep_alive_time: u64,
hook: bool,
) -> Self {
Self {
Expand All @@ -32,6 +37,8 @@ impl Config {
min_size,
max_size,
keep_alive_time,
min_memory_count,
memory_keep_alive_time,
hook,
}
}
Expand Down Expand Up @@ -61,6 +68,16 @@ impl Config {
self.keep_alive_time
}

#[must_use]
pub fn min_memory_count(&self) -> usize {
self.min_memory_count
}

#[must_use]
pub fn memory_keep_alive_time(&self) -> u64 {
self.memory_keep_alive_time
}

#[must_use]
pub fn hook(&self) -> bool {
self.hook
Expand Down Expand Up @@ -101,6 +118,16 @@ impl Config {
self
}

pub fn set_min_memory_count(&mut self, min_memory_count: usize) -> &mut Self {
self.min_memory_count = min_memory_count;
self
}

pub fn set_memory_keep_alive_time(&mut self, memory_keep_alive_time: u64) -> &mut Self {
self.memory_keep_alive_time = memory_keep_alive_time;
self
}

pub fn set_hook(&mut self, hook: bool) -> &mut Self {
self.hook = hook;
self
Expand All @@ -109,6 +136,15 @@ impl Config {

impl Default for Config {
fn default() -> Self {
Self::new(cpu_count(), DEFAULT_STACK_SIZE, 0, 65536, 0, true)
Self::new(
cpu_count(),
DEFAULT_STACK_SIZE,
0,
65536,
0,
0,
10_000_000_000,
true,
)
}
}
8 changes: 3 additions & 5 deletions core/src/coroutine/korosensei.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::catch;
use crate::common::constants::CoroutineState;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::coroutine::stack_pool::{PooledStack, StackPool};
use crate::coroutine::stack_pool::{MemoryPool, PooledStack};
use crate::coroutine::suspender::Suspender;
use crate::coroutine::StackInfo;
use corosensei::stack::Stack;
Expand All @@ -29,7 +29,6 @@ pub struct Coroutine<'c, Param, Yield, Return> {
pub(crate) name: String,
inner: corosensei::Coroutine<Param, Yield, Result<Return, &'static str>, PooledStack>,
pub(crate) state: Cell<CoroutineState<Yield, Return>>,
pub(crate) stack_size: usize,
pub(crate) stack_infos: RefCell<VecDeque<StackInfo>>,
pub(crate) listeners: VecDeque<&'c dyn Listener<Yield, Return>>,
pub(crate) local: CoroutineLocal<'c>,
Expand Down Expand Up @@ -308,7 +307,7 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
stack_size: usize,
callback: F,
) -> std::io::Result<R> {
let stack_pool = StackPool::get_instance();
let stack_pool = MemoryPool::get_instance();
if let Some(co) = Self::current() {
let remaining_stack = unsafe { co.remaining_stack() };
if remaining_stack >= red_zone {
Expand Down Expand Up @@ -380,7 +379,7 @@ where
F: FnOnce(&Suspender<Param, Yield>, Param) -> Return + 'static,
{
let stack_size = stack_size.max(crate::common::page_size());
let stack = StackPool::get_instance().allocate(stack_size)?;
let stack = MemoryPool::get_instance().allocate(stack_size)?;
let stack_infos = RefCell::new(VecDeque::from([StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
Expand All @@ -403,7 +402,6 @@ where
let mut co = Coroutine {
name,
inner,
stack_size,
stack_infos,
state: Cell::new(CoroutineState::Ready),
listeners: VecDeque::default(),
Expand Down
12 changes: 7 additions & 5 deletions core/src/coroutine/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::common::constants::CoroutineState;
use crate::common::ordered_work_steal::Ordered;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::{impl_current_for, impl_display_by_debug, impl_for_named};
Expand All @@ -16,10 +17,11 @@ pub mod local;
/// Coroutine listener abstraction and impl.
pub mod listener;

use crate::common::ordered_work_steal::Ordered;
/// Reuse stacks.
pub mod stack_pool;

#[cfg(feature = "korosensei")]
pub use korosensei::Coroutine;

#[cfg(feature = "korosensei")]
mod korosensei;

Expand Down Expand Up @@ -76,8 +78,6 @@ pub struct StackInfo {
/// Coroutine state abstraction and impl.
mod state;

pub(crate) mod stack_pool;

impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
/// Get the name of this coroutine.
pub fn name(&self) -> &str {
Expand Down Expand Up @@ -201,8 +201,10 @@ where
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Coroutine")
.field("name", &self.name())
.field("status", &self.state())
.field("state", &self.state())
.field("stack_infos", &self.stack_infos)
.field("local", &self.local)
.field("priority", &self.priority)
.finish()
}
}
Expand Down
52 changes: 33 additions & 19 deletions core/src/coroutine/stack_pool.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::common::beans::BeanFactory;
use crate::common::constants::STACK_POOL_BEAN;
use crate::common::now;
use crate::config::Config;
use corosensei::stack::{DefaultStack, Stack, StackPointer};
use once_cell::sync::OnceCell;
use std::cell::UnsafeCell;
use std::cmp::Ordering;
use std::collections::BinaryHeap;
Expand Down Expand Up @@ -138,35 +138,49 @@ impl PooledStack {
}
}

pub(crate) struct StackPool {
static STACK_POOL: OnceCell<MemoryPool> = OnceCell::new();

/// A memory pool for reusing stacks.
#[derive(educe::Educe)]
#[educe(Debug)]
pub struct MemoryPool {
#[educe(Debug(ignore))]
pool: UnsafeCell<BinaryHeap<PooledStack>>,
len: AtomicUsize,
//最小内存数,即核心内存数
min_size: AtomicUsize,
min_count: AtomicUsize,
//非核心内存的最大存活时间,单位ns
keep_alive_time: AtomicU64,
}

unsafe impl Send for StackPool {}
unsafe impl Send for MemoryPool {}

unsafe impl Sync for StackPool {}
unsafe impl Sync for MemoryPool {}

impl Default for StackPool {
impl Default for MemoryPool {
fn default() -> Self {
Self::new(0, 10_000_000_000)
}
}

impl StackPool {
impl MemoryPool {
/// Init the `MemoryPool`.
pub fn init(config: &Config) -> Result<(), MemoryPool> {
STACK_POOL.set(MemoryPool::new(
config.min_memory_count(),
config.memory_keep_alive_time(),
))
}

pub(crate) fn get_instance<'m>() -> &'m Self {
BeanFactory::get_or_default(STACK_POOL_BEAN)
STACK_POOL.get_or_init(MemoryPool::default)
}

pub(crate) fn new(min_size: usize, keep_alive_time: u64) -> Self {
pub(crate) fn new(min_count: usize, keep_alive_time: u64) -> Self {
Self {
pool: UnsafeCell::new(BinaryHeap::default()),
len: AtomicUsize::default(),
min_size: AtomicUsize::new(min_size),
min_count: AtomicUsize::new(min_count),
keep_alive_time: AtomicU64::new(keep_alive_time),
}
}
Expand Down Expand Up @@ -194,7 +208,7 @@ impl StackPool {
stack.update_stack_teb_fields();
return Ok(stack);
}
if self.min_size() < self.len()
if self.min_count() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// clean the expired stack
Expand All @@ -221,13 +235,13 @@ impl StackPool {
}

#[allow(dead_code)]
pub(crate) fn set_min_size(&self, min_size: usize) {
self.min_size
.store(min_size, std::sync::atomic::Ordering::Release);
pub(crate) fn set_min_count(&self, min_count: usize) {
self.min_count
.store(min_count, std::sync::atomic::Ordering::Release);
}

pub(crate) fn min_size(&self) -> usize {
self.min_size.load(std::sync::atomic::Ordering::Acquire)
pub(crate) fn min_count(&self) -> usize {
self.min_count.load(std::sync::atomic::Ordering::Acquire)
}

pub(crate) fn len(&self) -> usize {
Expand Down Expand Up @@ -264,7 +278,7 @@ impl StackPool {
}
}
for stack in maybe_free {
if self.min_size() < self.len()
if self.min_count() < self.len()
&& now() <= stack.create_time.saturating_add(self.keep_alive_time())
{
// free the stack
Expand All @@ -283,7 +297,7 @@ mod tests {

#[test]
fn test_stack_pool() -> std::io::Result<()> {
let pool = StackPool::default();
let pool = MemoryPool::default();
let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
assert_eq!(Rc::strong_count(&stack.stack), 2);
drop(stack);
Expand Down
4 changes: 4 additions & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@
/// Common traits and impl.
pub mod common;

/// Configuration for `EventLoops`.
#[allow(missing_docs)]
pub mod config;

/// Coroutine impls.
pub mod coroutine;

Expand Down
4 changes: 2 additions & 2 deletions core/src/monitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN};
use crate::common::{get_timeout_time, now, CondvarBlocker};
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
use crate::coroutine::stack_pool::StackPool;
use crate::coroutine::stack_pool::MemoryPool;
use crate::scheduler::SchedulableSuspender;
use crate::{catch, error, impl_current_for, impl_display_by_debug, info};
use nix::sys::pthread::{pthread_kill, pthread_self, Pthread};
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Monitor {
);
}
}
StackPool::get_instance().clean();
MemoryPool::get_instance().clean();
//monitor线程不执行协程计算任务,每次循环至少wait 1ms
monitor.blocker.clone().block(Duration::from_millis(1));
}
Expand Down
6 changes: 1 addition & 5 deletions core/src/net/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::Config;
use crate::coroutine::suspender::Suspender;
use crate::net::config::Config;
use crate::net::event_loop::EventLoop;
use crate::net::join::JoinHandle;
use crate::{error, info};
Expand Down Expand Up @@ -30,10 +30,6 @@ mod operator;
#[allow(missing_docs)]
pub mod event_loop;

/// Configuration for `EventLoops`.
#[allow(missing_docs)]
pub mod config;

/// Task join abstraction and impl.
pub mod join;

Expand Down
6 changes: 5 additions & 1 deletion hook/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@

use once_cell::sync::OnceCell;
use open_coroutine_core::co_pool::task::UserTaskFunc;
use open_coroutine_core::net::config::Config;
use open_coroutine_core::config::Config;
use open_coroutine_core::coroutine::stack_pool::MemoryPool;
use open_coroutine_core::net::join::JoinHandle;
use open_coroutine_core::net::{EventLoops, UserFunc};
use open_coroutine_core::scheduler::SchedulableCoroutine;
Expand All @@ -75,6 +76,9 @@ pub mod syscall;
/// Start the framework.
#[no_mangle]
pub extern "C" fn open_coroutine_init(config: Config) -> c_int {
if MemoryPool::init(&config).is_err() {
return -1;
}
EventLoops::init(&config);
_ = HOOK.get_or_init(|| config.hook());
0
Expand Down
Loading
Loading