Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 15 additions & 93 deletions compio-runtime/src/runtime/mod.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
use std::{
any::Any,
cell::{Cell, RefCell},
collections::{HashSet, VecDeque},
collections::HashSet,
future::{Future, ready},
io,
marker::PhantomData,
panic::AssertUnwindSafe,
rc::Rc,
sync::Arc,
task::{Context, Poll},
time::Duration,
};

use async_task::{Runnable, Task};
use async_task::Task;
use compio_buf::IntoInner;
use compio_driver::{
AsRawFd, DriverType, Key, NotifyHandle, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd,
op::Asyncify,
AsRawFd, DriverType, Key, OpCode, Proactor, ProactorBuilder, PushEntry, RawFd, op::Asyncify,
};
use compio_log::{debug, instrument};
use crossbeam_queue::SegQueue;
use futures_util::{FutureExt, future::Either};

pub(crate) mod op;
Expand All @@ -29,76 +24,22 @@ pub(crate) mod time;
mod buffer_pool;
pub use buffer_pool::*;

mod send_wrapper;
use send_wrapper::SendWrapper;
mod scheduler;

#[cfg(feature = "time")]
use crate::runtime::time::{TimerFuture, TimerKey, TimerRuntime};
use crate::{BufResult, affinity::bind_to_cpu_set, runtime::op::OpFuture};
use crate::{
BufResult,
affinity::bind_to_cpu_set,
runtime::{op::OpFuture, scheduler::Scheduler},
};

scoped_tls::scoped_thread_local!(static CURRENT_RUNTIME: Runtime);

/// Type alias for `Task<Result<T, Box<dyn Any + Send>>>`, which resolves to an
/// `Err` when the spawned future panicked.
pub type JoinHandle<T> = Task<Result<T, Box<dyn Any + Send>>>;

struct RunnableQueue {
local_runnables: SendWrapper<RefCell<VecDeque<Runnable>>>,
sync_runnables: SegQueue<Runnable>,
}

impl RunnableQueue {
pub fn new() -> Self {
Self {
local_runnables: SendWrapper::new(RefCell::new(VecDeque::new())),
sync_runnables: SegQueue::new(),
}
}

pub fn schedule(&self, runnable: Runnable, handle: &NotifyHandle) {
if let Some(runnables) = self.local_runnables.get() {
runnables.borrow_mut().push_back(runnable);
#[cfg(feature = "notify-always")]
handle.notify().ok();
} else {
self.sync_runnables.push(runnable);
handle.notify().ok();
}
}

/// SAFETY: call in the main thread
pub unsafe fn run(&self, event_interval: usize) -> bool {
let local_runnables = self.local_runnables.get_unchecked();

for _ in 0..event_interval {
let local_task = local_runnables.borrow_mut().pop_front();

// Perform an empty check as a fast path, since `pop()` is more expensive.
let sync_task = if self.sync_runnables.is_empty() {
None
} else {
self.sync_runnables.pop()
};

match (local_task, sync_task) {
(Some(local), Some(sync)) => {
local.run();
sync.run();
}
(Some(local), None) => {
local.run();
}
(None, Some(sync)) => {
sync.run();
}
(None, None) => break,
}
}

!(local_runnables.borrow().is_empty() && self.sync_runnables.is_empty())
}
}

thread_local! {
static RUNTIME_ID: Cell<u64> = const { Cell::new(0) };
}
Expand All @@ -107,10 +48,9 @@ thread_local! {
/// sent to other threads.
pub struct Runtime {
driver: RefCell<Proactor>,
runnables: Arc<RunnableQueue>,
scheduler: Scheduler,
#[cfg(feature = "time")]
timer_runtime: RefCell<TimerRuntime>,
event_interval: usize,
// Runtime id is used to check if the buffer pool is belonged to this runtime or not.
// Without this, if user enable `io-uring-buf-ring` feature then:
// 1. Create a buffer pool at runtime1
Expand All @@ -119,9 +59,6 @@ pub struct Runtime {
// - buffer pool will return a wrong buffer which the buffer's data is uninit, that will cause
// UB
id: u64,
// Other fields don't make it !Send, but actually `local_runnables` implies it should be !Send,
// otherwise it won't be valid if the runtime is sent to other threads.
_p: PhantomData<Rc<VecDeque<Runnable>>>,
}

impl Runtime {
Expand All @@ -148,12 +85,10 @@ impl Runtime {
}
Ok(Self {
driver: RefCell::new(proactor_builder.build()?),
runnables: Arc::new(RunnableQueue::new()),
scheduler: Scheduler::new(*event_interval),
#[cfg(feature = "time")]
timer_runtime: RefCell::new(TimerRuntime::new()),
event_interval: *event_interval,
id,
_p: PhantomData,
})
}

Expand Down Expand Up @@ -202,22 +137,10 @@ impl Runtime {
///
/// The caller should ensure the captured lifetime long enough.
pub unsafe fn spawn_unchecked<F: Future>(&self, future: F) -> Task<F::Output> {
let schedule = {
// Use `Weak` to break reference cycle.
// `RunnableQueue` -> `Runnable` -> `RunnableQueue`
let runnables = Arc::downgrade(&self.runnables);
let handle = self.driver.borrow().handle();

move |runnable| {
if let Some(runnables) = runnables.upgrade() {
runnables.schedule(runnable, &handle);
}
}
};
let notify = self.driver.borrow().handle();

let (runnable, task) = async_task::spawn_unchecked(future, schedule);
runnable.schedule();
task
// SAFETY: See the safety comment of this method.
unsafe { self.scheduler.spawn_unchecked(future, notify) }
}

/// Low level API to control the runtime.
Expand All @@ -226,8 +149,7 @@ impl Runtime {
///
/// The return value indicates whether there are still tasks in the queue.
pub fn run(&self) -> bool {
// SAFETY: self is !Send + !Sync.
unsafe { self.runnables.run(self.event_interval) }
self.scheduler.run()
}

/// Block on the future till it completes.
Expand Down
45 changes: 45 additions & 0 deletions compio-runtime/src/runtime/scheduler/local_queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use std::{cell::UnsafeCell, collections::VecDeque};

/// A queue that is `!Sync` with interior mutability.
pub(crate) struct LocalQueue<T> {
queue: UnsafeCell<VecDeque<T>>,
}

impl<T> LocalQueue<T> {
/// Creates an empty `LocalQueue`.
pub(crate) const fn new() -> Self {
Self {
queue: UnsafeCell::new(VecDeque::new()),
}
}

/// Pushes an item to the back of the queue.
pub(crate) fn push(&self, item: T) {
// SAFETY:
// Exclusive mutable access because:
// - The mutable reference is created and used immediately within this scope.
// - `LocalQueue` is `!Sync`, so no other threads can access it concurrently.
let queue = unsafe { &mut *self.queue.get() };
queue.push_back(item);
}

/// Pops an item from the front of the queue, returning `None` if empty.
pub(crate) fn pop(&self) -> Option<T> {
// SAFETY:
// Exclusive mutable access because:
// - The mutable reference is created and used immediately within this scope.
// - `LocalQueue` is `!Sync`, so no other threads can access it concurrently.
let queue = unsafe { &mut *self.queue.get() };
queue.pop_front()
}

/// Returns `true` if the queue is empty.
pub(crate) fn is_empty(&self) -> bool {
// SAFETY:
// Exclusive mutable access because:
// - The mutable reference is created and used immediately within this scope.
// - `LocalQueue` is `!Sync`, so no other threads can access it concurrently.
let queue = unsafe { &mut *self.queue.get() };
queue.is_empty()
}
}
154 changes: 154 additions & 0 deletions compio-runtime/src/runtime/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use crate::runtime::scheduler::{local_queue::LocalQueue, send_wrapper::SendWrapper};
use async_task::{Runnable, Task};
use compio_driver::NotifyHandle;
use crossbeam_queue::SegQueue;
use std::{future::Future, marker::PhantomData, sync::Arc};

mod local_queue;
mod send_wrapper;

/// A task queue consisting of a local queue and a synchronized queue.
struct TaskQueue {
local_queue: SendWrapper<LocalQueue<Runnable>>,
sync_queue: SegQueue<Runnable>,
}

impl TaskQueue {
/// Creates a new `TaskQueue`.
fn new() -> Self {
Self {
local_queue: SendWrapper::new(LocalQueue::new()),
sync_queue: SegQueue::new(),
}
}

/// Pushes a `Runnable` task to the appropriate queue.
///
/// If the current thread is the same as the creator thread, push to the local queue.
/// Otherwise, push to the sync queue.
fn push(&self, runnable: Runnable, notify: &NotifyHandle) {
if let Some(local_queue) = self.local_queue.get() {
local_queue.push(runnable);
#[cfg(feature = "notify-always")]
notify.notify().ok();
} else {
self.sync_queue.push(runnable);
notify.notify().ok();
}
}

/// Pops at most one task from each queue and returns them as `(local_task, sync_task)`.
///
/// # Safety
///
/// Call this method in the same thread as the creator.
unsafe fn pop(&self) -> (Option<Runnable>, Option<Runnable>) {
// SAFETY: See the safety comment of this method.
let local_queue = unsafe { self.local_queue.get_unchecked() };

let local_task = local_queue.pop();

// Perform an empty check as a fast path, since `SegQueue::pop()` is more expensive.
let sync_task = if self.sync_queue.is_empty() {
None
} else {
self.sync_queue.pop()
};

(local_task, sync_task)
}

/// Returns `true` if both queues are empty.
///
/// # Safety
///
/// Call this method in the same thread as the creator.
unsafe fn is_empty(&self) -> bool {
// SAFETY: See the safety comment of this method.
let local_queue = unsafe { self.local_queue.get_unchecked() };
local_queue.is_empty() && self.sync_queue.is_empty()
}
}

/// A scheduler for managing and executing tasks.
pub(crate) struct Scheduler {
task_queue: Arc<TaskQueue>,
event_interval: usize,
// `Scheduler` is `!Send` and `!Sync`.
_local_marker: PhantomData<*const ()>,
}

impl Scheduler {
/// Creates a new `Scheduler`.
pub(crate) fn new(event_interval: usize) -> Self {
Self {
task_queue: Arc::new(TaskQueue::new()),
event_interval,
_local_marker: PhantomData,
}
}

/// Spawns a new asynchronous task, returning a [`Task`] for it.
///
/// # Safety
///
/// The caller should ensure the captured lifetime long enough.
pub(crate) unsafe fn spawn_unchecked<F>(
&self,
future: F,
notify: NotifyHandle,
) -> Task<F::Output>
where
F: Future,
{
let schedule = {
// Use `Weak` to break reference cycle.
// `TaskQueue` -> `Runnable` -> `TaskQueue`
let task_queue = Arc::downgrade(&self.task_queue);

move |runnable| {
if let Some(task_queue) = task_queue.upgrade() {
task_queue.push(runnable, &notify);
}
}
};

let (runnable, task) = async_task::spawn_unchecked(future, schedule);
runnable.schedule();
task
}

/// Run the scheduled tasks.
///
/// The return value indicates whether there are still tasks in the queue.
pub(crate) fn run(&self) -> bool {
for _ in 0..self.event_interval {
// SAFETY:
// `Scheduler` is `!Send` and `!Sync`, so this method is only called
// on `TaskQueue`'s creator thread.
let tasks = unsafe { self.task_queue.pop() };

// Run the tasks, which will poll the futures.
// Since spawned tasks are not required to be `Send`, they must always be polled
// on the same thread. Because `Scheduler` is `!Send` and `!Sync`, this is safe.
match tasks {
(Some(local), Some(sync)) => {
local.run();
sync.run();
}
(Some(local), None) => {
local.run();
}
(None, Some(sync)) => {
sync.run();
}
(None, None) => break,
}
}

// SAFETY:
// `Scheduler` is `!Send` and `!Sync`, so this method is only called
// on `TaskQueue`'s creator thread.
!unsafe { self.task_queue.is_empty() }
}
}
Loading