diff --git a/TODO.md b/TODO.md index 700933d5..794dd45f 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,3 @@ -syscall mod is split into more modules based on system call names - EventLoops does not perform load balancing refactor CI \ No newline at end of file diff --git a/open-coroutine-core/src/net/event_loop/mod.rs b/open-coroutine-core/src/net/event_loop/mod.rs index 04718a09..e288ad48 100644 --- a/open-coroutine-core/src/net/event_loop/mod.rs +++ b/open-coroutine-core/src/net/event_loop/mod.rs @@ -181,27 +181,30 @@ impl EventLoops { pub fn stop() { warn!("open-coroutine is exiting..."); - EVENT_LOOP_STARTED.store(false, Ordering::Release); - // wait for the event-loops to stop - let (lock, cvar) = EVENT_LOOP_STOP.as_ref(); - let result = cvar - .wait_timeout_while( - lock.lock().unwrap(), - Duration::from_millis(30000), - |stopped| { - stopped.load(Ordering::Acquire) - < EVENT_LOOP_START_COUNT.load(Ordering::Acquire) - 1 - }, - ) - .unwrap() - .1; + if EVENT_LOOP_STARTED + .compare_exchange(true, false, Ordering::Acquire, Ordering::Relaxed) + .is_ok() + { + // wait for the event-loops to stop + let (lock, cvar) = EVENT_LOOP_STOP.as_ref(); + let result = cvar + .wait_timeout_while( + lock.lock().unwrap(), + Duration::from_millis(30000), + |stopped| { + stopped.load(Ordering::Acquire) + < EVENT_LOOP_START_COUNT.load(Ordering::Acquire) - 1 + }, + ) + .unwrap() + .1; + if result.timed_out() { + crate::error!("open-coroutine didn't exit successfully within 30 seconds !"); + } + } #[cfg(all(unix, feature = "preemptive-schedule"))] crate::monitor::Monitor::stop(); - if result.timed_out() { - crate::error!("open-coroutine didn't exit successfully within 30 seconds !"); - } else { - crate::info!("open-coroutine exit successfully !"); - } + crate::info!("open-coroutine exit successfully !"); } pub fn submit_co( diff --git a/open-coroutine-core/src/syscall/facade.rs b/open-coroutine-core/src/syscall/facade.rs deleted file mode 100644 index e5996dab..00000000 --- a/open-coroutine-core/src/syscall/facade.rs +++ /dev/null @@ -1,84 +0,0 @@ -use crate::syscall::nio::NioLinuxSyscall; -use crate::syscall::raw::RawLinuxSyscall; -use crate::syscall::state::StateLinuxSyscall; -#[cfg(target_os = "linux")] -use crate::syscall::LinuxSyscall; -use crate::syscall::UnixSyscall; -#[cfg(target_os = "linux")] -use libc::epoll_event; -use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; -use once_cell::sync::Lazy; -use std::ffi::{c_int, c_void}; - -cfg_if::cfg_if! { - if #[cfg(all(target_os = "linux", feature = "io_uring"))] { - use crate::syscall::io_uring::IoUringLinuxSyscall; - static CHAIN: Lazy>>> = - Lazy::new(StateLinuxSyscall::default); - } else { - static CHAIN: Lazy>> = - Lazy::new(StateLinuxSyscall::default); - } -} - -/// write - -#[must_use] -pub extern "C" fn sendto( - fn_ptr: Option< - &extern "C" fn(c_int, *const c_void, size_t, c_int, *const sockaddr, socklen_t) -> ssize_t, - >, - socket: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, -) -> ssize_t { - CHAIN.sendto(fn_ptr, socket, buf, len, flags, addr, addrlen) -} - -#[must_use] -pub extern "C" fn write( - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, -) -> ssize_t { - CHAIN.write(fn_ptr, fd, buf, count) -} - -#[must_use] -pub extern "C" fn pwrite( - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, -) -> ssize_t { - CHAIN.pwrite(fn_ptr, fd, buf, count, offset) -} - -#[must_use] -pub extern "C" fn sendmsg( - fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *const msghdr, - flags: c_int, -) -> ssize_t { - CHAIN.sendmsg(fn_ptr, fd, msg, flags) -} - -/// poll - -#[cfg(target_os = "linux")] -#[must_use] -pub extern "C" fn epoll_ctl( - fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *mut epoll_event) -> c_int>, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut epoll_event, -) -> c_int { - CHAIN.epoll_ctl(fn_ptr, epfd, op, fd, event) -} diff --git a/open-coroutine-core/src/syscall/io_uring.rs b/open-coroutine-core/src/syscall/io_uring.rs deleted file mode 100644 index bf0d5f7b..00000000 --- a/open-coroutine-core/src/syscall/io_uring.rs +++ /dev/null @@ -1,93 +0,0 @@ -use crate::syscall::LinuxSyscall; -use crate::syscall::UnixSyscall; -use libc::epoll_event; -use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; -use std::ffi::{c_int, c_void}; - -#[derive(Debug, Default)] -pub struct IoUringLinuxSyscall { - inner: I, -} - -macro_rules! unsupported { - ( $invoker: expr, $syscall:ident, $fn_ptr:expr, $($arg: expr),* $(,)* ) => {{ - $invoker.inner.$syscall($fn_ptr, $($arg, )*) - }}; -} - -macro_rules! impl_io_uring { - ( $invoker: expr, $syscall:ident, $fn_ptr:expr, $($arg: expr),* $(,)* ) => {{ - if let Ok(result) = $crate::net::event_loop::EventLoops::$syscall($($arg, )*) { - return result; - } - unsupported!($invoker, $syscall, $fn_ptr, $($arg, )*) - }}; -} - -impl UnixSyscall for IoUringLinuxSyscall { - extern "C" fn sendto( - &self, - fn_ptr: Option< - &extern "C" fn( - c_int, - *const c_void, - size_t, - c_int, - *const sockaddr, - socklen_t, - ) -> ssize_t, - >, - socket: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> ssize_t { - impl_io_uring!(self, sendto, fn_ptr, socket, buf, len, flags, addr, addrlen) - } - - extern "C" fn write( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> ssize_t { - impl_io_uring!(self, write, fn_ptr, fd, buf, count) - } - - extern "C" fn pwrite( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - impl_io_uring!(self, pwrite, fn_ptr, fd, buf, count, offset) - } - - extern "C" fn sendmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> ssize_t { - impl_io_uring!(self, sendmsg, fn_ptr, fd, msg, flags) - } -} - -impl LinuxSyscall for IoUringLinuxSyscall { - extern "C" fn epoll_ctl( - &self, - fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *mut epoll_event) -> c_int>, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut epoll_event, - ) -> c_int { - impl_io_uring!(self, epoll_ctl, fn_ptr, epfd, op, fd, event) - } -} diff --git a/open-coroutine-core/src/syscall/mod.rs b/open-coroutine-core/src/syscall/mod.rs index 2978af25..a0676c19 100644 --- a/open-coroutine-core/src/syscall/mod.rs +++ b/open-coroutine-core/src/syscall/mod.rs @@ -1,96 +1,6 @@ -#[cfg(target_os = "linux")] -use libc::epoll_event; -#[cfg(unix)] -use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; -#[cfg(unix)] -use std::ffi::{c_int, c_void}; - #[cfg(unix)] pub mod common; -#[cfg(unix)] -pub mod raw; - -#[cfg(unix)] -pub mod nio; - -#[allow(unused_variables)] -#[cfg(all(target_os = "linux", feature = "io_uring"))] -pub mod io_uring; - -#[cfg(unix)] -pub mod state; - -#[cfg(unix)] -mod facade; -#[cfg(unix)] -pub use facade::*; - -#[cfg(unix)] -pub trait UnixSyscall { - /// write - - extern "C" fn sendto( - &self, - fn_ptr: Option< - &extern "C" fn( - c_int, - *const c_void, - size_t, - c_int, - *const sockaddr, - socklen_t, - ) -> ssize_t, - >, - fd: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> ssize_t; - - extern "C" fn write( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> ssize_t; - - extern "C" fn pwrite( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> ssize_t; - - extern "C" fn sendmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> ssize_t; -} - -#[cfg(target_os = "linux")] -pub trait LinuxSyscall: UnixSyscall { - /// poll - - extern "C" fn epoll_ctl( - &self, - fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *mut epoll_event) -> c_int>, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut epoll_event, - ) -> c_int; -} - -#[allow(unused_imports)] #[cfg(unix)] pub use unix::*; diff --git a/open-coroutine-core/src/syscall/nio.rs b/open-coroutine-core/src/syscall/nio.rs deleted file mode 100644 index 7137779b..00000000 --- a/open-coroutine-core/src/syscall/nio.rs +++ /dev/null @@ -1,221 +0,0 @@ -use crate::net::event_loop::EventLoops; -use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking}; -#[cfg(target_os = "linux")] -use crate::syscall::LinuxSyscall; -use crate::syscall::UnixSyscall; -#[cfg(target_os = "linux")] -use libc::epoll_event; -use libc::{iovec, msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; -use std::ffi::{c_int, c_void}; -use std::time::Duration; - -#[derive(Debug, Default)] -pub struct NioLinuxSyscall { - inner: I, -} - -macro_rules! impl_expected_write_hook { - ( $invoker: expr, $syscall: ident, $fn_ptr: expr, $socket:expr, $buffer:expr, $length:expr, $($arg: expr),* $(,)* ) => {{ - let socket = $socket; - let blocking = $crate::syscall::common::is_blocking(socket); - if blocking { - $crate::syscall::common::set_non_blocking(socket); - } - let mut sent = 0; - let mut r = 0; - while sent < $length { - r = $invoker.$syscall( - $fn_ptr, - $socket, - ($buffer as usize + sent) as *const c_void, - $length - sent, - $($arg, )* - ); - if r != -1 { - $crate::syscall::common::reset_errno(); - sent += r as size_t; - if sent >= $length { - r = sent as ssize_t; - break; - } - } - let error_kind = std::io::Error::last_os_error().kind(); - if error_kind == std::io::ErrorKind::WouldBlock { - //wait write event - if $crate::net::event_loop::EventLoops::wait_write_event( - socket, - Some(std::time::Duration::from_millis(10)), - ) - .is_err() - { - break; - } - } else if error_kind != std::io::ErrorKind::Interrupted { - break; - } - } - if blocking { - $crate::syscall::common::set_blocking(socket); - } - r - }}; -} - -impl UnixSyscall for NioLinuxSyscall { - extern "C" fn sendto( - &self, - fn_ptr: Option< - &extern "C" fn( - c_int, - *const c_void, - size_t, - c_int, - *const sockaddr, - socklen_t, - ) -> ssize_t, - >, - socket: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> ssize_t { - impl_expected_write_hook!( - self.inner, sendto, fn_ptr, socket, buf, len, flags, addr, addrlen - ) - } - - extern "C" fn write( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> ssize_t { - impl_expected_write_hook!(self.inner, write, fn_ptr, fd, buf, count,) - } - - extern "C" fn pwrite( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - impl_expected_write_hook!(self.inner, pwrite, fn_ptr, fd, buf, count, offset) - } - - extern "C" fn sendmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> ssize_t { - let blocking = is_blocking(fd); - if blocking { - set_non_blocking(fd); - } - let msghdr = unsafe { *msg }; - let mut vec = std::collections::VecDeque::from(unsafe { - Vec::from_raw_parts( - msghdr.msg_iov, - msghdr.msg_iovlen as usize, - msghdr.msg_iovlen as usize, - ) - }); - let mut length = 0; - let mut pices = std::collections::VecDeque::new(); - for iovec in &vec { - length += iovec.iov_len; - pices.push_back(length); - } - let mut sent = 0; - let mut r = 0; - while sent < length { - // find from-index - let mut from_index = 0; - for (i, v) in pices.iter().enumerate() { - if sent < *v { - from_index = i; - break; - } - } - // calculate offset - let current_sent_offset = if from_index > 0 { - sent.saturating_sub(pices[from_index.saturating_sub(1)]) - } else { - sent - }; - // remove already sent - for _ in 0..from_index { - _ = vec.pop_front(); - _ = pices.pop_front(); - } - // build syscall args - vec[0] = iovec { - iov_base: (vec[0].iov_base as usize + current_sent_offset) as *mut c_void, - iov_len: vec[0].iov_len - current_sent_offset, - }; - cfg_if::cfg_if! { - if #[cfg(any( - target_os = "linux", - target_os = "l4re", - target_os = "android", - target_os = "emscripten" - ))] { - let len = vec.len(); - } else { - let len = c_int::try_from(vec.len()).unwrap(); - } - } - let new_msg = msghdr { - msg_name: msghdr.msg_name, - msg_namelen: msghdr.msg_namelen, - msg_iov: vec.get_mut(0).unwrap(), - msg_iovlen: len, - msg_control: msghdr.msg_control, - msg_controllen: msghdr.msg_controllen, - msg_flags: msghdr.msg_flags, - }; - r = self.inner.sendmsg(fn_ptr, fd, &new_msg, flags); - if r != -1 { - reset_errno(); - sent += r as usize; - if sent >= length { - r = sent as ssize_t; - break; - } - } - let error_kind = std::io::Error::last_os_error().kind(); - if error_kind == std::io::ErrorKind::WouldBlock { - //wait write event - if EventLoops::wait_write_event(fd, Some(Duration::from_millis(10))).is_err() { - break; - } - } else if error_kind != std::io::ErrorKind::Interrupted { - break; - } - } - if blocking { - set_blocking(fd); - } - r - } -} - -#[cfg(target_os = "linux")] -impl LinuxSyscall for NioLinuxSyscall { - extern "C" fn epoll_ctl( - &self, - fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *mut epoll_event) -> c_int>, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut epoll_event, - ) -> c_int { - self.inner.epoll_ctl(fn_ptr, epfd, op, fd, event) - } -} diff --git a/open-coroutine-core/src/syscall/raw.rs b/open-coroutine-core/src/syscall/raw.rs deleted file mode 100644 index 2276142e..00000000 --- a/open-coroutine-core/src/syscall/raw.rs +++ /dev/null @@ -1,103 +0,0 @@ -#[cfg(target_os = "linux")] -use crate::syscall::LinuxSyscall; -use crate::syscall::UnixSyscall; -#[cfg(target_os = "linux")] -use libc::epoll_event; -use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; -use std::ffi::{c_int, c_void}; - -#[derive(Debug, Copy, Clone, Default)] -pub struct RawLinuxSyscall {} - -impl UnixSyscall for RawLinuxSyscall { - /// write - - extern "C" fn sendto( - &self, - fn_ptr: Option< - &extern "C" fn( - c_int, - *const c_void, - size_t, - c_int, - *const sockaddr, - socklen_t, - ) -> ssize_t, - >, - socket: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(socket, buf, len, flags, addr, addrlen) - } else { - unsafe { libc::sendto(socket, buf, len, flags, addr, addrlen) } - } - } - - extern "C" fn write( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, buf, count) - } else { - unsafe { libc::write(fd, buf, count) } - } - } - - extern "C" fn pwrite( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, buf, count, offset) - } else { - unsafe { libc::pwrite(fd, buf, count, offset) } - } - } - - extern "C" fn sendmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> ssize_t { - if let Some(f) = fn_ptr { - (f)(fd, msg, flags) - } else { - unsafe { libc::sendmsg(fd, msg, flags) } - } - } -} - -#[cfg(target_os = "linux")] -impl LinuxSyscall for RawLinuxSyscall { - /// poll - - extern "C" fn epoll_ctl( - &self, - fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *mut epoll_event) -> c_int>, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut epoll_event, - ) -> c_int { - if let Some(f) = fn_ptr { - (f)(epfd, op, fd, event) - } else { - unsafe { libc::epoll_ctl(epfd, op, fd, event) } - } - } -} diff --git a/open-coroutine-core/src/syscall/state.rs b/open-coroutine-core/src/syscall/state.rs deleted file mode 100644 index 09514764..00000000 --- a/open-coroutine-core/src/syscall/state.rs +++ /dev/null @@ -1,104 +0,0 @@ -use crate::common::{Current, Named}; -#[cfg(target_os = "linux")] -use crate::syscall::LinuxSyscall; -use crate::syscall::UnixSyscall; -#[cfg(target_os = "linux")] -use libc::epoll_event; -use libc::{msghdr, off_t, size_t, sockaddr, socklen_t, ssize_t}; -use std::ffi::{c_int, c_void}; - -#[derive(Debug, Default)] -pub struct StateLinuxSyscall { - inner: I, -} - -macro_rules! syscall_state { - ( $invoker: expr , $syscall: ident, $($arg: expr),* $(,)* ) => {{ - let syscall = $crate::constants::Syscall::$syscall; - $crate::info!("{} hooked", syscall); - if let Some(co) = $crate::scheduler::SchedulableCoroutine::current() { - if co - .syscall((), syscall, $crate::constants::SyscallState::Executing) - .is_err() - { - $crate::error!("{} change to syscall state failed !", co.get_name()); - } - } - let r = $invoker.inner.$syscall($($arg, )*); - if let Some(co) = $crate::scheduler::SchedulableCoroutine::current() { - if co.running().is_err() { - $crate::error!("{} change to running state failed !", co.get_name()); - } - } - return r; - }}; -} - -impl UnixSyscall for StateLinuxSyscall { - extern "C" fn sendto( - &self, - fn_ptr: Option< - &extern "C" fn( - c_int, - *const c_void, - size_t, - c_int, - *const sockaddr, - socklen_t, - ) -> ssize_t, - >, - socket: c_int, - buf: *const c_void, - len: size_t, - flags: c_int, - addr: *const sockaddr, - addrlen: socklen_t, - ) -> ssize_t { - syscall_state!(self, sendto, fn_ptr, socket, buf, len, flags, addr, addrlen) - } - - extern "C" fn write( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - ) -> ssize_t { - syscall_state!(self, write, fn_ptr, fd, buf, count) - } - - extern "C" fn pwrite( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, - fd: c_int, - buf: *const c_void, - count: size_t, - offset: off_t, - ) -> ssize_t { - syscall_state!(self, pwrite, fn_ptr, fd, buf, count, offset) - } - - extern "C" fn sendmsg( - &self, - fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, - fd: c_int, - msg: *const msghdr, - flags: c_int, - ) -> ssize_t { - syscall_state!(self, sendmsg, fn_ptr, fd, msg, flags) - } -} - -#[cfg(target_os = "linux")] -impl LinuxSyscall for StateLinuxSyscall { - extern "C" fn epoll_ctl( - &self, - fn_ptr: Option<&extern "C" fn(c_int, c_int, c_int, *mut epoll_event) -> c_int>, - epfd: c_int, - op: c_int, - fd: c_int, - event: *mut epoll_event, - ) -> c_int { - syscall_state!(self, epoll_ctl, fn_ptr, epfd, op, fd, event) - } -} diff --git a/open-coroutine-core/src/syscall/unix/mod.rs b/open-coroutine-core/src/syscall/unix/mod.rs index 3b1ff34a..05f9f664 100644 --- a/open-coroutine-core/src/syscall/unix/mod.rs +++ b/open-coroutine-core/src/syscall/unix/mod.rs @@ -8,6 +8,7 @@ pub use nanosleep::nanosleep; pub use poll::poll; pub use pread::pread; pub use preadv::preadv; +pub use pwrite::pwrite; pub use pwritev::pwritev; pub use read::read; pub use readv::readv; @@ -16,10 +17,13 @@ pub use recvfrom::recvfrom; pub use recvmsg::recvmsg; pub use select::select; pub use send::send; +pub use sendmsg::sendmsg; +pub use sendto::sendto; pub use shutdown::shutdown; pub use sleep::sleep; pub use socket::socket; pub use usleep::usleep; +pub use write::write; pub use writev::writev; macro_rules! impl_facade { @@ -487,6 +491,7 @@ mod nanosleep; mod poll; mod pread; mod preadv; +mod pwrite; mod pwritev; mod read; mod readv; @@ -495,8 +500,11 @@ mod recvfrom; mod recvmsg; mod select; mod send; +mod sendmsg; +mod sendto; mod shutdown; mod sleep; mod socket; mod usleep; +mod write; mod writev; diff --git a/open-coroutine-core/src/syscall/unix/pwrite.rs b/open-coroutine-core/src/syscall/unix/pwrite.rs new file mode 100644 index 00000000..ba2a8c97 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/pwrite.rs @@ -0,0 +1,51 @@ +use libc::{off_t, size_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; + +#[must_use] +pub extern "C" fn pwrite( + fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, + fd: c_int, + buf: *const c_void, + count: size_t, + offset: off_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + PwriteSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.pwrite(fn_ptr, fd, buf, count, offset) +} + +trait PwriteSyscall { + extern "C" fn pwrite( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t, off_t) -> ssize_t>, + fd: c_int, + buf: *const c_void, + count: size_t, + offset: off_t, + ) -> ssize_t; +} + +impl_facade!(PwriteSyscallFacade, PwriteSyscall, + pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t +); + +impl_io_uring!(IoUringPwriteSyscall, PwriteSyscall, + pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t +); + +impl_nio_write_buf!(NioPwriteSyscall, PwriteSyscall, + pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t +); + +impl_raw!(RawPwriteSyscall, PwriteSyscall, + pwrite(fd: c_int, buf: *const c_void, len: size_t, offset: off_t) -> ssize_t +); diff --git a/open-coroutine-core/src/syscall/unix/sendmsg.rs b/open-coroutine-core/src/syscall/unix/sendmsg.rs new file mode 100644 index 00000000..aa568cbb --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/sendmsg.rs @@ -0,0 +1,159 @@ +use crate::net::event_loop::EventLoops; +use crate::syscall::common::{is_blocking, reset_errno, set_blocking, set_non_blocking}; +use libc::{msghdr, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; +use std::io::{Error, ErrorKind}; +use std::time::Duration; + +#[must_use] +pub extern "C" fn sendmsg( + fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, + fd: c_int, + msg: *const msghdr, + flags: c_int, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + SendmsgSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.sendmsg(fn_ptr, fd, msg, flags) +} + +trait SendmsgSyscall { + extern "C" fn sendmsg( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, + fd: c_int, + msg: *const msghdr, + flags: c_int, + ) -> ssize_t; +} + +impl_facade!(SendmsgSyscallFacade, SendmsgSyscall, + sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t +); + +impl_io_uring!(IoUringSendmsgSyscall, SendmsgSyscall, + sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t +); + +#[derive(Debug, Default)] +struct NioSendmsgSyscall { + inner: I, +} + +impl SendmsgSyscall for NioSendmsgSyscall { + extern "C" fn sendmsg( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *const msghdr, c_int) -> ssize_t>, + fd: c_int, + msg: *const msghdr, + flags: c_int, + ) -> ssize_t { + let blocking = is_blocking(fd); + if blocking { + set_non_blocking(fd); + } + let msghdr = unsafe { *msg }; + let vec = unsafe { + Vec::from_raw_parts( + msghdr.msg_iov, + msghdr.msg_iovlen as usize, + msghdr.msg_iovlen as usize, + ) + }; + let mut length = 0; + let mut sent = 0usize; + let mut r = 0; + let mut index = 0; + for iovec in &vec { + let mut offset = sent.saturating_sub(length); + length += iovec.iov_len; + if sent > length { + index += 1; + continue; + } + let mut iov = Vec::new(); + for i in vec.iter().skip(index) { + iov.push(*i); + } + cfg_if::cfg_if! { + if #[cfg(any( + target_os = "linux", + target_os = "l4re", + target_os = "android", + target_os = "emscripten" + ))] { + let msg_iovlen = vec.len(); + } else { + let msg_iovlen = c_int::try_from(iov.len()).unwrap_or_else(|_| { + panic!("{} msghdr.msg_iovlen overflow", crate::constants::Syscall::recvmsg) + }); + } + } + while sent < length { + if 0 != offset { + iov[0] = libc::iovec { + iov_base: (iov[0].iov_base as usize + offset) as *mut c_void, + iov_len: iov[0].iov_len - offset, + }; + } + let arg = msghdr { + msg_name: msghdr.msg_name, + msg_namelen: msghdr.msg_namelen, + msg_iov: iov.as_mut_ptr(), + msg_iovlen, + msg_control: msghdr.msg_control, + msg_controllen: msghdr.msg_controllen, + msg_flags: msghdr.msg_flags, + }; + r = self.inner.sendmsg(fn_ptr, fd, &arg, flags); + if r != -1 { + reset_errno(); + sent += r as usize; + if sent >= length { + r = sent as ssize_t; + break; + } + offset = sent.saturating_sub(length); + } + let error_kind = Error::last_os_error().kind(); + if error_kind == ErrorKind::WouldBlock { + //wait write event + if EventLoops::wait_write_event(fd, Some(Duration::from_millis(10))).is_err() { + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + return r; + } + } else if error_kind != ErrorKind::Interrupted { + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + return r; + } + } + if sent >= length { + index += 1; + } + } + std::mem::forget(vec); + if blocking { + set_blocking(fd); + } + r + } +} + +impl_raw!(RawSendmsgSyscall, SendmsgSyscall, + sendmsg(fd: c_int, msg: *const msghdr, flags: c_int) -> ssize_t +); diff --git a/open-coroutine-core/src/syscall/unix/sendto.rs b/open-coroutine-core/src/syscall/unix/sendto.rs new file mode 100644 index 00000000..2eaa4e94 --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/sendto.rs @@ -0,0 +1,70 @@ +use libc::{size_t, sockaddr, socklen_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; + +#[must_use] +pub extern "C" fn sendto( + fn_ptr: Option< + &extern "C" fn(c_int, *const c_void, size_t, c_int, *const sockaddr, socklen_t) -> ssize_t, + >, + fd: c_int, + buf: *const c_void, + len: size_t, + flags: c_int, + addr: *const sockaddr, + addrlen: socklen_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + SendtoSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.sendto(fn_ptr, fd, buf, len, flags, addr, addrlen) +} + +trait SendtoSyscall { + extern "C" fn sendto( + &self, + fn_ptr: Option< + &extern "C" fn( + c_int, + *const c_void, + size_t, + c_int, + *const sockaddr, + socklen_t, + ) -> ssize_t, + >, + fd: c_int, + buf: *const c_void, + len: size_t, + flags: c_int, + addr: *const sockaddr, + addrlen: socklen_t, + ) -> ssize_t; +} + +impl_facade!(SendtoSyscallFacade, SendtoSyscall, + sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, + addr: *const sockaddr, addrlen: socklen_t) -> ssize_t +); + +impl_io_uring!(IoUringSendtoSyscall, SendtoSyscall, + sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, + addr: *const sockaddr, addrlen: socklen_t) -> ssize_t +); + +impl_nio_write_buf!(NioSendtoSyscall, SendtoSyscall, + sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, + addr: *const sockaddr, addrlen: socklen_t) -> ssize_t +); + +impl_raw!(RawSendtoSyscall, SendtoSyscall, + sendto(fd: c_int, buf: *const c_void, len: size_t, flags: c_int, + addr: *const sockaddr, addrlen: socklen_t) -> ssize_t +); diff --git a/open-coroutine-core/src/syscall/unix/write.rs b/open-coroutine-core/src/syscall/unix/write.rs new file mode 100644 index 00000000..2569872e --- /dev/null +++ b/open-coroutine-core/src/syscall/unix/write.rs @@ -0,0 +1,49 @@ +use libc::{size_t, ssize_t}; +use once_cell::sync::Lazy; +use std::ffi::{c_int, c_void}; + +#[must_use] +pub extern "C" fn write( + fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, + fd: c_int, + buf: *const c_void, + len: size_t, +) -> ssize_t { + cfg_if::cfg_if! { + if #[cfg(all(target_os = "linux", feature = "io_uring"))] { + static CHAIN: Lazy< + WriteSyscallFacade>> + > = Lazy::new(Default::default); + } else { + static CHAIN: Lazy>> = + Lazy::new(Default::default); + } + } + CHAIN.write(fn_ptr, fd, buf, len) +} + +trait WriteSyscall { + extern "C" fn write( + &self, + fn_ptr: Option<&extern "C" fn(c_int, *const c_void, size_t) -> ssize_t>, + fd: c_int, + buf: *const c_void, + len: size_t, + ) -> ssize_t; +} + +impl_facade!(WriteSyscallFacade, WriteSyscall, + write(fd: c_int, buf: *const c_void, len: size_t) -> ssize_t +); + +impl_io_uring!(IoUringWriteSyscall, WriteSyscall, + write(fd: c_int, buf: *const c_void, len: size_t) -> ssize_t +); + +impl_nio_write_buf!(NioWriteSyscall, WriteSyscall, + write(fd: c_int, buf: *const c_void, len: size_t, ) -> ssize_t +); + +impl_raw!(RawWriteSyscall, WriteSyscall, + write(fd: c_int, buf: *const c_void, len: size_t) -> ssize_t +);