Skip to content

Commit

Permalink
combine Sink::{poll, output}, make naming consistent
Browse files Browse the repository at this point in the history
  • Loading branch information
alecmocatta committed Jul 21, 2020
1 parent c02e005 commit c693f05
Show file tree
Hide file tree
Showing 38 changed files with 1,244 additions and 1,321 deletions.
1 change: 1 addition & 0 deletions amadeus-core/src/into_par_stream/collections.rs
Expand Up @@ -13,6 +13,7 @@ impl<'a, 'b, I: Iterator<Item = (&'a A, &'b B)>, A: Clone + 'a, B: Clone + 'b> I
for TupleCloned<I>
{
type Item = (A, B);

fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|(a, b)| (a.clone(), b.clone()))
}
Expand Down
5 changes: 3 additions & 2 deletions amadeus-core/src/into_par_stream/iterator.rs
Expand Up @@ -21,6 +21,7 @@ pub trait IteratorExt: Iterator + Sized {
impl<I: Iterator + Sized> IteratorExt for I {}

impl_par_dist_rename! {
#[pin_project]
pub struct IterParStream<I>(pub(crate) I);

impl<I: Iterator> ParallelStream for IterParStream<I>
Expand All @@ -33,8 +34,8 @@ impl_par_dist_rename! {
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
fn next_task(&mut self) -> Option<Self::Task> {
self.0.next().map(IterStreamTask::new)
fn next_task(mut self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
Poll::Ready(self.0.next().map(IterStreamTask::new))
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion amadeus-core/src/into_par_stream/slice.rs
Expand Up @@ -47,7 +47,7 @@ impl_par_dist_rename! {
fn size_hint(&self) -> (usize, Option<usize>) {
unreachable!()
}
fn next_task(&mut self) -> Option<Self::Task> {
fn next_task(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Option<Self::Task>> {
unreachable!()
}
}
Expand Down
91 changes: 45 additions & 46 deletions amadeus-core/src/par_pipe.rs
Expand Up @@ -5,14 +5,13 @@ use futures::Stream;
use serde_closure::traits;
use std::{cmp::Ordering, hash::Hash, iter, ops};

use crate::{pipe::Pipe, pool::ProcessSend};

use super::{par_sink::*, par_stream::*};
use crate::{pipe::Pipe, pool::ProcessSend};

#[must_use]
pub trait PipeTask<Source> {
type Item;
type Async: Pipe<Source, Item = Self::Item>;
pub trait PipeTask<Input> {
type Output;
type Async: Pipe<Input, Output = Self::Output>;

fn into_async(self) -> Self::Async;
}
Expand All @@ -21,39 +20,39 @@ macro_rules! pipe {
($pipe:ident $sink:ident $from_sink:ident $send:ident $fns:ident $assert_pipe:ident $assert_sink:ident $($meta:meta)*) => {
$(#[$meta])*
#[must_use]
pub trait $pipe<Source> {
type Item;
type Task: PipeTask<Source, Item = Self::Item> + $send;
pub trait $pipe<Input> {
type Output;
type Task: PipeTask<Input, Output = Self::Output> + $send;

fn task(&self) -> Self::Task;

fn inspect<F>(self, f: F) -> Inspect<Self, F>
where
F: $fns::FnMut(&Self::Item) + Clone + $send + 'static,
F: $fns::FnMut(&Self::Output) + Clone + $send + 'static,
Self: Sized,
{
$assert_pipe(Inspect::new(self, f))
}

fn update<F>(self, f: F) -> Update<Self, F>
where
F: $fns::FnMut(&mut Self::Item) + Clone + $send + 'static,
F: $fns::FnMut(&mut Self::Output) + Clone + $send + 'static,
Self: Sized,
{
$assert_pipe(Update::new(self, f))
}

fn map<B, F>(self, f: F) -> Map<Self, F>
where
F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static,
F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static,
Self: Sized,
{
$assert_pipe(Map::new(self, f))
}

fn flat_map<B, F>(self, f: F) -> FlatMap<Self, F>
where
F: $fns::FnMut(Self::Item) -> B + Clone + $send + 'static,
F: $fns::FnMut(Self::Output) -> B + Clone + $send + 'static,
B: Stream,
Self: Sized,
{
Expand All @@ -62,52 +61,52 @@ macro_rules! pipe {

fn filter<F>(self, f: F) -> Filter<Self, F>
where
F: $fns::FnMut(&Self::Item) -> bool + Clone + $send + 'static,
F: $fns::FnMut(&Self::Output) -> bool + Clone + $send + 'static,
Self: Sized,
{
$assert_pipe(Filter::new(self, f))
}

fn cloned<'a, T>(self) -> Cloned<Self, T, Source>
fn cloned<'a, T>(self) -> Cloned<Self, T, Input>
where
T: Clone + 'a,
Source: 'a,
Self: $pipe<&'a Source, Item = &'a T> + Sized,
Input: 'a,
Self: $pipe<&'a Input, Output = &'a T> + Sized,
{
$assert_pipe(Cloned::new(self))
}

// #[must_use]
// fn chain<C>(self, chain: C) -> Chain<Self, C::Iter>
// where
// C: IntoParallelStream<Item = Self::Item>,
// C: IntoParallelStream<Output = Self::Output>,
// Self: Sized,
// {
// $assert_pipe(Chain::new(self, chain.into_par_stream()))
// }

fn pipe<S>(self, sink: S) -> super::par_sink::Pipe<Self, S>
where
S: $sink<Self::Item>,
S: $sink<Self::Output>,
Self: Sized,
{
$assert_sink(super::par_sink::Pipe::new(self, sink))
}

fn fork<A, B, RefAItem>(
self, sink: A, sink_ref: B,
) -> Fork<Self, A, B, &'static Self::Item>
) -> Fork<Self, A, B, &'static Self::Output>
where
A: $sink<Self::Item>,
B: for<'a> $sink<&'a Self::Item>,
A: $sink<Self::Output>,
B: for<'a> $sink<&'a Self::Output>,
Self: Sized,
{
$assert_sink(Fork::new(self, sink, sink_ref))
}

fn for_each<F>(self, f: F) -> ForEach<Self, F>
where
F: $fns::FnMut(Self::Item) + Clone + $send + 'static,
F: $fns::FnMut(Self::Output) + Clone + $send + 'static,
Self: Sized,
{
$assert_sink(ForEach::new(self, f))
Expand All @@ -116,7 +115,7 @@ macro_rules! pipe {
fn fold<ID, F, B>(self, identity: ID, op: F) -> Fold<Self, ID, F, B>
where
ID: $fns::FnMut() -> B + Clone + $send + 'static,
F: $fns::FnMut(B, Either<Self::Item, B>) -> B + Clone + $send + 'static,
F: $fns::FnMut(B, Either<Self::Output, B>) -> B + Clone + $send + 'static,
B: $send + 'static,
Self: Sized,
{
Expand All @@ -130,15 +129,15 @@ macro_rules! pipe {
<S::Pipe as $pipe<B>>::Task: Clone + $send + 'static,
S::ReduceA: 'static,
S::ReduceC: Clone,
S::Output: $send + 'static,
Self: $pipe<Source, Item = (A, B)> + Sized,
S::Done: $send + 'static,
Self: $pipe<Input, Output = (A, B)> + Sized,
{
$assert_sink(GroupBy::new(self, sink))
}

fn histogram(self) -> Histogram<Self>
where
Self::Item: Hash + Ord + $send + 'static,
Self::Output: Hash + Ord + $send + 'static,
Self: Sized,
{
$assert_sink(Histogram::new(self))
Expand All @@ -153,78 +152,78 @@ macro_rules! pipe {

fn sum<B>(self) -> Sum<Self, B>
where
B: iter::Sum<Self::Item> + iter::Sum<B> + $send + 'static,
B: iter::Sum<Self::Output> + iter::Sum<B> + $send + 'static,
Self: Sized,
{
$assert_sink(Sum::new(self))
}

fn combine<F>(self, f: F) -> Combine<Self, F>
where
F: $fns::FnMut(Self::Item, Self::Item) -> Self::Item + Clone + $send + 'static,
Self::Item: $send + 'static,
F: $fns::FnMut(Self::Output, Self::Output) -> Self::Output + Clone + $send + 'static,
Self::Output: $send + 'static,
Self: Sized,
{
$assert_sink(Combine::new(self, f))
}

fn max(self) -> Max<Self>
where
Self::Item: Ord + $send + 'static,
Self::Output: Ord + $send + 'static,
Self: Sized,
{
$assert_sink(Max::new(self))
}

fn max_by<F>(self, f: F) -> MaxBy<Self, F>
where
F: $fns::FnMut(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
Self::Item: $send + 'static,
F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
Self::Output: $send + 'static,
Self: Sized,
{
$assert_sink(MaxBy::new(self, f))
}

fn max_by_key<F, B>(self, f: F) -> MaxByKey<Self, F>
where
F: $fns::FnMut(&Self::Item) -> B + Clone + $send + 'static,
F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static,
B: Ord + 'static,
Self::Item: $send + 'static,
Self::Output: $send + 'static,
Self: Sized,
{
$assert_sink(MaxByKey::new(self, f))
}

fn min(self) -> Min<Self>
where
Self::Item: Ord + $send + 'static,
Self::Output: Ord + $send + 'static,
Self: Sized,
{
$assert_sink(Min::new(self))
}

fn min_by<F>(self, f: F) -> MinBy<Self, F>
where
F: $fns::FnMut(&Self::Item, &Self::Item) -> Ordering + Clone + $send + 'static,
Self::Item: $send + 'static,
F: $fns::FnMut(&Self::Output, &Self::Output) -> Ordering + Clone + $send + 'static,
Self::Output: $send + 'static,
Self: Sized,
{
$assert_sink(MinBy::new(self, f))
}

fn min_by_key<F, B>(self, f: F) -> MinByKey<Self, F>
where
F: $fns::FnMut(&Self::Item) -> B + Clone + $send + 'static,
F: $fns::FnMut(&Self::Output) -> B + Clone + $send + 'static,
B: Ord + 'static,
Self::Item: $send + 'static,
Self::Output: $send + 'static,
Self: Sized,
{
$assert_sink(MinByKey::new(self, f))
}

fn most_frequent(self, n: usize, probability: f64, tolerance: f64) -> MostFrequent<Self>
where
Self::Item: Hash + Eq + Clone + $send + 'static,
Self::Output: Hash + Eq + Clone + $send + 'static,
Self: Sized,
{
$assert_sink(MostFrequent::new(self, n, probability, tolerance))
Expand All @@ -234,7 +233,7 @@ macro_rules! pipe {
self, n: usize, probability: f64, tolerance: f64, error_rate: f64,
) -> MostDistinct<Self>
where
Self: $pipe<Source, Item = (A, B)> + Sized,
Self: $pipe<Input, Output = (A, B)> + Sized,
A: Hash + Eq + Clone + $send + 'static,
B: Hash + 'static,
{
Expand All @@ -249,39 +248,39 @@ macro_rules! pipe {

fn sample_unstable(self, samples: usize) -> SampleUnstable<Self>
where
Self::Item: $send + 'static,
Self::Output: $send + 'static,
Self: Sized,
{
$assert_sink(SampleUnstable::new(self, samples))
}

fn all<F>(self, f: F) -> All<Self, F>
where
F: $fns::FnMut(Self::Item) -> bool + Clone + $send + 'static,
F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static,
Self: Sized,
{
$assert_sink(All::new(self, f))
}

fn any<F>(self, f: F) -> Any<Self, F>
where
F: $fns::FnMut(Self::Item) -> bool + Clone + $send + 'static,
F: $fns::FnMut(Self::Output) -> bool + Clone + $send + 'static,
Self: Sized,
{
$assert_sink(Any::new(self, f))
}

fn collect<B>(self) -> Collect<Self, B>
where
B: $from_sink<Self::Item>,
B: $from_sink<Self::Output>,
Self: Sized,
{
$assert_sink(Collect::new(self))
}
}

#[inline(always)]
pub(crate) fn $assert_pipe<T, I: $pipe<Source, Item = T>, Source>(i: I) -> I {
pub(crate) fn $assert_pipe<T, I: $pipe<Input, Output = T>, Input>(i: I) -> I {
i
}
};
Expand Down

0 comments on commit c693f05

Please sign in to comment.