Skip to content

Commit

Permalink
rt: move scheduler ctxs to runtime::context (tokio-rs#5727)
Browse files Browse the repository at this point in the history
This commit eliminates the current_thread::CURRENT and multi_thread::current
thread-local variables in favor of using `runtime::context`. This is another step
towards reducing the total number of thread-local variables used by Tokio.
  • Loading branch information
carllerche committed May 27, 2023
1 parent d274ef3 commit 9f9db7d
Show file tree
Hide file tree
Showing 8 changed files with 199 additions and 136 deletions.
4 changes: 0 additions & 4 deletions tokio/src/macros/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,6 @@ cfg_trace! {
mod trace;
}

#[macro_use]
#[cfg(feature = "rt")]
pub(crate) mod scoped_tls;

cfg_macros! {
#[macro_use]
mod select;
Expand Down
77 changes: 0 additions & 77 deletions tokio/src/macros/scoped_tls.rs

This file was deleted.

21 changes: 21 additions & 0 deletions tokio/src/runtime/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ use std::cell::Cell;
use crate::util::rand::{FastRand, RngSeed};

cfg_rt! {
mod scoped;
use scoped::Scoped;

use crate::runtime::{scheduler, task::Id, Defer};

use std::cell::RefCell;
Expand All @@ -27,6 +30,10 @@ struct Context {
#[cfg(feature = "rt")]
handle: RefCell<Option<scheduler::Handle>>,

/// Handle to the scheduler's internal "context"
#[cfg(feature = "rt")]
scheduler: Scoped<scheduler::Context>,

#[cfg(feature = "rt")]
current_task_id: Cell<Option<Id>>,

Expand Down Expand Up @@ -70,6 +77,11 @@ tokio_thread_local! {
/// accessing drivers, etc...
#[cfg(feature = "rt")]
handle: RefCell::new(None),

/// Tracks the current scheduler internal context
#[cfg(feature = "rt")]
scheduler: Scoped::new(),

#[cfg(feature = "rt")]
current_task_id: Cell::new(None),

Expand Down Expand Up @@ -287,6 +299,15 @@ cfg_rt! {
})
}

pub(super) fn set_scheduler<R>(v: &scheduler::Context, f: impl FnOnce() -> R) -> R {
CONTEXT.with(|c| c.scheduler.set(v, f))
}

#[track_caller]
pub(super) fn with_scheduler<R>(f: impl FnOnce(Option<&scheduler::Context>) -> R) -> R {
CONTEXT.with(|c| c.scheduler.with(f))
}

impl Context {
fn set_current(&self, handle: &scheduler::Handle) -> SetCurrentGuard {
let rng_seed = handle.seed_generator().next_seed();
Expand Down
58 changes: 58 additions & 0 deletions tokio/src/runtime/context/scoped.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use std::cell::Cell;
use std::ptr;

/// Scoped thread-local storage
pub(super) struct Scoped<T> {
pub(super) inner: Cell<*const T>,
}

unsafe impl<T> Sync for Scoped<T> {}

impl<T> Scoped<T> {
pub(super) fn new() -> Scoped<T> {
Scoped {
inner: Cell::new(ptr::null()),
}
}

/// Inserts a value into the scoped cell for the duration of the closure
pub(super) fn set<F, R>(&self, t: &T, f: F) -> R
where
F: FnOnce() -> R,
{
struct Reset<'a, T> {
cell: &'a Cell<*const T>,
prev: *const T,
}

impl<T> Drop for Reset<'_, T> {
fn drop(&mut self) {
self.cell.set(self.prev);
}
}

let prev = self.inner.get();
self.inner.set(t as *const _);

let _reset = Reset {
cell: &self.inner,
prev,
};

f()
}

/// Gets the value out of the scoped cell;
pub(super) fn with<F, R>(&self, f: F) -> R
where
F: FnOnce(Option<&T>) -> R,
{
let val = self.inner.get();

if val.is_null() {
f(None)
} else {
unsafe { f(Some(&*(val as *const T))) }
}
}
}
115 changes: 71 additions & 44 deletions tokio/src/runtime/scheduler/current_thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ struct Shared {
}

/// Thread-local context.
struct Context {
///
/// pub(crate) to store in `runtime::context`.
pub(crate) struct Context {
/// Scheduler handle
handle: Arc<Handle>,

Expand All @@ -100,9 +102,6 @@ type Notified = task::Notified<Arc<Handle>>;
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

// Tracks the current CurrentThread.
scoped_thread_local!(static CURRENT: Context);

impl CurrentThread {
pub(crate) fn new(
driver: Driver,
Expand Down Expand Up @@ -185,10 +184,10 @@ impl CurrentThread {
let core = self.core.take()?;

Some(CoreGuard {
context: Context {
context: scheduler::Context::CurrentThread(Context {
handle: handle.clone(),
core: RefCell::new(Some(core)),
},
}),
scheduler: self,
})
}
Expand All @@ -205,39 +204,58 @@ impl CurrentThread {
None => panic!("Oh no! We never placed the Core back, this is a bug!"),
};

core.enter(|mut core, _context| {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
handle.shared.owned.close_and_shutdown_all();
// Check that the thread-local is not being destroyed
let tls_available = context::with_current(|_| ()).is_ok();

// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.next_local_task(handle) {
drop(task);
}
if tls_available {
core.enter(|core, _context| {
let core = shutdown2(core, handle);
(core, ())
});
} else {
// Shutdown without setting the context. `tokio::spawn` calls will
// fail, but those will fail either way because the thread-local is
// not available anymore.
let context = core.context.expect_current_thread();
let core = context.core.borrow_mut().take().unwrap();

let core = shutdown2(core, handle);
*context.core.borrow_mut() = Some(core);
}
}
}

// Close the injection queue
handle.shared.inject.close();
fn shutdown2(mut core: Box<Core>, handle: &Handle) -> Box<Core> {
// Drain the OwnedTasks collection. This call also closes the
// collection, ensuring that no tasks are ever pushed after this
// call returns.
handle.shared.owned.close_and_shutdown_all();

// Drain remote queue
while let Some(task) = handle.shared.inject.pop() {
drop(task);
}
// Drain local queue
// We already shut down every task, so we just need to drop the task.
while let Some(task) = core.next_local_task(handle) {
drop(task);
}

assert!(handle.shared.owned.is_empty());
// Close the injection queue
handle.shared.inject.close();

// Submit metrics
core.submit_metrics(handle);
// Drain remote queue
while let Some(task) = handle.shared.inject.pop() {
drop(task);
}

// Shutdown the resource drivers
if let Some(driver) = core.driver.as_mut() {
driver.shutdown(&handle.driver);
}
assert!(handle.shared.owned.is_empty());

(core, ())
});
// Submit metrics
core.submit_metrics(handle);

// Shutdown the resource drivers
if let Some(driver) = core.driver.as_mut() {
driver.shutdown(&handle.driver);
}

core
}

impl fmt::Debug for CurrentThread {
Expand Down Expand Up @@ -425,10 +443,10 @@ impl Handle {
let mut traces = vec![];

// todo: how to make this work outside of a runtime context?
CURRENT.with(|maybe_context| {
context::with_scheduler(|maybe_context| {
// drain the local queue
let context = if let Some(context) = maybe_context {
context
context.expect_current_thread()
} else {
return;
};
Expand Down Expand Up @@ -522,8 +540,10 @@ impl Schedule for Arc<Handle> {
}

fn schedule(&self, task: task::Notified<Self>) {
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.handle) => {
use scheduler::Context::CurrentThread;

context::with_scheduler(|maybe_cx| match maybe_cx {
Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
let mut core = cx.core.borrow_mut();

// If `None`, the runtime is shutting down, so there is no need
Expand Down Expand Up @@ -552,11 +572,14 @@ impl Schedule for Arc<Handle> {
// Do nothing
}
UnhandledPanic::ShutdownRuntime => {
use scheduler::Context::CurrentThread;

// This hook is only called from within the runtime, so
// `CURRENT` should match with `&self`, i.e. there is no
// opportunity for a nested scheduler to be called.
CURRENT.with(|maybe_cx| match maybe_cx {
Some(cx) if Arc::ptr_eq(self, &cx.handle) => {
// `context::with_scheduler` should match with `&self`, i.e.
// there is no opportunity for a nested scheduler to be
// called.
context::with_scheduler(|maybe_cx| match maybe_cx {
Some(CurrentThread(cx)) if Arc::ptr_eq(self, &cx.handle) => {
let mut core = cx.core.borrow_mut();

// If `None`, the runtime is shutting down, so there is no need to signal shutdown
Expand Down Expand Up @@ -590,7 +613,7 @@ impl Wake for Handle {
/// Used to ensure we always place the `Core` value back into its slot in
/// `CurrentThread`, even if the future panics.
struct CoreGuard<'a> {
context: Context,
context: scheduler::Context,
scheduler: &'a CurrentThread,
}

Expand Down Expand Up @@ -672,21 +695,25 @@ impl CoreGuard<'_> {
where
F: FnOnce(Box<Core>, &Context) -> (Box<Core>, R),
{
let context = self.context.expect_current_thread();

// Remove `core` from `context` to pass into the closure.
let core = self.context.core.borrow_mut().take().expect("core missing");
let core = context.core.borrow_mut().take().expect("core missing");

// Call the closure and place `core` back
let (core, ret) = CURRENT.set(&self.context, || f(core, &self.context));
let (core, ret) = context::set_scheduler(&self.context, || f(core, context));

*self.context.core.borrow_mut() = Some(core);
*context.core.borrow_mut() = Some(core);

ret
}
}

impl Drop for CoreGuard<'_> {
fn drop(&mut self) {
if let Some(core) = self.context.core.borrow_mut().take() {
let context = self.context.expect_current_thread();

if let Some(core) = context.core.borrow_mut().take() {
// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
self.scheduler.core.set(core);
Expand Down
Loading

0 comments on commit 9f9db7d

Please sign in to comment.