Skip to content

Commit

Permalink
Merge pull request #2 from Amjad50/static_mem_channel
Browse files Browse the repository at this point in the history
Static mem channel
  • Loading branch information
Amjad50 committed Feb 22, 2024
2 parents 88f62a8 + 271a566 commit 28bd7f7
Show file tree
Hide file tree
Showing 9 changed files with 777 additions and 386 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ jobs:
# https://docs.github.com/en/actions/learn-github-actions/contexts#context-availability
strategy:
matrix:
msrv: ["1.60.0"]
msrv: ["1.61.0"]
name: ubuntu / ${{ matrix.msrv }}
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/safety.yml
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,4 @@ jobs:
env:
LOOM_MAX_PREEMPTIONS: 2
RUSTFLAGS: "--cfg loom"
RUSTDOCFLAGS: "--cfg loom"
7 changes: 6 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ description = "Fast, bounded, multiple-producer, multiple-consumer, lossy, broad
repository = "https://github.com/Amjad50/blinkcast.git"
keywords = ["channel", "broadcast", "mpmc", "no_std"]
categories = ["concurrency", "no-std"]
rust-version = "1.60.0"
rust-version = "1.61.0"

[features]
default = ["alloc"]

alloc = []
# This is used only for testing using `bench`, as it requires nightly
# doesn't provide any extra functionality
unstable = []

[target.'cfg(loom)'.dependencies]
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ up to date is better than delayed audio.
See [the documentation](https://docs.rs/blinkcast) for examples.

# Minimum Supported Rust Version (MSRV)
The minimum supported Rust version for this crate is `1.60.0`
The minimum supported Rust version for this crate is `1.61.0`

# License
Licensed under `MIT` ([LICENSE](./LICENSE) or http://opensource.org/licenses/MIT)
277 changes: 277 additions & 0 deletions src/alloc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
//! A channel implemented with heap allocated buffers (require `alloc` feature).
//!
//! This module implements a broadcast channel with a fixed-size buffer, allocated
//! in the heap. This means that multiple senders can be used without the need to use the same reference.
//!
//! The channel overwrites the oldest message if the buffer is full, prioritizing the latest data.
//!
//! **Key Features:**
//!
//! * **Broadcast:** Multiple senders can send messages to multiple receivers simultaneously.
//! * **Heap-allocated Buffers:** Ensures data storage flexibility when the application requires it.
//! * **Fixed-size Buffer:** Provides bounded memory usage with predictable performance.
//! * **Overwriting Behavior:** Prioritizes the latest data in scenarios where the buffer becomes full.
//! * **Cloneable:** Both `Sender` and `Receiver` are cloneable, enabling flexible message distribution patterns.
//!
//! **Usage Considerations:**
//! * Well-suited for scenarios where multiple components need to broadcast messages and the latest data takes priority.
//! * Ideal when heap allocation is necessary or desirable.
//! * Receivers must be fast enough to keep up with the senders and avoid losing messages due to overwriting.
//!
//! # Examples
//! ```
//! # #[cfg(not(loom))]
//! # {
//! use blinkcast::alloc::channel;
//! let (sender, mut receiver) = channel::<i32>(4);
//! sender.send(1);
//! assert_eq!(receiver.recv(), Some(1));
//!
//! sender.send(2);
//! sender.send(3);
//!
//! assert_eq!(receiver.recv(), Some(2));
//!
//! // clone the receiver
//! let mut receiver2 = receiver.clone();
//! assert_eq!(receiver.recv(), Some(3));
//! assert_eq!(receiver2.recv(), Some(3));
//! assert_eq!(receiver.recv(), None);
//! assert_eq!(receiver2.recv(), None);
//! # }
//! ```

extern crate alloc;
use alloc::{boxed::Box, vec::Vec};

use crate::{core_impl, unpack_data_index, AtomicUsize, Node, Ordering, ReaderData, MAX_LEN};

#[cfg(not(loom))]
use alloc::sync::Arc;
#[cfg(loom)]
use loom::sync::Arc;

struct InnerChannel<T> {
buffer: Box<[Node<T>]>,
head: AtomicUsize,
}

impl<T: Clone> InnerChannel<T> {
fn new(size: usize) -> Self {
assert!(size <= MAX_LEN, "Exceeded the maximum length");

let mut buffer = Vec::with_capacity(size);
for _ in 0..size {
buffer.push(Default::default());
}
let buffer = buffer.into_boxed_slice();
Self {
buffer,
head: AtomicUsize::new(0),
}
}

fn push(&self, value: T) {
core_impl::push(&self.buffer, &self.head, value);
}

fn pop(&self, reader: &mut ReaderData) -> Option<T> {
core_impl::pop(&self.buffer, &self.head, reader)
}
}

/// The sender of the [`channel`].
///
/// This is a cloneable sender, so you can have multiple senders that will send to the same
/// channel.
///
/// Broadcast messages sent by using the [`send`](Sender::send) method.
///
/// # Examples
/// ```
/// # #[cfg(not(loom))]
/// # {
/// use blinkcast::alloc::channel;
///
/// let (sender, mut receiver) = channel::<i32>(4);
///
/// sender.send(1);
/// let sender2 = sender.clone();
/// sender2.send(2);
///
/// assert_eq!(receiver.recv(), Some(1));
/// assert_eq!(receiver.recv(), Some(2));
/// assert_eq!(receiver.recv(), None);
/// # }
/// ```
/// Or using the [`new`](Sender::new) method:
/// ```
/// # #[cfg(not(loom))]
/// # {
/// use blinkcast::alloc::Sender;
///
/// let sender = Sender::<i32>::new(4);
///
/// let mut receiver = sender.new_receiver();
///
/// sender.send(1);
/// sender.send(2);
/// assert_eq!(receiver.recv(), Some(1));
/// assert_eq!(receiver.recv(), Some(2));
/// assert_eq!(receiver.recv(), None);
/// # }
/// ```
pub struct Sender<T> {
queue: Arc<InnerChannel<T>>,
}

unsafe impl<T: Clone + Send> Send for Sender<T> {}
unsafe impl<T: Clone + Send> Sync for Sender<T> {}

impl<T: Clone> Sender<T> {
/// Sends a message to the channel.
/// If the channel is full, the oldest message will be overwritten.
/// So the receiver must be quick or it will lose the old data.
pub fn send(&self, value: T) {
self.queue.push(value);
}

/// Creates a new channel with a buffer of size `N`.
#[allow(clippy::new_without_default)]
pub fn new(size: usize) -> Self {
Self {
queue: Arc::new(InnerChannel::<T>::new(size)),
}
}

/// Creates a new receiver that starts from the same point as the sender.
///
/// # Examples
/// ```
/// # #[cfg(not(loom))]
/// # {
/// use blinkcast::alloc::Sender;
///
/// let sender = Sender::<i32>::new(4);
///
/// sender.send(1);
///
/// let mut receiver = sender.new_receiver();
/// assert_eq!(receiver.recv(), None);
///
/// sender.send(2);
/// assert_eq!(receiver.recv(), Some(2));
/// assert_eq!(receiver.recv(), None);
/// # }
/// ```
pub fn new_receiver(&self) -> Receiver<T> {
let head = self.queue.head.load(Ordering::Relaxed);
let (lap, index) = unpack_data_index(head);

Receiver {
queue: self.queue.clone(),
reader: ReaderData { index, lap },
}
}
}

impl<T> Clone for Sender<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
}
}
}

/// The receiver of the [`channel`].
///
/// Can also be created with the [`new_receiver`](Sender::new_receiver) method of the [`Sender`].
///
/// This is a cloneable receiver, so you can have multiple receivers that start from the same
/// point.
///
/// Broadcast messages sent by the channel are received by the [`recv`](Receiver::recv) method.
///
/// # Examples
/// ```
/// # #[cfg(not(loom))]
/// # {
/// use blinkcast::alloc::channel;
/// let (sender, mut receiver) = channel::<i32>(4);
/// sender.send(1);
/// assert_eq!(receiver.recv(), Some(1));
///
/// sender.send(2);
/// sender.send(3);
///
/// assert_eq!(receiver.recv(), Some(2));
///
/// // clone the receiver
/// let mut receiver2 = receiver.clone();
/// assert_eq!(receiver.recv(), Some(3));
/// assert_eq!(receiver2.recv(), Some(3));
/// assert_eq!(receiver.recv(), None);
/// assert_eq!(receiver2.recv(), None);
/// # }
/// ```
pub struct Receiver<T> {
queue: Arc<InnerChannel<T>>,
reader: ReaderData,
}

unsafe impl<T: Clone + Send> Send for Receiver<T> {}
unsafe impl<T: Clone + Send> Sync for Receiver<T> {}

impl<T: Clone> Receiver<T> {
/// Receives a message from the channel.
///
/// If there is no message available, this method will return `None`.
pub fn recv(&mut self) -> Option<T> {
self.queue.pop(&mut self.reader)
}
}

impl<T: Clone> Clone for Receiver<T> {
fn clone(&self) -> Self {
Self {
queue: self.queue.clone(),
reader: self.reader.clone(),
}
}
}

/// Creates a new channel, returning the [`Sender`] and [`Receiver`] for it.
///
/// Both of the sender and receiver are cloneable, so you can have multiple senders and receivers.
///
/// Another method to create a channel is using the [`Sender::new`] and [`Sender::new_receiver`] methods.
///
/// # Examples
/// ```
/// # #[cfg(not(loom))]
/// # {
/// use blinkcast::alloc::channel;
/// let (sender, mut receiver) = channel::<i32>(4);
///
/// sender.send(1);
/// sender.send(2);
///
/// assert_eq!(receiver.recv(), Some(1));
///
/// let sender2 = sender.clone();
/// sender2.send(3);
///
/// assert_eq!(receiver.recv(), Some(2));
///
/// let mut receiver2 = receiver.clone();
/// assert_eq!(receiver.recv(), Some(3));
/// assert_eq!(receiver2.recv(), Some(3));
/// assert_eq!(receiver.recv(), None);
/// assert_eq!(receiver2.recv(), None);
/// # }
/// ```
pub fn channel<T: Clone>(size: usize) -> (Sender<T>, Receiver<T>) {
let sender = Sender::<T>::new(size);
let receiver = sender.new_receiver();
(sender, receiver)
}

0 comments on commit 28bd7f7

Please sign in to comment.