diff --git a/compio-runtime/src/runtime/mod.rs b/compio-runtime/src/runtime/mod.rs index 30517d0c..70d3d653 100644 --- a/compio-runtime/src/runtime/mod.rs +++ b/compio-runtime/src/runtime/mod.rs @@ -1,25 +1,20 @@ use std::{ any::Any, cell::{Cell, RefCell}, - collections::{HashSet, VecDeque}, + collections::HashSet, future::{Future, ready}, io, - marker::PhantomData, panic::AssertUnwindSafe, - rc::Rc, - sync::Arc, task::{Context, Poll}, time::Duration, }; -use async_task::{Runnable, Task}; +use async_task::Task; use compio_buf::IntoInner; use compio_driver::{ - AsRawFd, DriverType, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd, - op::Asyncify, + AsRawFd, DriverType, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd, op::Asyncify, }; use compio_log::{debug, instrument}; -use crossbeam_queue::SegQueue; use futures_util::{FutureExt, future::Either}; pub(crate) mod op; @@ -29,12 +24,15 @@ pub(crate) mod time; mod buffer_pool; pub use buffer_pool::*; -mod send_wrapper; -use send_wrapper::SendWrapper; +mod scheduler; #[cfg(feature = "time")] use crate::runtime::time::{TimerFuture, TimerKey, TimerRuntime}; -use crate::{BufResult, affinity::bind_to_cpu_set, runtime::op::OpFuture}; +use crate::{ + BufResult, + affinity::bind_to_cpu_set, + runtime::{op::OpFuture, scheduler::Scheduler}, +}; scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); @@ -42,63 +40,6 @@ scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime); /// `Err` when the spawned future panicked. pub type JoinHandle = Task>>; -struct RunnableQueue { - local_runnables: SendWrapper>>, - sync_runnables: SegQueue, -} - -impl RunnableQueue { - pub fn new() -> Self { - Self { - local_runnables: SendWrapper::new(RefCell::new(VecDeque::new())), - sync_runnables: SegQueue::new(), - } - } - - pub fn schedule(&self, runnable: Runnable, handle: &NotifyHandle) { - if let Some(runnables) = self.local_runnables.get() { - runnables.borrow_mut().push_back(runnable); - #[cfg(feature = "notify-always")] - handle.notify().ok(); - } else { - self.sync_runnables.push(runnable); - handle.notify().ok(); - } - } - - /// SAFETY: call in the main thread - pub unsafe fn run(&self, event_interval: usize) -> bool { - let local_runnables = self.local_runnables.get_unchecked(); - - for _ in 0..event_interval { - let local_task = local_runnables.borrow_mut().pop_front(); - - // Perform an empty check as a fast path, since `pop()` is more expensive. - let sync_task = if self.sync_runnables.is_empty() { - None - } else { - self.sync_runnables.pop() - }; - - match (local_task, sync_task) { - (Some(local), Some(sync)) => { - local.run(); - sync.run(); - } - (Some(local), None) => { - local.run(); - } - (None, Some(sync)) => { - sync.run(); - } - (None, None) => break, - } - } - - !(local_runnables.borrow().is_empty() && self.sync_runnables.is_empty()) - } -} - thread_local! { static RUNTIME_ID: Cell = const { Cell::new(0) }; } @@ -107,10 +48,9 @@ thread_local! { /// sent to other threads. pub struct Runtime { driver: RefCell, - runnables: Arc, + scheduler: Scheduler, #[cfg(feature = "time")] timer_runtime: RefCell, - event_interval: usize, // Runtime id is used to check if the buffer pool is belonged to this runtime or not. // Without this, if user enable `io-uring-buf-ring` feature then: // 1. Create a buffer pool at runtime1 @@ -119,9 +59,6 @@ pub struct Runtime { // - buffer pool will return a wrong buffer which the buffer's data is uninit, that will cause // UB id: u64, - // Other fields don't make it !Send, but actually `local_runnables` implies it should be !Send, - // otherwise it won't be valid if the runtime is sent to other threads. - _p: PhantomData>>, } impl Runtime { @@ -148,12 +85,10 @@ impl Runtime { } Ok(Self { driver: RefCell::new(proactor_builder.build()?), - runnables: Arc::new(RunnableQueue::new()), + scheduler: Scheduler::new(*event_interval), #[cfg(feature = "time")] timer_runtime: RefCell::new(TimerRuntime::new()), - event_interval: *event_interval, id, - _p: PhantomData, }) } @@ -202,22 +137,10 @@ impl Runtime { /// /// The caller should ensure the captured lifetime long enough. pub unsafe fn spawn_unchecked(&self, future: F) -> Task { - let schedule = { - // Use `Weak` to break reference cycle. - // `RunnableQueue` -> `Runnable` -> `RunnableQueue` - let runnables = Arc::downgrade(&self.runnables); - let handle = self.driver.borrow().handle(); - - move |runnable| { - if let Some(runnables) = runnables.upgrade() { - runnables.schedule(runnable, &handle); - } - } - }; + let notify = self.driver.borrow().handle(); - let (runnable, task) = async_task::spawn_unchecked(future, schedule); - runnable.schedule(); - task + // SAFETY: See the safety comment of this method. + unsafe { self.scheduler.spawn_unchecked(future, notify) } } /// Low level API to control the runtime. @@ -226,8 +149,7 @@ impl Runtime { /// /// The return value indicates whether there are still tasks in the queue. pub fn run(&self) -> bool { - // SAFETY: self is !Send + !Sync. - unsafe { self.runnables.run(self.event_interval) } + self.scheduler.run() } /// Block on the future till it completes. diff --git a/compio-runtime/src/runtime/scheduler/local_queue.rs b/compio-runtime/src/runtime/scheduler/local_queue.rs new file mode 100644 index 00000000..6bdb164c --- /dev/null +++ b/compio-runtime/src/runtime/scheduler/local_queue.rs @@ -0,0 +1,45 @@ +use std::{cell::UnsafeCell, collections::VecDeque}; + +/// A queue that is `!Sync` with interior mutability. +pub(crate) struct LocalQueue { + queue: UnsafeCell>, +} + +impl LocalQueue { + /// Creates an empty `LocalQueue`. + pub(crate) const fn new() -> Self { + Self { + queue: UnsafeCell::new(VecDeque::new()), + } + } + + /// Pushes an item to the back of the queue. + pub(crate) fn push(&self, item: T) { + // SAFETY: + // Exclusive mutable access because: + // - The mutable reference is created and used immediately within this scope. + // - `LocalQueue` is `!Sync`, so no other threads can access it concurrently. + let queue = unsafe { &mut *self.queue.get() }; + queue.push_back(item); + } + + /// Pops an item from the front of the queue, returning `None` if empty. + pub(crate) fn pop(&self) -> Option { + // SAFETY: + // Exclusive mutable access because: + // - The mutable reference is created and used immediately within this scope. + // - `LocalQueue` is `!Sync`, so no other threads can access it concurrently. + let queue = unsafe { &mut *self.queue.get() }; + queue.pop_front() + } + + /// Returns `true` if the queue is empty. + pub(crate) fn is_empty(&self) -> bool { + // SAFETY: + // Exclusive mutable access because: + // - The mutable reference is created and used immediately within this scope. + // - `LocalQueue` is `!Sync`, so no other threads can access it concurrently. + let queue = unsafe { &mut *self.queue.get() }; + queue.is_empty() + } +} diff --git a/compio-runtime/src/runtime/scheduler/mod.rs b/compio-runtime/src/runtime/scheduler/mod.rs new file mode 100644 index 00000000..4a03e1bb --- /dev/null +++ b/compio-runtime/src/runtime/scheduler/mod.rs @@ -0,0 +1,154 @@ +use crate::runtime::scheduler::{local_queue::LocalQueue, send_wrapper::SendWrapper}; +use async_task::{Runnable, Task}; +use compio_driver::NotifyHandle; +use crossbeam_queue::SegQueue; +use std::{future::Future, marker::PhantomData, sync::Arc}; + +mod local_queue; +mod send_wrapper; + +/// A task queue consisting of a local queue and a synchronized queue. +struct TaskQueue { + local_queue: SendWrapper>, + sync_queue: SegQueue, +} + +impl TaskQueue { + /// Creates a new `TaskQueue`. + fn new() -> Self { + Self { + local_queue: SendWrapper::new(LocalQueue::new()), + sync_queue: SegQueue::new(), + } + } + + /// Pushes a `Runnable` task to the appropriate queue. + /// + /// If the current thread is the same as the creator thread, push to the local queue. + /// Otherwise, push to the sync queue. + fn push(&self, runnable: Runnable, notify: &NotifyHandle) { + if let Some(local_queue) = self.local_queue.get() { + local_queue.push(runnable); + #[cfg(feature = "notify-always")] + notify.notify().ok(); + } else { + self.sync_queue.push(runnable); + notify.notify().ok(); + } + } + + /// Pops at most one task from each queue and returns them as `(local_task, sync_task)`. + /// + /// # Safety + /// + /// Call this method in the same thread as the creator. + unsafe fn pop(&self) -> (Option, Option) { + // SAFETY: See the safety comment of this method. + let local_queue = unsafe { self.local_queue.get_unchecked() }; + + let local_task = local_queue.pop(); + + // Perform an empty check as a fast path, since `SegQueue::pop()` is more expensive. + let sync_task = if self.sync_queue.is_empty() { + None + } else { + self.sync_queue.pop() + }; + + (local_task, sync_task) + } + + /// Returns `true` if both queues are empty. + /// + /// # Safety + /// + /// Call this method in the same thread as the creator. + unsafe fn is_empty(&self) -> bool { + // SAFETY: See the safety comment of this method. + let local_queue = unsafe { self.local_queue.get_unchecked() }; + local_queue.is_empty() && self.sync_queue.is_empty() + } +} + +/// A scheduler for managing and executing tasks. +pub(crate) struct Scheduler { + task_queue: Arc, + event_interval: usize, + // `Scheduler` is `!Send` and `!Sync`. + _local_marker: PhantomData<*const ()>, +} + +impl Scheduler { + /// Creates a new `Scheduler`. + pub(crate) fn new(event_interval: usize) -> Self { + Self { + task_queue: Arc::new(TaskQueue::new()), + event_interval, + _local_marker: PhantomData, + } + } + + /// Spawns a new asynchronous task, returning a [`Task`] for it. + /// + /// # Safety + /// + /// The caller should ensure the captured lifetime long enough. + pub(crate) unsafe fn spawn_unchecked( + &self, + future: F, + notify: NotifyHandle, + ) -> Task + where + F: Future, + { + let schedule = { + // Use `Weak` to break reference cycle. + // `TaskQueue` -> `Runnable` -> `TaskQueue` + let task_queue = Arc::downgrade(&self.task_queue); + + move |runnable| { + if let Some(task_queue) = task_queue.upgrade() { + task_queue.push(runnable, ¬ify); + } + } + }; + + let (runnable, task) = async_task::spawn_unchecked(future, schedule); + runnable.schedule(); + task + } + + /// Run the scheduled tasks. + /// + /// The return value indicates whether there are still tasks in the queue. + pub(crate) fn run(&self) -> bool { + for _ in 0..self.event_interval { + // SAFETY: + // `Scheduler` is `!Send` and `!Sync`, so this method is only called + // on `TaskQueue`'s creator thread. + let tasks = unsafe { self.task_queue.pop() }; + + // Run the tasks, which will poll the futures. + // Since spawned tasks are not required to be `Send`, they must always be polled + // on the same thread. Because `Scheduler` is `!Send` and `!Sync`, this is safe. + match tasks { + (Some(local), Some(sync)) => { + local.run(); + sync.run(); + } + (Some(local), None) => { + local.run(); + } + (None, Some(sync)) => { + sync.run(); + } + (None, None) => break, + } + } + + // SAFETY: + // `Scheduler` is `!Send` and `!Sync`, so this method is only called + // on `TaskQueue`'s creator thread. + !unsafe { self.task_queue.is_empty() } + } +} diff --git a/compio-runtime/src/runtime/send_wrapper.rs b/compio-runtime/src/runtime/scheduler/send_wrapper.rs similarity index 100% rename from compio-runtime/src/runtime/send_wrapper.rs rename to compio-runtime/src/runtime/scheduler/send_wrapper.rs