Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Operation pools POC #209

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ pin-project-lite = "0.2"
socket2 = { version = "0.5", features = ["all"] }

bytes = { version = "1", optional = true }
fixed-vec-deque = { version = "0.1", optional = true }
flume = { version = "0.10", optional = true }
mio = { version = "0.8", features = [
"net",
Expand Down Expand Up @@ -71,10 +72,20 @@ debug = ["tracing"]
legacy = ["mio"]
# iouring support
iouring = []
# iouring support with user provided IO operation limits
iouring-fixed = ["dep:fixed-vec-deque"]
# tokio-compatible(only have effect when legacy is enabled and iouring is not)
tokio-compat = ["tokio"]
# signal enables setting ctrl_c handler
signal = ["ctrlc", "sync"]
signal-termination = ["signal", "ctrlc/termination"]
# by default both iouring and legacy are enabled
default = ["async-cancel", "bytes", "iouring", "legacy", "macros", "utils"]
default = [
"async-cancel",
"bytes",
"iouring",
"iouring-fixed",
"legacy",
"macros",
"utils",
]
59 changes: 58 additions & 1 deletion monoio/src/driver/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/// Monoio Driver.
// #[cfg(unix)]
pub(crate) mod op;
pub(crate) mod oppool;
pub(crate) mod shared_fd;
#[cfg(feature = "sync")]
pub(crate) mod thread;
Expand All @@ -9,6 +10,8 @@ pub(crate) mod thread;
mod legacy;
#[cfg(all(target_os = "linux", feature = "iouring"))]
mod uring;
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
mod uring_fixed;

mod util;

Expand All @@ -35,6 +38,8 @@ use self::op::{CompletionMeta, Op, OpAble};
pub use self::uring::IoUringDriver;
#[cfg(all(target_os = "linux", feature = "iouring"))]
use self::uring::UringInner;
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
use self::uring_fixed::UringFixedInner;

/// Unpark a runtime of another thread.
pub(crate) mod unpark {
Expand Down Expand Up @@ -91,6 +96,8 @@ scoped_thread_local!(pub(crate) static CURRENT: Inner);
pub(crate) enum Inner {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Uring(std::rc::Rc<std::cell::UnsafeCell<UringInner>>),
#[cfg(all(target_os = "linux", feature = "iouring"))]
UringFixed(std::rc::Rc<std::cell::UnsafeCell<UringFixedInner>>),
#[cfg(feature = "legacy")]
Legacy(std::rc::Rc<std::cell::UnsafeCell<LegacyInner>>),
}
Expand All @@ -102,6 +109,10 @@ impl Inner {
_ => unimplemented!(),
#[cfg(all(target_os = "linux", feature = "iouring"))]
Inner::Uring(this) => UringInner::submit_with_data(this, data),
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
Inner::UringFixed(this) => {
panic!("uring fixed supports maybe_submit instead of submit")
}
#[cfg(feature = "legacy")]
Inner::Legacy(this) => LegacyInner::submit_with_data(this, data),
#[cfg(all(
Expand All @@ -114,6 +125,26 @@ impl Inner {
}
}

fn maybe_submit_with<T: OpAble>(&self, data: T) -> Option<io::Result<Op<T>>> {
match self {
#[cfg(windows)]
_ => unimplemented!(),
#[cfg(all(target_os = "linux", feature = "iouring"))]
_ => unimplemented!(),
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
Inner::UringFixed(this) => UringInnerFixed::maybe_submit_with_data(this, data),
#[cfg(feature = "legacy")]
_ => unimplemented!(),
#[cfg(all(
not(feature = "legacy"),
not(all(target_os = "linux", feature = "iouring"))
))]
_ => {
util::feature_panic();
}
}
}

#[allow(unused)]
fn poll_op<T: OpAble>(
&self,
Expand Down Expand Up @@ -180,7 +211,12 @@ impl Inner {
}
}

#[cfg(all(target_os = "linux", feature = "iouring", feature = "legacy"))]
#[cfg(all(
target_os = "linux",
feature = "iouring",
feature = "ioring-fixed",
feature = "legacy"
))]
fn is_legacy(&self) -> bool {
matches!(self, Inner::Legacy(..))
}
Expand All @@ -190,6 +226,14 @@ impl Inner {
false
}

#[cfg(all(
target_os = "linux",
feature = "iouring-fixed",
not(feature = "legacy")
))]
fn is_legacy(&self) -> bool {
false
}
#[allow(unused)]
#[cfg(not(all(target_os = "linux", feature = "iouring")))]
fn is_legacy(&self) -> bool {
Expand All @@ -203,6 +247,8 @@ impl Inner {
pub(crate) enum UnparkHandle {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Uring(self::uring::UnparkHandle),
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
UringFixed(self::uring::UnparkHandle),
#[cfg(feature = "legacy")]
Legacy(self::legacy::UnparkHandle),
}
Expand All @@ -213,6 +259,8 @@ impl unpark::Unpark for UnparkHandle {
match self {
#[cfg(all(target_os = "linux", feature = "iouring"))]
UnparkHandle::Uring(inner) => inner.unpark(),
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
UnparkHandle::UringFixed(inner) => inner.unpark(),
#[cfg(feature = "legacy")]
UnparkHandle::Legacy(inner) => inner.unpark(),
#[cfg(all(
Expand All @@ -233,6 +281,13 @@ impl From<self::uring::UnparkHandle> for UnparkHandle {
}
}

#[cfg(all(feature = "sync", target_os = "linux", feature = "iouring-fixed"))]
impl From<self::uring::UnparkHandle> for UnparkHandle {
fn from(inner: self::uring::UnparkHandle) -> Self {
Self::Uring(inner)
}
}

#[cfg(all(feature = "sync", feature = "legacy"))]
impl From<self::legacy::UnparkHandle> for UnparkHandle {
fn from(inner: self::legacy::UnparkHandle) -> Self {
Expand All @@ -247,6 +302,8 @@ impl UnparkHandle {
CURRENT.with(|inner| match inner {
#[cfg(all(target_os = "linux", feature = "iouring"))]
Inner::Uring(this) => UringInner::unpark(this).into(),
#[cfg(all(target_os = "linux", feature = "iouring-fixed"))]
Inner::Uring(this) => UringInner::unpark(this).into(),
#[cfg(feature = "legacy")]
Inner::Legacy(this) => LegacyInner::unpark(this).into(),
})
Expand Down
10 changes: 10 additions & 0 deletions monoio/src/driver/op.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub(crate) struct CompletionMeta {
pub(crate) result: io::Result<u32>,
#[allow(unused)]
pub(crate) flags: u32,
// relevant to io_uring_fixed runtime
pub(crate) user_data: usize,
}

pub(crate) trait OpAble {
Expand Down Expand Up @@ -99,6 +101,14 @@ impl<T> Op<T> {
driver::CURRENT.with(|this| this.submit_with(data))
}

/// Submit an operation to uring if submission queue is not full.
pub(super) fn maybe_submit_with(data: T) -> io::Result<Op<T>>
where
T: OpAble,
{
driver::CURRENT.with(|this| this.maybe_submit_with(data))
}

/// Try submitting an operation to uring
#[allow(unused)]
pub(super) fn try_submit_with(data: T) -> io::Result<Op<T>>
Expand Down
108 changes: 108 additions & 0 deletions monoio/src/driver/op/accept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

impl Op<Accept> {
/// Accept a connection
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Self> {

Check failure on line 34 in monoio/src/driver/op/accept.rs

View workflow job for this annotation

GitHub Actions / Run cargo test on macos

duplicate definitions with name `accept`
#[cfg(unix)]
let addr = Box::new((
MaybeUninit::uninit(),
Expand Down Expand Up @@ -127,3 +127,111 @@
};
}
}

/// Pool of accept operations
pub struct Accept {

Check failure on line 132 in monoio/src/driver/op/accept.rs

View workflow job for this annotation

GitHub Actions / Run cargo test on macos

the name `Accept` is defined multiple times

Check failure on line 132 in monoio/src/driver/op/accept.rs

View workflow job for this annotation

GitHub Actions / Run cargo test

the name `Accept` is defined multiple times
pub(crate) fd: SharedFd,
#[cfg(unix)]
pub(crate) addr: Box<(MaybeUninit<libc::sockaddr_storage>, libc::socklen_t)>,
#[cfg(windows)]
pub(crate) addr: Box<(MaybeUninit<SOCKADDR_STORAGE>, socklen_t)>,
}

impl Op<Accept> {
/// Accept a connection
pub(crate) fn accept(fd: &SharedFd) -> io::Result<Self> {
#[cfg(unix)]
let addr = Box::new((
MaybeUninit::uninit(),
size_of::<libc::sockaddr_storage>() as libc::socklen_t,
));

#[cfg(windows)]
let addr = Box::new((
MaybeUninit::uninit(),
size_of::<SOCKADDR_STORAGE>() as socklen_t,
));

Op::submit_with(Accept {
fd: fd.clone(),
addr,
})
}
}

impl OpAble for Accept {

Check failure on line 162 in monoio/src/driver/op/accept.rs

View workflow job for this annotation

GitHub Actions / Run cargo test on macos

conflicting implementations of trait `OpAble` for type `accept::Accept`
#[cfg(all(target_os = "linux", feature = "iouring"))]
fn uring_op(&mut self) -> io_uring::squeue::Entry {
opcode::Accept::new(
types::Fd(self.fd.raw_fd()),
self.addr.0.as_mut_ptr() as *mut _,
&mut self.addr.1,
)
.build()
}

#[cfg(feature = "legacy")]
fn legacy_interest(&self) -> Option<(Direction, usize)> {
self.fd.registered_index().map(|idx| (Direction::Read, idx))
}

#[cfg(windows)]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_socket();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;

syscall!(accept(fd, addr, len), PartialEq::eq, INVALID_SOCKET)
}

#[cfg(all(unix, feature = "legacy"))]
fn legacy_call(&mut self) -> io::Result<u32> {
let fd = self.fd.as_raw_fd();
let addr = self.addr.0.as_mut_ptr() as *mut _;
let len = &mut self.addr.1;
// Here I use copied some code from mio because I don't want the conversion.

// On platforms that support it we can use `accept4(2)` to set `NONBLOCK`
// and `CLOEXEC` in the call to accept the connection.
#[cfg(any(
// Android x86's seccomp profile forbids calls to `accept4(2)`
// See https://github.com/tokio-rs/mio/issues/1445 for details
all(
not(target_arch="x86"),
target_os = "android"
),
target_os = "dragonfly",
target_os = "freebsd",
target_os = "illumos",
target_os = "linux",
target_os = "netbsd",
target_os = "openbsd"
))]
return syscall_u32!(accept4(
fd,
addr,
len,
libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK,
));

// But not all platforms have the `accept4(2)` call. Luckily BSD (derived)
// OSes inherit the non-blocking flag from the listener, so we just have to
// set `CLOEXEC`.
#[cfg(any(
all(target_arch = "x86", target_os = "android"),
target_os = "ios",
target_os = "macos",
target_os = "redox"
))]
return {
let stream_fd = syscall_u32!(accept(fd, addr, len))? as i32;
syscall_u32!(fcntl(stream_fd, libc::F_SETFD, libc::FD_CLOEXEC))
.and_then(|_| syscall_u32!(fcntl(stream_fd, libc::F_SETFL, libc::O_NONBLOCK)))
.map_err(|e| {
let _ = syscall_u32!(close(stream_fd));
e
})?;
Ok(stream_fd as _)
};
}
}
21 changes: 21 additions & 0 deletions monoio/src/driver/oppool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use std::{
future::Future,
io,
pin::Pin,
task::{Context, Poll},
};

// pub(crate) mod close;

mod accept;
// mod connect;
// mod fsync;
// mod open;
// mod poll;
// mod read;
// mod recv;
// mod send;
// mod write;

#[cfg(all(target_os = "linux", feature = "splice"))]

Check failure on line 20 in monoio/src/driver/oppool.rs

View workflow job for this annotation

GitHub Actions / Run cargo fmt and cargo clippy

expected item after attributes

Check failure on line 20 in monoio/src/driver/oppool.rs

View workflow job for this annotation

GitHub Actions / Run cargo test on macos

expected item after attributes

Check failure on line 20 in monoio/src/driver/oppool.rs

View workflow job for this annotation

GitHub Actions / Run cargo test

expected item after attributes
// mod splice;
Loading
Loading