Skip to content

Commit

Permalink
Merge e157e17 into 73a68a8
Browse files Browse the repository at this point in the history
  • Loading branch information
haraldh committed Jul 20, 2020
2 parents 73a68a8 + e157e17 commit bf96d0b
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 79 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ keywords = [ "parallel", "performance", "thread", "join", "concurrency"]
categories = [ "concurrency" ]

[dependencies]
rayon = "1.3.0"
rayon = "1.3"
crossbeam-queue = { version = "0.2", optional = true }
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,21 @@ A `DynQueue<T>` can be iterated with `into_par_iter` producing `(DynQueueHandle,
With the `DynQueueHandle<T>` a new `T` can be inserted in the `DynQueue<T>`,
which is currently iterated over.

```rust
use dynqueue::DynQueue;
A `Vec<T>`, `VecDeque<T>` and `crossbeam_queue::SegQueue<T>` (with `feature = "crossbeam-queue"`)
can be turned into a `DynQueue<T>` with `.into_dyn_queue()`.

```rust
use rayon::iter::IntoParallelIterator as _;
use rayon::iter::ParallelIterator as _;

use dynqueue::IntoDynQueue as _;

fn main() {
let mut result = DynQueue::new(vec![1, 2, 3])
.into_par_iter()
.map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
.collect::<Vec<_>>();
let mut result = vec![1, 2, 3]
.into_dyn_queue()
.into_par_iter()
.map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
.collect::<Vec<_>>();
result.sort();

assert_eq!(result, vec![1, 2, 3, 4]);
Expand Down
174 changes: 110 additions & 64 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,21 @@
//! # Example
//!
//! ```
//! use dynqueue::{DynQueue, DynQueueHandle};
//!
//! use rayon::iter::IntoParallelIterator as _;
//! use rayon::iter::ParallelIterator as _;
//!
//! let mut result = DynQueue::new(vec![1, 2, 3])
//! .into_par_iter()
//! .map(|(handle, value)| { if value == 2 { handle.enqueue(4) }; value })
//! .collect::<Vec<_>>();
//! use dynqueue::IntoDynQueue as _;
//!
//! let mut result = vec![1, 2, 3]
//! .into_dyn_queue()
//! .into_par_iter()
//! .map(|(handle, value)| {
//! if value == 2 {
//! handle.enqueue(4)
//! };
//! value
//! })
//! .collect::<Vec<_>>();
//! result.sort();
//!
//! assert_eq!(result, vec![1, 2, 3, 4]);
Expand All @@ -26,27 +32,45 @@
//! The `DynQueueHandle` shall not outlive the `DynQueue` iterator
//!
//! ```should_panic
//! use dynqueue::{DynQueue, DynQueueHandle};
//! use dynqueue::{DynQueue, DynQueueHandle, IntoDynQueue};
//!
//! use rayon::iter::IntoParallelIterator as _;
//! use rayon::iter::ParallelIterator as _;
//! use std::sync::RwLock;
//!
//! static mut STALE_HANDLE : Option<DynQueueHandle<u8, Vec<u8>>> = None;
//! static mut STALE_HANDLE: Option<DynQueueHandle<u8, RwLock<Vec<u8>>>> = None;
//!
//! pub fn test_func() -> Vec<u8> {
//! DynQueue::new(vec![1u8, 2u8, 3u8])
//! vec![1u8, 2u8, 3u8]
//! .into_dyn_queue()
//! .into_par_iter()
//! .map(|(handle, value)| unsafe { STALE_HANDLE.replace(handle); value })
//! .map(|(handle, value)| unsafe {
//! STALE_HANDLE.replace(handle);
//! value
//! })
//! .collect::<Vec<_>>()
//! }
//! // test_func() panics
//! let result = test_func();
//! unsafe { STALE_HANDLE.as_ref().unwrap().enqueue(4); }
//! unsafe {
//! STALE_HANDLE.as_ref().unwrap().enqueue(4);
//! }
//! ```

#![deny(clippy::all)]
#![deny(missing_docs)]

#[allow(unused)]
macro_rules! doc_comment {
($x:expr) => {
#[doc = $x]
#[doc(hidden)]
mod readme_tests {}
};
}

doc_comment!(include_str!("../README.md"));

use rayon::iter::plumbing::{
bridge_unindexed, Consumer, Folder, UnindexedConsumer, UnindexedProducer,
};
Expand All @@ -57,80 +81,123 @@ use std::sync::{Arc, RwLock};
#[cfg(test)]
mod tests;

/// Trait to produce a new DynQueue
pub trait IntoDynQueue<T, U: Queue<T>> {
/// new
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, U>;
}

/// Everything implementing `Queue` can be handled by DynQueue
#[allow(clippy::len_without_is_empty)]
pub trait Queue<T>
where
Self: Sized,
{
/// push an element in the queue
fn push(&mut self, v: T);
fn push(&self, v: T);

/// pop an element from the queue
fn pop(&mut self) -> Option<T>;
fn pop(&self) -> Option<T>;

/// number of elements in the queue
fn len(&self) -> usize;

/// split off `size` elements
fn split_off(&mut self, size: usize) -> Self;
fn split_off(&self, size: usize) -> Self;
}

impl<T> Queue<T> for Vec<T> {
impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for Vec<T> {
#[inline(always)]
fn push(&mut self, v: T) {
Vec::push(self, v)
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
}
}

impl<T> IntoDynQueue<T, RwLock<Vec<T>>> for RwLock<Vec<T>> {
#[inline(always)]
fn pop(&mut self) -> Option<T> {
Vec::pop(self)
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<Vec<T>>> {
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
}
}

impl<T> Queue<T> for RwLock<Vec<T>> {
#[inline(always)]
fn push(&self, v: T) {
self.write().unwrap().push(v)
}

#[inline(always)]
fn pop(&self) -> Option<T> {
self.write().unwrap().pop()
}

#[inline(always)]
fn len(&self) -> usize {
Vec::len(self)
self.read().unwrap().len()
}

#[inline(always)]
fn split_off(&mut self, size: usize) -> Self {
Vec::split_off(self, size)
fn split_off(&self, size: usize) -> Self {
RwLock::new(self.write().unwrap().split_off(size))
}
}

impl<T> Queue<T> for VecDeque<T> {
impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for VecDeque<T> {
#[inline(always)]
fn push(&mut self, v: T) {
VecDeque::push_back(self, v)
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
DynQueue(Arc::new(DynQueueInner(RwLock::new(self), PhantomData)))
}
}

impl<T> IntoDynQueue<T, RwLock<VecDeque<T>>> for RwLock<VecDeque<T>> {
#[inline(always)]
fn pop(&mut self) -> Option<T> {
VecDeque::pop_front(self)
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, RwLock<VecDeque<T>>> {
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
}
}

impl<T> Queue<T> for RwLock<VecDeque<T>> {
#[inline(always)]
fn push(&self, v: T) {
self.write().unwrap().push_back(v)
}

#[inline(always)]
fn pop(&self) -> Option<T> {
self.write().unwrap().pop_front()
}

#[inline(always)]
fn len(&self) -> usize {
VecDeque::len(self)
self.read().unwrap().len()
}

#[inline(always)]
fn split_off(&mut self, size: usize) -> Self {
VecDeque::split_off(self, size)
fn split_off(&self, size: usize) -> Self {
RwLock::new(self.write().unwrap().split_off(size))
}
}

#[cfg(feature = "crossbeam-queue")]
use crossbeam_queue::SegQueue;

#[cfg(feature = "crossbeam-queue")]
impl<T> IntoDynQueue<T, SegQueue<T>> for SegQueue<T> {
#[inline(always)]
fn into_dyn_queue<'a>(self) -> DynQueue<'a, T, Self> {
DynQueue(Arc::new(DynQueueInner(self, PhantomData)))
}
}

#[cfg(feature = "crossbeam-queue")]
impl<T> Queue<T> for SegQueue<T> {
#[inline(always)]
fn push(&mut self, v: T) {
fn push(&self, v: T) {
SegQueue::push(self, v);
}

#[inline(always)]
fn pop(&mut self) -> Option<T> {
fn pop(&self) -> Option<T> {
SegQueue::pop(self).ok()
}

Expand All @@ -140,18 +207,18 @@ impl<T> Queue<T> for SegQueue<T> {
}

#[inline(always)]
fn split_off(&mut self, size: usize) -> Self {
fn split_off(&self, size: usize) -> Self {
let q = SegQueue::new();
(0..size)
.filter_map(|_| self.pop())
.filter_map(|_| Queue::pop(self))
.for_each(|ele| q.push(ele));
q
}
}

// PhantomData should prevent `DynQueueInner` to outlive the original `DynQueue`
// but does not always.
struct DynQueueInner<'a, T, U: Queue<T>>(std::sync::RwLock<U>, PhantomData<&'a T>);
struct DynQueueInner<'a, T, U: Queue<T>>(U, PhantomData<&'a T>);

/// The `DynQueueHandle` returned by the iterator in addition to `T`
pub struct DynQueueHandle<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);
Expand All @@ -160,44 +227,26 @@ impl<'a, T, U: Queue<T>> DynQueueHandle<'a, T, U> {
/// Enqueue `T` in the `DynQueue<T>`, which is currently iterated.
#[inline]
pub fn enqueue(&self, job: T) {
(self.0).0.write().unwrap().push(job)
(self.0).0.push(job)
}
}

/// The `DynQueue<T>` which can be parallel iterated over
pub struct DynQueue<'a, T, U: Queue<T>>(Arc<DynQueueInner<'a, T, U>>);

impl<'a, T, U: Queue<T>> DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
{
/// Create a new `DynQueue<T>` from a `Vec<T>`
#[inline]
pub fn new(lifo: U) -> Self {
Self(Arc::new(DynQueueInner(RwLock::new(lifo), PhantomData)))
}
}

impl<'a, T, U> UnindexedProducer for DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
{
type Item = (DynQueueHandle<'a, T, U>, T);

fn split(self) -> (Self, Option<Self>) {
let len = {
let q = (self.0).0.read().unwrap();
q.len()
};
let len = (self.0).0.len();

if len >= 2 {
let new_q = {
let mut q = (self.0).0.write().unwrap();
let split_off = q.split_off(len / 2);
DynQueue::new(split_off)
};
(self, Some(new_q))
let new_q = (self.0).0.split_off(len / 2);
(self, Some(new_q.into_dyn_queue()))
} else {
(self, None)
}
Expand All @@ -209,10 +258,7 @@ where
{
let mut folder = folder;
loop {
let ret = {
let mut q = (self.0).0.write().unwrap();
q.pop()
};
let ret = (self.0).0.pop();

if let Some(v) = ret {
folder = folder.consume((DynQueueHandle(self.0.clone()), v));
Expand All @@ -233,7 +279,7 @@ where
impl<'a, T, U> rayon::iter::ParallelIterator for DynQueue<'a, T, U>
where
T: Send + Sync,
U: Queue<T> + Send + Sync,
U: IntoDynQueue<T, U> + Queue<T> + Send + Sync,
{
type Item = (DynQueueHandle<'a, T, U>, T);

Expand Down
Loading

0 comments on commit bf96d0b

Please sign in to comment.