From 55c14c90865cabd233158ceca16295233de052e9 Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Mon, 24 Aug 2020 22:44:41 +0200 Subject: [PATCH 01/13] Added rayon feature and rayon::iter::IntoParallelIterator impl for ArrayVec --- Cargo.toml | 5 ++ src/lib.rs | 174 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 179 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index 66267cd3..0b4603c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,11 @@ default-features = false [dev-dependencies.serde_test] version = "1.0" +[dependencies.rayon] +version = "1.0" +optional = true +default-features = false + [dev-dependencies] matches = { version = "0.1" } bencher = "0.1.4" diff --git a/src/lib.rs b/src/lib.rs index 0268334f..ce43ec8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,11 @@ use crate::maybe_uninit::MaybeUninit; #[cfg(feature="serde")] use serde::{Serialize, Deserialize, Serializer, Deserializer}; +#[cfg(feature="rayon")] +use rayon::iter::plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}; +#[cfg(feature="rayon")] +use rayon::iter::{IntoParallelIterator, ParallelIterator, IndexedParallelIterator}; + mod array; mod array_string; mod char; @@ -1196,3 +1201,172 @@ impl<'de, T: Deserialize<'de>, A: Array> Deserialize<'de> for ArrayVec(PhantomData)) } } + +// Adapted from `rayon/vec.rs` + +#[cfg(feature="rayon")] +/// Parallel iterator that moves out of an `ArrayVec`. +#[derive(Debug, Clone)] +pub struct IntoParIter + Send> { + vec: ArrayVec, +} + +#[cfg(feature="rayon")] +impl IntoParallelIterator for ArrayVec +where + T: Send, + A: Array + Send, + ::Index: Send +{ + type Item = T; + type Iter = IntoParIter; + + fn into_par_iter(self) -> Self::Iter { + IntoParIter { vec: self } + } +} + +#[cfg(feature="rayon")] +impl ParallelIterator for IntoParIter +where + T: Send, + A: Array + Send, + ::Index: Send +{ + type Item = T; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +#[cfg(feature="rayon")] +impl IndexedParallelIterator for IntoParIter +where + T: Send, + A: Array + Send, + ::Index: Send +{ + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.vec.len() + } + + fn with_producer(mut self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + // The producer will move or drop each item from its slice, effectively taking ownership of + // them. When we're done, the vector only needs to free its buffer. + unsafe { + // Make the `ArrayVec` forget about the actual items. + let len = self.vec.len(); + self.vec.set_len(0); + + // Get a correct borrow, then extend it to the original length. + let mut slice = self.vec.as_mut_slice(); + slice = std::slice::from_raw_parts_mut(slice.as_mut_ptr(), len); + + callback.callback(ArrayVecProducer { slice }) + } + } +} + +/// //////////////////////////////////////////////////////////////////////// + +#[cfg(feature="rayon")] +struct ArrayVecProducer<'data, T: Send> { + slice: &'data mut [T], +} + +#[cfg(feature="rayon")] +impl<'data, T: 'data + Send> Producer for ArrayVecProducer<'data, T> { + type Item = T; + type IntoIter = SliceDrain<'data, T>; + + fn into_iter(mut self) -> Self::IntoIter { + // replace the slice so we don't drop it twice + let slice = std::mem::replace(&mut self.slice, &mut []); + SliceDrain { + iter: slice.iter_mut(), + } + } + + fn split_at(mut self, index: usize) -> (Self, Self) { + // replace the slice so we don't drop it twice + let slice = std::mem::replace(&mut self.slice, &mut []); + let (left, right) = slice.split_at_mut(index); + (ArrayVecProducer { slice: left }, ArrayVecProducer { slice: right }) + } +} + +#[cfg(feature="rayon")] +impl<'data, T: 'data + Send> Drop for ArrayVecProducer<'data, T> { + fn drop(&mut self) { + SliceDrain { + iter: self.slice.iter_mut(), + }; + } +} + +/// //////////////////////////////////////////////////////////////////////// + +// like std::vec::Drain, without updating a source Vec +#[cfg(feature="rayon")] +struct SliceDrain<'data, T> { + iter: std::slice::IterMut<'data, T>, +} + +#[cfg(feature="rayon")] +impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { + type Item = T; + + fn next(&mut self) -> Option { + let ptr = self.iter.next()?; + Some(unsafe { std::ptr::read(ptr) }) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +#[cfg(feature="rayon")] +impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { + fn next_back(&mut self) -> Option { + let ptr = self.iter.next_back()?; + Some(unsafe { std::ptr::read(ptr) }) + } +} + +#[cfg(feature="rayon")] +impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> { + fn len(&self) -> usize { + self.iter.len() + } +} + +#[cfg(feature="rayon")] +impl<'data, T: 'data> Drop for SliceDrain<'data, T> { + fn drop(&mut self) { + for ptr in &mut self.iter { + unsafe { + std::ptr::drop_in_place(ptr); + } + } + } +} From cae4c5386fc9b60bed0ab8cba1edd17aa0af2c5c Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Mon, 24 Aug 2020 22:46:58 +0200 Subject: [PATCH 02/13] Added tests for rayon feature --- tests/rayon.rs | 120 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) create mode 100644 tests/rayon.rs diff --git a/tests/rayon.rs b/tests/rayon.rs new file mode 100644 index 00000000..eb201968 --- /dev/null +++ b/tests/rayon.rs @@ -0,0 +1,120 @@ +#![cfg(feature = "rayon")] + +use arrayvec::ArrayVec; + +// Adapted from `rayon/tests/producer_split_at.rs` + +use rayon::iter::plumbing::*; +use rayon::prelude::*; + +fn check(expected: &[I::Item], mut f: F) +where + F: FnMut() -> I, + I: IntoParallelIterator, + I::Iter: IndexedParallelIterator, + I::Item: PartialEq + std::fmt::Debug, +{ + map_triples(expected.len() + 1, |i, j, k| { + Split::forward(f(), i, j, k, expected); + Split::reverse(f(), i, j, k, expected); + }); +} + +fn map_triples(end: usize, mut f: F) +where + F: FnMut(usize, usize, usize), +{ + for i in 0..end { + for j in i..end { + for k in j..end { + f(i, j, k); + } + } + } +} + +#[derive(Debug)] +struct Split { + i: usize, + j: usize, + k: usize, + reverse: bool, +} + +impl Split { + fn forward(iter: I, i: usize, j: usize, k: usize, expected: &[I::Item]) + where + I: IntoParallelIterator, + I::Iter: IndexedParallelIterator, + I::Item: PartialEq + std::fmt::Debug, + { + let result = iter.into_par_iter().with_producer(Split { + i, + j, + k, + reverse: false, + }); + assert_eq!(result, expected); + } + + fn reverse(iter: I, i: usize, j: usize, k: usize, expected: &[I::Item]) + where + I: IntoParallelIterator, + I::Iter: IndexedParallelIterator, + I::Item: PartialEq + std::fmt::Debug, + { + let result = iter.into_par_iter().with_producer(Split { + i, + j, + k, + reverse: true, + }); + assert!(result.iter().eq(expected.iter().rev())); + } +} + +impl ProducerCallback for Split { + type Output = Vec; + + fn callback

(self, producer: P) -> Self::Output + where + P: Producer, + { + println!("{:?}", self); + + // Splitting the outer indexes first gets us an arbitrary mid section, + // which we then split further to get full test coverage. + let (left, d) = producer.split_at(self.k); + let (a, mid) = left.split_at(self.i); + let (b, c) = mid.split_at(self.j - self.i); + + let a = a.into_iter(); + let b = b.into_iter(); + let c = c.into_iter(); + let d = d.into_iter(); + + check_len(&a, self.i); + check_len(&b, self.j - self.i); + check_len(&c, self.k - self.j); + + let chain = a.chain(b).chain(c).chain(d); + if self.reverse { + chain.rev().collect() + } else { + chain.collect() + } + } +} + +fn check_len(iter: &I, len: usize) { + assert_eq!(iter.size_hint(), (len, Some(len))); + assert_eq!(iter.len(), len); +} + +// Actual test + +#[test] +fn rayon_arrayvec_producer_split_at() { + let v: ArrayVec<[u8; 10]> = (0..10).collect(); + check(&v, || v.clone()); +} \ No newline at end of file From a8f332729452f326d16f5321e2fbcbab1f857e66 Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Mon, 24 Aug 2020 22:52:52 +0200 Subject: [PATCH 03/13] Add missing trailing newline in tests/rayon.rs --- tests/rayon.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rayon.rs b/tests/rayon.rs index eb201968..1c9967cd 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -117,4 +117,4 @@ fn check_len(iter: &I, len: usize) { fn rayon_arrayvec_producer_split_at() { let v: ArrayVec<[u8; 10]> = (0..10).collect(); check(&v, || v.clone()); -} \ No newline at end of file +} From 6216f04e8370655e7e81cb421648301918d7247d Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Mon, 24 Aug 2020 22:55:03 +0200 Subject: [PATCH 04/13] Fix wrong path of rayon file in comment --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index ce43ec8d..ec74605b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1202,7 +1202,7 @@ impl<'de, T: Deserialize<'de>, A: Array> Deserialize<'de> for ArrayVec Date: Mon, 24 Aug 2020 23:00:34 +0200 Subject: [PATCH 05/13] Updated .travis.yml to run rayon tests --- .travis.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 42816fef..d514c90e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,7 @@ language: rust sudo: false env: - - FEATURES='serde' + - FEATURES='serde rayon' matrix: include: - rust: 1.36.0 @@ -11,6 +11,9 @@ matrix: - rust: stable env: - FEATURES='serde' + - rust: stable + env: + - FEATURES='rayon' - rust: stable env: - FEATURES='array-sizes-33-128 array-sizes-129-255' @@ -19,6 +22,9 @@ matrix: - rust: nightly env: - FEATURES='serde' + - rust: nightly + env: + - FEATURES='rayon' - rust: nightly env: - FEATURES='array-sizes-33-128 array-sizes-129-255' From c5b1416019b97dc0489a7cbda55dfa4062c8ab10 Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 13:23:08 +0200 Subject: [PATCH 06/13] Added FromParallelIterator and ParallelExtend impls for ArrayVec --- src/lib.rs | 54 +++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ec74605b..5b699595 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -50,9 +50,11 @@ use crate::maybe_uninit::MaybeUninit; use serde::{Serialize, Deserialize, Serializer, Deserializer}; #[cfg(feature="rayon")] -use rayon::iter::plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}; -#[cfg(feature="rayon")] -use rayon::iter::{IntoParallelIterator, ParallelIterator, IndexedParallelIterator}; +use rayon::iter::{ + plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}, + FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelExtend, + ParallelIterator, +}; mod array; mod array_string; @@ -1370,3 +1372,49 @@ impl<'data, T: 'data> Drop for SliceDrain<'data, T> { } } } + +#[cfg(feature = "rayon")] +impl FromParallelIterator for ArrayVec +where + T: Send, + A: Array + Send, + ::Index: Send, +{ + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + let mut arrayvec = Self::new(); + arrayvec.par_extend(par_iter); + arrayvec + } +} + +#[cfg(feature = "rayon")] +impl ParallelExtend for ArrayVec +where + T: Send, + A: Array + Send, + ::Index: Send, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + self.extend( + par_iter + .into_par_iter() + .fold( + || Self::new(), + |mut arrayvec, element| { + let _ = arrayvec.try_push(element); + arrayvec + }, + ) + .reduce(Self::new, |mut arrayvec1, arrayvec2| { + arrayvec1.extend(arrayvec2); + arrayvec1 + }), + ) + } +} From 07655e80caf2422621e443208fc0bdc576a8237b Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 18:40:44 +0200 Subject: [PATCH 07/13] Added basic tests for FromParallelIterator and ParallelExtend for ArrayVec --- tests/rayon.rs | 40 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 39 insertions(+), 1 deletion(-) diff --git a/tests/rayon.rs b/tests/rayon.rs index 1c9967cd..ddd8dbba 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -111,10 +111,48 @@ fn check_len(iter: &I, len: usize) { assert_eq!(iter.len(), len); } -// Actual test +// Actual tests #[test] fn rayon_arrayvec_producer_split_at() { let v: ArrayVec<[u8; 10]> = (0..10).collect(); check(&v, || v.clone()); } + +#[test] +fn rayon_arrayvec_collect() { + // Iterator length == capacity + let v: ArrayVec<[u8; 10]> = (0..10u8).into_par_iter().collect(); + assert_eq!(v.len(), 10); + + // Iterator length > capacity + let v: ArrayVec<[u8; 10]> = (0..20u8).into_par_iter().collect(); + assert_eq!(v.len(), 10); + + // Iterator length < capacity + let v: ArrayVec<[u8; 10]> = (0..5u8).into_par_iter().collect(); + assert_eq!(v.len(), 5); +} + +#[test] +fn rayon_arrayvec_extend() { + let mut v = ArrayVec::<[u8; 20]>::new(); + + // Iterator length == remaining capacity + v.extend(0..10); + v.par_extend(0..10u8); + assert_eq!(v.len(), 20); + v.clear(); + + // Iterator length > remaining capacity + v.extend(0..10); + v.par_extend(0..30u8); + assert_eq!(v.len(), 20); + v.clear(); + + // Iterator length < remaining capacity + v.extend(0..10); + v.par_extend(0..5u8); + assert_eq!(v.len(), 15); + v.clear(); +} From 9465199cca2dcae9509e0a0906091c747ae32279 Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 18:47:16 +0200 Subject: [PATCH 08/13] Simplified generics bounds for rayon traits impls --- src/lib.rs | 56 +++++++++++++++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5b699595..95237751 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1209,19 +1209,19 @@ impl<'de, T: Deserialize<'de>, A: Array> Deserialize<'de> for ArrayVec + Send> { +pub struct IntoParIter> { vec: ArrayVec, } #[cfg(feature="rayon")] -impl IntoParallelIterator for ArrayVec -where - T: Send, - A: Array + Send, - ::Index: Send +impl IntoParallelIterator for ArrayVec +where + A: Array + Send, + A::Item: Send, + A::Index: Send, { - type Item = T; - type Iter = IntoParIter; + type Item = A::Item; + type Iter = IntoParIter; fn into_par_iter(self) -> Self::Iter { IntoParIter { vec: self } @@ -1229,13 +1229,13 @@ where } #[cfg(feature="rayon")] -impl ParallelIterator for IntoParIter +impl ParallelIterator for IntoParIter where - T: Send, - A: Array + Send, - ::Index: Send + A: Array + Send, + A::Item: Send, + A::Index: Send, { - type Item = T; + type Item = A::Item; fn drive_unindexed(self, consumer: C) -> C::Result where @@ -1250,11 +1250,11 @@ where } #[cfg(feature="rayon")] -impl IndexedParallelIterator for IntoParIter -where - T: Send, - A: Array + Send, - ::Index: Send +impl IndexedParallelIterator for IntoParIter +where + A: Array + Send, + A::Item: Send, + A::Index: Send, { fn drive(self, consumer: C) -> C::Result where @@ -1374,15 +1374,15 @@ impl<'data, T: 'data> Drop for SliceDrain<'data, T> { } #[cfg(feature = "rayon")] -impl FromParallelIterator for ArrayVec +impl FromParallelIterator for ArrayVec where - T: Send, - A: Array + Send, - ::Index: Send, + A: Array + Send, + A::Item: Send, + A::Index: Send, { fn from_par_iter(par_iter: I) -> Self where - I: IntoParallelIterator, + I: IntoParallelIterator, { let mut arrayvec = Self::new(); arrayvec.par_extend(par_iter); @@ -1391,15 +1391,15 @@ where } #[cfg(feature = "rayon")] -impl ParallelExtend for ArrayVec +impl ParallelExtend for ArrayVec where - T: Send, - A: Array + Send, - ::Index: Send, + A: Array + Send, + A::Item: Send, + A::Index: Send, { fn par_extend(&mut self, par_iter: I) where - I: IntoParallelIterator, + I: IntoParallelIterator, { self.extend( par_iter From 7a06b470cd6de85e28e8869c8df8da7fa29fc46d Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 18:51:03 +0200 Subject: [PATCH 09/13] Formatted rayon impls --- src/lib.rs | 31 +++++++++++++++++-------------- 1 file changed, 17 insertions(+), 14 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 95237751..8d139148 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1206,14 +1206,14 @@ impl<'de, T: Deserialize<'de>, A: Array> Deserialize<'de> for ArrayVec> { vec: ArrayVec, } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl IntoParallelIterator for ArrayVec where A: Array + Send, @@ -1228,9 +1228,9 @@ where } } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl ParallelIterator for IntoParIter -where +where A: Array + Send, A::Item: Send, A::Index: Send, @@ -1249,7 +1249,7 @@ where } } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl IndexedParallelIterator for IntoParIter where A: Array + Send, @@ -1289,12 +1289,12 @@ where /// //////////////////////////////////////////////////////////////////////// -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] struct ArrayVecProducer<'data, T: Send> { slice: &'data mut [T], } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl<'data, T: 'data + Send> Producer for ArrayVecProducer<'data, T> { type Item = T; type IntoIter = SliceDrain<'data, T>; @@ -1311,11 +1311,14 @@ impl<'data, T: 'data + Send> Producer for ArrayVecProducer<'data, T> { // replace the slice so we don't drop it twice let slice = std::mem::replace(&mut self.slice, &mut []); let (left, right) = slice.split_at_mut(index); - (ArrayVecProducer { slice: left }, ArrayVecProducer { slice: right }) + ( + ArrayVecProducer { slice: left }, + ArrayVecProducer { slice: right }, + ) } } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl<'data, T: 'data + Send> Drop for ArrayVecProducer<'data, T> { fn drop(&mut self) { SliceDrain { @@ -1327,12 +1330,12 @@ impl<'data, T: 'data + Send> Drop for ArrayVecProducer<'data, T> { /// //////////////////////////////////////////////////////////////////////// // like std::vec::Drain, without updating a source Vec -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] struct SliceDrain<'data, T> { iter: std::slice::IterMut<'data, T>, } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { type Item = T; @@ -1347,7 +1350,7 @@ impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { } } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { fn next_back(&mut self) -> Option { let ptr = self.iter.next_back()?; @@ -1355,14 +1358,14 @@ impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { } } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> { fn len(&self) -> usize { self.iter.len() } } -#[cfg(feature="rayon")] +#[cfg(feature = "rayon")] impl<'data, T: 'data> Drop for SliceDrain<'data, T> { fn drop(&mut self) { for ptr in &mut self.iter { From d04154fac4b748b2c60f289ff606cc0135a7d3b9 Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 19:11:05 +0200 Subject: [PATCH 10/13] Moved rayon impls in a separated module --- src/lib.rs | 229 +-------------------------------------------- src/rayon_impls.rs | 211 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+), 225 deletions(-) create mode 100644 src/rayon_impls.rs diff --git a/src/lib.rs b/src/lib.rs index 8d139148..bca38f21 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,13 +49,6 @@ use crate::maybe_uninit::MaybeUninit; #[cfg(feature="serde")] use serde::{Serialize, Deserialize, Serializer, Deserializer}; -#[cfg(feature="rayon")] -use rayon::iter::{ - plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}, - FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelExtend, - ParallelIterator, -}; - mod array; mod array_string; mod char; @@ -66,6 +59,10 @@ use crate::array::Index; pub use crate::array_string::ArrayString; pub use crate::errors::CapacityError; +#[cfg(feature="rayon")] +mod rayon_impls; +#[cfg(feature="rayon")] +pub use rayon_impls::IntoParIter; /// A vector with a fixed capacity. /// @@ -1203,221 +1200,3 @@ impl<'de, T: Deserialize<'de>, A: Array> Deserialize<'de> for ArrayVec(PhantomData)) } } - -// Adapted from `rayon/srs/vec.rs` - -#[cfg(feature = "rayon")] -/// Parallel iterator that moves out of an `ArrayVec`. -#[derive(Debug, Clone)] -pub struct IntoParIter> { - vec: ArrayVec, -} - -#[cfg(feature = "rayon")] -impl IntoParallelIterator for ArrayVec -where - A: Array + Send, - A::Item: Send, - A::Index: Send, -{ - type Item = A::Item; - type Iter = IntoParIter; - - fn into_par_iter(self) -> Self::Iter { - IntoParIter { vec: self } - } -} - -#[cfg(feature = "rayon")] -impl ParallelIterator for IntoParIter -where - A: Array + Send, - A::Item: Send, - A::Index: Send, -{ - type Item = A::Item; - - fn drive_unindexed(self, consumer: C) -> C::Result - where - C: UnindexedConsumer, - { - bridge(self, consumer) - } - - fn opt_len(&self) -> Option { - Some(self.len()) - } -} - -#[cfg(feature = "rayon")] -impl IndexedParallelIterator for IntoParIter -where - A: Array + Send, - A::Item: Send, - A::Index: Send, -{ - fn drive(self, consumer: C) -> C::Result - where - C: Consumer, - { - bridge(self, consumer) - } - - fn len(&self) -> usize { - self.vec.len() - } - - fn with_producer(mut self, callback: CB) -> CB::Output - where - CB: ProducerCallback, - { - // The producer will move or drop each item from its slice, effectively taking ownership of - // them. When we're done, the vector only needs to free its buffer. - unsafe { - // Make the `ArrayVec` forget about the actual items. - let len = self.vec.len(); - self.vec.set_len(0); - - // Get a correct borrow, then extend it to the original length. - let mut slice = self.vec.as_mut_slice(); - slice = std::slice::from_raw_parts_mut(slice.as_mut_ptr(), len); - - callback.callback(ArrayVecProducer { slice }) - } - } -} - -/// //////////////////////////////////////////////////////////////////////// - -#[cfg(feature = "rayon")] -struct ArrayVecProducer<'data, T: Send> { - slice: &'data mut [T], -} - -#[cfg(feature = "rayon")] -impl<'data, T: 'data + Send> Producer for ArrayVecProducer<'data, T> { - type Item = T; - type IntoIter = SliceDrain<'data, T>; - - fn into_iter(mut self) -> Self::IntoIter { - // replace the slice so we don't drop it twice - let slice = std::mem::replace(&mut self.slice, &mut []); - SliceDrain { - iter: slice.iter_mut(), - } - } - - fn split_at(mut self, index: usize) -> (Self, Self) { - // replace the slice so we don't drop it twice - let slice = std::mem::replace(&mut self.slice, &mut []); - let (left, right) = slice.split_at_mut(index); - ( - ArrayVecProducer { slice: left }, - ArrayVecProducer { slice: right }, - ) - } -} - -#[cfg(feature = "rayon")] -impl<'data, T: 'data + Send> Drop for ArrayVecProducer<'data, T> { - fn drop(&mut self) { - SliceDrain { - iter: self.slice.iter_mut(), - }; - } -} - -/// //////////////////////////////////////////////////////////////////////// - -// like std::vec::Drain, without updating a source Vec -#[cfg(feature = "rayon")] -struct SliceDrain<'data, T> { - iter: std::slice::IterMut<'data, T>, -} - -#[cfg(feature = "rayon")] -impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { - type Item = T; - - fn next(&mut self) -> Option { - let ptr = self.iter.next()?; - Some(unsafe { std::ptr::read(ptr) }) - } - - fn size_hint(&self) -> (usize, Option) { - let len = self.len(); - (len, Some(len)) - } -} - -#[cfg(feature = "rayon")] -impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { - fn next_back(&mut self) -> Option { - let ptr = self.iter.next_back()?; - Some(unsafe { std::ptr::read(ptr) }) - } -} - -#[cfg(feature = "rayon")] -impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> { - fn len(&self) -> usize { - self.iter.len() - } -} - -#[cfg(feature = "rayon")] -impl<'data, T: 'data> Drop for SliceDrain<'data, T> { - fn drop(&mut self) { - for ptr in &mut self.iter { - unsafe { - std::ptr::drop_in_place(ptr); - } - } - } -} - -#[cfg(feature = "rayon")] -impl FromParallelIterator for ArrayVec -where - A: Array + Send, - A::Item: Send, - A::Index: Send, -{ - fn from_par_iter(par_iter: I) -> Self - where - I: IntoParallelIterator, - { - let mut arrayvec = Self::new(); - arrayvec.par_extend(par_iter); - arrayvec - } -} - -#[cfg(feature = "rayon")] -impl ParallelExtend for ArrayVec -where - A: Array + Send, - A::Item: Send, - A::Index: Send, -{ - fn par_extend(&mut self, par_iter: I) - where - I: IntoParallelIterator, - { - self.extend( - par_iter - .into_par_iter() - .fold( - || Self::new(), - |mut arrayvec, element| { - let _ = arrayvec.try_push(element); - arrayvec - }, - ) - .reduce(Self::new, |mut arrayvec1, arrayvec2| { - arrayvec1.extend(arrayvec2); - arrayvec1 - }), - ) - } -} diff --git a/src/rayon_impls.rs b/src/rayon_impls.rs new file mode 100644 index 00000000..3f733cbd --- /dev/null +++ b/src/rayon_impls.rs @@ -0,0 +1,211 @@ +#![cfg(feature = "rayon")] + +use crate::{ArrayVec, Array}; +use rayon::iter::{ + plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}, + FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelExtend, + ParallelIterator, +}; + +// Adapted from `rayon/srs/vec.rs` + +/// Parallel iterator that moves out of an `ArrayVec`. +#[derive(Debug, Clone)] +pub struct IntoParIter> { + vec: ArrayVec, +} + +impl IntoParallelIterator for ArrayVec +where + A: Array + Send, + A::Item: Send, + A::Index: Send, +{ + type Item = A::Item; + type Iter = IntoParIter; + + fn into_par_iter(self) -> Self::Iter { + IntoParIter { vec: self } + } +} + +impl ParallelIterator for IntoParIter +where + A: Array + Send, + A::Item: Send, + A::Index: Send, +{ + type Item = A::Item; + + fn drive_unindexed(self, consumer: C) -> C::Result + where + C: UnindexedConsumer, + { + bridge(self, consumer) + } + + fn opt_len(&self) -> Option { + Some(self.len()) + } +} + +impl IndexedParallelIterator for IntoParIter +where + A: Array + Send, + A::Item: Send, + A::Index: Send, +{ + fn drive(self, consumer: C) -> C::Result + where + C: Consumer, + { + bridge(self, consumer) + } + + fn len(&self) -> usize { + self.vec.len() + } + + fn with_producer(mut self, callback: CB) -> CB::Output + where + CB: ProducerCallback, + { + // The producer will move or drop each item from its slice, effectively taking ownership of + // them. When we're done, the vector only needs to free its buffer. + unsafe { + // Make the `ArrayVec` forget about the actual items. + let len = self.vec.len(); + self.vec.set_len(0); + + // Get a correct borrow, then extend it to the original length. + let mut slice = self.vec.as_mut_slice(); + slice = std::slice::from_raw_parts_mut(slice.as_mut_ptr(), len); + + callback.callback(ArrayVecProducer { slice }) + } + } +} + +struct ArrayVecProducer<'data, T: Send> { + slice: &'data mut [T], +} + +impl<'data, T: 'data + Send> Producer for ArrayVecProducer<'data, T> { + type Item = T; + type IntoIter = SliceDrain<'data, T>; + + fn into_iter(mut self) -> Self::IntoIter { + // replace the slice so we don't drop it twice + let slice = std::mem::replace(&mut self.slice, &mut []); + SliceDrain { + iter: slice.iter_mut(), + } + } + + fn split_at(mut self, index: usize) -> (Self, Self) { + // replace the slice so we don't drop it twice + let slice = std::mem::replace(&mut self.slice, &mut []); + let (left, right) = slice.split_at_mut(index); + ( + ArrayVecProducer { slice: left }, + ArrayVecProducer { slice: right }, + ) + } +} + +impl<'data, T: 'data + Send> Drop for ArrayVecProducer<'data, T> { + fn drop(&mut self) { + SliceDrain { + iter: self.slice.iter_mut(), + }; + } +} + +/// //////////////////////////////////////////////////////////////////////// + +// like std::vec::Drain, without updating a source Vec +struct SliceDrain<'data, T> { + iter: std::slice::IterMut<'data, T>, +} + +impl<'data, T: 'data> Iterator for SliceDrain<'data, T> { + type Item = T; + + fn next(&mut self) -> Option { + let ptr = self.iter.next()?; + Some(unsafe { std::ptr::read(ptr) }) + } + + fn size_hint(&self) -> (usize, Option) { + let len = self.len(); + (len, Some(len)) + } +} + +impl<'data, T: 'data> DoubleEndedIterator for SliceDrain<'data, T> { + fn next_back(&mut self) -> Option { + let ptr = self.iter.next_back()?; + Some(unsafe { std::ptr::read(ptr) }) + } +} + +impl<'data, T: 'data> ExactSizeIterator for SliceDrain<'data, T> { + fn len(&self) -> usize { + self.iter.len() + } +} + +impl<'data, T: 'data> Drop for SliceDrain<'data, T> { + fn drop(&mut self) { + for ptr in &mut self.iter { + unsafe { + std::ptr::drop_in_place(ptr); + } + } + } +} + +impl FromParallelIterator for ArrayVec +where + A: Array + Send, + A::Item: Send, + A::Index: Send, +{ + fn from_par_iter(par_iter: I) -> Self + where + I: IntoParallelIterator, + { + let mut arrayvec = Self::new(); + arrayvec.par_extend(par_iter); + arrayvec + } +} + +impl ParallelExtend for ArrayVec +where + A: Array + Send, + A::Item: Send, + A::Index: Send, +{ + fn par_extend(&mut self, par_iter: I) + where + I: IntoParallelIterator, + { + self.extend( + par_iter + .into_par_iter() + .fold( + || Self::new(), + |mut arrayvec, element| { + let _ = arrayvec.try_push(element); + arrayvec + }, + ) + .reduce(Self::new, |mut arrayvec1, arrayvec2| { + // TODO: use `ArrayVec::append/try_append` when it becomes available + arrayvec1.extend(arrayvec2); + arrayvec1 + }), + ) + } +} From dadef1804a96a52e83956417290fc964e86ffb84 Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 21:38:21 +0200 Subject: [PATCH 11/13] Optimized ParallelExtend impl for ArrayVec --- src/rayon_impls.rs | 290 ++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 271 insertions(+), 19 deletions(-) diff --git a/src/rayon_impls.rs b/src/rayon_impls.rs index 3f733cbd..d1cab1e2 100644 --- a/src/rayon_impls.rs +++ b/src/rayon_impls.rs @@ -1,11 +1,12 @@ #![cfg(feature = "rayon")] -use crate::{ArrayVec, Array}; +use crate::{Array, ArrayVec}; use rayon::iter::{ - plumbing::{bridge, Consumer, Producer, ProducerCallback, UnindexedConsumer}, - FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelExtend, - ParallelIterator, + plumbing::*, FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, + ParallelExtend, ParallelIterator, }; +use std::marker::PhantomData; +use std::{ptr, slice}; // Adapted from `rayon/srs/vec.rs` @@ -165,6 +166,8 @@ impl<'data, T: 'data> Drop for SliceDrain<'data, T> { } } +// Adapted from `rayon/srs/iter/collect/mod.rs` and ``rayon/srs/iter/collect/consumer.rs`` + impl FromParallelIterator for ArrayVec where A: Array + Send, @@ -191,21 +194,270 @@ where where I: IntoParallelIterator, { - self.extend( - par_iter - .into_par_iter() - .fold( - || Self::new(), - |mut arrayvec, element| { - let _ = arrayvec.try_push(element); - arrayvec - }, - ) - .reduce(Self::new, |mut arrayvec1, arrayvec2| { - // TODO: use `ArrayVec::append/try_append` when it becomes available - arrayvec1.extend(arrayvec2); - arrayvec1 - }), + let par_iter = par_iter.into_par_iter(); + + if let Some(len) = par_iter.opt_len() { + // When Rust gets specialization, we can get here for indexed iterators + // without relying on `opt_len`. Until then, `special_extend()` fakes + // an unindexed mode on the promise that `opt_len()` is accurate. + Collect::new(self, len).with_consumer(|consumer| par_iter.drive_unindexed(consumer)); + } else { + self.extend( + par_iter + .into_par_iter() + .fold( + || Self::new(), + |mut arrayvec, element| { + let _ = arrayvec.try_push(element); + arrayvec + }, + ) + .reduce(Self::new, |mut arrayvec1, arrayvec2| { + // TODO: use `ArrayVec::append/try_append` when it becomes available + arrayvec1.extend(arrayvec2); + arrayvec1 + }), + ) + } + } +} + +/// Manage the collection vector. +struct Collect<'c, A: Array> { + vec: &'c mut ArrayVec, + len: usize, +} + +impl<'c, A> Collect<'c, A> +where + A: Array + Send, + A::Item: Send, + A::Index: Send, +{ + fn new(vec: &'c mut ArrayVec, len: usize) -> Self { + Collect { vec, len } + } + + /// Create a consumer on the slice of memory we are collecting into. + /// + /// The consumer needs to be used inside the scope function, and the + /// complete collect result passed back. + /// + /// This method will verify the collect result, and panic if the slice + /// was not fully written into. Otherwise, in the successful case, + /// the vector is complete with the collected result. + fn with_consumer(mut self, scope_fn: F) + where + F: FnOnce(CollectConsumer<'_, A::Item>) -> CollectResult<'_, A::Item>, + { + unsafe { + let slice = Self::reserve_get_tail_slice(&mut self.vec, self.len); + let expected_writes = slice.len(); + let result = scope_fn(CollectConsumer::new(slice)); + + // The CollectResult represents a contiguous part of the + // slice, that has been written to. + // On unwind here, the CollectResult will be dropped. + // If some producers on the way did not produce enough elements, + // partial CollectResults may have been dropped without + // being reduced to the final result, and we will see + // that as the length coming up short. + // + // Here, we assert that `slice` is fully initialized. This is + // checked by the following assert, which verifies if a + // complete CollectResult was produced; if the length is + // correct, it is necessarily covering the target slice. + // Since we know that the consumer cannot have escaped from + // `drive` (by parametricity, essentially), we know that any + // stores that will happen, have happened. Unless some code is buggy, + // that means we should have seen `len` total writes. + let actual_writes = result.len(); + assert!( + actual_writes == expected_writes, + "expected {} total writes, but got {}", + expected_writes, + actual_writes + ); + + // Release the result's mutable borrow and "proxy ownership" + // of the elements, before the vector takes it over. + result.release_ownership(); + + let new_len = self.vec.len() + expected_writes; + self.vec.set_len(new_len); + } + } + + /// Reserve space for `len` more elements in the vector, + /// and return a slice to the uninitialized tail of the vector + /// + /// Safety: The tail slice is uninitialized + unsafe fn reserve_get_tail_slice(vec: &mut ArrayVec, len: usize) -> &mut [A::Item] { + // Cap the slice length + let actual_len = std::cmp::min(A::CAPACITY - vec.len(), len); + // Get a correct borrow, then extend it for the newly added length. + let start = vec.len(); + let slice = &mut vec[start..]; + slice::from_raw_parts_mut(slice.as_mut_ptr(), actual_len) + } +} + +pub(super) struct CollectConsumer<'c, T: Send> { + /// A slice covering the target memory, not yet initialized! + target: &'c mut [T], +} + +pub(super) struct CollectFolder<'c, T: Send> { + /// The folder writes into `result` and must extend the result + /// up to exactly this number of elements. + final_len: usize, + + /// The current written-to part of our slice of the target + result: CollectResult<'c, T>, +} + +impl<'c, T: Send + 'c> CollectConsumer<'c, T> { + /// The target memory is considered uninitialized, and will be + /// overwritten without reading or dropping existing values. + pub(super) fn new(target: &'c mut [T]) -> Self { + CollectConsumer { target } + } +} + +/// CollectResult represents an initialized part of the target slice. +/// +/// This is a proxy owner of the elements in the slice; when it drops, +/// the elements will be dropped, unless its ownership is released before then. +#[must_use] +pub(super) struct CollectResult<'c, T> { + start: *mut T, + len: usize, + invariant_lifetime: PhantomData<&'c mut &'c mut [T]>, +} + +unsafe impl<'c, T> Send for CollectResult<'c, T> where T: Send {} + +impl<'c, T> CollectResult<'c, T> { + /// The current length of the collect result + pub(super) fn len(&self) -> usize { + self.len + } + + /// Release ownership of the slice of elements, and return the length + pub(super) fn release_ownership(mut self) -> usize { + let ret = self.len; + self.len = 0; + ret + } +} + +impl<'c, T> Drop for CollectResult<'c, T> { + fn drop(&mut self) { + // Drop the first `self.len` elements, which have been recorded + // to be initialized by the folder. + unsafe { + ptr::drop_in_place(slice::from_raw_parts_mut(self.start, self.len)); + } + } +} + +impl<'c, T: Send + 'c> Consumer for CollectConsumer<'c, T> { + type Folder = CollectFolder<'c, T>; + type Reducer = CollectReducer; + type Result = CollectResult<'c, T>; + + fn split_at(self, index: usize) -> (Self, Self, CollectReducer) { + let CollectConsumer { target } = self; + + // Produce new consumers. Normal slicing ensures that the + // memory range given to each consumer is disjoint. + + let (left, right) = if index < target.len() { + target.split_at_mut(index) + } else { + (target, &mut [][..]) + }; + ( + CollectConsumer::new(left), + CollectConsumer::new(right), + CollectReducer, ) } + + fn into_folder(self) -> CollectFolder<'c, T> { + // Create a folder that consumes values and writes them + // into target. The initial result has length 0. + CollectFolder { + final_len: self.target.len(), + result: CollectResult { + start: self.target.as_mut_ptr(), + len: 0, + invariant_lifetime: PhantomData, + }, + } + } + + fn full(&self) -> bool { + self.target.len() == 0 + } +} + +impl<'c, T: Send + 'c> Folder for CollectFolder<'c, T> { + type Result = CollectResult<'c, T>; + + fn consume(mut self, item: T) -> CollectFolder<'c, T> { + if self.result.len >= self.final_len { + panic!("too many values pushed to consumer"); + } + + // Compute target pointer and write to it, and + // extend the current result by one element + unsafe { + self.result.start.add(self.result.len).write(item); + self.result.len += 1; + } + + self + } + + fn complete(self) -> Self::Result { + // NB: We don't explicitly check that the local writes were complete, + // but Collect will assert the total result length in the end. + self.result + } + + fn full(&self) -> bool { + self.result.len == self.final_len + } +} + +/// Pretend to be unindexed for `special_collect_into_vec`, +/// but we should never actually get used that way... +impl<'c, T: Send + 'c> UnindexedConsumer for CollectConsumer<'c, T> { + fn split_off_left(&self) -> Self { + unreachable!("CollectConsumer must be indexed!") + } + fn to_reducer(&self) -> Self::Reducer { + CollectReducer + } +} + +/// CollectReducer combines adjacent chunks; the result must always +/// be contiguous so that it is one combined slice. +pub(super) struct CollectReducer; + +impl<'c, T> Reducer> for CollectReducer { + fn reduce( + self, + mut left: CollectResult<'c, T>, + right: CollectResult<'c, T>, + ) -> CollectResult<'c, T> { + // Merge if the CollectResults are adjacent and in left to right order + // else: drop the right piece now and total length will end up short in the end, + // when the correctness of the collected result is asserted. + if left.start.wrapping_add(left.len) == right.start { + left.len += right.release_ownership(); + } + left + } } From 1ea25dc68d4254758f32d37c618c52fc5905f39c Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 21:39:52 +0200 Subject: [PATCH 12/13] Duplicated rayon tests for indexed and unindexed ParallelIterator --- tests/rayon.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/rayon.rs b/tests/rayon.rs index ddd8dbba..230e7048 100644 --- a/tests/rayon.rs +++ b/tests/rayon.rs @@ -138,6 +138,8 @@ fn rayon_arrayvec_collect() { fn rayon_arrayvec_extend() { let mut v = ArrayVec::<[u8; 20]>::new(); + // Test IndexedParallelIterator + // Iterator length == remaining capacity v.extend(0..10); v.par_extend(0..10u8); @@ -155,4 +157,24 @@ fn rayon_arrayvec_extend() { v.par_extend(0..5u8); assert_eq!(v.len(), 15); v.clear(); + + // Test ParallelIterator (Unindexed) + + // Iterator length == remaining capacity + v.extend(0..10); + v.par_extend((0..10u8).into_par_iter().filter(|_| true)); + assert_eq!(v.len(), 20); + v.clear(); + + // Iterator length > remaining capacity + v.extend(0..10); + v.par_extend((0..30u8).into_par_iter().filter(|_| true)); + assert_eq!(v.len(), 20); + v.clear(); + + // Iterator length < remaining capacity + v.extend(0..10); + v.par_extend((0..5u8).into_par_iter().filter(|_| true)); + assert_eq!(v.len(), 15); + v.clear(); } From 5e5f34e4504e97a1c8a0a665856a9e8d12af97bb Mon Sep 17 00:00:00 2001 From: Giacomo Stevanato Date: Wed, 26 Aug 2020 21:51:00 +0200 Subject: [PATCH 13/13] Fixed comment typos --- src/rayon_impls.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rayon_impls.rs b/src/rayon_impls.rs index d1cab1e2..0602c499 100644 --- a/src/rayon_impls.rs +++ b/src/rayon_impls.rs @@ -8,7 +8,7 @@ use rayon::iter::{ use std::marker::PhantomData; use std::{ptr, slice}; -// Adapted from `rayon/srs/vec.rs` +// Adapted from `rayon/src/vec.rs` /// Parallel iterator that moves out of an `ArrayVec`. #[derive(Debug, Clone)] @@ -166,7 +166,7 @@ impl<'data, T: 'data> Drop for SliceDrain<'data, T> { } } -// Adapted from `rayon/srs/iter/collect/mod.rs` and ``rayon/srs/iter/collect/consumer.rs`` +// Adapted from `rayon/src/iter/collect/mod.rs` and `rayon/src/iter/collect/consumer.rs` impl FromParallelIterator for ArrayVec where