Skip to content

Commit

Permalink
feat(thingbuf): add no_std compatible StaticThingBuf (#1)
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Nov 17, 2021
1 parent e47cd7d commit 3b23f85
Show file tree
Hide file tree
Showing 8 changed files with 376 additions and 198 deletions.
227 changes: 104 additions & 123 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
use core::{fmt, marker::PhantomData, mem::MaybeUninit};
#![cfg_attr(not(feature = "std"), no_std)]

use core::{fmt, mem::MaybeUninit, ops::Index};

#[cfg(feature = "alloc")]
extern crate alloc;

macro_rules! test_println {
($($arg:tt)*) => {
if cfg!(test) {
if std::thread::panicking() {
if crate::util::panicking() {
// getting the thread ID while panicking doesn't seem to play super nicely with loom's
// mock lazy_static...
println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
} else {
#[cfg(test)]
println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
}
}
Expand All @@ -31,37 +32,37 @@ macro_rules! test_dbg {
}

mod loom;
#[cfg(test)]
mod tests;
mod util;

use crate::loom::{
atomic::{AtomicUsize, Ordering},
UnsafeCell,
};

use crate::util::{Backoff, CachePadded};

#[cfg(feature = "alloc")]
mod thingbuf;
#[cfg(feature = "alloc")]
pub use self::thingbuf::ThingBuf;
#[cfg(feature = "alloc")]
mod stringbuf;

#[cfg(feature = "alloc")]
pub use stringbuf::StringBuf;

/// A ringbuf of...things.
///
/// # Examples
///
/// Using a
pub struct ThingBuf<T, S = Box<[Slot<T>]>> {
pub use stringbuf::{StaticStringBuf, StringBuf};

mod static_thingbuf;
pub use self::static_thingbuf::StaticThingBuf;

use crate::{
loom::{
atomic::{AtomicUsize, Ordering},
UnsafeCell,
},
util::{Backoff, CachePadded},
};

#[derive(Debug)]
struct Core {
head: CachePadded<AtomicUsize>,
tail: CachePadded<AtomicUsize>,
gen: usize,
gen_mask: usize,
idx_mask: usize,
capacity: usize,
slots: S,
_t: PhantomData<T>,
}

pub struct Ref<'slot, T> {
Expand All @@ -73,27 +74,28 @@ pub struct Ref<'slot, T> {
pub struct AtCapacity(usize);

pub struct Slot<T> {
value: UnsafeCell<T>,
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicUsize,
}

// === impl ThingBuf ===

impl<T: Default> ThingBuf<T> {
pub fn new(capacity: usize) -> Self {
Self::new_with(capacity, T::default)
impl Core {
#[cfg(not(test))]
const fn new(capacity: usize) -> Self {
let gen = (capacity + 1).next_power_of_two();
let idx_mask = gen - 1;
let gen_mask = !(gen - 1);
Self {
head: CachePadded(AtomicUsize::new(0)),
tail: CachePadded(AtomicUsize::new(0)),
gen,
gen_mask,
idx_mask,
capacity,
}
}
}

impl<T> ThingBuf<T> {
pub fn new_with(capacity: usize, mut initializer: impl FnMut() -> T) -> Self {
assert!(capacity > 0);
let slots = (0..capacity)
.map(|idx| Slot {
state: AtomicUsize::new(idx),
value: UnsafeCell::new(initializer()),
})
.collect();
#[cfg(test)]
fn new(capacity: usize) -> Self {
let gen = (capacity + 1).next_power_of_two();
let idx_mask = gen - 1;
let gen_mask = !(gen - 1);
Expand All @@ -104,13 +106,9 @@ impl<T> ThingBuf<T> {
gen_mask,
idx_mask,
capacity,
slots,
_t: PhantomData,
}
}
}

impl<T, S> ThingBuf<T, S> {
#[inline]
fn idx_gen(&self, val: usize) -> (usize, usize) {
(val & self.idx_mask, val & self.gen_mask)
Expand All @@ -130,46 +128,19 @@ impl<T, S> ThingBuf<T, S> {
}
}

pub fn capacity(&self) -> usize {
self.capacity
}
}

impl<T, S> ThingBuf<T, S>
where
S: AsRef<[Slot<T>]>,
{
pub fn from_array(slots: S) -> Self {
let capacity = slots.as_ref().len();
assert!(capacity > 0);
for (idx, slot) in slots.as_ref().iter().enumerate() {
// Relaxed is fine here, because the slot is not shared yet.
slot.state.store(idx, Ordering::Relaxed);
}
let gen = (capacity + 1).next_power_of_two();
let idx_mask = gen - 1;
let gen_mask = !(gen - 1);
Self {
head: CachePadded(AtomicUsize::new(0)),
tail: CachePadded(AtomicUsize::new(0)),
gen,
gen_mask,
idx_mask,
capacity,
slots,
_t: PhantomData,
}
}

#[inline]
pub fn push_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Result<U, AtCapacity> {
self.push_ref().map(|mut r| r.with_mut(f))
fn capacity(&self) -> usize {
self.capacity
}

pub fn push_ref(&self) -> Result<Ref<'_, T>, AtCapacity> {
fn push_ref<'slots, T, S>(&self, slots: &'slots S) -> Result<Ref<'slots, T>, AtCapacity>
where
T: Default,
S: Index<usize, Output = Slot<T>> + ?Sized,
{
test_println!("push_ref");
let mut backoff = Backoff::new();
let mut tail = self.tail.load(Ordering::Relaxed);
let slots = self.slots.as_ref();

loop {
let (idx, gen) = self.idx_gen(tail);
Expand All @@ -178,7 +149,7 @@ where
let slot = &slots[idx];
let state = slot.state.load(Ordering::Acquire);

if state == tail {
if state == tail || (state == 0 && gen == 0) {
// Move the tail index forward by 1.
let next_tail = self.next(idx, gen);
match self.tail.compare_exchange_weak(
Expand All @@ -188,12 +159,25 @@ where
Ordering::Relaxed,
) {
Ok(_) => {
// We got the slot! It's now okay to write to it
test_println!("claimed tail slot");
if gen == 0 {
slot.value.with_mut(|value| unsafe {
// Safety: we have just claimed exclusive ownership over
// this slot.
(*value).write(T::default());
});
test_println!("-> initialized");
}

return Ok(Ref {
new_state: tail + 1,
slot,
})
});
}
Err(actual) => {
// Someone else took this slot and advanced the tail
// index. Try to claim the new tail.
tail = actual;
backoff.spin();
continue;
Expand All @@ -215,15 +199,13 @@ where
}
}

#[inline]
pub fn pop_with<U>(&self, f: impl FnOnce(&mut T) -> U) -> Option<U> {
self.pop_ref().map(|mut r| r.with_mut(f))
}

pub fn pop_ref(&self) -> Option<Ref<'_, T>> {
fn pop_ref<'slots, T, S>(&self, slots: &'slots S) -> Option<Ref<'slots, T>>
where
S: Index<usize, Output = Slot<T>> + ?Sized,
{
test_println!("pop_ref");
let mut backoff = Backoff::new();
let mut head = self.head.load(Ordering::Relaxed);
let slots = self.slots.as_ref();

loop {
test_dbg!(head);
Expand All @@ -245,10 +227,11 @@ where
Ordering::Relaxed,
) {
Ok(_) => {
test_println!("claimed head slot");
return Some(Ref {
new_state: head.wrapping_add(self.gen),
slot,
})
});
}
Err(actual) => {
head = actual;
Expand All @@ -273,13 +256,24 @@ where
head = self.head.load(Ordering::Relaxed);
}
}
}

impl<T, S> fmt::Debug for ThingBuf<T, S> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ThingBuf")
.field("capacity", &self.capacity())
.finish()
fn len(&self) -> usize {
use std::cmp;
loop {
let tail = self.tail.load(Ordering::SeqCst);
let head = self.head.load(Ordering::SeqCst);

if self.tail.load(Ordering::SeqCst) == tail {
let (head_idx, _) = self.idx_gen(head);
let (tail_idx, _) = self.idx_gen(tail);
return match head_idx.cmp(&tail_idx) {
cmp::Ordering::Less => head_idx - tail_idx,
cmp::Ordering::Greater => self.capacity - head_idx + tail_idx,
_ if tail == head => 0,
_ => self.capacity,
};
}
}
}
}

Expand All @@ -289,16 +283,23 @@ impl<T> Ref<'_, T> {
#[inline]
pub fn with<U>(&self, f: impl FnOnce(&T) -> U) -> U {
self.slot.value.with(|value| unsafe {
// Safety: if a `Ref` exists, we have exclusive ownership of the slot.
f(&*value)
// Safety: if a `Ref` exists, we have exclusive ownership of the
// slot. A `Ref` is only created if the slot has already been
// initialized.
// TODO(eliza): use `MaybeUninit::assume_init_ref` here once it's
// supported by `tracing-appender`'s MSRV.
f(&*(&*value).as_ptr())
})
}

#[inline]
pub fn with_mut<U>(&mut self, f: impl FnOnce(&mut T) -> U) -> U {
self.slot.value.with_mut(|value| unsafe {
// Safety: if a `Ref` exists, we have exclusive ownership of the slot.
f(&mut *value)
// Safety: if a `Ref` exists, we have exclusive ownership of the
// slot.
// TODO(eliza): use `MaybeUninit::assume_init_mut` here once it's
// supported by `tracing-appender`'s MSRV.
f(&mut *(&mut *value).as_mut_ptr())
})
}
}
Expand Down Expand Up @@ -344,42 +345,22 @@ impl<T: fmt::Write> fmt::Write for Ref<'_, T> {

// === impl Slot ===

impl<T: Default> Default for Slot<T> {
fn default() -> Self {
Self::new(T::default())
}
}

impl<T> Slot<T> {
const UNINIT: usize = usize::MAX;

#[cfg(not(test))]
pub const fn new(t: T) -> Self {
const fn empty() -> Self {
Self {
value: UnsafeCell::new(t),
state: AtomicUsize::new(Self::UNINIT),
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
}
}

#[cfg(test)]
pub fn new(t: T) -> Self {
fn empty() -> Self {
Self {
value: UnsafeCell::new(t),
state: AtomicUsize::new(Self::UNINIT),
value: UnsafeCell::new(MaybeUninit::uninit()),
state: AtomicUsize::new(0),
}
}
}

impl<T> Slot<MaybeUninit<T>> {
#[cfg(not(test))]
pub const fn uninit() -> Self {
Self::new(MaybeUninit::uninit())
}

#[cfg(test)]
pub fn uninit() -> Self {
Self::new(MaybeUninit::uninit())
}
}

unsafe impl<T: Sync> Sync for Slot<T> {}
2 changes: 1 addition & 1 deletion src/loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod inner {
pub use loom::sync::atomic::*;
pub use std::sync::atomic::Ordering;
}
pub(crate) use loom::{cell::UnsafeCell, hint, sync, thread};
pub(crate) use loom::{cell::UnsafeCell, hint, thread};

pub(crate) fn model(f: impl Fn() + Sync + Send + 'static) {
let iteration = core::sync::atomic::AtomicUsize::new(0);
Expand Down
Loading

0 comments on commit 3b23f85

Please sign in to comment.