diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index dd1f3719..1e13d861 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -82,9 +82,9 @@ jobs:
aarch64-apple-darwin,
x86_64-pc-windows-gnu,
-# i686-pc-windows-gnu,
+ i686-pc-windows-gnu,
x86_64-pc-windows-msvc,
-# i686-pc-windows-msvc,
+ i686-pc-windows-msvc,
]
channel: [ 1.81.0, nightly-2024-08-02 ]
include:
@@ -114,9 +114,9 @@ jobs:
- target: x86_64-pc-windows-gnu
os: windows-latest
-# - target: i686-pc-windows-gnu
-# os: windows-latest
+ - target: i686-pc-windows-gnu
+ os: windows-latest
- target: x86_64-pc-windows-msvc
os: windows-latest
-# - target: i686-pc-windows-msvc
-# os: windows-latest
+ - target: i686-pc-windows-msvc
+ os: windows-latest
diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml
index 857908eb..ffd9a223 100644
--- a/.github/workflows/coverage.yml
+++ b/.github/workflows/coverage.yml
@@ -31,9 +31,9 @@ jobs:
- name: Install cargo-llvm-cov
uses: taiki-e/install-action@cargo-llvm-cov
- name: Generate code coverage
- run: sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && sudo -u runner /home/runner/.cargo/bin/cargo llvm-cov --release --all --lcov --output-path lcov.info"
+ run: bash -c "ulimit -Sl 512 && ulimit -Hl 512 && /home/runner/.cargo/bin/cargo llvm-cov --release --all --lcov --output-path lcov.info"
- name: Generate code coverage with all features
- run: sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && sudo -u runner /home/runner/.cargo/bin/cargo llvm-cov --all-features --release --all --lcov --output-path lcov-all-features.info"
+ run: bash -c "ulimit -Sl 512 && ulimit -Hl 512 && /home/runner/.cargo/bin/cargo llvm-cov --all-features --release --all --lcov --output-path lcov-all-features.info"
- name: Upload coverage to Codecov
run: |
bash <(curl -s https://codecov.io/bash) -f lcov.info -t ${{ env.CODECOV_TOKEN }}
diff --git a/core/src/coroutine/korosensei.rs b/core/src/coroutine/korosensei.rs
index 78ba9e70..85e4bf63 100644
--- a/core/src/coroutine/korosensei.rs
+++ b/core/src/coroutine/korosensei.rs
@@ -2,10 +2,9 @@ use crate::catch;
use crate::common::constants::CoroutineState;
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
-use crate::coroutine::stack_pool::{MemoryPool, PooledStack};
use crate::coroutine::suspender::Suspender;
use crate::coroutine::StackInfo;
-use corosensei::stack::Stack;
+use corosensei::stack::{DefaultStack, Stack};
use corosensei::trap::TrapHandlerRegs;
use corosensei::CoroutineResult;
use std::cell::{Cell, RefCell, UnsafeCell};
@@ -27,7 +26,7 @@ cfg_if::cfg_if! {
#[repr(C)]
pub struct Coroutine<'c, Param, Yield, Return> {
pub(crate) name: String,
- inner: corosensei::Coroutine, PooledStack>,
+ inner: corosensei::Coroutine, DefaultStack>,
pub(crate) state: Cell>,
stack_infos: UnsafeCell>,
pub(crate) listeners: VecDeque<&'c dyn Listener>,
@@ -326,17 +325,13 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
stack_size: usize,
callback: F,
) -> std::io::Result {
- let stack_pool = MemoryPool::get_instance();
if let Some(co) = Self::current() {
let remaining_stack = unsafe { co.remaining_stack() };
if remaining_stack >= red_zone {
return Ok(callback());
}
- return stack_pool.allocate(stack_size).map(|stack| {
- co.stack_infos_mut().push_back(StackInfo {
- stack_top: stack.base().get(),
- stack_bottom: stack.limit().get(),
- });
+ return DefaultStack::new(stack_size).map(|stack| {
+ co.stack_infos_mut().push_back(StackInfo::from(&stack));
let r = corosensei::on_stack(stack, callback);
_ = co.stack_infos_mut().pop_back();
r
@@ -355,12 +350,9 @@ impl<'c, Param, Yield, Return> Coroutine<'c, Param, Yield, Return> {
return Ok(callback());
}
}
- stack_pool.allocate(stack_size).map(|stack| {
+ DefaultStack::new(stack_size).map(|stack| {
STACK_INFOS.with(|s| {
- s.borrow_mut().push_back(StackInfo {
- stack_top: stack.base().get(),
- stack_bottom: stack.limit().get(),
- });
+ s.borrow_mut().push_back(StackInfo::from(&stack));
});
let r = corosensei::on_stack(stack, callback);
_ = STACK_INFOS.with(|s| s.borrow_mut().pop_back());
@@ -398,7 +390,7 @@ where
F: FnOnce(&Suspender, Param) -> Return + 'static,
{
let stack_size = stack_size.max(crate::common::page_size());
- let stack = MemoryPool::get_instance().allocate(stack_size)?;
+ let stack = DefaultStack::new(stack_size)?;
let stack_infos = UnsafeCell::new(VecDeque::from([StackInfo {
stack_top: stack.base().get(),
stack_bottom: stack.limit().get(),
@@ -469,3 +461,12 @@ where
}
}
}
+
+impl From<&S> for StackInfo {
+ fn from(stack: &S) -> Self {
+ Self {
+ stack_top: stack.base().get(),
+ stack_bottom: stack.limit().get(),
+ }
+ }
+}
diff --git a/core/src/coroutine/mod.rs b/core/src/coroutine/mod.rs
index 71aa8891..0403fd9f 100644
--- a/core/src/coroutine/mod.rs
+++ b/core/src/coroutine/mod.rs
@@ -17,9 +17,6 @@ pub mod local;
/// Coroutine listener abstraction and impl.
pub mod listener;
-/// Reuse stacks.
-pub mod stack_pool;
-
#[cfg(feature = "korosensei")]
pub use korosensei::Coroutine;
#[cfg(feature = "korosensei")]
diff --git a/core/src/coroutine/stack_pool.rs b/core/src/coroutine/stack_pool.rs
deleted file mode 100644
index 464073a8..00000000
--- a/core/src/coroutine/stack_pool.rs
+++ /dev/null
@@ -1,311 +0,0 @@
-use crate::common::now;
-use crate::config::Config;
-use corosensei::stack::{DefaultStack, Stack, StackPointer};
-use once_cell::sync::OnceCell;
-use std::cell::UnsafeCell;
-use std::cmp::Ordering;
-use std::collections::BinaryHeap;
-use std::ops::{Deref, DerefMut};
-use std::rc::Rc;
-use std::sync::atomic::{AtomicU64, AtomicUsize};
-
-pub(crate) struct PooledStack {
- stack_size: usize,
- stack: Rc>,
- create_time: u64,
-}
-
-impl Deref for PooledStack {
- type Target = DefaultStack;
-
- fn deref(&self) -> &DefaultStack {
- unsafe {
- self.stack
- .deref()
- .get()
- .as_ref()
- .expect("PooledStack is not unique")
- }
- }
-}
-
-impl DerefMut for PooledStack {
- fn deref_mut(&mut self) -> &mut Self::Target {
- unsafe {
- self.stack
- .deref()
- .get()
- .as_mut()
- .expect("PooledStack is not unique")
- }
- }
-}
-
-impl Clone for PooledStack {
- fn clone(&self) -> Self {
- Self {
- stack_size: self.stack_size,
- stack: self.stack.clone(),
- create_time: self.create_time,
- }
- }
-}
-
-impl PartialEq for PooledStack {
- fn eq(&self, other: &Self) -> bool {
- Rc::strong_count(&other.stack).eq(&Rc::strong_count(&self.stack))
- }
-}
-
-impl Eq for PooledStack {}
-
-impl PartialOrd for PooledStack {
- fn partial_cmp(&self, other: &Self) -> Option {
- Some(self.cmp(other))
- }
-}
-
-impl Ord for PooledStack {
- fn cmp(&self, other: &Self) -> Ordering {
- // BinaryHeap defaults to a large top heap, but we need a small top heap
- match Rc::strong_count(&other.stack).cmp(&Rc::strong_count(&self.stack)) {
- Ordering::Less => Ordering::Less,
- Ordering::Equal => match other.stack_size.cmp(&self.stack_size) {
- Ordering::Less => Ordering::Less,
- Ordering::Equal => other.create_time.cmp(&self.create_time),
- Ordering::Greater => Ordering::Greater,
- },
- Ordering::Greater => Ordering::Greater,
- }
- }
-}
-
-unsafe impl Stack for PooledStack {
- #[inline]
- fn base(&self) -> StackPointer {
- self.deref().base()
- }
-
- #[inline]
- fn limit(&self) -> StackPointer {
- self.deref().limit()
- }
-
- #[cfg(windows)]
- #[inline]
- fn teb_fields(&self) -> corosensei::stack::StackTebFields {
- self.deref().teb_fields()
- }
-
- #[cfg(windows)]
- #[inline]
- fn update_teb_fields(&mut self, stack_limit: usize, guaranteed_stack_bytes: usize) {
- self.deref_mut()
- .update_teb_fields(stack_limit, guaranteed_stack_bytes);
- }
-}
-
-impl PooledStack {
- pub(crate) fn new(stack_size: usize) -> std::io::Result {
- Ok(Self {
- stack_size,
- stack: Rc::new(UnsafeCell::new(DefaultStack::new(stack_size)?)),
- create_time: now(),
- })
- }
-
- /// This function must be called after a stack has finished running a coroutine
- /// so that the `StackLimit` and `GuaranteedStackBytes` fields from the TEB can
- /// be updated in the stack. This is necessary if the stack is reused for
- /// another coroutine.
- #[inline]
- #[cfg(windows)]
- pub(crate) fn update_stack_teb_fields(&mut self) {
- cfg_if::cfg_if! {
- if #[cfg(target_arch = "x86_64")] {
- type StackWord = u64;
- } else if #[cfg(target_arch = "x86")] {
- type StackWord = u32;
- }
- }
- let base = self.base().get() as *const StackWord;
- unsafe {
- let stack_limit = usize::try_from(*base.sub(1)).expect("stack limit overflow");
- let guaranteed_stack_bytes =
- usize::try_from(*base.sub(2)).expect("guaranteed stack bytes overflow");
- self.update_teb_fields(stack_limit, guaranteed_stack_bytes);
- }
- }
-}
-
-static STACK_POOL: OnceCell = OnceCell::new();
-
-/// A memory pool for reusing stacks.
-#[derive(educe::Educe)]
-#[educe(Debug)]
-pub struct MemoryPool {
- #[educe(Debug(ignore))]
- pool: UnsafeCell>,
- len: AtomicUsize,
- //最小内存数,即核心内存数
- min_count: AtomicUsize,
- //非核心内存的最大存活时间,单位ns
- keep_alive_time: AtomicU64,
-}
-
-unsafe impl Send for MemoryPool {}
-
-unsafe impl Sync for MemoryPool {}
-
-impl Default for MemoryPool {
- fn default() -> Self {
- Self::new(0, 0)
- }
-}
-
-impl MemoryPool {
- /// Init the `MemoryPool`.
- pub fn init(config: &Config) -> Result<(), MemoryPool> {
- STACK_POOL.set(MemoryPool::new(
- config.min_memory_count(),
- config.memory_keep_alive_time(),
- ))
- }
-
- pub(crate) fn get_instance<'m>() -> &'m Self {
- STACK_POOL.get_or_init(MemoryPool::default)
- }
-
- pub(crate) fn new(min_count: usize, keep_alive_time: u64) -> Self {
- Self {
- pool: UnsafeCell::new(BinaryHeap::default()),
- len: AtomicUsize::default(),
- min_count: AtomicUsize::new(min_count),
- keep_alive_time: AtomicU64::new(keep_alive_time),
- }
- }
-
- pub(crate) fn allocate(&self, stack_size: usize) -> std::io::Result {
- let heap = unsafe { self.pool.get().as_mut().expect("MemoryPool is not unique") };
- // find min stack
- let mut not_use = Vec::new();
- while let Some(stack) = heap.peek() {
- if Rc::strong_count(&stack.stack) > 1 {
- // can't use the stack
- break;
- }
- #[allow(unused_mut)]
- if let Some(mut stack) = heap.pop() {
- self.sub_len();
- if stack_size <= stack.stack_size {
- for s in not_use {
- heap.push(s);
- self.add_len();
- }
- heap.push(stack.clone());
- self.add_len();
- #[cfg(windows)]
- stack.update_stack_teb_fields();
- return Ok(stack);
- }
- if self.min_count() < self.len()
- && now() <= stack.create_time.saturating_add(self.keep_alive_time())
- {
- // clean the expired stack
- continue;
- }
- not_use.push(stack);
- }
- }
- let stack = PooledStack::new(stack_size)?;
- heap.push(stack.clone());
- self.add_len();
- Ok(stack)
- }
-
- #[allow(dead_code)]
- pub(crate) fn set_keep_alive_time(&self, keep_alive_time: u64) {
- self.keep_alive_time
- .store(keep_alive_time, std::sync::atomic::Ordering::Release);
- }
-
- pub(crate) fn keep_alive_time(&self) -> u64 {
- self.keep_alive_time
- .load(std::sync::atomic::Ordering::Acquire)
- }
-
- #[allow(dead_code)]
- pub(crate) fn set_min_count(&self, min_count: usize) {
- self.min_count
- .store(min_count, std::sync::atomic::Ordering::Release);
- }
-
- pub(crate) fn min_count(&self) -> usize {
- self.min_count.load(std::sync::atomic::Ordering::Acquire)
- }
-
- pub(crate) fn len(&self) -> usize {
- self.len.load(std::sync::atomic::Ordering::Acquire)
- }
-
- fn add_len(&self) {
- self.len.store(
- self.len().saturating_add(1),
- std::sync::atomic::Ordering::Release,
- );
- }
-
- fn sub_len(&self) {
- self.len.store(
- self.len().saturating_sub(1),
- std::sync::atomic::Ordering::Release,
- );
- }
-
- /// Clean the expired stack.
- #[allow(dead_code)]
- pub(crate) fn clean(&self) {
- let heap = unsafe { self.pool.get().as_mut().expect("MemoryPool is not unique") };
- let mut maybe_free = Vec::new();
- while let Some(stack) = heap.peek() {
- if Rc::strong_count(&stack.stack) > 1 {
- // can't free the stack
- break;
- }
- if let Some(stack) = heap.pop() {
- self.sub_len();
- maybe_free.push(stack);
- }
- }
- for stack in maybe_free {
- if self.min_count() < self.len()
- && now() <= stack.create_time.saturating_add(self.keep_alive_time())
- {
- // free the stack
- continue;
- }
- heap.push(stack);
- self.add_len();
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
- use crate::common::constants::DEFAULT_STACK_SIZE;
-
- #[test]
- fn test_stack_pool() -> std::io::Result<()> {
- let pool = MemoryPool::default();
- let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
- assert_eq!(Rc::strong_count(&stack.stack), 2);
- drop(stack);
- let stack = pool.allocate(DEFAULT_STACK_SIZE)?;
- assert_eq!(Rc::strong_count(&stack.stack), 2);
- assert_eq!(pool.len(), 1);
- _ = pool.allocate(DEFAULT_STACK_SIZE)?;
- assert_eq!(pool.len(), 2);
- Ok(())
- }
-}
diff --git a/core/src/monitor.rs b/core/src/monitor.rs
index df5d09e8..5e6a1e17 100644
--- a/core/src/monitor.rs
+++ b/core/src/monitor.rs
@@ -3,7 +3,6 @@ use crate::common::constants::{CoroutineState, MONITOR_BEAN};
use crate::common::{get_timeout_time, now, CondvarBlocker};
use crate::coroutine::listener::Listener;
use crate::coroutine::local::CoroutineLocal;
-use crate::coroutine::stack_pool::MemoryPool;
use crate::scheduler::SchedulableSuspender;
use crate::{catch, error, impl_current_for, impl_display_by_debug, info};
use nix::sys::pthread::{pthread_kill, pthread_self, Pthread};
@@ -137,7 +136,6 @@ impl Monitor {
);
}
}
- MemoryPool::get_instance().clean();
//monitor线程不执行协程计算任务,每次循环至少wait 1ms
monitor.blocker.clone().block(Duration::from_millis(1));
}
diff --git a/core/src/scheduler.rs b/core/src/scheduler.rs
index 0f325ccc..07f65cca 100644
--- a/core/src/scheduler.rs
+++ b/core/src/scheduler.rs
@@ -176,17 +176,12 @@ impl<'s> Scheduler<'s> {
stack_size: Option,
priority: Option,
) -> std::io::Result<()> {
- let mut co = co!(
+ self.submit_raw_co(co!(
format!("{}@{}", self.name(), uuid::Uuid::new_v4()),
f,
stack_size.unwrap_or(self.stack_size()),
priority
- )?;
- for listener in self.listeners.clone() {
- co.add_raw_listener(listener);
- }
- // let co_name = Box::leak(Box::from(coroutine.get_name()));
- self.submit_raw_co(co)
+ )?)
}
/// Add a listener to this scheduler.
@@ -198,8 +193,11 @@ impl<'s> Scheduler<'s> {
///
/// Allow multiple threads to concurrently submit coroutine to the scheduler,
/// but only allow one thread to execute scheduling.
- pub fn submit_raw_co(&self, coroutine: SchedulableCoroutine<'s>) -> std::io::Result<()> {
- self.ready.push(coroutine);
+ pub fn submit_raw_co(&self, mut co: SchedulableCoroutine<'s>) -> std::io::Result<()> {
+ for listener in self.listeners.clone() {
+ co.add_raw_listener(listener);
+ }
+ self.ready.push(co);
Ok(())
}
diff --git a/core/src/syscall/unix/accept.rs b/core/src/syscall/unix/accept.rs
index 72cd5dc2..a39afead 100644
--- a/core/src/syscall/unix/accept.rs
+++ b/core/src/syscall/unix/accept.rs
@@ -36,7 +36,7 @@ impl_facade!(AcceptSyscallFacade, AcceptSyscall,
accept(fd: c_int, address: *mut sockaddr, address_len: *mut socklen_t) -> c_int
);
-impl_io_uring!(IoUringAcceptSyscall, AcceptSyscall,
+impl_io_uring_read!(IoUringAcceptSyscall, AcceptSyscall,
accept(fd: c_int, address: *mut sockaddr, address_len: *mut socklen_t) -> c_int
);
diff --git a/core/src/syscall/unix/connect.rs b/core/src/syscall/unix/connect.rs
index 3abd7c98..ece25287 100644
--- a/core/src/syscall/unix/connect.rs
+++ b/core/src/syscall/unix/connect.rs
@@ -40,7 +40,7 @@ impl_facade!(ConnectSyscallFacade, ConnectSyscall,
connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int
);
-impl_io_uring!(IoUringConnectSyscall, ConnectSyscall,
+impl_io_uring_write!(IoUringConnectSyscall, ConnectSyscall,
connect(fd: c_int, address: *const sockaddr, len: socklen_t) -> c_int
);
diff --git a/core/src/syscall/unix/mod.rs b/core/src/syscall/unix/mod.rs
index c0f64a1a..9374369a 100644
--- a/core/src/syscall/unix/mod.rs
+++ b/core/src/syscall/unix/mod.rs
@@ -73,7 +73,7 @@ macro_rules! impl_io_uring {
}
if let Some(suspender) = SchedulableSuspender::current() {
suspender.suspend();
- //回来的时候,系统调用已经执行完了
+ //回来的时候,系统调用已经执行完毕
}
if let Some(co) = SchedulableCoroutine::current() {
if let CoroutineState::Syscall((), syscall, SyscallState::Callback) = co.state()
@@ -110,6 +110,172 @@ macro_rules! impl_io_uring {
}
}
+macro_rules! impl_io_uring_read {
+ ( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
+ #[repr(C)]
+ #[derive(Debug, Default)]
+ #[cfg(all(target_os = "linux", feature = "io_uring"))]
+ struct $struct_name {
+ inner: I,
+ }
+
+ #[cfg(all(target_os = "linux", feature = "io_uring"))]
+ impl $trait_name for $struct_name {
+ extern "C" fn $syscall(
+ &self,
+ fn_ptr: Option<&extern "C" fn($fd_type, $($arg_type),*) -> $result>,
+ $fd: $fd_type,
+ $($arg: $arg_type),*
+ ) -> $result {
+ if let Ok(arc) = $crate::net::EventLoops::$syscall($fd, $($arg, )*) {
+ use $crate::common::constants::{CoroutineState, SyscallState};
+ use $crate::scheduler::{SchedulableCoroutine, SchedulableSuspender};
+
+ if let Some(co) = SchedulableCoroutine::current() {
+ if let CoroutineState::Syscall((), syscall, SyscallState::Executing) = co.state()
+ {
+ let new_state = SyscallState::Suspend(
+ $crate::common::now()
+ .saturating_add($crate::syscall::recv_time_limit($fd))
+ );
+ if co.syscall((), syscall, new_state).is_err() {
+ $crate::error!(
+ "{} change to syscall {} {} failed !",
+ co.name(), syscall, new_state
+ );
+ }
+ }
+ }
+ if let Some(suspender) = SchedulableSuspender::current() {
+ suspender.suspend();
+ //回来的时候,系统调用已经执行完毕或者超时
+ }
+ if let Some(co) = SchedulableCoroutine::current() {
+ if let CoroutineState::Syscall((), syscall, syscall_state) = co.state() {
+ match syscall_state {
+ SyscallState::Timeout => {
+ $crate::syscall::set_errno(libc::ETIMEDOUT);
+ return -1;
+ },
+ SyscallState::Callback => {
+ let new_state = SyscallState::Executing;
+ if co.syscall((), syscall, new_state).is_err() {
+ $crate::error!(
+ "{} change to syscall {} {} failed !",
+ co.name(), syscall, new_state
+ );
+ }
+ },
+ _ => {}
+ }
+ }
+ }
+ let (lock, cvar) = &*arc;
+ let mut syscall_result: $result = cvar
+ .wait_while(lock.lock().expect("lock failed"),
+ |&mut result| result.is_none()
+ )
+ .expect("lock failed")
+ .expect("no syscall result")
+ .try_into()
+ .expect("io_uring syscall result overflow");
+ if syscall_result < 0 {
+ let errno: std::ffi::c_int = (-syscall_result).try_into()
+ .expect("io_uring errno overflow");
+ $crate::syscall::set_errno(errno);
+ syscall_result = -1;
+ }
+ return syscall_result;
+ }
+ self.inner.$syscall(fn_ptr, $fd, $($arg, )*)
+ }
+ }
+ }
+}
+
+macro_rules! impl_io_uring_write {
+ ( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
+ #[repr(C)]
+ #[derive(Debug, Default)]
+ #[cfg(all(target_os = "linux", feature = "io_uring"))]
+ struct $struct_name {
+ inner: I,
+ }
+
+ #[cfg(all(target_os = "linux", feature = "io_uring"))]
+ impl $trait_name for $struct_name {
+ extern "C" fn $syscall(
+ &self,
+ fn_ptr: Option<&extern "C" fn($fd_type, $($arg_type),*) -> $result>,
+ $fd: $fd_type,
+ $($arg: $arg_type),*
+ ) -> $result {
+ if let Ok(arc) = $crate::net::EventLoops::$syscall($fd, $($arg, )*) {
+ use $crate::common::constants::{CoroutineState, SyscallState};
+ use $crate::scheduler::{SchedulableCoroutine, SchedulableSuspender};
+
+ if let Some(co) = SchedulableCoroutine::current() {
+ if let CoroutineState::Syscall((), syscall, SyscallState::Executing) = co.state()
+ {
+ let new_state = SyscallState::Suspend(
+ $crate::common::now()
+ .saturating_add($crate::syscall::send_time_limit($fd))
+ );
+ if co.syscall((), syscall, new_state).is_err() {
+ $crate::error!(
+ "{} change to syscall {} {} failed !",
+ co.name(), syscall, new_state
+ );
+ }
+ }
+ }
+ if let Some(suspender) = SchedulableSuspender::current() {
+ suspender.suspend();
+ //回来的时候,系统调用已经执行完毕或者超时
+ }
+ if let Some(co) = SchedulableCoroutine::current() {
+ if let CoroutineState::Syscall((), syscall, syscall_state) = co.state() {
+ match syscall_state {
+ SyscallState::Timeout => {
+ $crate::syscall::set_errno(libc::ETIMEDOUT);
+ return -1;
+ },
+ SyscallState::Callback => {
+ let new_state = SyscallState::Executing;
+ if co.syscall((), syscall, new_state).is_err() {
+ $crate::error!(
+ "{} change to syscall {} {} failed !",
+ co.name(), syscall, new_state
+ );
+ }
+ },
+ _ => {}
+ }
+ }
+ }
+ let (lock, cvar) = &*arc;
+ let mut syscall_result: $result = cvar
+ .wait_while(lock.lock().expect("lock failed"),
+ |&mut result| result.is_none()
+ )
+ .expect("lock failed")
+ .expect("no syscall result")
+ .try_into()
+ .expect("io_uring syscall result overflow");
+ if syscall_result < 0 {
+ let errno: std::ffi::c_int = (-syscall_result).try_into()
+ .expect("io_uring errno overflow");
+ $crate::syscall::set_errno(errno);
+ syscall_result = -1;
+ }
+ return syscall_result;
+ }
+ self.inner.$syscall(fn_ptr, $fd, $($arg, )*)
+ }
+ }
+ }
+}
+
macro_rules! impl_nio_read {
( $struct_name:ident, $trait_name: ident, $syscall: ident($fd: ident : $fd_type: ty, $($arg: ident : $arg_type: ty),*) -> $result: ty ) => {
#[repr(C)]
diff --git a/core/src/syscall/unix/pread.rs b/core/src/syscall/unix/pread.rs
index b2614d21..9f14cf2e 100644
--- a/core/src/syscall/unix/pread.rs
+++ b/core/src/syscall/unix/pread.rs
@@ -38,7 +38,7 @@ impl_facade!(PreadSyscallFacade, PreadSyscall,
pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t
);
-impl_io_uring!(IoUringPreadSyscall, PreadSyscall,
+impl_io_uring_read!(IoUringPreadSyscall, PreadSyscall,
pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/preadv.rs b/core/src/syscall/unix/preadv.rs
index 91ce1487..339625f4 100644
--- a/core/src/syscall/unix/preadv.rs
+++ b/core/src/syscall/unix/preadv.rs
@@ -38,7 +38,7 @@ impl_facade!(PreadvSyscallFacade, PreadvSyscall,
preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
);
-impl_io_uring!(IoUringPreadvSyscall, PreadvSyscall,
+impl_io_uring_read!(IoUringPreadvSyscall, PreadvSyscall,
preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/pwrite.rs b/core/src/syscall/unix/pwrite.rs
index ba2a8c97..3673564b 100644
--- a/core/src/syscall/unix/pwrite.rs
+++ b/core/src/syscall/unix/pwrite.rs
@@ -38,7 +38,7 @@ impl_facade!(PwriteSyscallFacade, PwriteSyscall,
pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t
);
-impl_io_uring!(IoUringPwriteSyscall, PwriteSyscall,
+impl_io_uring_write!(IoUringPwriteSyscall, PwriteSyscall,
pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/pwritev.rs b/core/src/syscall/unix/pwritev.rs
index 45587c99..d1014894 100644
--- a/core/src/syscall/unix/pwritev.rs
+++ b/core/src/syscall/unix/pwritev.rs
@@ -38,7 +38,7 @@ impl_facade!(PwritevSyscallFacade, PwritevSyscall,
pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
);
-impl_io_uring!(IoUringPwritevSyscall, PwritevSyscall,
+impl_io_uring_write!(IoUringPwritevSyscall, PwritevSyscall,
pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/read.rs b/core/src/syscall/unix/read.rs
index a3367028..17975783 100644
--- a/core/src/syscall/unix/read.rs
+++ b/core/src/syscall/unix/read.rs
@@ -36,7 +36,7 @@ impl_facade!(ReadSyscallFacade, ReadSyscall,
read(fd: c_int, buf: *mut c_void, len: size_t) -> ssize_t
);
-impl_io_uring!(IoUringReadSyscall, ReadSyscall,
+impl_io_uring_read!(IoUringReadSyscall, ReadSyscall,
read(fd: c_int, buf: *mut c_void, len: size_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/readv.rs b/core/src/syscall/unix/readv.rs
index 376d1d7b..3913221f 100644
--- a/core/src/syscall/unix/readv.rs
+++ b/core/src/syscall/unix/readv.rs
@@ -36,7 +36,7 @@ impl_facade!(ReadvSyscallFacade, ReadvSyscall,
readv(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t
);
-impl_io_uring!(IoUringReadvSyscall, ReadvSyscall,
+impl_io_uring_read!(IoUringReadvSyscall, ReadvSyscall,
readv(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t
);
diff --git a/core/src/syscall/unix/recv.rs b/core/src/syscall/unix/recv.rs
index 5ae5fe77..b6f6454d 100644
--- a/core/src/syscall/unix/recv.rs
+++ b/core/src/syscall/unix/recv.rs
@@ -38,7 +38,7 @@ impl_facade!(RecvSyscallFacade, RecvSyscall,
recv(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int) -> ssize_t
);
-impl_io_uring!(IoUringRecvSyscall, RecvSyscall,
+impl_io_uring_read!(IoUringRecvSyscall, RecvSyscall,
recv(fd: c_int, buf: *mut c_void, len: size_t, flags: c_int) -> ssize_t
);
diff --git a/core/src/syscall/unix/recvmsg.rs b/core/src/syscall/unix/recvmsg.rs
index 14e07026..72e6cadf 100644
--- a/core/src/syscall/unix/recvmsg.rs
+++ b/core/src/syscall/unix/recvmsg.rs
@@ -40,7 +40,7 @@ impl_facade!(RecvmsgSyscallFacade, RecvmsgSyscall,
recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t
);
-impl_io_uring!(IoUringRecvmsgSyscall, RecvmsgSyscall,
+impl_io_uring_read!(IoUringRecvmsgSyscall, RecvmsgSyscall,
recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t
);
diff --git a/core/src/syscall/unix/send.rs b/core/src/syscall/unix/send.rs
index 941bc0b4..5a7d16cf 100644
--- a/core/src/syscall/unix/send.rs
+++ b/core/src/syscall/unix/send.rs
@@ -38,7 +38,7 @@ impl_facade!(SendSyscallFacade, SendSyscall,
send(fd: c_int, buf: *const c_void, len: size_t, flags: c_int) -> ssize_t
);
-impl_io_uring!(IoUringSendSyscall, SendSyscall,
+impl_io_uring_write!(IoUringSendSyscall, SendSyscall,
send(fd: c_int, buf: *const c_void, len: size_t, flags: c_int) -> ssize_t
);
diff --git a/core/src/syscall/unix/sendmsg.rs b/core/src/syscall/unix/sendmsg.rs
index 679dab71..ff6bae5e 100644
--- a/core/src/syscall/unix/sendmsg.rs
+++ b/core/src/syscall/unix/sendmsg.rs
@@ -40,7 +40,7 @@ impl_facade!(SendmsgSyscallFacade, SendmsgSyscall,
sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t
);
-impl_io_uring!(IoUringSendmsgSyscall, SendmsgSyscall,
+impl_io_uring_write!(IoUringSendmsgSyscall, SendmsgSyscall,
sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t
);
diff --git a/core/src/syscall/unix/sendto.rs b/core/src/syscall/unix/sendto.rs
index 2eaa4e94..1563ece0 100644
--- a/core/src/syscall/unix/sendto.rs
+++ b/core/src/syscall/unix/sendto.rs
@@ -54,7 +54,7 @@ impl_facade!(SendtoSyscallFacade, SendtoSyscall,
addr: *const sockaddr, addrlen: socklen_t) -> ssize_t
);
-impl_io_uring!(IoUringSendtoSyscall, SendtoSyscall,
+impl_io_uring_write!(IoUringSendtoSyscall, SendtoSyscall,
sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int,
addr: *const sockaddr, addrlen: socklen_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/write.rs b/core/src/syscall/unix/write.rs
index 2569872e..a06fde6a 100644
--- a/core/src/syscall/unix/write.rs
+++ b/core/src/syscall/unix/write.rs
@@ -36,7 +36,7 @@ impl_facade!(WriteSyscallFacade, WriteSyscall,
write(fd: c_int, buf: *const c_void, len: size_t) -> ssize_t
);
-impl_io_uring!(IoUringWriteSyscall, WriteSyscall,
+impl_io_uring_write!(IoUringWriteSyscall, WriteSyscall,
write(fd: c_int, buf: *const c_void, len: size_t) -> ssize_t
);
diff --git a/core/src/syscall/unix/writev.rs b/core/src/syscall/unix/writev.rs
index c0e52627..5aeea9f7 100644
--- a/core/src/syscall/unix/writev.rs
+++ b/core/src/syscall/unix/writev.rs
@@ -36,7 +36,7 @@ impl_facade!(WritevSyscallFacade, WritevSyscall,
writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t
);
-impl_io_uring!(IoUringWritevSyscall, WritevSyscall,
+impl_io_uring_write!(IoUringWritevSyscall, WritevSyscall,
writev(fd: c_int, iov: *const iovec, iovcnt: c_int) -> ssize_t
);
diff --git a/hook/src/lib.rs b/hook/src/lib.rs
index f387dec8..2e8fcccc 100644
--- a/hook/src/lib.rs
+++ b/hook/src/lib.rs
@@ -50,7 +50,6 @@
use once_cell::sync::OnceCell;
use open_coroutine_core::co_pool::task::UserTaskFunc;
use open_coroutine_core::config::Config;
-use open_coroutine_core::coroutine::stack_pool::MemoryPool;
use open_coroutine_core::net::join::JoinHandle;
use open_coroutine_core::net::{EventLoops, UserFunc};
use open_coroutine_core::scheduler::SchedulableCoroutine;
@@ -76,9 +75,6 @@ pub mod syscall;
/// Start the framework.
#[no_mangle]
pub extern "C" fn open_coroutine_init(config: Config) -> c_int {
- if MemoryPool::init(&config).is_err() {
- return -1;
- }
EventLoops::init(&config);
_ = HOOK.get_or_init(|| config.hook());
0
diff --git a/open-coroutine/src/lib.rs b/open-coroutine/src/lib.rs
index 062ecbce..91ec2c16 100644
--- a/open-coroutine/src/lib.rs
+++ b/open-coroutine/src/lib.rs
@@ -365,6 +365,7 @@ mod tests {
#[test]
fn test() {
init(Config::single());
+ _ = any_join!(task!(|_| 1, ()), task!(|_| 2, ()), task!(|_| 3, ()));
let join = task!(
|_| {
println!("Hello, world!");