Skip to content

Commit

Permalink
Shared rb, heap and static rb trait, fix doc tests
Browse files Browse the repository at this point in the history
  • Loading branch information
agerasev committed Apr 3, 2023
1 parent b7d531f commit d38a509
Show file tree
Hide file tree
Showing 12 changed files with 261 additions and 135 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,6 @@ jobs:
- uses: actions-rs/toolchain@v1
with:
toolchain: stable
- run: cargo check --no-default-features
- run: cargo check --no-default-features --features alloc
- run: cargo test --no-default-features
- run: cargo test --no-default-features --features alloc
- run: cargo test
19 changes: 9 additions & 10 deletions src/consumer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
raw::{RawRb, RawStorage},
raw::{RawBuffer, RawRb},
utils::{slice_assume_init_mut, slice_assume_init_ref},
Observer,
};
Expand Down Expand Up @@ -127,18 +127,17 @@ pub trait Consumer: Observer {
#[cfg_attr(
feature = "alloc",
doc = r##"
```ignore
```
# extern crate ringbuf;
# use ringbuf::HeapRb;
# use ringbuf::{LocalRb, storage::Static, prelude::*};
# fn main() {
let target = HeapRb::<i32>::new(8);
let (mut prod, mut cons) = target.split();
let mut rb = LocalRb::<Static<i32, 8>>::default();
assert_eq!(prod.push_iter(&mut (0..8)), 8);
assert_eq!(rb.push_iter(&mut (0..8)), 8);
assert_eq!(cons.skip(4), 4);
assert_eq!(cons.skip(8), 4);
assert_eq!(cons.skip(8), 0);
assert_eq!(rb.skip(4), 4);
assert_eq!(rb.skip(8), 4);
assert_eq!(rb.skip(8), 0);
# }
```
"##
Expand Down Expand Up @@ -242,7 +241,7 @@ impl<R: Deref> Observer for Wrap<R>
where
R::Target: RawRb + Sized,
{
type Item = <R::Target as RawStorage>::Item;
type Item = <R::Target as RawBuffer>::Item;
type Raw = R::Target;
fn as_raw(&self) -> &Self::Raw {
&self.raw
Expand Down
29 changes: 0 additions & 29 deletions src/init.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1 @@
use crate::{local::LocalRb, storage::Storage, utils::uninit_array};
use core::mem::MaybeUninit;

#[cfg(feature = "alloc")]
use alloc::{collections::TryReserveError, vec::Vec};

impl<T, const N: usize> Default for LocalRb<[MaybeUninit<T>; N]> {
fn default() -> Self {
uninit_array().into_rb()
}
}

#[cfg(feature = "alloc")]
impl<T> LocalRb<Vec<MaybeUninit<T>>> {
/// Creates a new instance of a ring buffer.
///
/// *Panics if allocation failed or `capacity` is zero.*
pub fn new(capacity: usize) -> Self {
Self::try_new(capacity).unwrap()
}

/// Creates a new instance of a ring buffer returning an error if allocation failed.
///
/// *Panics if `capacity` is zero.*
pub fn try_new(capacity: usize) -> Result<Self, TryReserveError> {
let mut data = Vec::new();
data.try_reserve_exact(capacity)?;
Ok(data.into_rb())
}
}
11 changes: 6 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,29 @@ extern crate alloc;
extern crate std;

pub mod consumer;
mod init;
pub mod local;
pub mod observer;
pub mod producer;
pub mod raw;
pub mod ring_buffer;
pub mod shared;
pub mod storage;
pub mod stored;
mod utils;

#[cfg(test)]
mod tests;

pub use consumer::Consumer;
pub use local::LocalRb;
pub use observer::Observer;
pub use producer::Producer;
pub use ring_buffer::{RingBuffer, Split};
pub use shared::SharedRb;

pub mod prelude {
pub use super::{
consumer::Consumer,
observer::Observer,
producer::Producer,
ring_buffer::{RingBuffer, Split},
stored::{HeapRb, StaticRb},
Consumer, Observer, Producer, RingBuffer, Split,
};
}
67 changes: 32 additions & 35 deletions src/local.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
use crate::storage::StoredRb;

use super::{
consumer::Consumer,
observer::Observer,
producer::Producer,
use crate::{
raw::RawRb,
storage::{Shared, Storage},
RingBuffer,
stored::StoredRb,
Consumer, Observer, Producer, RingBuffer,
};
use core::{cell::Cell, mem::ManuallyDrop, ptr};

Expand All @@ -20,16 +16,17 @@ use core::{cell::Cell, mem::ManuallyDrop, ptr};
This code must fail to compile:
```compile_fail
use std::{thread, vec::Vec};
use ringbuf::LocalRb;
use std::{thread, sync::Arc};
use ringbuf::{LocalRb, storage::Static, prelude::*};
let (mut prod, mut cons) = LocalRb::<i32, Vec<_>>::new(256).split();
let rb = LocalRb::<Static<i32, 16>>::default();
let (mut prod, mut cons) = Split::<Arc<_>>::split(rb);
thread::spawn(move || {
prod.push(123).unwrap();
prod.try_push(123).unwrap();
})
.join();
thread::spawn(move || {
assert_eq!(cons.pop().unwrap(), 123);
assert_eq!(cons.try_pop().unwrap(), 123);
})
.join();
```
Expand All @@ -41,6 +38,29 @@ pub struct LocalRb<S: Storage> {
write: Cell<usize>,
}

impl<S: Storage> StoredRb for LocalRb<S> {
type Storage = S;

unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
Self {
storage: Shared::new(storage),
read: Cell::new(read),
write: Cell::new(write),
}
}

unsafe fn into_raw_parts(self) -> (S, usize, usize) {
let (read, write) = (self.read_end(), self.write_end());
let self_ = ManuallyDrop::new(self);
(ptr::read(&self_.storage).into_inner(), read, write)
}

#[inline]
fn storage(&self) -> &Shared<Self::Storage> {
&self.storage
}
}

impl<S: Storage> RawRb for LocalRb<S> {
#[inline]
fn read_end(&self) -> usize {
Expand Down Expand Up @@ -84,26 +104,3 @@ impl<S: Storage> Drop for LocalRb<S> {
self.clear();
}
}

impl<S: Storage> StoredRb for LocalRb<S> {
type Storage = S;

unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
Self {
storage: Shared::new(storage),
read: Cell::new(read),
write: Cell::new(write),
}
}

unsafe fn into_raw_parts(self) -> (S, usize, usize) {
let (read, write) = (self.read_end(), self.write_end());
let self_ = ManuallyDrop::new(self);
(ptr::read(&self_.storage).into_inner(), read, write)
}

#[inline]
fn storage(&self) -> &Shared<Self::Storage> {
&self.storage
}
}
4 changes: 2 additions & 2 deletions src/observer.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::raw::{RawRb, RawStorage};
use crate::raw::{RawBuffer, RawRb};
use core::ops::Deref;

pub trait Observer {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<R: Deref> Observer for Wrap<R>
where
R::Target: RawRb + Sized,
{
type Item = <R::Target as RawStorage>::Item;
type Item = <R::Target as RawBuffer>::Item;
type Raw = R::Target;
fn as_raw(&self) -> &Self::Raw {
&self.raw
Expand Down
4 changes: 2 additions & 2 deletions src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::{
raw::{RawRb, RawStorage},
raw::{RawBuffer, RawRb},
utils::write_slice,
Observer,
};
Expand Down Expand Up @@ -137,7 +137,7 @@ impl<R: Deref> Observer for Wrap<R>
where
R::Target: RawRb + Sized,
{
type Item = <R::Target as RawStorage>::Item;
type Item = <R::Target as RawBuffer>::Item;
type Raw = R::Target;
fn as_raw(&self) -> &Self::Raw {
&self.raw
Expand Down
4 changes: 2 additions & 2 deletions src/raw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub fn ranges(capacity: NonZeroUsize, begin: usize, end: usize) -> (Range<usize>
}
}

pub trait RawStorage {
pub trait RawBuffer {
type Item: Sized;

/// Capacity of the ring buffer.
Expand Down Expand Up @@ -78,7 +78,7 @@ pub trait RawStorage {
/// It allows us to distinguish situations when the buffer is empty (`read == write`) and when the buffer is full (`write - read` modulo `2 * capacity` equals to `capacity`)
/// without using the space for an extra element in container.
/// And obviously we cannot store more than `capacity` items in the buffer, so `write - read` modulo `2 * capacity` is not allowed to be greater than `capacity`.
pub trait RawRb: RawStorage {
pub trait RawRb: RawBuffer {
/// Read end position.
fn read_end(&self) -> usize;

Expand Down
113 changes: 113 additions & 0 deletions src/shared.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use crate::{
consumer::Consumer,
observer::Observer,
producer::Producer,
raw::RawRb,
ring_buffer::RingBuffer,
storage::{Shared, Storage},
stored::StoredRb,
};
use core::{
mem::ManuallyDrop,
ptr,
sync::atomic::{AtomicUsize, Ordering},
};
use crossbeam_utils::CachePadded;

/// Ring buffer that could be shared between threads.
///
/// Implements [`Sync`] *if `T` implements [`Send`]*. And therefore its [`Producer`] and [`Consumer`] implement [`Send`].
///
/// Note that there is no explicit requirement of `T: Send`. Instead [`SharedRb`] will work just fine even with `T: !Send`
/// until you try to send its [`Producer`] or [`Consumer`] to another thread.
#[cfg_attr(
feature = "std",
doc = r##"
```
use std::{thread, sync::Arc};
use ringbuf::{SharedRb, storage::Heap, prelude::*};
let rb = SharedRb::<Heap<i32>>::new(256);
let (mut prod, mut cons) = Split::<Arc<_>>::split(rb);
thread::spawn(move || {
prod.try_push(123).unwrap();
})
.join();
thread::spawn(move || {
assert_eq!(cons.try_pop().unwrap(), 123);
})
.join();
```
"##
)]
pub struct SharedRb<S: Storage> {
storage: Shared<S>,
read: CachePadded<AtomicUsize>,
write: CachePadded<AtomicUsize>,
}

impl<S: Storage> StoredRb for SharedRb<S> {
type Storage = S;

unsafe fn from_raw_parts(storage: S, read: usize, write: usize) -> Self {
Self {
storage: Shared::new(storage),
read: CachePadded::new(AtomicUsize::new(read)),
write: CachePadded::new(AtomicUsize::new(write)),
}
}

unsafe fn into_raw_parts(self) -> (S, usize, usize) {
let (read, write) = (self.read_end(), self.write_end());
let self_ = ManuallyDrop::new(self);
(ptr::read(&self_.storage).into_inner(), read, write)
}

fn storage(&self) -> &Shared<S> {
&self.storage
}
}

impl<S: Storage> RawRb for SharedRb<S> {
#[inline]
fn read_end(&self) -> usize {
self.read.load(Ordering::Acquire)
}

#[inline]
fn write_end(&self) -> usize {
self.write.load(Ordering::Acquire)
}

#[inline]
unsafe fn set_read_end(&self, value: usize) {
self.read.store(value, Ordering::Release)
}

#[inline]
unsafe fn set_write_end(&self, value: usize) {
self.write.store(value, Ordering::Release)
}
}

impl<S: Storage> Observer for SharedRb<S> {
type Item = S::Item;

type Raw = Self;

fn as_raw(&self) -> &Self::Raw {
self
}
}

impl<S: Storage> Producer for SharedRb<S> {}

impl<S: Storage> Consumer for SharedRb<S> {}

impl<S: Storage> RingBuffer for SharedRb<S> {}

impl<S: Storage> Drop for SharedRb<S> {
fn drop(&mut self) {
self.clear();
}
}

0 comments on commit d38a509

Please sign in to comment.