diff --git a/monoio/Cargo.toml b/monoio/Cargo.toml index 8f002e8e..493bfbdc 100644 --- a/monoio/Cargo.toml +++ b/monoio/Cargo.toml @@ -21,6 +21,7 @@ pin-project-lite = "0.2" socket2 = { version = "0.5", features = ["all"] } bytes = { version = "1", optional = true } +fixed-vec-deque = { version = "0.1", optional = true } flume = { version = "0.10", optional = true } mio = { version = "0.8", features = [ "net", @@ -71,10 +72,20 @@ debug = ["tracing"] legacy = ["mio"] # iouring support iouring = [] +# iouring support with user provided IO operation limits +iouring-fixed = ["dep:fixed-vec-deque"] # tokio-compatible(only have effect when legacy is enabled and iouring is not) tokio-compat = ["tokio"] # signal enables setting ctrl_c handler signal = ["ctrlc", "sync"] signal-termination = ["signal", "ctrlc/termination"] # by default both iouring and legacy are enabled -default = ["async-cancel", "bytes", "iouring", "legacy", "macros", "utils"] +default = [ + "async-cancel", + "bytes", + "iouring", + "iouring-fixed", + "legacy", + "macros", + "utils", +] diff --git a/monoio/src/driver/mod.rs b/monoio/src/driver/mod.rs index cf443303..5387c2b3 100644 --- a/monoio/src/driver/mod.rs +++ b/monoio/src/driver/mod.rs @@ -1,6 +1,7 @@ /// Monoio Driver. // #[cfg(unix)] pub(crate) mod op; +pub(crate) mod oppool; pub(crate) mod shared_fd; #[cfg(feature = "sync")] pub(crate) mod thread; @@ -9,6 +10,8 @@ pub(crate) mod thread; mod legacy; #[cfg(all(target_os = "linux", feature = "iouring"))] mod uring; +#[cfg(all(target_os = "linux", feature = "iouring-fixed"))] +mod uring_fixed; mod util; @@ -35,6 +38,8 @@ use self::op::{CompletionMeta, Op, OpAble}; pub use self::uring::IoUringDriver; #[cfg(all(target_os = "linux", feature = "iouring"))] use self::uring::UringInner; +#[cfg(all(target_os = "linux", feature = "iouring-fixed"))] +use self::uring_fixed::UringFixedInner; /// Unpark a runtime of another thread. pub(crate) mod unpark { @@ -91,6 +96,8 @@ scoped_thread_local!(pub(crate) static CURRENT: Inner); pub(crate) enum Inner { #[cfg(all(target_os = "linux", feature = "iouring"))] Uring(std::rc::Rc>), + #[cfg(all(target_os = "linux", feature = "iouring"))] + UringFixed(std::rc::Rc>), #[cfg(feature = "legacy")] Legacy(std::rc::Rc>), } @@ -102,6 +109,10 @@ impl Inner { _ => unimplemented!(), #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::submit_with_data(this, data), + #[cfg(all(target_os = "linux", feature = "iouring-fixed"))] + Inner::UringFixed(this) => { + panic!("uring fixed supports maybe_submit instead of submit") + } #[cfg(feature = "legacy")] Inner::Legacy(this) => LegacyInner::submit_with_data(this, data), #[cfg(all( @@ -114,6 +125,26 @@ impl Inner { } } + fn maybe_submit_with(&self, data: T) -> Option>> { + match self { + #[cfg(windows)] + _ => unimplemented!(), + #[cfg(all(target_os = "linux", feature = "iouring"))] + _ => unimplemented!(), + #[cfg(all(target_os = "linux", feature = "iouring-fixed"))] + Inner::UringFixed(this) => UringInnerFixed::maybe_submit_with_data(this, data), + #[cfg(feature = "legacy")] + _ => unimplemented!(), + #[cfg(all( + not(feature = "legacy"), + not(all(target_os = "linux", feature = "iouring")) + ))] + _ => { + util::feature_panic(); + } + } + } + #[allow(unused)] fn poll_op( &self, @@ -180,7 +211,12 @@ impl Inner { } } - #[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))] + #[cfg(all( + target_os = "linux", + feature = "iouring", + feature = "ioring-fixed", + feature = "legacy" + ))] fn is_legacy(&self) -> bool { matches!(self, Inner::Legacy(..)) } @@ -190,6 +226,14 @@ impl Inner { false } + #[cfg(all( + target_os = "linux", + feature = "iouring-fixed", + not(feature = "legacy") + ))] + fn is_legacy(&self) -> bool { + false + } #[allow(unused)] #[cfg(not(all(target_os = "linux", feature = "iouring")))] fn is_legacy(&self) -> bool { @@ -203,6 +247,8 @@ impl Inner { pub(crate) enum UnparkHandle { #[cfg(all(target_os = "linux", feature = "iouring"))] Uring(self::uring::UnparkHandle), + #[cfg(all(target_os = "linux", feature = "iouring-fixed"))] + UringFixed(self::uring::UnparkHandle), #[cfg(feature = "legacy")] Legacy(self::legacy::UnparkHandle), } @@ -213,6 +259,8 @@ impl unpark::Unpark for UnparkHandle { match self { #[cfg(all(target_os = "linux", feature = "iouring"))] UnparkHandle::Uring(inner) => inner.unpark(), + #[cfg(all(target_os = "linux", feature = "iouring-fixed"))] + UnparkHandle::UringFixed(inner) => inner.unpark(), #[cfg(feature = "legacy")] UnparkHandle::Legacy(inner) => inner.unpark(), #[cfg(all( @@ -233,6 +281,13 @@ impl From for UnparkHandle { } } +#[cfg(all(feature = "sync", target_os = "linux", feature = "iouring-fixed"))] +impl From for UnparkHandle { + fn from(inner: self::uring::UnparkHandle) -> Self { + Self::Uring(inner) + } +} + #[cfg(all(feature = "sync", feature = "legacy"))] impl From for UnparkHandle { fn from(inner: self::legacy::UnparkHandle) -> Self { @@ -247,6 +302,8 @@ impl UnparkHandle { CURRENT.with(|inner| match inner { #[cfg(all(target_os = "linux", feature = "iouring"))] Inner::Uring(this) => UringInner::unpark(this).into(), + #[cfg(all(target_os = "linux", feature = "iouring-fixed"))] + Inner::Uring(this) => UringInner::unpark(this).into(), #[cfg(feature = "legacy")] Inner::Legacy(this) => LegacyInner::unpark(this).into(), }) diff --git a/monoio/src/driver/op.rs b/monoio/src/driver/op.rs index fa8a8c31..0541c269 100644 --- a/monoio/src/driver/op.rs +++ b/monoio/src/driver/op.rs @@ -47,6 +47,8 @@ pub(crate) struct CompletionMeta { pub(crate) result: io::Result, #[allow(unused)] pub(crate) flags: u32, + // relevant to io_uring_fixed runtime + pub(crate) user_data: usize, } pub(crate) trait OpAble { @@ -99,6 +101,14 @@ impl Op { driver::CURRENT.with(|this| this.submit_with(data)) } + /// Submit an operation to uring if submission queue is not full. + pub(super) fn maybe_submit_with(data: T) -> io::Result> + where + T: OpAble, + { + driver::CURRENT.with(|this| this.maybe_submit_with(data)) + } + /// Try submitting an operation to uring #[allow(unused)] pub(super) fn try_submit_with(data: T) -> io::Result> diff --git a/monoio/src/driver/op/accept.rs b/monoio/src/driver/op/accept.rs index 4e708e49..65f9fb00 100644 --- a/monoio/src/driver/op/accept.rs +++ b/monoio/src/driver/op/accept.rs @@ -127,3 +127,111 @@ impl OpAble for Accept { }; } } + +/// Pool of accept operations +pub struct Accept { + pub(crate) fd: SharedFd, + #[cfg(unix)] + pub(crate) addr: Box<(MaybeUninit, libc::socklen_t)>, + #[cfg(windows)] + pub(crate) addr: Box<(MaybeUninit, socklen_t)>, +} + +impl Op { + /// Accept a connection + pub(crate) fn accept(fd: &SharedFd) -> io::Result { + #[cfg(unix)] + let addr = Box::new(( + MaybeUninit::uninit(), + size_of::() as libc::socklen_t, + )); + + #[cfg(windows)] + let addr = Box::new(( + MaybeUninit::uninit(), + size_of::() as socklen_t, + )); + + Op::submit_with(Accept { + fd: fd.clone(), + addr, + }) + } +} + +impl OpAble for Accept { + #[cfg(all(target_os = "linux", feature = "iouring"))] + fn uring_op(&mut self) -> io_uring::squeue::Entry { + opcode::Accept::new( + types::Fd(self.fd.raw_fd()), + self.addr.0.as_mut_ptr() as *mut _, + &mut self.addr.1, + ) + .build() + } + + #[cfg(feature = "legacy")] + fn legacy_interest(&self) -> Option<(Direction, usize)> { + self.fd.registered_index().map(|idx| (Direction::Read, idx)) + } + + #[cfg(windows)] + fn legacy_call(&mut self) -> io::Result { + let fd = self.fd.as_raw_socket(); + let addr = self.addr.0.as_mut_ptr() as *mut _; + let len = &mut self.addr.1; + + syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET) + } + + #[cfg(all(unix, feature = "legacy"))] + fn legacy_call(&mut self) -> io::Result { + let fd = self.fd.as_raw_fd(); + let addr = self.addr.0.as_mut_ptr() as *mut _; + let len = &mut self.addr.1; + // Here I use copied some code from mio because I don't want the conversion. + + // On platforms that support it we can use `accept4(2)` to set `NONBLOCK` + // and `CLOEXEC` in the call to accept the connection. + #[cfg(any( + // Android x86's seccomp profile forbids calls to `accept4(2)` + // See https://github.com/tokio-rs/mio/issues/1445 for details + all( + not(target_arch="x86"), + target_os = "android" + ), + target_os = "dragonfly", + target_os = "freebsd", + target_os = "illumos", + target_os = "linux", + target_os = "netbsd", + target_os = "openbsd" + ))] + return syscall_u32!(accept4( + fd, + addr, + len, + libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK, + )); + + // But not all platforms have the `accept4(2)` call. Luckily BSD (derived) + // OSes inherit the non-blocking flag from the listener, so we just have to + // set `CLOEXEC`. + #[cfg(any( + all(target_arch = "x86", target_os = "android"), + target_os = "ios", + target_os = "macos", + target_os = "redox" + ))] + return { + let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32; + syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC)) + .and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK))) + .map_err(|e| { + let _ = syscall_u32!(close(stream_fd)); + e + })?; + Ok(stream_fd as _) + }; + } +} diff --git a/monoio/src/driver/oppool.rs b/monoio/src/driver/oppool.rs new file mode 100644 index 00000000..84083233 --- /dev/null +++ b/monoio/src/driver/oppool.rs @@ -0,0 +1,21 @@ +use std::{ + future::Future, + io, + pin::Pin, + task::{Context, Poll}, +}; + +// pub(crate) mod close; + +mod accept; +// mod connect; +// mod fsync; +// mod open; +// mod poll; +// mod read; +// mod recv; +// mod send; +// mod write; + +#[cfg(all(target_os = "linux", feature = "splice"))] +// mod splice; diff --git a/monoio/src/driver/oppool/accept.rs b/monoio/src/driver/oppool/accept.rs new file mode 100644 index 00000000..7ee1e999 --- /dev/null +++ b/monoio/src/driver/oppool/accept.rs @@ -0,0 +1,93 @@ +#[cfg(windows)] +use { + crate::syscall, + std::os::windows::prelude::AsRawSocket, + windows_sys::Win32::Networking::WinSock::{ + accept, socklen_t, INVALID_SOCKET, SOCKADDR_STORAGE, + }, +}; + +use crate::driver::op::Accept; + +/// Accept pool +pub(crate) struct AcceptPool { + #[cfg(unix)] + free_sockaddrs: [Option, libc::socklen_t)>>; N], + // #[cfg(windows)] + // addrs: [Box<(MaybeUninit, socklen_t)>; N], + // index of the last free element or usize::MAX when free list is empty + // can't be more then isize::MAX due to std::alloc::Layout allocation limit + last_free_index: Wrapping, + // Requested Accepts - both submitted and not + accepts: [Option; N], + last_unqueued_index: Wrapping, + last_queued_index: Wrapping, + completions: [CompletionMeta; N], +} + +impl AcceptPool { + /// Accept a connection if free sockaddr storage is available + /// + /// User can provide + pub fn maybe_accept(&mut self, fd: &SharedFd, user_data: usize) -> Option> { + #[cfg(unix)] + let addr = self.acquire_sockaddr(); + + // #[cfg(windows)] + // let addr = Box::new(( + // MaybeUninit::uninit(), + // size_of::() as socklen_t, + // )); + + Op::maybe_submit_with(Accept { + fd: fd.clone(), + addr, + }) + } + + pub(crate) fn new() -> Self { + let free_sockaddrs = { + let mut list: [MaybeUninit< + Option, libc::socklen_t)>>, + >; N] = MaybeUninit::uninit_array(); + for elem in &mut list { + let maybe_uninit_buf = + Box::<(MaybeUninit, libc::socklen_t)>::new_zeroed()?; + // SAFETY: + // just allocated + let buf = unsafe { maybe_uninit_buf.assume_init() }; + _ = elem.write(Some(buf)); + } + // SAFETY: + // array was previously initialized + unsafe { MaybeUninit::array_assume_init(list) } + }; + Ok(Self { + free_sockaddrs, + last_free_index: Wrapping(N) - Wrapping(1), + }) + } + + /// Acquire sockaddr_storage from the pool + fn acquire_sockaddr( + &mut self, + ) -> Option, libc::socklen_t)>> { + if self.last_free_index.0 < self.free_sockaddrs.len() { + #[expect(clippy::indexing_slicing, reason = "safe indexing due to len check")] + let maybe_sockaddr = self.free_sockaddrs[self.last_free_index.0].take(); + self.last_free_index -= Wrapping(1); + maybe_sockaddr + } else { + None + } + } + + /// Release sockaddr_storage to the pool + fn release_sockaddr(&mut self, sockaddr_storage: Box<(MaybeUninit)>) { + let new_free_index = self.last_free_index + Wrapping(1); + *self + .free_list + .get_mut(new_free_index.0) + .expect("pool ops fit the backing array") = Some(sockaddr_storage); + } +} diff --git a/monoio/src/driver/uring_fixed/lifecycle.rs b/monoio/src/driver/uring_fixed/lifecycle.rs new file mode 100644 index 00000000..f6025790 --- /dev/null +++ b/monoio/src/driver/uring_fixed/lifecycle.rs @@ -0,0 +1,94 @@ +//! Uring state lifecycle. +//! Partly borrow from tokio-uring. + +use std::{ + io, + task::{Context, Poll, Waker}, +}; + +use crate::{driver::op::CompletionMeta, utils::slab::Ref}; + +pub(crate) enum InProgressLifecycle { + /// The operation has been queued by the driver but has not submitted to uring yet + Enqueued, + /// The operation has been submitted to uring and is currently in-flight + Submitted, + + /// The submitter is waiting for the completion of the operation + Waiting(Waker), + + /// The submitter no longer has interest in the operation result. The state + /// must be passed to the driver and held until the operation completes. + Ignored(Box), +} + +impl<'a> Ref<'a, Lifecycle> { + pub(crate) fn complete(mut self, result: io::Result, flags: u32) { + let ref_mut = &mut *self; + match ref_mut { + Lifecycle::Enqueued => { + unreachable!("Can't complete enqueued operation") + } + Lifecycle::Submitted => { + *ref_mut = Lifecycle::Completed(result, flags); + } + Lifecycle::Waiting(_) => { + let old = std::mem::replace(ref_mut, Lifecycle::Completed(result, flags)); + match old { + Lifecycle::Waiting(waker) => { + waker.wake(); + } + _ => unsafe { std::hint::unreachable_unchecked() }, + } + } + Lifecycle::Ignored(..) => { + self.remove(); + } + Lifecycle::Completed(..) => unsafe { std::hint::unreachable_unchecked() }, + } + } + + #[allow(clippy::needless_pass_by_ref_mut)] + pub(crate) fn poll_op(mut self, cx: &mut Context<'_>) -> Poll { + let ref_mut = &mut *self; + match ref_mut { + Lifecycle::Submitted => { + *ref_mut = Lifecycle::Waiting(cx.waker().clone()); + return Poll::Pending; + } + Lifecycle::Waiting(waker) => { + if !waker.will_wake(cx.waker()) { + *ref_mut = Lifecycle::Waiting(cx.waker().clone()); + } + return Poll::Pending; + } + _ => {} + } + + match self.remove() { + Lifecycle::Completed(result, flags) => Poll::Ready(CompletionMeta { result, flags }), + _ => unsafe { std::hint::unreachable_unchecked() }, + } + } + + // return if the op must has been finished + pub(crate) fn drop_op(mut self, data: &mut Option) -> bool { + let ref_mut = &mut *self; + match ref_mut { + Lifecycle::Submitted | Lifecycle::Waiting(_) => { + if let Some(data) = data.take() { + *ref_mut = Lifecycle::Ignored(Box::new(data)); + } else { + *ref_mut = Lifecycle::Ignored(Box::new(())); // () is a ZST, so it does not + // allocate + }; + return false; + } + Lifecycle::Completed(..) => { + self.remove(); + } + Lifecycle::Ignored(..) => unsafe { std::hint::unreachable_unchecked() }, + } + true + } +} diff --git a/monoio/src/driver/uring_fixed/mod.rs b/monoio/src/driver/uring_fixed/mod.rs new file mode 100644 index 00000000..b0816ba6 --- /dev/null +++ b/monoio/src/driver/uring_fixed/mod.rs @@ -0,0 +1,568 @@ +//! Monoio Uring Fixed Driver. +//! +//! Fixed Driver requires user provided IO operation limits + +use std::{ + cell::UnsafeCell, + io, + mem::ManuallyDrop, + os::unix::prelude::{AsRawFd, RawFd}, + rc::Rc, + task::{Context, Poll}, + time::Duration, +}; + +use io_uring::{cqueue, opcode, types::Timespec, IoUring}; +use lifecycle::InProgressLifecycle; + +use super::{ + op::{CompletionMeta, Op, OpAble}, + util::timespec, + Driver, Inner, CURRENT, +}; +use crate::{oppool::accept::AcceptPool, utils::slab::Slab}; + +mod lifecycle; +#[cfg(feature = "sync")] +mod waker; +#[cfg(feature = "sync")] +pub(crate) use waker::UnparkHandle; + +#[allow(unused)] +pub(crate) const CANCEL_USERDATA: u64 = u64::MAX; +pub(crate) const TIMEOUT_USERDATA: u64 = u64::MAX - 1; +#[allow(unused)] +pub(crate) const EVENTFD_USERDATA: u64 = u64::MAX - 2; + +pub(crate) const MIN_REVERSED_USERDATA: u64 = u64::MAX - 2; + +/// Driver with uring fixed. +pub struct IoUringFixedDriver { + inner: Rc>, + + // Used as timeout buffer + timespec: *mut Timespec, + + // Used as read eventfd buffer + #[cfg(feature = "sync")] + eventfd_read_dst: *mut u8, + + // Used for drop + #[cfg(feature = "sync")] + thread_id: usize, +} + +pub(crate) struct UringFixedInner { + /// In-flight operations + ops: InProgressOps, + /// Pools with preallocated operations data + op_pools: OpPools, + + /// IoUring bindings + uring: ManuallyDrop, + + /// Shared waker + #[cfg(feature = "sync")] + shared_waker: std::sync::Arc, + + // Mark if eventfd is in the ring + #[cfg(feature = "sync")] + eventfd_installed: bool, + + // Waker receiver + #[cfg(feature = "sync")] + waker_receiver: flume::Receiver, + + // Uring support ext_arg + ext_arg: bool, +} + +// When dropping the driver, all in-flight operations must have completed. This +// type wraps the slab and ensures that, on drop, the slab is empty. +struct InProgressOps { + in_progress: [Option; N], + in_progress_head: usize, + completed: [Option<(io::Result, u32, usize)>; N], +} + +struct OpPools { + accept_pool: AcceptPool, +} + +impl IoUringFixedDriver { + const DEFAULT_ENTRIES: u32 = 1024; + + // pub(crate) fn new(b: &io_uring::Builder) -> io::Result { + // Self::new_with_entries(b, Self::DEFAULT_ENTRIES) + // } + + #[cfg(not(feature = "sync"))] + pub(crate) fn new_with_entries( + urb: &io_uring::Builder, + entries: u32, + ) -> io::Result { + let uring = ManuallyDrop::new(urb.build(entries)?); + + let inner = Rc::new(UnsafeCell::new(UringFixedInner { + ops: InProgressOps::new(), + ext_arg: uring.params().is_feature_ext_arg(), + uring, + })); + + Ok(IoUringFixedDriver { + inner, + timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec, + }) + } + + #[cfg(feature = "sync")] + pub(crate) fn new_with_entries( + urb: &io_uring::Builder, + entries: u32, + ) -> io::Result { + let uring = ManuallyDrop::new(urb.build(entries)?); + + // Create eventfd and register it to the ring. + let waker = { + let fd = crate::syscall!(eventfd(0, libc::EFD_CLOEXEC))?; + unsafe { + use std::os::unix::io::FromRawFd; + std::fs::File::from_raw_fd(fd) + } + }; + + let (waker_sender, waker_receiver) = flume::unbounded::(); + + let inner = Rc::new(UnsafeCell::new(UringFixedInner { + ops: InProgressOps::new(), + ext_arg: uring.params().is_feature_ext_arg(), + uring, + shared_waker: std::sync::Arc::new(waker::EventWaker::new(waker)), + eventfd_installed: false, + waker_receiver, + })); + + let thread_id = crate::builder::BUILD_THREAD_ID.with(|id| *id); + let driver = IoUringFixedDriver { + inner, + timespec: Box::leak(Box::new(Timespec::new())) as *mut Timespec, + eventfd_read_dst: Box::leak(Box::new([0_u8; 8])) as *mut u8, + thread_id, + }; + + // Register unpark handle + super::thread::register_unpark_handle(thread_id, driver.unpark().into()); + super::thread::register_waker_sender(thread_id, waker_sender); + Ok(driver) + } + + #[allow(unused)] + fn num_operations(&self) -> usize { + let inner = self.inner.get(); + unsafe { (*inner).ops.slab.len() } + } + + // Flush to make enough space + fn flush_space(inner: &mut UringFixedInner, need: usize) -> io::Result<()> { + let sq = inner.uring.submission(); + debug_assert!(sq.capacity() >= need); + if sq.len() + need > sq.capacity() { + drop(sq); + inner.submit()?; + } + Ok(()) + } + + #[cfg(feature = "sync")] + fn install_eventfd(&self, inner: &mut UringFixedInner, fd: RawFd) { + let entry = opcode::Read::new(io_uring::types::Fd(fd), self.eventfd_read_dst, 8) + .build() + .user_data(EVENTFD_USERDATA); + + let mut sq = inner.uring.submission(); + let _ = unsafe { sq.push(&entry) }; + inner.eventfd_installed = true; + } + + fn install_timeout(&self, inner: &mut UringFixedInner, duration: Duration) { + let timespec = timespec(duration); + unsafe { + std::ptr::replace(self.timespec, timespec); + } + let entry = opcode::Timeout::new(self.timespec as *const Timespec) + .build() + .user_data(TIMEOUT_USERDATA); + + let mut sq = inner.uring.submission(); + let _ = unsafe { sq.push(&entry) }; + } + + fn inner_park(&self, timeout: Option) -> io::Result<()> { + let inner = unsafe { &mut *self.inner.get() }; + + #[allow(unused_mut)] + let mut need_wait = true; + + #[cfg(feature = "sync")] + { + // Process foreign wakers + while let Ok(w) = inner.waker_receiver.try_recv() { + w.wake(); + need_wait = false; + } + + // Set status as not awake if we are going to sleep + if need_wait { + inner + .shared_waker + .awake + .store(false, std::sync::atomic::Ordering::Release); + } + + // Process foreign wakers left + while let Ok(w) = inner.waker_receiver.try_recv() { + w.wake(); + need_wait = false; + } + } + + if need_wait { + // Install timeout and eventfd for unpark if sync is enabled + + // 1. alloc spaces + let mut space = 0; + #[cfg(feature = "sync")] + if !inner.eventfd_installed { + space += 1; + } + if timeout.is_some() { + space += 1; + } + if space != 0 { + Self::flush_space(inner, space)?; + } + + // 2. install eventfd and timeout + #[cfg(feature = "sync")] + if !inner.eventfd_installed { + self.install_eventfd(inner, inner.shared_waker.as_raw_fd()); + } + if let Some(duration) = timeout { + match inner.ext_arg { + // Submit and Wait with timeout in an TimeoutOp way. + // Better compatibility(5.4+). + false => { + self.install_timeout(inner, duration); + inner.uring.submit_and_wait(1)?; + } + // Submit and Wait with enter args. + // Better performance(5.11+). + true => { + let timespec = timespec(duration); + let args = io_uring::types::SubmitArgs::new().timespec(×pec); + if let Err(e) = inner.uring.submitter().submit_with_args(1, &args) { + if e.raw_os_error() != Some(libc::ETIME) { + return Err(e); + } + } + } + } + } else { + // Submit and Wait without timeout + inner.uring.submit_and_wait(1)?; + } + } else { + // Submit only + inner.uring.submit()?; + } + + // Set status as awake + #[cfg(feature = "sync")] + inner + .shared_waker + .awake + .store(true, std::sync::atomic::Ordering::Release); + + // Process CQ + inner.tick(); + + Ok(()) + } +} + +impl Driver for IoUringFixedDriver { + /// Enter the driver context. This enables using uring types. + fn with(&self, f: impl FnOnce() -> R) -> R { + // TODO(ihciah): remove clone + let inner = Inner::Uring(self.inner.clone()); + CURRENT.set(&inner, f) + } + + fn submit(&self) -> io::Result<()> { + let inner = unsafe { &mut *self.inner.get() }; + inner.submit()?; + inner.tick(); + Ok(()) + } + + fn park(&self) -> io::Result<()> { + self.inner_park(None) + } + + fn park_timeout(&self, duration: Duration) -> io::Result<()> { + self.inner_park(Some(duration)) + } + + #[cfg(feature = "sync")] + type Unpark = waker::UnparkHandle; + + #[cfg(feature = "sync")] + fn unpark(&self) -> Self::Unpark { + UringFixedInner::unpark(&self.inner) + } +} + +impl UringFixedInner { + fn tick(&mut self) { + let mut cq = self.uring.completion(); + cq.sync(); + + for cqe in cq { + if cqe.user_data() >= MIN_REVERSED_USERDATA { + #[cfg(feature = "sync")] + if cqe.user_data() == EVENTFD_USERDATA { + self.eventfd_installed = false; + } + continue; + } + let index = cqe.user_data() as _; + self.ops.complete(index, resultify(&cqe), cqe.flags()); + } + } + + fn submit(&mut self) -> io::Result<()> { + loop { + match self.uring.submit() { + Ok(_) => { + self.uring.submission().sync(); + return Ok(()); + } + Err(ref e) + if e.kind() == io::ErrorKind::Other + || e.kind() == io::ErrorKind::ResourceBusy => + { + self.tick(); + } + Err(e) => { + return Err(e); + } + } + } + } + + fn new_op(data: T, inner: &mut UringFixedInner, driver: Inner) -> Op { + Op { + driver, + index: inner.ops.insert(), + data: Some(data), + } + } + + pub(crate) fn submit_with_data( + this: &Rc>, + data: T, + ) -> io::Result> + where + T: OpAble, + { + let inner = unsafe { &mut *this.get() }; + // If the submission queue is full, flush it to the kernel + if inner.uring.submission().is_full() { + inner.submit()?; + } + + // Create the operation + let mut op = Self::new_op(data, inner, Inner::Uring(this.clone())); + + // Configure the SQE + let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() }; + let sqe = OpAble::uring_op(data_mut).user_data(op.index as _); + + { + let mut sq = inner.uring.submission(); + + // Push the new operation + if unsafe { sq.push(&sqe).is_err() } { + unimplemented!("when is this hit?"); + } + } + + // Submit the new operation. At this point, the operation has been + // pushed onto the queue and the tail pointer has been updated, so + // the submission entry is visible to the kernel. If there is an + // error here (probably EAGAIN), we still return the operation. A + // future `io_uring_enter` will fully submit the event. + + // CHIHAI: We are not going to do syscall now. If we are waiting + // for IO, we will submit on `park`. + // let _ = inner.submit(); + Ok(op) + } + + pub(crate) fn maybe_submit_with_data( + this: &Rc>, + data: T, + ) -> Option>> + where + T: OpAble, + { + let inner = unsafe { &mut *this.get() }; + // If the submission queue is full, skip submission + if inner.uring.submission().is_full() { + return None; + } + + // Create the operation + let mut op = Self::new_op(data, inner, Inner::Uring(this.clone())); + + // Configure the SQE + let data_mut = unsafe { op.data.as_mut().unwrap_unchecked() }; + let sqe = OpAble::uring_op(data_mut).user_data(op.index as _); + + { + let mut sq = inner.uring.submission(); + + // Push the new operation + if unsafe { sq.push(&sqe).is_err() } { + unimplemented!("when is this hit?"); + } + } + + Ok(op) + } + + pub(crate) fn poll_op( + this: &Rc>, + index: usize, + cx: &mut Context<'_>, + ) -> Poll { + let inner = unsafe { &mut *this.get() }; + let lifecycle = unsafe { inner.ops.slab.get(index).unwrap_unchecked() }; + lifecycle.poll_op(cx) + } + + pub(crate) fn drop_op( + this: &Rc>, + index: usize, + data: &mut Option, + ) { + let inner = unsafe { &mut *this.get() }; + if index == usize::MAX { + // already finished + return; + } + if let Some(lifecycle) = inner.ops.slab.get(index) { + let _must_finished = lifecycle.drop_op(data); + #[cfg(feature = "async-cancel")] + if !_must_finished { + unsafe { + let cancel = opcode::AsyncCancel::new(index as u64) + .build() + .user_data(u64::MAX); + + // Try push cancel, if failed, will submit and re-push. + if inner.uring.submission().push(&cancel).is_err() { + let _ = inner.submit(); + let _ = inner.uring.submission().push(&cancel); + } + } + } + } + } + + pub(crate) unsafe fn cancel_op(this: &Rc>, index: usize) { + let inner = &mut *this.get(); + let cancel = opcode::AsyncCancel::new(index as u64) + .build() + .user_data(u64::MAX); + if inner.uring.submission().push(&cancel).is_err() { + let _ = inner.submit(); + let _ = inner.uring.submission().push(&cancel); + } + } + + #[cfg(feature = "sync")] + pub(crate) fn unpark(this: &Rc>) -> waker::UnparkHandle { + let inner = unsafe { &*this.get() }; + let weak = std::sync::Arc::downgrade(&inner.shared_waker); + waker::UnparkHandle(weak) + } +} + +impl AsRawFd for IoUringFixedDriver { + fn as_raw_fd(&self) -> RawFd { + unsafe { (*self.inner.get()).uring.as_raw_fd() } + } +} + +impl Drop for IoUringFixedDriver { + fn drop(&mut self) { + trace!("MONOIO DEBUG[IoUringFixedDriver]: drop"); + + // Dealloc leaked memory + unsafe { std::ptr::drop_in_place(self.timespec) }; + + #[cfg(feature = "sync")] + unsafe { + std::ptr::drop_in_place(self.eventfd_read_dst) + }; + + // Deregister thread id + #[cfg(feature = "sync")] + { + use crate::driver::thread::{unregister_unpark_handle, unregister_waker_sender}; + unregister_unpark_handle(self.thread_id); + unregister_waker_sender(self.thread_id); + } + } +} + +impl Drop for UringFixedInner { + fn drop(&mut self) { + unsafe { + ManuallyDrop::drop(&mut self.uring); + } + } +} + +impl InProgressOps { + const fn new() -> Self { + unimplemented!() + } + + // Insert a new operation + pub(crate) fn insert(&mut self) -> usize { + unimplemented!() + } + + fn complete(&mut self, index: usize, result: io::Result, flags: u32) { + let lifecycle = unsafe { self.slab.get(index).unwrap_unchecked() }; + lifecycle.complete(result, flags); + } +} + +#[inline] +fn resultify(cqe: &cqueue::Entry) -> io::Result { + let res = cqe.result(); + + if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + } +} + +impl OpPools { + pub async submit() { + } +} diff --git a/monoio/src/driver/uring_fixed/waker.rs b/monoio/src/driver/uring_fixed/waker.rs new file mode 100644 index 00000000..83c56b16 --- /dev/null +++ b/monoio/src/driver/uring_fixed/waker.rs @@ -0,0 +1,57 @@ +//! Custom thread waker based on eventfd. + +use std::os::unix::prelude::{AsRawFd, RawFd}; + +use crate::driver::unpark::Unpark; + +pub(crate) struct EventWaker { + // RawFd + raw: RawFd, + // File hold the ownership of fd, only useful when drop + _file: std::fs::File, + // Atomic awake status + pub(crate) awake: std::sync::atomic::AtomicBool, +} + +impl EventWaker { + pub(crate) fn new(file: std::fs::File) -> Self { + Self { + raw: file.as_raw_fd(), + _file: file, + awake: std::sync::atomic::AtomicBool::new(true), + } + } + + pub(crate) fn wake(&self) -> std::io::Result<()> { + // Skip wake if already awake + if self.awake.load(std::sync::atomic::Ordering::Acquire) { + return Ok(()); + } + // Write data into EventFd to wake the executor. + let buf = 0x1u64.to_ne_bytes(); + unsafe { + // SAFETY: Writing number to eventfd is thread safe. + libc::write(self.raw, buf.as_ptr().cast(), buf.len()); + Ok(()) + } + } +} + +impl AsRawFd for EventWaker { + fn as_raw_fd(&self) -> RawFd { + self.raw + } +} + +#[derive(Clone)] +pub struct UnparkHandle(pub(crate) std::sync::Weak); + +impl Unpark for UnparkHandle { + fn unpark(&self) -> std::io::Result<()> { + if let Some(w) = self.0.upgrade() { + w.wake() + } else { + Ok(()) + } + } +} diff --git a/monoio/src/net/tcp/static_size_listener.rs b/monoio/src/net/tcp/static_size_listener.rs new file mode 100644 index 00000000..e69de29b