diff --git a/open-coroutine-core/src/net/event_loop/mod.rs b/open-coroutine-core/src/net/event_loop/mod.rs index aa3b9357..04718a09 100644 --- a/open-coroutine-core/src/net/event_loop/mod.rs +++ b/open-coroutine-core/src/net/event_loop/mod.rs @@ -106,6 +106,8 @@ static EVENT_LOOP_WORKERS: OnceCell]>> = OnceCe static EVENT_LOOP_STARTED: Lazy = Lazy::new(AtomicBool::default); +static EVENT_LOOP_START_COUNT: Lazy = Lazy::new(|| AtomicUsize::new(0)); + static EVENT_LOOP_STOP: Lazy, Condvar)>> = Lazy::new(|| Arc::new((Mutex::new(AtomicUsize::new(0)), Condvar::new()))); @@ -151,6 +153,7 @@ impl EventLoops { .name(format!("open-coroutine-event-loop-{i}")) .spawn(move || { warn!("open-coroutine-event-loop-{i} has started"); + _ = EVENT_LOOP_START_COUNT.fetch_add(1, Ordering::Release); if set_for_current(CoreId { id: i }) { warn!("pin event-loop-{i} thread to CPU core-{i} failed !"); } @@ -185,7 +188,10 @@ impl EventLoops { .wait_timeout_while( lock.lock().unwrap(), Duration::from_millis(30000), - |stopped| stopped.load(Ordering::Acquire) < unsafe { EVENT_LOOPS.len() } - 1, + |stopped| { + stopped.load(Ordering::Acquire) + < EVENT_LOOP_START_COUNT.load(Ordering::Acquire) - 1 + }, ) .unwrap() .1; diff --git a/open-coroutine-core/src/syscall/facade.rs b/open-coroutine-core/src/syscall/facade.rs index daef06d5..a372c69e 100644 --- a/open-coroutine-core/src/syscall/facade.rs +++ b/open-coroutine-core/src/syscall/facade.rs @@ -6,7 +6,7 @@ use crate::syscall::LinuxSyscall; use crate::syscall::UnixSyscall; #[cfg(target_os = "linux")] use libc::epoll_event; -use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; +use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; use once_cell::sync::Lazy; use std::ffi::{c_int, c_void}; @@ -44,17 +44,6 @@ pub extern "C" fn pread( CHAIN.pread(fn_ptr, fd, buf, count, offset) } -#[must_use] -pub extern "C" fn preadv( - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, -) -> ssize_t { - CHAIN.preadv(fn_ptr, fd, iov, iovcnt, offset) -} - #[must_use] pub extern "C" fn recvmsg( fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, @@ -103,17 +92,6 @@ pub extern "C" fn pwrite( CHAIN.pwrite(fn_ptr, fd, buf, count, offset) } -#[must_use] -pub extern "C" fn pwritev( - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, -) -> ssize_t { - CHAIN.pwritev(fn_ptr, fd, iov, iovcnt, offset) -} - #[must_use] pub extern "C" fn sendmsg( fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, diff --git a/open-coroutine-core/src/syscall/io_uring.rs b/open-coroutine-core/src/syscall/io_uring.rs index d787fd55..3a5d0f3c 100644 --- a/open-coroutine-core/src/syscall/io_uring.rs +++ b/open-coroutine-core/src/syscall/io_uring.rs @@ -1,7 +1,7 @@ use crate::syscall::LinuxSyscall; use crate::syscall::UnixSyscall; use libc::epoll_event; -use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; +use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; use std::ffi::{c_int, c_void}; #[derive(Debug, Default)] @@ -46,17 +46,6 @@ impl UnixSyscall for IoUringLinuxSyscall { impl_io_uring!(self, pread, fn_ptr, fd, buf, count, offset) } - extern "C" fn preadv( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - impl_io_uring!(self, preadv, fn_ptr, fd, iov, iovcnt, offset) - } - extern "C" fn recvmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, @@ -110,17 +99,6 @@ impl UnixSyscall for IoUringLinuxSyscall { impl_io_uring!(self, pwrite, fn_ptr, fd, buf, count, offset) } - extern "C" fn pwritev( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - impl_io_uring!(self, pwritev, fn_ptr, fd, iov, iovcnt, offset) - } - extern "C" fn sendmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, diff --git a/open-coroutine-core/src/syscall/mod.rs b/open-coroutine-core/src/syscall/mod.rs index 80c2f590..0a8f26a4 100644 --- a/open-coroutine-core/src/syscall/mod.rs +++ b/open-coroutine-core/src/syscall/mod.rs @@ -1,7 +1,7 @@ #[cfg(target_os = "linux")] use libc::epoll_event; #[cfg(unix)] -use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; +use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; #[cfg(unix)] use std::ffi::{c_int, c_void}; @@ -47,15 +47,6 @@ pub trait UnixSyscall { offset: off_t, ) -> ssize_t; - extern "C" fn preadv( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t; - extern "C" fn recvmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, @@ -103,15 +94,6 @@ pub trait UnixSyscall { offset: off_t, ) -> ssize_t; - extern "C" fn pwritev( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t; - extern "C" fn sendmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, diff --git a/open-coroutine-core/src/syscall/nio.rs b/open-coroutine-core/src/syscall/nio.rs index 88ce03d4..0c293571 100644 --- a/open-coroutine-core/src/syscall/nio.rs +++ b/open-coroutine-core/src/syscall/nio.rs @@ -61,80 +61,6 @@ macro_rules! impl_expected_read_hook { }}; } -macro_rules! impl_expected_batch_read_hook { - ( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $iov:expr, $length:expr, $($arg: expr),* $(,)* ) => {{ - let socket = $socket; - let blocking = $crate::syscall::common::is_blocking(socket); - if blocking { - $crate::syscall::common::set_non_blocking(socket); - } - let mut vec = std::collections::VecDeque::from(unsafe { - Vec::from_raw_parts($iov.cast_mut(), $length as usize, $length as usize) - }); - let mut length = 0; - let mut pices = std::collections::VecDeque::new(); - for iovec in &vec { - length += iovec.iov_len; - pices.push_back(length); - } - let mut received = 0; - let mut r = 0; - while received < length { - // find from-index - let mut from_index = 0; - for (i, v) in pices.iter().enumerate() { - if received < *v { - from_index = i; - break; - } - } - // calculate offset - let current_received_offset = if from_index > 0 { - received.saturating_sub(pices[from_index.saturating_sub(1)]) - } else { - received - }; - // remove already received - for _ in 0..from_index { - _ = vec.pop_front(); - _ = pices.pop_front(); - } - // build syscall args - vec[0] = iovec { - iov_base: (vec[0].iov_base as usize + current_received_offset) as *mut c_void, - iov_len: vec[0].iov_len - current_received_offset, - }; - r = $invoker.$syscall($fn_ptr, $socket, vec.get(0).unwrap(), c_int::try_from(vec.len()).unwrap(), $($arg, )*); - if r != -1 { - $crate::syscall::common::reset_errno(); - received += r as usize; - if received >= length || r == 0 { - r = received as ssize_t; - break; - } - } - let error_kind = std::io::Error::last_os_error().kind(); - if error_kind == std::io::ErrorKind::WouldBlock { - //wait read event - if $crate::net::event_loop::EventLoops::wait_read_event( - socket, - Some(std::time::Duration::from_millis(10)), - ) - .is_err() - { - break; - } - } else if error_kind != std::io::ErrorKind::Interrupted { - break; - } - } - if blocking { - $crate::syscall::common::set_blocking(socket); - } - r - }}; -} - macro_rules! impl_expected_write_hook { ( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $buffer:expr, $length:expr, $($arg: expr),* $(,)* ) => {{ let socket = $socket; @@ -182,80 +108,6 @@ macro_rules! impl_expected_write_hook { }}; } -macro_rules! impl_expected_batch_write_hook { - ( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $iov:expr, $length:expr, $($arg: expr),* $(,)* ) => {{ - let socket = $socket; - let blocking = $crate::syscall::common::is_blocking(socket); - if blocking { - $crate::syscall::common::set_non_blocking(socket); - } - let mut vec = std::collections::VecDeque::from(unsafe { - Vec::from_raw_parts($iov.cast_mut(), $length as usize, $length as usize) - }); - let mut length = 0; - let mut pices = std::collections::VecDeque::new(); - for iovec in &vec { - length += iovec.iov_len; - pices.push_back(length); - } - let mut sent = 0; - let mut r = 0; - while sent < length { - // find from-index - let mut from_index = 0; - for (i, v) in pices.iter().enumerate() { - if sent < *v { - from_index = i; - break; - } - } - // calculate offset - let current_sent_offset = if from_index > 0 { - sent.saturating_sub(pices[from_index.saturating_sub(1)]) - } else { - sent - }; - // remove already sent - for _ in 0..from_index { - _ = vec.pop_front(); - _ = pices.pop_front(); - } - // build syscall args - vec[0] = iovec { - iov_base: (vec[0].iov_base as usize + current_sent_offset) as *mut c_void, - iov_len: vec[0].iov_len - current_sent_offset, - }; - r = $invoker.$syscall($fn_ptr, $socket, vec.get(0).unwrap(), c_int::try_from(vec.len()).unwrap(), $($arg, )*); - if r != -1 { - $crate::syscall::common::reset_errno(); - sent += r as usize; - if sent >= length { - r = sent as ssize_t; - break; - } - } - let error_kind = std::io::Error::last_os_error().kind(); - if error_kind == std::io::ErrorKind::WouldBlock { - //wait write event - if $crate::net::event_loop::EventLoops::wait_write_event( - socket, - Some(std::time::Duration::from_millis(10)), - ) - .is_err() - { - break; - } - } else if error_kind != std::io::ErrorKind::Interrupted { - break; - } - } - if blocking { - $crate::syscall::common::set_blocking(socket); - } - r - }}; -} - impl UnixSyscall for NioLinuxSyscall { extern "C" fn read( &self, @@ -278,17 +130,6 @@ impl UnixSyscall for NioLinuxSyscall { impl_expected_read_hook!(self.inner, pread, fn_ptr, fd, buf, count, offset) } - extern "C" fn preadv( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - impl_expected_batch_read_hook!(self.inner, preadv, fn_ptr, fd, iov, iovcnt, offset) - } - extern "C" fn recvmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, @@ -432,17 +273,6 @@ impl UnixSyscall for NioLinuxSyscall { impl_expected_write_hook!(self.inner, pwrite, fn_ptr, fd, buf, count, offset) } - extern "C" fn pwritev( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - impl_expected_batch_write_hook!(self.inner, pwritev, fn_ptr, fd, iov, iovcnt, offset) - } - extern "C" fn sendmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, diff --git a/open-coroutine-core/src/syscall/raw.rs b/open-coroutine-core/src/syscall/raw.rs index 341a25b6..5469ed8d 100644 --- a/open-coroutine-core/src/syscall/raw.rs +++ b/open-coroutine-core/src/syscall/raw.rs @@ -3,7 +3,7 @@ use crate::syscall::LinuxSyscall; use crate::syscall::UnixSyscall; #[cfg(target_os = "linux")] use libc::epoll_event; -use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; +use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; use std::ffi::{c_int, c_void}; #[derive(Debug, Copy, Clone, Default)] @@ -41,21 +41,6 @@ impl UnixSyscall for RawLinuxSyscall { } } - extern "C" fn preadv( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, iov, iovcnt, offset) - } else { - unsafe { libc::preadv(fd, iov, iovcnt, offset) } - } - } - extern "C" fn recvmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, @@ -127,21 +112,6 @@ impl UnixSyscall for RawLinuxSyscall { } } - extern "C" fn pwritev( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, iov, iovcnt, offset) - } else { - unsafe { libc::pwritev(fd, iov, iovcnt, offset) } - } - } - extern "C" fn sendmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, diff --git a/open-coroutine-core/src/syscall/state.rs b/open-coroutine-core/src/syscall/state.rs index 9712c87e..97155cd9 100644 --- a/open-coroutine-core/src/syscall/state.rs +++ b/open-coroutine-core/src/syscall/state.rs @@ -4,7 +4,7 @@ use crate::syscall::LinuxSyscall; use crate::syscall::UnixSyscall; #[cfg(target_os = "linux")] use libc::epoll_event; -use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; +use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; use std::ffi::{c_int, c_void}; #[derive(Debug, Default)] @@ -56,17 +56,6 @@ impl UnixSyscall for StateLinuxSyscall { syscall_state!(self, pread, fn_ptr, fd, buf, count, offset) } - extern "C" fn preadv( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - syscall_state!(self, preadv, fn_ptr, fd, iov, iovcnt, offset) - } - extern "C" fn recvmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, @@ -120,17 +109,6 @@ impl UnixSyscall for StateLinuxSyscall { syscall_state!(self, pwrite, fn_ptr, fd, buf, count, offset) } - extern "C" fn pwritev( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, - fd: c_int, - iov: *const iovec, - iovcnt: c_int, - offset: off_t, - ) -> ssize_t { - syscall_state!(self, pwritev, fn_ptr, fd, iov, iovcnt, offset) - } - extern "C" fn sendmsg( &self, fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, diff --git a/open-coroutine-core/src/syscall/unix/mod.rs b/open-coroutine-core/src/syscall/unix/mod.rs index b9b28d12..a32963e4 100644 --- a/open-coroutine-core/src/syscall/unix/mod.rs +++ b/open-coroutine-core/src/syscall/unix/mod.rs @@ -6,6 +6,8 @@ pub use connect::connect; pub use listen::listen; pub use nanosleep::nanosleep; pub use poll::poll; +pub use preadv::preadv; +pub use pwritev::pwritev; pub use readv::readv; pub use recv::recv; pub use recvfrom::recvfrom; @@ -480,6 +482,8 @@ mod connect; mod listen; mod nanosleep; mod poll; +mod preadv; +mod pwritev; mod readv; mod recv; mod recvfrom; diff --git a/open-coroutine-core/src/syscall/unix/preadv.rs b/open-coroutine-core/src/syscall/unix/preadv.rs new file mode 100644 index 00000000..91ce1487 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/preadv.rs @@ -0,0 +1,51 @@ +use libc::{iovec, off_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::c_int; + +#[must_use] +pub extern "C" fn preadv( + fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + PreadvSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.preadv(fn_ptr, fd, iov, iovcnt, offset) +} + +trait PreadvSyscall { + extern "C" fn preadv( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + ) -> ssize_t; +} + +impl_facade!(PreadvSyscallFacade, PreadvSyscall, + preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); + +impl_io_uring!(IoUringPreadvSyscall, PreadvSyscall, + preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); + +impl_nio_read_iovec!(NioPreadvSyscall, PreadvSyscall, + preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); + +impl_raw!(RawPreadvSyscall, PreadvSyscall, + preadv(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); diff --git a/open-coroutine-core/src/syscall/unix/pwritev.rs b/open-coroutine-core/src/syscall/unix/pwritev.rs new file mode 100644 index 00000000..45587c99 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/pwritev.rs @@ -0,0 +1,51 @@ +use libc::{iovec, off_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::c_int; + +#[must_use] +pub extern "C" fn pwritev( + fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + PwritevSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.pwritev(fn_ptr, fd, iov, iovcnt, offset) +} + +trait PwritevSyscall { + extern "C" fn pwritev( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *const iovec, c_int, off_t) -> ssize_t>, + fd: c_int, + iov: *const iovec, + iovcnt: c_int, + offset: off_t, + ) -> ssize_t; +} + +impl_facade!(PwritevSyscallFacade, PwritevSyscall, + pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); + +impl_io_uring!(IoUringPwritevSyscall, PwritevSyscall, + pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); + +impl_nio_write_iovec!(NioPwritevSyscall, PwritevSyscall, + pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +); + +impl_raw!(RawPwritevSyscall, PwritevSyscall, + pwritev(fd: c_int, iov: *const iovec, iovcnt: c_int, offset: off_t) -> ssize_t +);