Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream-partition #541

Merged
merged 2 commits into from Nov 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/stream/stream/mod.rs
Expand Up @@ -120,8 +120,11 @@ cfg_unstable! {

use crate::stream::into_stream::IntoStream;
use crate::stream::{FromStream, Product, Sum};
use crate::stream::Extend;

use count::CountFuture;
use partition::PartitionFuture;

pub use merge::Merge;
pub use flatten::Flatten;
pub use flat_map::FlatMap;
Expand All @@ -132,6 +135,7 @@ cfg_unstable! {
mod merge;
mod flatten;
mod flat_map;
mod partition;
mod timeout;
mod throttle;
}
Expand Down Expand Up @@ -1308,6 +1312,44 @@ extension_trait! {
FoldFuture::new(self, init, f)
}

#[doc = r#"
A combinator that applies a function to every element in a stream
creating two collections from it.

# Examples

Basic usage:

```
# fn main() { async_std::task::block_on(async {
#
use async_std::prelude::*;
use async_std::stream;

let (even, odd): (Vec<i32>, Vec<i32>) = stream::from_iter(vec![1, 2, 3])
.partition(|&n| n % 2 == 0).await;

assert_eq!(even, vec![2]);
assert_eq!(odd, vec![1, 3]);

#
# }) }
```
"#]
#[cfg(feature = "unstable")]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
fn partition<B, F>(
self,
f: F,
) -> impl Future<Output = (B, B)> [PartitionFuture<Self, F, B>]
where
Self: Sized,
F: FnMut(&Self::Item) -> bool,
B: Default + Extend<Self::Item>,
{
PartitionFuture::new(self, f)
}

#[doc = r#"
Call a closure on each element of the stream.

Expand Down
60 changes: 60 additions & 0 deletions src/stream/stream/partition.rs
@@ -0,0 +1,60 @@
use std::future::Future;
use std::pin::Pin;
use std::default::Default;
use pin_project_lite::pin_project;

use crate::stream::Stream;
use crate::task::{Context, Poll};

pin_project! {
#[derive(Debug)]
#[allow(missing_debug_implementations)]
#[cfg(all(feature = "default", feature = "unstable"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
pub struct PartitionFuture<S, F, B> {
#[pin]
stream: S,
f: F,
res: Option<(B, B)>,
}
}

impl<S, F, B: Default> PartitionFuture<S, F, B> {
pub(super) fn new(stream: S, f: F) -> Self {
Self {
stream,
f,
res: Some((B::default(), B::default())),
}
}
}

impl<S, F, B> Future for PartitionFuture<S, F, B>
where
S: Stream + Sized,
F: FnMut(&S::Item) -> bool,
B: Default + Extend<S::Item>,
{
type Output = (B, B);

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();

loop {
let next = futures_core::ready!(this.stream.as_mut().poll_next(cx));

match next {
Some(v) => {
let mut res = this.res.take().unwrap();
match (this.f)(&v) {
true => res.0.extend(Some(v)),
false => res.1.extend(Some(v)),
};

*this.res = Some(res);
}
None => return Poll::Ready(this.res.take().unwrap()),
}
}
}
}