diff --git a/Cargo.toml b/Cargo.toml index bbedd3b5..64ecb7c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ libc = "0.2" # Other platform dependencies [target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] -mio = { version = "0.8", features = ["os-ext"] } +polling = "3" libc = "0.2" [features] diff --git a/README.md b/README.md index 3485dd94..c7861290 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![docs.rs](https://img.shields.io/badge/docs.rs-compio-latest)](https://docs.rs/compio) [![Azure DevOps builds](https://strawberry-vs.visualstudio.com/compio/_apis/build/status/Berrysoft.compio?branch=master)](https://strawberry-vs.visualstudio.com/compio/_build) -A thread-per-core Rust runtime with IOCP/io_uring/mio. +A thread-per-core Rust runtime with IOCP/io_uring/polling. The name comes from "completion-based IO". This crate is inspired by [monoio](https://github.com/bytedance/monoio/). diff --git a/src/driver/mio/mod.rs b/src/driver/mio/mod.rs deleted file mode 100644 index fe5a64c8..00000000 --- a/src/driver/mio/mod.rs +++ /dev/null @@ -1,229 +0,0 @@ -#[doc(no_inline)] -pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; -use std::{ - collections::{HashMap, HashSet, VecDeque}, - io, - ops::ControlFlow, - pin::Pin, - time::Duration, -}; - -pub(crate) use libc::{sockaddr_storage, socklen_t}; -use mio::{ - event::{Event, Source}, - unix::SourceFd, - Events, Interest, Poll, Token, -}; -use slab::Slab; - -use crate::driver::Entry; - -pub(crate) mod op; -pub(crate) use crate::driver::unix::RawOp; - -/// Abstraction of operations. -pub trait OpCode { - /// Perform the operation before submit, and return [`Decision`] to - /// indicate whether submitting the operation to mio is required. - fn pre_submit(self: Pin<&mut Self>) -> io::Result; - - /// Perform the operation after received corresponding - /// event. - fn on_event(self: Pin<&mut Self>, event: &Event) -> io::Result>; -} - -/// Result of [`OpCode::pre_submit`]. -pub enum Decision { - /// Instant operation, no need to submit - Completed(usize), - /// Async operation, needs to submit - Wait(WaitArg), -} - -impl Decision { - /// Decide to wait for the given fd with the given interest. - pub fn wait_for(fd: RawFd, interest: Interest) -> Self { - Self::Wait(WaitArg { fd, interest }) - } - - /// Decide to wait for the given fd to be readable. - pub fn wait_readable(fd: RawFd) -> Self { - Self::wait_for(fd, Interest::READABLE) - } - - /// Decide to wait for the given fd to be writable. - pub fn wait_writable(fd: RawFd) -> Self { - Self::wait_for(fd, Interest::WRITABLE) - } -} - -/// Meta of mio operations. -#[derive(Debug, Clone, Copy)] -pub struct WaitArg { - fd: RawFd, - interest: Interest, -} - -/// Low-level driver of mio. -pub(crate) struct Driver { - events: Events, - poll: Poll, - waiting: HashMap, - cancelled: HashSet, - cancel_queue: VecDeque, -} - -/// Entry waiting for events -struct WaitEntry { - arg: WaitArg, - user_data: usize, -} - -impl WaitEntry { - fn new(user_data: usize, arg: WaitArg) -> Self { - Self { arg, user_data } - } -} - -impl Driver { - pub fn new(entries: u32) -> io::Result { - let entries = entries as usize; // for the sake of consistency, use u32 like iour - - Ok(Self { - events: Events::with_capacity(entries), - poll: Poll::new()?, - waiting: HashMap::new(), - cancelled: HashSet::new(), - cancel_queue: VecDeque::new(), - }) - } -} - -impl Driver { - fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> { - if self.cancelled.remove(&user_data) { - self.cancel_queue.push_back(user_data); - } else { - let token = Token(user_data); - - SourceFd(&arg.fd).register(self.poll.registry(), token, arg.interest)?; - - // Only insert the entry after it was registered successfully - self.waiting - .insert(user_data, WaitEntry::new(user_data, arg)); - } - Ok(()) - } - - /// Register all operations in the squeue to mio. - fn submit_squeue( - &mut self, - ops: &mut impl Iterator, - entries: &mut impl Extend, - registry: &mut Slab, - ) -> io::Result { - let mut extended = false; - for user_data in ops { - let op = registry[user_data].as_pin(); - match op.pre_submit() { - Ok(Decision::Wait(arg)) => { - self.submit(user_data, arg)?; - } - Ok(Decision::Completed(res)) => { - entries.extend(Some(Entry::new(user_data, Ok(res)))); - extended = true; - } - Err(err) => { - entries.extend(Some(Entry::new(user_data, Err(err)))); - extended = true; - } - } - } - - Ok(extended) - } - - /// Poll all events from mio, call `perform` on op and push them into - /// cqueue. - fn poll_impl( - &mut self, - timeout: Option, - entries: &mut impl Extend, - registry: &mut Slab, - ) -> io::Result<()> { - self.poll.poll(&mut self.events, timeout)?; - if self.events.is_empty() && timeout.is_some() { - return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); - } - for event in &self.events { - let token = event.token(); - let entry = self - .waiting - .get_mut(&token.0) - .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? - let op = registry[entry.user_data].as_pin(); - let res = match op.on_event(event) { - Ok(ControlFlow::Continue(_)) => continue, - Ok(ControlFlow::Break(res)) => Ok(res), - Err(err) => Err(err), - }; - self.poll - .registry() - .deregister(&mut SourceFd(&entry.arg.fd))?; - let entry = Entry::new(entry.user_data, res); - entries.extend(Some(entry)); - self.waiting.remove(&token.0); - } - Ok(()) - } - - fn poll_cancel(&mut self, entries: &mut impl Extend) -> bool { - let has_cancel = !self.cancel_queue.is_empty(); - if has_cancel { - entries.extend(self.cancel_queue.drain(..).map(|user_data| { - Entry::new( - user_data, - Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)), - ) - })) - } - has_cancel - } - - pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> { - Ok(()) - } - - pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab) { - if let Some(entry) = self.waiting.remove(&user_data) { - self.poll - .registry() - .deregister(&mut SourceFd(&entry.arg.fd)) - .ok(); - self.cancel_queue.push_back(user_data); - } else { - self.cancelled.insert(user_data); - } - } - - pub unsafe fn poll( - &mut self, - timeout: Option, - ops: &mut impl Iterator, - entries: &mut impl Extend, - registry: &mut Slab, - ) -> io::Result<()> { - let mut extended = self.submit_squeue(ops, entries, registry)?; - extended |= self.poll_cancel(entries); - if !extended { - self.poll_impl(timeout, entries, registry)?; - } - Ok(()) - } -} - -impl AsRawFd for Driver { - fn as_raw_fd(&self) -> RawFd { - self.poll.as_raw_fd() - } -} diff --git a/src/driver/mod.rs b/src/driver/mod.rs index afe512a1..c913fa9f 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -17,8 +17,8 @@ cfg_if::cfg_if! { mod iour; pub use iour::*; } else if #[cfg(unix)]{ - mod mio; - pub use self::mio::*; + mod poll; + pub use poll::*; } } @@ -118,7 +118,7 @@ impl Proactor { /// attached to one driver, and could only be attached once, even if you /// `try_clone` it. It will cause unexpected result to attach the handle /// with one driver and push an op to another driver. - /// * io-uring/mio: it will do nothing and return `Ok(())` + /// * io-uring/polling: it will do nothing and return `Ok(())`. pub fn attach(&mut self, fd: RawFd) -> io::Result<()> { self.driver.attach(fd) } diff --git a/src/driver/poll/mod.rs b/src/driver/poll/mod.rs new file mode 100644 index 00000000..8fa2f5bd --- /dev/null +++ b/src/driver/poll/mod.rs @@ -0,0 +1,288 @@ +#[doc(no_inline)] +pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; +use std::{ + collections::{HashMap, HashSet, VecDeque}, + io, + num::NonZeroUsize, + ops::ControlFlow, + os::fd::BorrowedFd, + pin::Pin, + time::Duration, +}; + +pub(crate) use libc::{sockaddr_storage, socklen_t}; +use polling::{Event, Events, Poller}; +use slab::Slab; + +use crate::driver::Entry; + +pub(crate) mod op; +pub(crate) use crate::driver::unix::RawOp; + +/// Abstraction of operations. +pub trait OpCode { + /// Perform the operation before submit, and return [`Decision`] to + /// indicate whether submitting the operation to polling is required. + fn pre_submit(self: Pin<&mut Self>) -> io::Result; + + /// Perform the operation after received corresponding + /// event. + fn on_event(self: Pin<&mut Self>, event: &Event) -> io::Result>; +} + +/// Result of [`OpCode::pre_submit`]. +pub enum Decision { + /// Instant operation, no need to submit + Completed(usize), + /// Async operation, needs to submit + Wait(WaitArg), +} + +impl Decision { + /// Decide to wait for the given fd with the given interest. + pub fn wait_for(fd: RawFd, interest: Interest) -> Self { + Self::Wait(WaitArg { fd, interest }) + } + + /// Decide to wait for the given fd to be readable. + pub fn wait_readable(fd: RawFd) -> Self { + Self::wait_for(fd, Interest::Readable) + } + + /// Decide to wait for the given fd to be writable. + pub fn wait_writable(fd: RawFd) -> Self { + Self::wait_for(fd, Interest::Writable) + } +} + +/// Meta of polling operations. +#[derive(Debug, Clone, Copy)] +pub struct WaitArg { + /// The raw fd of the operation. + pub fd: RawFd, + /// The interest to be registered. + pub interest: Interest, +} + +/// The interest of the operation +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Interest { + /// Represents a read operation. + Readable, + /// Represents a write operation. + Writable, +} + +#[derive(Debug, Default)] +struct FdQueue { + read_queue: VecDeque, + write_queue: VecDeque, +} + +impl FdQueue { + pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) { + match interest { + Interest::Readable => self.read_queue.push_back(user_data), + Interest::Writable => self.write_queue.push_back(user_data), + } + } + + pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) { + match interest { + Interest::Readable => self.read_queue.push_front(user_data), + Interest::Writable => self.write_queue.push_front(user_data), + } + } + + pub fn event(&self, key: usize) -> Event { + let mut event = Event::all(key); + event.readable = !self.read_queue.is_empty(); + event.writable = !self.write_queue.is_empty(); + event + } + + pub fn pop_interest(&mut self, event: &Event) -> (usize, Interest) { + if event.readable { + if let Some(user_data) = self.read_queue.pop_front() { + return (user_data, Interest::Readable); + } + } + if event.writable { + if let Some(user_data) = self.write_queue.pop_front() { + return (user_data, Interest::Writable); + } + } + unreachable!("should receive event when no interest") + } +} + +/// Low-level driver of polling. +pub(crate) struct Driver { + events: Events, + poll: Poller, + registry: HashMap, + cancelled: HashSet, +} + +impl Driver { + pub fn new(entries: u32) -> io::Result { + let entries = entries as usize; // for the sake of consistency, use u32 like iour + let events = if entries == 0 { + Events::new() + } else { + Events::with_capacity(NonZeroUsize::new(entries).unwrap()) + }; + + Ok(Self { + events, + poll: Poller::new()?, + registry: HashMap::new(), + cancelled: HashSet::new(), + }) + } + + fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result { + if self.cancelled.remove(&user_data) { + Ok(false) + } else { + let need_add = !self.registry.contains_key(&arg.fd); + let queue = self.registry.entry(arg.fd).or_default(); + queue.push_back_interest(user_data, arg.interest); + // We use fd as the key. + let event = queue.event(arg.fd as usize); + unsafe { + if need_add { + self.poll.add(arg.fd, event)?; + } else { + let fd = BorrowedFd::borrow_raw(arg.fd); + self.poll.modify(fd, event)?; + } + } + Ok(true) + } + } + + /// Register all operations in the squeue to polling. + fn submit_squeue( + &mut self, + ops: &mut impl Iterator, + entries: &mut impl Extend, + registry: &mut Slab, + ) -> io::Result { + let mut extended = false; + for user_data in ops { + let op = registry[user_data].as_pin(); + match op.pre_submit() { + Ok(Decision::Wait(arg)) => { + let succeeded = self.submit(user_data, arg)?; + if !succeeded { + entries.extend(Some(entry_cancelled(user_data))); + extended = true; + } + } + Ok(Decision::Completed(res)) => { + entries.extend(Some(Entry::new(user_data, Ok(res)))); + extended = true; + } + Err(err) => { + entries.extend(Some(Entry::new(user_data, Err(err)))); + extended = true; + } + } + } + + Ok(extended) + } + + /// Poll all events from polling, call `perform` on op and push them into + /// cqueue. + fn poll_impl( + &mut self, + timeout: Option, + entries: &mut impl Extend, + registry: &mut Slab, + ) -> io::Result<()> { + self.poll.wait(&mut self.events, timeout)?; + if self.events.is_empty() && timeout.is_some() { + return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); + } + for event in self.events.iter() { + let fd = event.key as RawFd; + let queue = self + .registry + .get_mut(&fd) + .expect("the fd should be attached"); + let (user_data, interest) = queue.pop_interest(&event); + if self.cancelled.remove(&user_data) { + entries.extend(Some(entry_cancelled(user_data))); + } else { + let op = registry[user_data].as_pin(); + let res = match op.on_event(&event) { + Ok(ControlFlow::Continue(_)) => { + // The operation should go back to the front. + queue.push_front_interest(user_data, interest); + None + } + Ok(ControlFlow::Break(res)) => Some(Ok(res)), + Err(err) => Some(Err(err)), + }; + if let Some(res) = res { + let entry = Entry::new(user_data, res); + entries.extend(Some(entry)); + } + } + let renew_event = queue.event(fd as _); + unsafe { + let fd = BorrowedFd::borrow_raw(fd); + self.poll.modify(fd, renew_event)?; + } + } + Ok(()) + } + + pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> { + Ok(()) + } + + pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab) { + self.cancelled.insert(user_data); + } + + pub unsafe fn poll( + &mut self, + timeout: Option, + ops: &mut impl Iterator, + entries: &mut impl Extend, + registry: &mut Slab, + ) -> io::Result<()> { + let extended = self.submit_squeue(ops, entries, registry)?; + if !extended { + self.poll_impl(timeout, entries, registry)?; + } + Ok(()) + } +} + +impl AsRawFd for Driver { + fn as_raw_fd(&self) -> RawFd { + self.poll.as_raw_fd() + } +} + +impl Drop for Driver { + fn drop(&mut self) { + for fd in self.registry.keys() { + unsafe { + let fd = BorrowedFd::borrow_raw(*fd); + self.poll.delete(fd).ok(); + } + } + } +} + +fn entry_cancelled(user_data: usize) -> Entry { + Entry::new( + user_data, + Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)), + ) +} diff --git a/src/driver/mio/op.rs b/src/driver/poll/op.rs similarity index 93% rename from src/driver/mio/op.rs rename to src/driver/poll/op.rs index 7956dbf6..680bc9d7 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/poll/op.rs @@ -1,6 +1,6 @@ use std::{io, ops::ControlFlow, pin::Pin}; -use mio::event::Event; +use polling::Event; pub use crate::driver::unix::op::*; use crate::{ @@ -31,7 +31,7 @@ impl OpCode for ReadAt { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); let fd = self.fd; let slice = self.buffer.as_uninit_slice(); @@ -67,7 +67,7 @@ impl OpCode for WriteAt { } fn on_event(self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); let slice = self.buffer.as_slice(); @@ -88,7 +88,7 @@ impl OpCode for Sync { } fn on_event(self: Pin<&mut Self>, _: &Event) -> std::io::Result> { - unreachable!("Sync operation should not be submitted to mio") + unreachable!("Sync operation should not be submitted to polling") } } @@ -104,7 +104,7 @@ impl OpCode for Accept { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); match syscall!(accept( self.fd, @@ -126,7 +126,7 @@ impl OpCode for Connect { } fn on_event(self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); let mut err: libc::c_int = 0; let mut err_len = std::mem::size_of::() as libc::socklen_t; @@ -153,7 +153,7 @@ impl OpCode for RecvImpl { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); self.slices = unsafe { self.buffer.as_io_slices_mut() }; syscall!(break readv(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) @@ -166,7 +166,7 @@ impl OpCode for SendImpl { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); self.slices = unsafe { self.buffer.as_io_slices() }; syscall!(break writev(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) @@ -180,7 +180,7 @@ impl OpCode for RecvFromImpl { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); syscall!(break recvmsg(self.fd, &mut self.msg, 0)) } @@ -193,7 +193,7 @@ impl OpCode for SendToImpl { } fn on_event(self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); syscall!(break sendmsg(self.fd, &self.msg, 0)) } diff --git a/src/driver/unix/mod.rs b/src/driver/unix/mod.rs index eaca327f..d54efd90 100644 --- a/src/driver/unix/mod.rs +++ b/src/driver/unix/mod.rs @@ -1,5 +1,5 @@ //! This mod doesn't actually contain any driver, but meant to provide some -//! common op type and utilities for unix platform (for iour and mio). +//! common op type and utilities for unix platform (for iour and polling). pub(crate) mod op; diff --git a/src/event/pipe.rs b/src/event/pipe.rs index 2987f5e2..29f0d375 100644 --- a/src/event/pipe.rs +++ b/src/event/pipe.rs @@ -1,6 +1,6 @@ use std::{ io, - os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, }; use arrayvec::ArrayVec; @@ -18,10 +18,14 @@ pub struct Event { impl Event { /// Create [`Event`]. pub fn new() -> io::Result { - let (sender, receiver) = mio::unix::pipe::new()?; - sender.set_nonblocking(false)?; - let sender = unsafe { OwnedFd::from_raw_fd(sender.into_raw_fd()) }; - let receiver = unsafe { OwnedFd::from_raw_fd(receiver.into_raw_fd()) }; + let mut fds = [-1, -1]; + syscall!(pipe(fds.as_mut_ptr()))?; + let receiver = unsafe { OwnedFd::from_raw_fd(fds[0]) }; + let sender = unsafe { OwnedFd::from_raw_fd(fds[1]) }; + + syscall!(fcntl(receiver.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; + syscall!(fcntl(receiver.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK))?; + syscall!(fcntl(sender.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; Ok(Self { sender, receiver }) } diff --git a/src/lib.rs b/src/lib.rs index 5c4000a2..1933c610 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,7 +123,7 @@ macro_rules! syscall { Ok(res) } }}; - // The below branches are used by mio driver. + // The below branches are used by polling driver. (break $fn: ident ( $($arg: expr),* $(,)* )) => { $crate::syscall!( $fn ( $($arg, )* )).map( |res| ::std::ops::ControlFlow::Break(res as usize) diff --git a/src/op.rs b/src/op.rs index 7c5de9a6..d94a8d37 100644 --- a/src/op.rs +++ b/src/op.rs @@ -128,7 +128,7 @@ impl Sync { /// /// * IOCP: it is synchronized operation, and calls `FlushFileBuffers`. /// * io-uring: `fdatasync` if `datasync` specified, otherwise `fsync`. - /// * mio: it is synchronized `fdatasync` or `fsync`. + /// * polling: it is synchronized `fdatasync` or `fsync`. pub fn new(fd: RawFd, datasync: bool) -> Self { Self { fd, datasync } } diff --git a/tests/driver.rs b/tests/driver.rs index 12561c76..5d21a14d 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -37,3 +37,22 @@ fn timeout() { .unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::TimedOut); } + +#[test] +fn register_multiple() { + const TASK_LEN: usize = 5; + + let mut driver = Proactor::new().unwrap(); + + let file = File::open("Cargo.toml").unwrap(); + driver.attach(file.as_raw_fd()).unwrap(); + + for _i in 0..TASK_LEN { + driver.push(ReadAt::new(file.as_raw_fd(), 0, Vec::with_capacity(1024))); + } + + let mut entries = ArrayVec::::new(); + while entries.len() < TASK_LEN { + driver.poll(None, &mut entries).unwrap(); + } +}