Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix notifier list #2

Merged
merged 2 commits into from Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/ci.yml
Expand Up @@ -95,6 +95,12 @@ jobs:
env:
MIRIFLAGS: -Zmiri-strict-provenance

- name: Run cargo miri (ignore leaks)
run: cargo miri test --tests
env:
RUSTFLAGS: --cfg tachyonix_ignore_leaks
MIRIFLAGS: -Zmiri-strict-provenance -Zmiri-ignore-leaks

lints:
name: Lints
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -31,7 +31,7 @@ waker-fn = "1.1"
[dev-dependencies]
futures-executor = { version = "0.3", default-features = false, features = ["thread-pool"] }
futures-task = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", default-features = false, features = ["std"] }
futures-util = { version = "0.3", default-features = false, features = ["std", "async-await"] }

[[test]]
name = "integration"
Expand Down
163 changes: 100 additions & 63 deletions src/event.rs
Expand Up @@ -13,16 +13,16 @@
//! suspension, but in the context of async rust where tasks are suspended in
//! user space without kernel calls, they become sizable.
//!
//! This implementation tries to improve on all those aspects. In particular, it
//! never requires allocation. Also, because the waker is cached in a notifier
//! that is re-used by the user, waker cloning is typically only necessary the
//! first time a task needs awaiting. The underlying list of notifiers is
//! typically accessed only once for each time a waiter is blocked and once for
//! notifying, thus reducing the need for synchronization operations. Finally,
//! spurious wake-ups are only generated in very rare circumstances.
//! This implementation tries to improve on all those aspects. In particular,
//! allocated notifiers can be re-used so that allocation is only necessary once
//! for each task. Also, because the last waker is cached in the re-used
//! notifiers, waker cloning is typically only necessary the first time a task
//! needs awaiting. The underlying list of notifiers is typically accessed only
//! once for each time a waiter is blocked and once for notifying, thus reducing
//! the need for synchronization operations. Finally, spurious wake-ups are only
//! generated in very rare circumstances.

use std::future::Future;
use std::marker::PhantomData;
use std::mem;
use std::pin::Pin;
use std::ptr::NonNull;
Expand Down Expand Up @@ -72,11 +72,16 @@ impl Event {

/// Returns a future that can be `await`ed until the provided predicate is
/// satisfied.
pub(super) fn wait_until<'a, F: FnMut() -> Option<T>, T>(
&'a self,
notifier: &'a mut Notifier,
///
/// # Leaks
///
/// The Boxed notifier will be leaked unless it is later retrieved from the
/// returned `WaitUntil` with `into_boxed_notifier`.
pub(super) fn wait_until<F: FnMut() -> Option<T>, T>(
&self,
notifier: Box<Notifier>,
predicate: F,
) -> WaitUntil<'a, F, T> {
) -> WaitUntil<F, T> {
WaitUntil::new(&self.wait_set, notifier, predicate)
}
}
Expand All @@ -86,13 +91,12 @@ unsafe impl Sync for Event {}

/// A waker wrapper that can be inserted in a list.
///
/// The pervasive use of `UnsafeCell` is mainly meant to verify access
/// correctness with Loom. In theory, a notifier always has an exclusive owner
/// or borrower, except in one edge case: the `in_wait_set` flag may be read by
/// the `WaitSet::remove()` method while the notifier is concurrently accessed
/// under a Mutex by one of the `WaitSet` methods. So technically, 2 reference
/// to a `Notifier` *may* exist at the same time, but the only field that may be
/// accessed concurrently is `in_wait_set`, which is atomic.
/// In theory, a notifier always has an exclusive owner or borrower, except in
/// one edge case: the `in_wait_set` flag may be read by the `WaitSet::remove()`
/// method while the notifier is concurrently accessed under a Mutex by one of
/// the `WaitSet` methods. So technically, 2 reference to a `Notifier` *may*
/// exist at the same time, but the only field that may be accessed concurrently
/// is `in_wait_set`, which is atomic.
pub(super) struct Notifier {
/// The cached or current waker.
waker: UnsafeCell<Option<Waker>>,
Expand Down Expand Up @@ -171,35 +175,71 @@ pub(super) struct WaitUntil<'a, F: FnMut() -> Option<T>, T> {
predicate: F,
wait_set: &'a WaitSet,
notifier: NonNull<Notifier>,
_phantom_notifier: PhantomData<&'a mut Notifier>,
}

impl<'a, F: FnMut() -> Option<T>, T> WaitUntil<'a, F, T> {
/// Creates a future associated to the specified event sink that can be
/// Creates a future associated with the specified event sink that can be
/// `await`ed until the specified predicate is satisfied.
///
/// The borrowed notifier may be either a newly created `Notifier` or one
/// that was already used, in which case the cached waker it contains will
/// be re-used if possible.
fn new(wait_set: &'a WaitSet, notifier: &'a mut Notifier, predicate: F) -> Self {
/// If the notifier contains a waker, this waker will be re-used if
/// possible.
fn new(wait_set: &'a WaitSet, notifier: Box<Notifier>, predicate: F) -> Self {
Self {
state: WaitUntilState::Idle,
predicate,
wait_set,
notifier: notifier.into(),
_phantom_notifier: PhantomData,
notifier: unsafe { NonNull::new_unchecked(Box::into_raw(notifier)) },
}
}
}

impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
fn drop(&mut self) {
match self.state {
// If the future was completed, then the notifier is no longer in
// the wait set, and the boxed notifier was already moved out so
// there is nothing left to do.
WaitUntilState::Completed => return,
// If we are in the `Polled` stated, it means that the future was
// cancelled and its notifier may still be in the wait set: it is
// necessary to cancel the notifier so that another event sink can
// be notified if one is registered, and then to deallocate the
// notifier.
WaitUntilState::Polled => {
// Safety: this notifier and all other notifiers in the wait set are
// guaranteed to be alive since the `WaitUntil` drop handler and the
// `into_boxed_notifier` method ensure that the notifier is removed
// from the wait set before notifier ownership is surrendered to the
// caller. Wakers which notifier is in the wait set are never
// accessed mutably since `WaitUntil::poll` always removes the
// notifier from the wait set before accessing it mutably.
unsafe {
self.wait_set.cancel(self.notifier);
}
}
// If we are in the `Idle` state then the notifier was never
// inserted into the wait set, so all what is left to do is to
// deallocate the notifier.
WaitUntilState::Idle => {}
}

// The future did not run to completion: deallocate the notifier.
//
// Safety: the notifier is no longer in the wait set and was not moved
// out since the future did not run to completion, so we can claim
// unique ownership.
let _ = unsafe { Box::from_raw(self.notifier.as_ptr()) };
}
}

impl<'a, F: FnMut() -> Option<T>, T> Unpin for WaitUntil<'a, F, T> {}

unsafe impl<F: (FnMut() -> Option<T>) + Send, T: Send> Send for WaitUntil<'_, F, T> {}

impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
type Output = T;
type Output = (T, Box<Notifier>);

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
assert!(self.state != WaitUntilState::Completed);

// Remove the notifier if it is in the wait set. In most cases this will
Expand Down Expand Up @@ -235,12 +275,16 @@ impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
unsafe { self.wait_set.remove(self.notifier) };
}

// Anticipate success.
self.state = WaitUntilState::Completed;

// Fast path.
if let Some(v) = (self.predicate)() {
return Poll::Ready(v);
self.state = WaitUntilState::Completed;

// Safety: the notifier is no longer in the wait set (if the state
// was `Polled`) or has never been (if the state was `Idle`), so we
// can claim unique ownership.
let notifier = unsafe { Box::from_raw(self.notifier.as_ptr()) };

return Poll::Ready((v, notifier));
}

// Set or update the notifier.
Expand Down Expand Up @@ -295,7 +339,13 @@ impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
self.wait_set.cancel(self.notifier);
}

return Poll::Ready(v);
self.state = WaitUntilState::Completed;

// Safety: the notifier is not longer in the wait set so we can
// claim unique ownership.
let notifier = unsafe { Box::from_raw(self.notifier.as_ptr()) };

return Poll::Ready((v, notifier));
}

self.state = WaitUntilState::Polled;
Expand All @@ -304,26 +354,6 @@ impl<'a, F: FnMut() -> Option<T>, T> Future for WaitUntil<'a, F, T> {
}
}

impl<F: FnMut() -> Option<T>, T> Drop for WaitUntil<'_, F, T> {
fn drop(&mut self) {
if self.state == WaitUntilState::Polled {
// The future was cancelled: it is necessary to cancel the notifier
// so that another event sink can be notified if one is registered.
//
// Safety: this notifier and all other notifiers in the wait set are
// guaranteed to be alive since the `WaitUntil` drop handler ensures
// that the notifier is removed from the wait set before notifier
// ownership is surrendered to the caller. Wakers which notifier is
// in the wait set are never accessed mutably since
// `WaitUntil::poll` always removes the notifier from the wait set
// before accessing it mutably.
unsafe {
self.wait_set.cancel(self.notifier);
}
}
}
}

/// State of the `WaitUntil` future.
#[derive(PartialEq)]
enum WaitUntilState {
Expand All @@ -347,8 +377,16 @@ impl WaitSet {
/// # Safety
///
/// The specified notifier and all notifiers in the wait set must be alive.
/// The notifier should not be already in the wait set.
unsafe fn insert(&self, notifier: NonNull<Notifier>) {
let mut list = self.list.lock().unwrap();

#[cfg(any(debug_assertions, tachyonix_loom))]
if notifier.as_ref().in_wait_set.load(Ordering::Relaxed) {
drop(list); // avoids poisoning the lock
panic!("the notifier was already in the wait set");
}

// Orderings: Relaxed ordering is sufficient since before this point the
// notifier was not in the list and therefore not shared.
notifier.as_ref().in_wait_set.store(true, Ordering::Relaxed);
Expand Down Expand Up @@ -756,8 +794,7 @@ mod tests {
struct WaitUntilClosure {
event: Arc<Event>,
token_counter: Arc<Counter>,
notifier: Notifier,
wait_until: Option<Box<dyn Future<Output = ()>>>,
wait_until: Option<Box<dyn Future<Output = ((), Box<Notifier>)>>>,
_pin: PhantomPinned,
}

Expand All @@ -768,22 +805,20 @@ mod tests {
let res = Self {
event,
token_counter,
notifier: Notifier::new(),
wait_until: None,
_pin: PhantomPinned,
};
let mut boxed = Box::new(res);
let boxed = Box::new(res);

// Artificially extend the lifetimes of the captured references.
let event_ptr = &*boxed.event as *const Event;
let notifier_mut_ptr = &mut boxed.notifier as *mut Notifier;
let token_counter_ptr = &boxed.token_counter as *const Arc<Counter>;

// Safety: we now commit to never move the closure and to ensure
// that the `WaitUntil` future does not outlive the captured
// references.
let wait_until: Box<dyn Future<Output = ()>> = unsafe {
Box::new((*event_ptr).wait_until(&mut *notifier_mut_ptr, move || {
let wait_until: Box<dyn Future<Output = _>> = unsafe {
Box::new((*event_ptr).wait_until(Box::new(Notifier::new()), move || {
(*token_counter_ptr).try_decrement()
}))
};
Expand All @@ -799,7 +834,9 @@ mod tests {
}

/// Returns a pinned, type-erased `WaitUntil` future.
fn as_pinned_future(self: Pin<&mut Self>) -> Pin<&mut dyn Future<Output = ()>> {
fn as_pinned_future(
self: Pin<&mut Self>,
) -> Pin<&mut dyn Future<Output = ((), Box<Notifier>)>> {
unsafe { self.map_unchecked_mut(|s| s.wait_until.as_mut().unwrap().as_mut()) }
}
}
Expand Down Expand Up @@ -881,7 +918,7 @@ mod tests {

// Return successfully if the predicate was
// checked successfully.
if poll_state == Poll::Ready(()) {
if matches!(poll_state, Poll::Ready(_)) {
return FutureState::Completed;
}

Expand Down
25 changes: 18 additions & 7 deletions src/lib.rs
Expand Up @@ -88,7 +88,7 @@ pub struct Sender<T> {
/// Shared data.
inner: Arc<Inner<T>>,
/// A cached notifier.
notifier: Notifier,
notifier: Option<Box<Notifier>>,
}

impl<T> Sender<T> {
Expand All @@ -107,11 +107,18 @@ impl<T> Sender<T> {
/// Sends a message asynchronously, if necessary waiting until enough
/// capacity becomes available.
pub async fn send(&mut self, message: T) -> Result<(), SendError<T>> {
let mut message = Some(message);
// Unless the previous send future was not polled to completion, there
// should be a cached notifier to take.
let notifier = self
.notifier
.take()
.unwrap_or_else(|| Box::new(Notifier::new()));
sbarral marked this conversation as resolved.
Show resolved Hide resolved

self.inner
let mut message = Some(message);
let notifier = self
.inner
.sender_signal
.wait_until(&mut self.notifier, || {
.wait_until(notifier, || {
match self.inner.queue.push(message.take().unwrap()) {
Ok(()) => Some(()),
Err(PushError::Full(v)) => {
Expand All @@ -129,7 +136,11 @@ impl<T> Sender<T> {
}
}
})
.await;
.await
.1;

// Cache the notifier with its waker.
self.notifier = Some(notifier);

match message {
Some(v) => Err(SendError(v)),
Expand Down Expand Up @@ -176,7 +187,7 @@ impl<T> Clone for Sender<T> {

Self {
inner: self.inner.clone(),
notifier: Notifier::new(),
notifier: Some(Box::new(Notifier::new())),
}
}
}
Expand Down Expand Up @@ -363,7 +374,7 @@ pub fn channel<T>(capacity: usize) -> (Sender<T>, Receiver<T>) {

let sender = Sender {
inner: inner.clone(),
notifier: Notifier::new(),
notifier: Some(Box::new(Notifier::new())),
};
let receiver = Receiver { inner };

Expand Down