diff --git a/open-coroutine-core/src/syscall/facade.rs b/open-coroutine-core/src/syscall/facade.rs index a372c69e..e5996dab 100644 --- a/open-coroutine-core/src/syscall/facade.rs +++ b/open-coroutine-core/src/syscall/facade.rs @@ -21,39 +21,6 @@ cfg_if::cfg_if! { } } -/// read - -#[must_use] -pub extern "C" fn read( - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, -) -> ssize_t { - CHAIN.read(fn_ptr, fd, buf, count) -} - -#[must_use] -pub extern "C" fn pread( - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, -) -> ssize_t { - CHAIN.pread(fn_ptr, fd, buf, count, offset) -} - -#[must_use] -pub extern "C" fn recvmsg( - fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *mut msghdr, - flags: c_int, -) -> ssize_t { - CHAIN.recvmsg(fn_ptr, fd, msg, flags) -} - /// write #[must_use] diff --git a/open-coroutine-core/src/syscall/io_uring.rs b/open-coroutine-core/src/syscall/io_uring.rs index 3a5d0f3c..bf0d5f7b 100644 --- a/open-coroutine-core/src/syscall/io_uring.rs +++ b/open-coroutine-core/src/syscall/io_uring.rs @@ -25,37 +25,6 @@ macro_rules! impl_io_uring { } impl UnixSyscall for IoUringLinuxSyscall { - extern "C" fn read( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> ssize_t { - impl_io_uring!(self, read, fn_ptr, fd, buf, count) - } - - extern "C" fn pread( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - impl_io_uring!(self, pread, fn_ptr, fd, buf, count, offset) - } - - extern "C" fn recvmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> ssize_t { - impl_io_uring!(self, recvmsg, fn_ptr, fd, msg, flags) - } - extern "C" fn sendto( &self, fn_ptr: Option< diff --git a/open-coroutine-core/src/syscall/mod.rs b/open-coroutine-core/src/syscall/mod.rs index 0a8f26a4..2978af25 100644 --- a/open-coroutine-core/src/syscall/mod.rs +++ b/open-coroutine-core/src/syscall/mod.rs @@ -28,33 +28,6 @@ pub use facade::*; #[cfg(unix)] pub trait UnixSyscall { - /// read - - extern "C" fn read( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> ssize_t; - - extern "C" fn pread( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> ssize_t; - - extern "C" fn recvmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> ssize_t; - /// write extern "C" fn sendto( diff --git a/open-coroutine-core/src/syscall/nio.rs b/open-coroutine-core/src/syscall/nio.rs index 0c293571..7137779b 100644 --- a/open-coroutine-core/src/syscall/nio.rs +++ b/open-coroutine-core/src/syscall/nio.rs @@ -14,53 +14,6 @@ pub struct NioLinuxSyscall { inner: I, } -macro_rules! impl_expected_read_hook { - ( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $buffer:expr, $length:expr, $($arg: expr),* $(,)* ) => {{ - let socket = $socket; - let blocking = $crate::syscall::common::is_blocking(socket); - if blocking { - $crate::syscall::common::set_non_blocking(socket); - } - let mut received = 0; - let mut r = 0; - while received < $length { - r = $invoker.$syscall( - $fn_ptr, - $socket, - ($buffer as usize + received) as *mut c_void, - $length - received, - $($arg, )* - ); - if r != -1 { - $crate::syscall::common::reset_errno(); - received += r as size_t; - if received >= $length || r == 0 { - r = received as ssize_t; - break; - } - } - let error_kind = std::io::Error::last_os_error().kind(); - if error_kind == std::io::ErrorKind::WouldBlock { - //wait read event - if $crate::net::event_loop::EventLoops::wait_read_event( - socket, - Some(std::time::Duration::from_millis(10)), - ) - .is_err() - { - break; - } - } else if error_kind != std::io::ErrorKind::Interrupted { - break; - } - } - if blocking { - $crate::syscall::common::set_blocking(socket); - } - r - }}; -} - macro_rules! impl_expected_write_hook { ( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $buffer:expr, $length:expr, $($arg: expr),* $(,)* ) => {{ let socket = $socket; @@ -109,125 +62,6 @@ macro_rules! impl_expected_write_hook { } impl UnixSyscall for NioLinuxSyscall { - extern "C" fn read( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> ssize_t { - impl_expected_read_hook!(self.inner, read, fn_ptr, fd, buf, count,) - } - - extern "C" fn pread( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - impl_expected_read_hook!(self.inner, pread, fn_ptr, fd, buf, count, offset) - } - - extern "C" fn recvmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> ssize_t { - let blocking = is_blocking(fd); - if blocking { - set_non_blocking(fd); - } - let msghdr = unsafe { *msg }; - let mut vec = std::collections::VecDeque::from(unsafe { - Vec::from_raw_parts( - msghdr.msg_iov, - msghdr.msg_iovlen as usize, - msghdr.msg_iovlen as usize, - ) - }); - let mut length = 0; - let mut pices = std::collections::VecDeque::new(); - for iovec in &vec { - length += iovec.iov_len; - pices.push_back(length); - } - let mut received = 0; - let mut r = 0; - while received < length { - // find from-index - let mut from_index = 0; - for (i, v) in pices.iter().enumerate() { - if received < *v { - from_index = i; - break; - } - } - // calculate offset - let current_received_offset = if from_index > 0 { - received.saturating_sub(pices[from_index.saturating_sub(1)]) - } else { - received - }; - // remove already received - for _ in 0..from_index { - _ = vec.pop_front(); - _ = pices.pop_front(); - } - // build syscall args - vec[0] = iovec { - iov_base: (vec[0].iov_base as usize + current_received_offset) as *mut c_void, - iov_len: vec[0].iov_len - current_received_offset, - }; - cfg_if::cfg_if! { - if #[cfg(any( - target_os = "linux", - target_os = "l4re", - target_os = "android", - target_os = "emscripten" - ))] { - let len = vec.len(); - } else { - let len = c_int::try_from(vec.len()).unwrap(); - } - } - let mut new_msg = msghdr { - msg_name: msghdr.msg_name, - msg_namelen: msghdr.msg_namelen, - msg_iov: vec.get_mut(0).unwrap(), - msg_iovlen: len, - msg_control: msghdr.msg_control, - msg_controllen: msghdr.msg_controllen, - msg_flags: msghdr.msg_flags, - }; - r = self.inner.recvmsg(fn_ptr, fd, &mut new_msg, flags); - if r != -1 { - reset_errno(); - received += r as usize; - if received >= length || r == 0 { - r = received as ssize_t; - break; - } - } - let error_kind = std::io::Error::last_os_error().kind(); - if error_kind == std::io::ErrorKind::WouldBlock { - //wait read event - if EventLoops::wait_read_event(fd, Some(Duration::from_millis(10))).is_err() { - break; - } - } else if error_kind != std::io::ErrorKind::Interrupted { - break; - } - } - if blocking { - set_blocking(fd); - } - r - } - extern "C" fn sendto( &self, fn_ptr: Option< diff --git a/open-coroutine-core/src/syscall/raw.rs b/open-coroutine-core/src/syscall/raw.rs index 5469ed8d..2276142e 100644 --- a/open-coroutine-core/src/syscall/raw.rs +++ b/open-coroutine-core/src/syscall/raw.rs @@ -10,51 +10,6 @@ use std::ffi::{c_int, c_void}; pub struct RawLinuxSyscall {} impl UnixSyscall for RawLinuxSyscall { - /// read - - extern "C" fn read( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, buf, count) - } else { - unsafe { libc::read(fd, buf, count) } - } - } - - extern "C" fn pread( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, buf, count, offset) - } else { - unsafe { libc::pread(fd, buf, count, offset) } - } - } - - extern "C" fn recvmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, msg, flags) - } else { - unsafe { libc::recvmsg(fd, msg, flags) } - } - } - /// write extern "C" fn sendto( diff --git a/open-coroutine-core/src/syscall/state.rs b/open-coroutine-core/src/syscall/state.rs index 97155cd9..09514764 100644 --- a/open-coroutine-core/src/syscall/state.rs +++ b/open-coroutine-core/src/syscall/state.rs @@ -35,37 +35,6 @@ macro_rules! syscall_state { } impl UnixSyscall for StateLinuxSyscall { - extern "C" fn read( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - ) -> ssize_t { - syscall_state!(self, read, fn_ptr, fd, buf, count) - } - - extern "C" fn pread( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *mut c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - syscall_state!(self, pread, fn_ptr, fd, buf, count, offset) - } - - extern "C" fn recvmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *mut msghdr, - flags: c_int, - ) -> ssize_t { - syscall_state!(self, recvmsg, fn_ptr, fd, msg, flags) - } - extern "C" fn sendto( &self, fn_ptr: Option< diff --git a/open-coroutine-core/src/syscall/unix/mod.rs b/open-coroutine-core/src/syscall/unix/mod.rs index a32963e4..3b1ff34a 100644 --- a/open-coroutine-core/src/syscall/unix/mod.rs +++ b/open-coroutine-core/src/syscall/unix/mod.rs @@ -6,11 +6,14 @@ pub use connect::connect; pub use listen::listen; pub use nanosleep::nanosleep; pub use poll::poll; +pub use pread::pread; pub use preadv::preadv; pub use pwritev::pwritev; +pub use read::read; pub use readv::readv; pub use recv::recv; pub use recvfrom::recvfrom; +pub use recvmsg::recvmsg; pub use select::select; pub use send::send; pub use shutdown::shutdown; @@ -482,11 +485,14 @@ mod connect; mod listen; mod nanosleep; mod poll; +mod pread; mod preadv; mod pwritev; +mod read; mod readv; mod recv; mod recvfrom; +mod recvmsg; mod select; mod send; mod shutdown; diff --git a/open-coroutine-core/src/syscall/unix/pread.rs b/open-coroutine-core/src/syscall/unix/pread.rs new file mode 100644 index 00000000..b2614d21 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/pread.rs @@ -0,0 +1,51 @@ +use libc::{off_t, size_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; + +#[must_use] +pub extern "C" fn pread( + fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, + fd: c_int, + buf: *mut c_void, + len: size_t, + offset: off_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + PreadSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.pread(fn_ptr, fd, buf, len, offset) +} + +trait PreadSyscall { + extern "C" fn pread( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t, off_t) -> ssize_t>, + fd: c_int, + buf: *mut c_void, + len: size_t, + offset: off_t, + ) -> ssize_t; +} + +impl_facade!(PreadSyscallFacade, PreadSyscall, + pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t +); + +impl_io_uring!(IoUringPreadSyscall, PreadSyscall, + pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t +); + +impl_nio_read_buf!(NioPreadSyscall, PreadSyscall, + pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t +); + +impl_raw!(RawPreadSyscall, PreadSyscall, + pread(fd: c_int, buf: *mut c_void, len: size_t, offset: off_t) -> ssize_t +); diff --git a/open-coroutine-core/src/syscall/unix/read.rs b/open-coroutine-core/src/syscall/unix/read.rs new file mode 100644 index 00000000..a3367028 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/read.rs @@ -0,0 +1,49 @@ +use libc::{size_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; + +#[must_use] +pub extern "C" fn read( + fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, + fd: c_int, + buf: *mut c_void, + len: size_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + ReadSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.read(fn_ptr, fd, buf, len) +} + +trait ReadSyscall { + extern "C" fn read( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *mut c_void, size_t) -> ssize_t>, + fd: c_int, + buf: *mut c_void, + len: size_t, + ) -> ssize_t; +} + +impl_facade!(ReadSyscallFacade, ReadSyscall, + read(fd: c_int, buf: *mut c_void, len: size_t) -> ssize_t +); + +impl_io_uring!(IoUringReadSyscall, ReadSyscall, + read(fd: c_int, buf: *mut c_void, len: size_t) -> ssize_t +); + +impl_nio_read_buf!(NioReadSyscall, ReadSyscall, + read(fd: c_int, buf: *mut c_void, len: size_t, ) -> ssize_t +); + +impl_raw!(RawReadSyscall, ReadSyscall, + read(fd: c_int, buf: *mut c_void, len: size_t) -> ssize_t +); diff --git a/open-coroutine-core/src/syscall/unix/recvmsg.rs b/open-coroutine-core/src/syscall/unix/recvmsg.rs new file mode 100644 index 00000000..084c3854 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/recvmsg.rs @@ -0,0 +1,165 @@ +use crate::net::event_loop::EventLoops; +use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking}; +use libc::{msghdr, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; +use std::io::{Error, ErrorKind}; +use std::time::Duration; + +#[must_use] +pub extern "C" fn recvmsg( + fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, + fd: c_int, + msg: *mut msghdr, + flags: c_int, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + RecvmsgSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.recvmsg(fn_ptr, fd, msg, flags) +} + +trait RecvmsgSyscall { + extern "C" fn recvmsg( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, + fd: c_int, + msg: *mut msghdr, + flags: c_int, + ) -> ssize_t; +} + +impl_facade!(RecvmsgSyscallFacade, RecvmsgSyscall, + recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t +); + +impl_io_uring!(IoUringRecvmsgSyscall, RecvmsgSyscall, + recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t +); + +#[derive(Debug, Default)] +struct NioRecvmsgSyscall { + inner: I, +} + +impl RecvmsgSyscall for NioRecvmsgSyscall { + extern "C" fn recvmsg( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *mut msghdr, c_int) -> ssize_t>, + fd: c_int, + msg: *mut msghdr, + flags: c_int, + ) -> ssize_t { + let blocking = is_blocking(fd); + if blocking { + set_non_blocking(fd); + } + let msghdr = unsafe { *msg }; + let vec = unsafe { + Vec::from_raw_parts( + msghdr.msg_iov, + msghdr.msg_iovlen as usize, + msghdr.msg_iovlen as usize, + ) + }; + let mut length = 0; + let mut received = 0usize; + let mut r = 0; + let mut index = 0; + for iovec in &vec { + let mut offset = received.saturating_sub(length); + length += iovec.iov_len; + if received > length { + index += 1; + continue; + } + let mut iov = Vec::new(); + for i in vec.iter().skip(index) { + iov.push(*i); + } + cfg_if::cfg_if! { + if #[cfg(any( + target_os = "linux", + target_os = "l4re", + target_os = "android", + target_os = "emscripten" + ))] { + let msg_iovlen = vec.len(); + } else { + let msg_iovlen = c_int::try_from(iov.len()).unwrap_or_else(|_| { + panic!("{} msghdr.msg_iovlen overflow", crate::constants::Syscall::recvmsg) + }); + } + } + while received < length { + if 0 != offset { + iov[0] = libc::iovec { + iov_base: (iov[0].iov_base as usize + offset) as *mut c_void, + iov_len: iov[0].iov_len - offset, + }; + } + let mut arg = msghdr { + msg_name: msghdr.msg_name, + msg_namelen: msghdr.msg_namelen, + msg_iov: iov.as_mut_ptr(), + msg_iovlen, + msg_control: msghdr.msg_control, + msg_controllen: msghdr.msg_controllen, + msg_flags: msghdr.msg_flags, + }; + r = self.inner.recvmsg(fn_ptr, fd, &mut arg, flags); + if r == 0 { + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + return r; + } else if r != -1 { + reset_errno(); + received += r as usize; + if received >= length { + r = received as ssize_t; + break; + } + offset = received.saturating_sub(length); + } + let error_kind = Error::last_os_error().kind(); + if error_kind == ErrorKind::WouldBlock { + //wait read event + if EventLoops::wait_read_event(fd, Some(Duration::from_millis(10))).is_err() { + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + return r; + } + } else if error_kind != ErrorKind::Interrupted { + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + return r; + } + } + if received >= length { + index += 1; + } + } + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + r + } +} + +impl_raw!(RawRecvmsgSyscall, RecvmsgSyscall, + recvmsg(fd: c_int, msg: *mut msghdr, flags: c_int) -> ssize_t +);