Navigation Menu

Skip to content

Commit

Permalink
feat(mpsc): initial sync and async channel APIs (#2)
Browse files Browse the repository at this point in the history
Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw committed Nov 26, 2021
1 parent 4424633 commit 1c28c84
Show file tree
Hide file tree
Showing 17 changed files with 1,486 additions and 84 deletions.
10 changes: 9 additions & 1 deletion Cargo.toml
Expand Up @@ -13,4 +13,12 @@ default = ["std"]
[dependencies]

[dev-dependencies]
loom = "0.5"
loom = { version = "0.5", features = ["checkpoint", "futures"] }
# So that we can use `poll_fn` in tests.
futures-util = "0.3"

[profile.test]
opt-level = 3

[patch.crates-io]
loom = { git = "https://github.com/tokio-rs/loom", branch = "master"}
69 changes: 23 additions & 46 deletions src/lib.rs
@@ -1,48 +1,25 @@
#![cfg_attr(not(feature = "std"), no_std)]

#![cfg_attr(docsrs, feature(doc_cfg))]
use core::{fmt, mem::MaybeUninit, ops::Index};

#[cfg(feature = "alloc")]
extern crate alloc;

macro_rules! test_println {
($($arg:tt)*) => {
if cfg!(test) {
if crate::util::panicking() {
// getting the thread ID while panicking doesn't seem to play super nicely with loom's
// mock lazy_static...
println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
} else {
println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
}
}
}
}

macro_rules! test_dbg {
($e:expr) => {
match $e {
e => {
#[cfg(test)]
test_println!("{} = {:?}", stringify!($e), &e);
e
}
}
};
}
#[macro_use]
mod macros;

mod loom;
mod util;

#[cfg(feature = "alloc")]
mod thingbuf;
#[cfg(feature = "alloc")]
pub use self::thingbuf::ThingBuf;
#[cfg(feature = "alloc")]
mod stringbuf;
feature! {
#![feature = "alloc"]
extern crate alloc;

#[cfg(feature = "alloc")]
pub use stringbuf::{StaticStringBuf, StringBuf};
mod thingbuf;
pub use self::thingbuf::ThingBuf;

mod stringbuf;
pub use stringbuf::{StaticStringBuf, StringBuf};

pub mod mpsc;
}

mod static_thingbuf;
pub use self::static_thingbuf::StaticThingBuf;
Expand All @@ -55,6 +32,14 @@ use crate::{
util::{Backoff, CachePadded},
};

pub struct Ref<'slot, T> {
slot: &'slot Slot<T>,
new_state: usize,
}

#[derive(Debug)]
pub struct AtCapacity(pub(crate) usize);

#[derive(Debug)]
struct Core {
head: CachePadded<AtomicUsize>,
Expand All @@ -65,15 +50,7 @@ struct Core {
capacity: usize,
}

pub struct Ref<'slot, T> {
slot: &'slot Slot<T>,
new_state: usize,
}

#[derive(Debug)]
pub struct AtCapacity(usize);

pub struct Slot<T> {
struct Slot<T> {
value: UnsafeCell<MaybeUninit<T>>,
state: AtomicUsize,
}
Expand Down
9 changes: 8 additions & 1 deletion src/loom.rs
Expand Up @@ -6,7 +6,7 @@ mod inner {
pub use loom::sync::atomic::*;
pub use std::sync::atomic::Ordering;
}
pub(crate) use loom::{cell::UnsafeCell, hint, thread};
pub(crate) use loom::{cell::UnsafeCell, future, hint, sync, thread};

pub(crate) fn model(f: impl Fn() + Sync + Send + 'static) {
let iteration = core::sync::atomic::AtomicUsize::new(0);
Expand Down Expand Up @@ -72,6 +72,13 @@ mod inner {
#[cfg(not(test))]
mod inner {
#![allow(dead_code)]
pub(crate) mod sync {
pub use core::sync::*;

#[cfg(feature = "alloc")]
pub use alloc::sync::*;
}

pub(crate) use core::sync::atomic;

#[cfg(feature = "std")]
Expand Down
67 changes: 67 additions & 0 deletions src/macros.rs
@@ -0,0 +1,67 @@
macro_rules! test_println {
($($arg:tt)*) => {
if cfg!(test) {
if crate::util::panic::panicking() {
// getting the thread ID while panicking doesn't seem to play super nicely with loom's
// mock lazy_static...
println!("[PANIC {:>17}:{:<3}] {}", file!(), line!(), format_args!($($arg)*))
} else {
println!("[{:?} {:>17}:{:<3}] {}", crate::loom::thread::current().id(), file!(), line!(), format_args!($($arg)*))
}
}
}
}

macro_rules! test_dbg {
($e:expr) => {
match $e {
e => {
#[cfg(test)]
test_println!("{} = {:?}", stringify!($e), &e);
e
}
}
};
}

macro_rules! feature {
(
#![$meta:meta]
$($item:item)*
) => {
$(
#[cfg($meta)]
#[cfg_attr(docsrs, doc(cfg($meta)))]
$item
)*
}
}

#[allow(unused_macros)]
macro_rules! unreachable_unchecked {
($($arg:tt)+) => {
crate::unreachable_unchecked!(@inner , format_args!(": {}", format_args!($($arg)*)))
};

() => {
crate::unreachable_unchecked!(@inner ".")
};

(@inner $msg:expr) => {
#[cfg(debug_assertions)] {
panic!(
"internal error: entered unreachable code{}\n\n\
/!\\ EXTREMELY SERIOUS WARNING /!\\\n
This code should NEVER be entered; in release mode, this would \
have been an `unreachable_unchecked` hint. The fact that this \
occurred means something VERY bad is going on. \n\
Please contact the `thingbuf` maintainers immediately. Sorry!",
$msg,
);
}
#[cfg(not(debug_assertions))]
unsafe {
core::hint::unreachable_unchecked();
}
};
}
31 changes: 31 additions & 0 deletions src/mpsc.rs
@@ -0,0 +1,31 @@
//! Multi-producer, single-consumer channels using [`ThingBuf`](crate::ThingBuf).
//!
//! The default MPSC channel returned by the [`channel`] function is
//! _asynchronous_: receiving from the channel is an `async fn`, and the
//! receiving task willwait when there are no messages in the channel.
//!
//! If the "std" feature flag is enabled, this module also provides a
//! synchronous channel, in the [`sync`] module. The synchronous receiver will
//! instead wait for new messages by blocking the current thread. Naturally,
//! this requires the Rust standard library. A synchronous channel
//! can be constructed using the [`sync::channel`] function.
mod async_impl;
pub use self::async_impl::*;

feature! {
#![feature = "std"]
pub mod sync;
}

#[derive(Debug)]
#[non_exhaustive]
pub enum TrySendError {
AtCapacity(crate::AtCapacity),
Closed(Closed),
}

#[derive(Debug)]
pub struct Closed(pub(crate) ());

#[cfg(test)]
mod tests;

0 comments on commit 1c28c84

Please sign in to comment.