From ed20c44b70604007e88b767197d80328478d4faf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Fri, 22 Sep 2023 12:35:19 +0800 Subject: [PATCH 1/5] Try to use polling instead of mio. --- Cargo.toml | 2 +- README.md | 2 +- src/driver/mod.rs | 6 +- src/driver/{mio => poll}/mod.rs | 115 +++++++++++++------------------- src/driver/{mio => poll}/op.rs | 20 +++--- src/driver/unix/mod.rs | 2 +- src/event/pipe.rs | 14 ++-- src/lib.rs | 2 +- src/op.rs | 2 +- 9 files changed, 74 insertions(+), 91 deletions(-) rename src/driver/{mio => poll}/mod.rs (63%) rename src/driver/{mio => poll}/op.rs (93%) diff --git a/Cargo.toml b/Cargo.toml index bbedd3b5..64ecb7c2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -75,7 +75,7 @@ libc = "0.2" # Other platform dependencies [target.'cfg(all(not(target_os = "linux"), unix))'.dependencies] -mio = { version = "0.8", features = ["os-ext"] } +polling = "3" libc = "0.2" [features] diff --git a/README.md b/README.md index 3485dd94..c7861290 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ [![docs.rs](https://img.shields.io/badge/docs.rs-compio-latest)](https://docs.rs/compio) [![Azure DevOps builds](https://strawberry-vs.visualstudio.com/compio/_apis/build/status/Berrysoft.compio?branch=master)](https://strawberry-vs.visualstudio.com/compio/_build) -A thread-per-core Rust runtime with IOCP/io_uring/mio. +A thread-per-core Rust runtime with IOCP/io_uring/polling. The name comes from "completion-based IO". This crate is inspired by [monoio](https://github.com/bytedance/monoio/). diff --git a/src/driver/mod.rs b/src/driver/mod.rs index afe512a1..6bb6e852 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -17,8 +17,8 @@ cfg_if::cfg_if! { mod iour; pub use iour::*; } else if #[cfg(unix)]{ - mod mio; - pub use self::mio::*; + mod poll; + pub use poll::*; } } @@ -118,7 +118,7 @@ impl Proactor { /// attached to one driver, and could only be attached once, even if you /// `try_clone` it. It will cause unexpected result to attach the handle /// with one driver and push an op to another driver. - /// * io-uring/mio: it will do nothing and return `Ok(())` + /// * io-uring/polling: it will do nothing and return `Ok(())` pub fn attach(&mut self, fd: RawFd) -> io::Result<()> { self.driver.attach(fd) } diff --git a/src/driver/mio/mod.rs b/src/driver/poll/mod.rs similarity index 63% rename from src/driver/mio/mod.rs rename to src/driver/poll/mod.rs index fe5a64c8..4d80ba0f 100644 --- a/src/driver/mio/mod.rs +++ b/src/driver/poll/mod.rs @@ -1,19 +1,16 @@ #[doc(no_inline)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::{ - collections::{HashMap, HashSet, VecDeque}, + collections::{HashSet, VecDeque}, io, + num::NonZeroUsize, ops::ControlFlow, pin::Pin, time::Duration, }; pub(crate) use libc::{sockaddr_storage, socklen_t}; -use mio::{ - event::{Event, Source}, - unix::SourceFd, - Events, Interest, Poll, Token, -}; +use polling::{Event, Events, Poller}; use slab::Slab; use crate::driver::Entry; @@ -24,7 +21,7 @@ pub(crate) use crate::driver::unix::RawOp; /// Abstraction of operations. pub trait OpCode { /// Perform the operation before submit, and return [`Decision`] to - /// indicate whether submitting the operation to mio is required. + /// indicate whether submitting the operation to polling is required. fn pre_submit(self: Pin<&mut Self>) -> io::Result; /// Perform the operation after received corresponding @@ -42,57 +39,53 @@ pub enum Decision { impl Decision { /// Decide to wait for the given fd with the given interest. - pub fn wait_for(fd: RawFd, interest: Interest) -> Self { - Self::Wait(WaitArg { fd, interest }) + pub fn wait_for(fd: RawFd, readable: bool, writable: bool) -> Self { + Self::Wait(WaitArg { + fd, + readable, + writable, + }) } /// Decide to wait for the given fd to be readable. pub fn wait_readable(fd: RawFd) -> Self { - Self::wait_for(fd, Interest::READABLE) + Self::wait_for(fd, true, false) } /// Decide to wait for the given fd to be writable. pub fn wait_writable(fd: RawFd) -> Self { - Self::wait_for(fd, Interest::WRITABLE) + Self::wait_for(fd, false, true) } } -/// Meta of mio operations. +/// Meta of polling operations. #[derive(Debug, Clone, Copy)] pub struct WaitArg { fd: RawFd, - interest: Interest, + readable: bool, + writable: bool, } -/// Low-level driver of mio. +/// Low-level driver of polling. pub(crate) struct Driver { events: Events, - poll: Poll, - waiting: HashMap, + poll: Poller, cancelled: HashSet, cancel_queue: VecDeque, } -/// Entry waiting for events -struct WaitEntry { - arg: WaitArg, - user_data: usize, -} - -impl WaitEntry { - fn new(user_data: usize, arg: WaitArg) -> Self { - Self { arg, user_data } - } -} - impl Driver { pub fn new(entries: u32) -> io::Result { let entries = entries as usize; // for the sake of consistency, use u32 like iour + let events = if entries == 0 { + Events::new() + } else { + Events::with_capacity(NonZeroUsize::new(entries).unwrap()) + }; Ok(Self { - events: Events::with_capacity(entries), - poll: Poll::new()?, - waiting: HashMap::new(), + events, + poll: Poller::new()?, cancelled: HashSet::new(), cancel_queue: VecDeque::new(), }) @@ -104,18 +97,17 @@ impl Driver { if self.cancelled.remove(&user_data) { self.cancel_queue.push_back(user_data); } else { - let token = Token(user_data); - - SourceFd(&arg.fd).register(self.poll.registry(), token, arg.interest)?; - - // Only insert the entry after it was registered successfully - self.waiting - .insert(user_data, WaitEntry::new(user_data, arg)); + let mut event = Event::none(user_data); + event.readable = arg.readable; + event.writable = arg.writable; + unsafe { + self.poll.add(arg.fd, event)?; + } } Ok(()) } - /// Register all operations in the squeue to mio. + /// Register all operations in the squeue to polling. fn submit_squeue( &mut self, ops: &mut impl Iterator, @@ -143,7 +135,7 @@ impl Driver { Ok(extended) } - /// Poll all events from mio, call `perform` on op and push them into + /// Poll all events from polling, call `perform` on op and push them into /// cqueue. fn poll_impl( &mut self, @@ -151,28 +143,23 @@ impl Driver { entries: &mut impl Extend, registry: &mut Slab, ) -> io::Result<()> { - self.poll.poll(&mut self.events, timeout)?; + self.poll.wait(&mut self.events, timeout)?; if self.events.is_empty() && timeout.is_some() { return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); } - for event in &self.events { - let token = event.token(); - let entry = self - .waiting - .get_mut(&token.0) - .expect("Unknown token returned by mio"); // XXX: Should this be silently ignored? - let op = registry[entry.user_data].as_pin(); - let res = match op.on_event(event) { - Ok(ControlFlow::Continue(_)) => continue, - Ok(ControlFlow::Break(res)) => Ok(res), - Err(err) => Err(err), - }; - self.poll - .registry() - .deregister(&mut SourceFd(&entry.arg.fd))?; - let entry = Entry::new(entry.user_data, res); - entries.extend(Some(entry)); - self.waiting.remove(&token.0); + for event in self.events.iter() { + if self.cancelled.remove(&event.key) { + self.cancel_queue.push_back(event.key); + } else { + let op = registry[event.key].as_pin(); + let res = match op.on_event(&event) { + Ok(ControlFlow::Continue(_)) => continue, + Ok(ControlFlow::Break(res)) => Ok(res), + Err(err) => Err(err), + }; + let entry = Entry::new(event.key, res); + entries.extend(Some(entry)); + } } Ok(()) } @@ -195,15 +182,7 @@ impl Driver { } pub fn cancel(&mut self, user_data: usize, _registry: &mut Slab) { - if let Some(entry) = self.waiting.remove(&user_data) { - self.poll - .registry() - .deregister(&mut SourceFd(&entry.arg.fd)) - .ok(); - self.cancel_queue.push_back(user_data); - } else { - self.cancelled.insert(user_data); - } + self.cancelled.insert(user_data); } pub unsafe fn poll( diff --git a/src/driver/mio/op.rs b/src/driver/poll/op.rs similarity index 93% rename from src/driver/mio/op.rs rename to src/driver/poll/op.rs index 7956dbf6..680bc9d7 100644 --- a/src/driver/mio/op.rs +++ b/src/driver/poll/op.rs @@ -1,6 +1,6 @@ use std::{io, ops::ControlFlow, pin::Pin}; -use mio::event::Event; +use polling::Event; pub use crate::driver::unix::op::*; use crate::{ @@ -31,7 +31,7 @@ impl OpCode for ReadAt { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); let fd = self.fd; let slice = self.buffer.as_uninit_slice(); @@ -67,7 +67,7 @@ impl OpCode for WriteAt { } fn on_event(self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); let slice = self.buffer.as_slice(); @@ -88,7 +88,7 @@ impl OpCode for Sync { } fn on_event(self: Pin<&mut Self>, _: &Event) -> std::io::Result> { - unreachable!("Sync operation should not be submitted to mio") + unreachable!("Sync operation should not be submitted to polling") } } @@ -104,7 +104,7 @@ impl OpCode for Accept { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); match syscall!(accept( self.fd, @@ -126,7 +126,7 @@ impl OpCode for Connect { } fn on_event(self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); let mut err: libc::c_int = 0; let mut err_len = std::mem::size_of::() as libc::socklen_t; @@ -153,7 +153,7 @@ impl OpCode for RecvImpl { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); self.slices = unsafe { self.buffer.as_io_slices_mut() }; syscall!(break readv(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) @@ -166,7 +166,7 @@ impl OpCode for SendImpl { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); self.slices = unsafe { self.buffer.as_io_slices() }; syscall!(break writev(self.fd, self.slices.as_ptr() as _, self.slices.len() as _,)) @@ -180,7 +180,7 @@ impl OpCode for RecvFromImpl { } fn on_event(mut self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_readable()); + debug_assert!(event.readable); syscall!(break recvmsg(self.fd, &mut self.msg, 0)) } @@ -193,7 +193,7 @@ impl OpCode for SendToImpl { } fn on_event(self: Pin<&mut Self>, event: &Event) -> std::io::Result> { - debug_assert!(event.is_writable()); + debug_assert!(event.writable); syscall!(break sendmsg(self.fd, &self.msg, 0)) } diff --git a/src/driver/unix/mod.rs b/src/driver/unix/mod.rs index eaca327f..d54efd90 100644 --- a/src/driver/unix/mod.rs +++ b/src/driver/unix/mod.rs @@ -1,5 +1,5 @@ //! This mod doesn't actually contain any driver, but meant to provide some -//! common op type and utilities for unix platform (for iour and mio). +//! common op type and utilities for unix platform (for iour and polling). pub(crate) mod op; diff --git a/src/event/pipe.rs b/src/event/pipe.rs index 2987f5e2..29f0d375 100644 --- a/src/event/pipe.rs +++ b/src/event/pipe.rs @@ -1,6 +1,6 @@ use std::{ io, - os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}, + os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}, }; use arrayvec::ArrayVec; @@ -18,10 +18,14 @@ pub struct Event { impl Event { /// Create [`Event`]. pub fn new() -> io::Result { - let (sender, receiver) = mio::unix::pipe::new()?; - sender.set_nonblocking(false)?; - let sender = unsafe { OwnedFd::from_raw_fd(sender.into_raw_fd()) }; - let receiver = unsafe { OwnedFd::from_raw_fd(receiver.into_raw_fd()) }; + let mut fds = [-1, -1]; + syscall!(pipe(fds.as_mut_ptr()))?; + let receiver = unsafe { OwnedFd::from_raw_fd(fds[0]) }; + let sender = unsafe { OwnedFd::from_raw_fd(fds[1]) }; + + syscall!(fcntl(receiver.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; + syscall!(fcntl(receiver.as_raw_fd(), libc::F_SETFL, libc::O_NONBLOCK))?; + syscall!(fcntl(sender.as_raw_fd(), libc::F_SETFD, libc::FD_CLOEXEC))?; Ok(Self { sender, receiver }) } diff --git a/src/lib.rs b/src/lib.rs index 5c4000a2..1933c610 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -123,7 +123,7 @@ macro_rules! syscall { Ok(res) } }}; - // The below branches are used by mio driver. + // The below branches are used by polling driver. (break $fn: ident ( $($arg: expr),* $(,)* )) => { $crate::syscall!( $fn ( $($arg, )* )).map( |res| ::std::ops::ControlFlow::Break(res as usize) diff --git a/src/op.rs b/src/op.rs index 7c5de9a6..d94a8d37 100644 --- a/src/op.rs +++ b/src/op.rs @@ -128,7 +128,7 @@ impl Sync { /// /// * IOCP: it is synchronized operation, and calls `FlushFileBuffers`. /// * io-uring: `fdatasync` if `datasync` specified, otherwise `fsync`. - /// * mio: it is synchronized `fdatasync` or `fsync`. + /// * polling: it is synchronized `fdatasync` or `fsync`. pub fn new(fd: RawFd, datasync: bool) -> Self { Self { fd, datasync } } From 047c3db0b823bb52bec8376bd23c84c670c9f550 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Fri, 22 Sep 2023 12:57:04 +0800 Subject: [PATCH 2/5] Add test for register multiple tasks. --- tests/driver.rs | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/driver.rs b/tests/driver.rs index 12561c76..5d21a14d 100644 --- a/tests/driver.rs +++ b/tests/driver.rs @@ -37,3 +37,22 @@ fn timeout() { .unwrap_err(); assert_eq!(err.kind(), io::ErrorKind::TimedOut); } + +#[test] +fn register_multiple() { + const TASK_LEN: usize = 5; + + let mut driver = Proactor::new().unwrap(); + + let file = File::open("Cargo.toml").unwrap(); + driver.attach(file.as_raw_fd()).unwrap(); + + for _i in 0..TASK_LEN { + driver.push(ReadAt::new(file.as_raw_fd(), 0, Vec::with_capacity(1024))); + } + + let mut entries = ArrayVec::::new(); + while entries.len() < TASK_LEN { + driver.poll(None, &mut entries).unwrap(); + } +} From 1481e0a7615f706628b50dd8b855c6af982b643d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Fri, 22 Sep 2023 17:21:16 +0800 Subject: [PATCH 3/5] Add FdQueue --- src/driver/mod.rs | 3 +- src/driver/poll/mod.rs | 159 +++++++++++++++++++++++++++++------------ 2 files changed, 117 insertions(+), 45 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 6bb6e852..a4a6acda 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -118,7 +118,8 @@ impl Proactor { /// attached to one driver, and could only be attached once, even if you /// `try_clone` it. It will cause unexpected result to attach the handle /// with one driver and push an op to another driver. - /// * io-uring/polling: it will do nothing and return `Ok(())` + /// * io-uring: it will do nothing and return `Ok(())`. + /// * polling: it will initialize the internal interest queue for the fd. pub fn attach(&mut self, fd: RawFd) -> io::Result<()> { self.driver.attach(fd) } diff --git a/src/driver/poll/mod.rs b/src/driver/poll/mod.rs index 4d80ba0f..96378077 100644 --- a/src/driver/poll/mod.rs +++ b/src/driver/poll/mod.rs @@ -1,10 +1,11 @@ #[doc(no_inline)] pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd}; use std::{ - collections::{HashSet, VecDeque}, + collections::{HashMap, HashSet, VecDeque}, io, num::NonZeroUsize, ops::ControlFlow, + os::fd::BorrowedFd, pin::Pin, time::Duration, }; @@ -39,39 +40,81 @@ pub enum Decision { impl Decision { /// Decide to wait for the given fd with the given interest. - pub fn wait_for(fd: RawFd, readable: bool, writable: bool) -> Self { - Self::Wait(WaitArg { - fd, - readable, - writable, - }) + pub fn wait_for(fd: RawFd, interest: Interest) -> Self { + Self::Wait(WaitArg { fd, interest }) } /// Decide to wait for the given fd to be readable. pub fn wait_readable(fd: RawFd) -> Self { - Self::wait_for(fd, true, false) + Self::wait_for(fd, Interest::Readable) } /// Decide to wait for the given fd to be writable. pub fn wait_writable(fd: RawFd) -> Self { - Self::wait_for(fd, false, true) + Self::wait_for(fd, Interest::Writable) } } /// Meta of polling operations. #[derive(Debug, Clone, Copy)] pub struct WaitArg { - fd: RawFd, - readable: bool, - writable: bool, + /// The raw fd of the operation. + pub fd: RawFd, + /// The interest to be registered. + pub interest: Interest, +} + +/// The interest of the operation +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Interest { + /// Represents a read operation. + Readable, + /// Represents a write operation. + Writable, +} + +#[derive(Debug, Default)] +struct FdQueue { + read_queue: VecDeque, + write_queue: VecDeque, +} + +impl FdQueue { + pub fn push_interest(&mut self, user_data: usize, interest: Interest) { + match interest { + Interest::Readable => self.read_queue.push_back(user_data), + Interest::Writable => self.write_queue.push_back(user_data), + } + } + + pub fn event(&self, key: usize) -> Event { + let mut event = Event::all(key); + event.readable = !self.read_queue.is_empty(); + event.writable = !self.write_queue.is_empty(); + event + } + + pub fn pop_interest(&mut self, event: &Event) -> usize { + if event.readable { + if let Some(user_data) = self.read_queue.pop_front() { + return user_data; + } + } + if event.writable { + if let Some(user_data) = self.write_queue.pop_front() { + return user_data; + } + } + unreachable!("should receive event when no interest") + } } /// Low-level driver of polling. pub(crate) struct Driver { events: Events, poll: Poller, + registry: HashMap, cancelled: HashSet, - cancel_queue: VecDeque, } impl Driver { @@ -86,25 +129,33 @@ impl Driver { Ok(Self { events, poll: Poller::new()?, + registry: HashMap::new(), cancelled: HashSet::new(), - cancel_queue: VecDeque::new(), }) } -} -impl Driver { - fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result<()> { + fn submit(&mut self, user_data: usize, arg: WaitArg) -> io::Result { if self.cancelled.remove(&user_data) { - self.cancel_queue.push_back(user_data); + Ok(false) } else { - let mut event = Event::none(user_data); - event.readable = arg.readable; - event.writable = arg.writable; + let need_add = !self.registry.contains_key(&arg.fd); + let queue = self + .registry + .get_mut(&arg.fd) + .expect("the fd should be attached"); + queue.push_interest(user_data, arg.interest); + // We use fd as the key. + let event = queue.event(arg.fd as usize); unsafe { - self.poll.add(arg.fd, event)?; + if need_add { + self.poll.add(arg.fd, event)?; + } else { + let fd = BorrowedFd::borrow_raw(arg.fd); + self.poll.modify(fd, event)?; + } } + Ok(true) } - Ok(()) } /// Register all operations in the squeue to polling. @@ -119,7 +170,11 @@ impl Driver { let op = registry[user_data].as_pin(); match op.pre_submit() { Ok(Decision::Wait(arg)) => { - self.submit(user_data, arg)?; + let succeeded = self.submit(user_data, arg)?; + if !succeeded { + entries.extend(Some(entry_cancelled(user_data))); + extended = true; + } } Ok(Decision::Completed(res)) => { entries.extend(Some(Entry::new(user_data, Ok(res)))); @@ -148,36 +203,35 @@ impl Driver { return Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)); } for event in self.events.iter() { - if self.cancelled.remove(&event.key) { - self.cancel_queue.push_back(event.key); + let fd = event.key as RawFd; + let queue = self + .registry + .get_mut(&fd) + .expect("the fd should be attached"); + let user_data = queue.pop_interest(&event); + let renew_event = queue.event(fd as _); + unsafe { + let fd = BorrowedFd::borrow_raw(fd); + self.poll.modify(fd, renew_event)?; + } + if self.cancelled.remove(&user_data) { + entries.extend(Some(entry_cancelled(user_data))); } else { - let op = registry[event.key].as_pin(); + let op = registry[user_data].as_pin(); let res = match op.on_event(&event) { Ok(ControlFlow::Continue(_)) => continue, Ok(ControlFlow::Break(res)) => Ok(res), Err(err) => Err(err), }; - let entry = Entry::new(event.key, res); + let entry = Entry::new(user_data, res); entries.extend(Some(entry)); } } Ok(()) } - fn poll_cancel(&mut self, entries: &mut impl Extend) -> bool { - let has_cancel = !self.cancel_queue.is_empty(); - if has_cancel { - entries.extend(self.cancel_queue.drain(..).map(|user_data| { - Entry::new( - user_data, - Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)), - ) - })) - } - has_cancel - } - - pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> { + pub fn attach(&mut self, fd: RawFd) -> io::Result<()> { + self.registry.entry(fd).or_default(); Ok(()) } @@ -192,8 +246,7 @@ impl Driver { entries: &mut impl Extend, registry: &mut Slab, ) -> io::Result<()> { - let mut extended = self.submit_squeue(ops, entries, registry)?; - extended |= self.poll_cancel(entries); + let extended = self.submit_squeue(ops, entries, registry)?; if !extended { self.poll_impl(timeout, entries, registry)?; } @@ -206,3 +259,21 @@ impl AsRawFd for Driver { self.poll.as_raw_fd() } } + +impl Drop for Driver { + fn drop(&mut self) { + for fd in self.registry.keys() { + unsafe { + let fd = BorrowedFd::borrow_raw(*fd); + self.poll.delete(fd).ok(); + } + } + } +} + +fn entry_cancelled(user_data: usize) -> Entry { + Entry::new( + user_data, + Err(io::Error::from_raw_os_error(libc::ETIMEDOUT)), + ) +} From ad28825993b5bcab3fe8f595ae6e37ddb39f8eb7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Fri, 22 Sep 2023 17:39:10 +0800 Subject: [PATCH 4/5] Fix attach. --- src/driver/mod.rs | 3 +-- src/driver/poll/mod.rs | 8 ++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/src/driver/mod.rs b/src/driver/mod.rs index a4a6acda..c913fa9f 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -118,8 +118,7 @@ impl Proactor { /// attached to one driver, and could only be attached once, even if you /// `try_clone` it. It will cause unexpected result to attach the handle /// with one driver and push an op to another driver. - /// * io-uring: it will do nothing and return `Ok(())`. - /// * polling: it will initialize the internal interest queue for the fd. + /// * io-uring/polling: it will do nothing and return `Ok(())`. pub fn attach(&mut self, fd: RawFd) -> io::Result<()> { self.driver.attach(fd) } diff --git a/src/driver/poll/mod.rs b/src/driver/poll/mod.rs index 96378077..3478b0cf 100644 --- a/src/driver/poll/mod.rs +++ b/src/driver/poll/mod.rs @@ -139,10 +139,7 @@ impl Driver { Ok(false) } else { let need_add = !self.registry.contains_key(&arg.fd); - let queue = self - .registry - .get_mut(&arg.fd) - .expect("the fd should be attached"); + let queue = self.registry.entry(arg.fd).or_default(); queue.push_interest(user_data, arg.interest); // We use fd as the key. let event = queue.event(arg.fd as usize); @@ -230,8 +227,7 @@ impl Driver { Ok(()) } - pub fn attach(&mut self, fd: RawFd) -> io::Result<()> { - self.registry.entry(fd).or_default(); + pub fn attach(&mut self, _fd: RawFd) -> io::Result<()> { Ok(()) } From e311230490f457550616e3942569225679bedd19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=8B=E5=AE=87=E9=80=B8?= Date: Fri, 22 Sep 2023 17:54:23 +0800 Subject: [PATCH 5/5] Push back the incompleted operation. --- src/driver/poll/mod.rs | 45 +++++++++++++++++++++++++++--------------- 1 file changed, 29 insertions(+), 16 deletions(-) diff --git a/src/driver/poll/mod.rs b/src/driver/poll/mod.rs index 3478b0cf..8fa2f5bd 100644 --- a/src/driver/poll/mod.rs +++ b/src/driver/poll/mod.rs @@ -80,13 +80,20 @@ struct FdQueue { } impl FdQueue { - pub fn push_interest(&mut self, user_data: usize, interest: Interest) { + pub fn push_back_interest(&mut self, user_data: usize, interest: Interest) { match interest { Interest::Readable => self.read_queue.push_back(user_data), Interest::Writable => self.write_queue.push_back(user_data), } } + pub fn push_front_interest(&mut self, user_data: usize, interest: Interest) { + match interest { + Interest::Readable => self.read_queue.push_front(user_data), + Interest::Writable => self.write_queue.push_front(user_data), + } + } + pub fn event(&self, key: usize) -> Event { let mut event = Event::all(key); event.readable = !self.read_queue.is_empty(); @@ -94,15 +101,15 @@ impl FdQueue { event } - pub fn pop_interest(&mut self, event: &Event) -> usize { + pub fn pop_interest(&mut self, event: &Event) -> (usize, Interest) { if event.readable { if let Some(user_data) = self.read_queue.pop_front() { - return user_data; + return (user_data, Interest::Readable); } } if event.writable { if let Some(user_data) = self.write_queue.pop_front() { - return user_data; + return (user_data, Interest::Writable); } } unreachable!("should receive event when no interest") @@ -140,7 +147,7 @@ impl Driver { } else { let need_add = !self.registry.contains_key(&arg.fd); let queue = self.registry.entry(arg.fd).or_default(); - queue.push_interest(user_data, arg.interest); + queue.push_back_interest(user_data, arg.interest); // We use fd as the key. let event = queue.event(arg.fd as usize); unsafe { @@ -205,23 +212,29 @@ impl Driver { .registry .get_mut(&fd) .expect("the fd should be attached"); - let user_data = queue.pop_interest(&event); - let renew_event = queue.event(fd as _); - unsafe { - let fd = BorrowedFd::borrow_raw(fd); - self.poll.modify(fd, renew_event)?; - } + let (user_data, interest) = queue.pop_interest(&event); if self.cancelled.remove(&user_data) { entries.extend(Some(entry_cancelled(user_data))); } else { let op = registry[user_data].as_pin(); let res = match op.on_event(&event) { - Ok(ControlFlow::Continue(_)) => continue, - Ok(ControlFlow::Break(res)) => Ok(res), - Err(err) => Err(err), + Ok(ControlFlow::Continue(_)) => { + // The operation should go back to the front. + queue.push_front_interest(user_data, interest); + None + } + Ok(ControlFlow::Break(res)) => Some(Ok(res)), + Err(err) => Some(Err(err)), }; - let entry = Entry::new(user_data, res); - entries.extend(Some(entry)); + if let Some(res) = res { + let entry = Entry::new(user_data, res); + entries.extend(Some(entry)); + } + } + let renew_event = queue.event(fd as _); + unsafe { + let fd = BorrowedFd::borrow_raw(fd); + self.poll.modify(fd, renew_event)?; } } Ok(())