Skip to content

Commit

Permalink
Merge #179
Browse files Browse the repository at this point in the history
179: adds stream::find combinator r=yoshuawuyts a=montekki

A find combinator
---
Stdlib: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.find
Ref: #129 

Co-authored-by: Fedor Sakharov <fedor.sakharov@gmail.com>
  • Loading branch information
bors[bot] and montekki committed Sep 10, 2019
2 parents 6f9ec66 + 97a5f9b commit 568f6a6
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
49 changes: 49 additions & 0 deletions src/stream/stream/find.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use crate::task::{Context, Poll};
use std::marker::PhantomData;
use std::pin::Pin;

#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct FindFuture<'a, S, P, T> {
stream: &'a mut S,
p: P,
__t: PhantomData<T>,
}

impl<'a, S, P, T> FindFuture<'a, S, P, T> {
pin_utils::unsafe_pinned!(stream: &'a mut S);
pin_utils::unsafe_unpinned!(p: P);

pub(super) fn new(stream: &'a mut S, p: P) -> Self {
FindFuture {
stream,
p,
__t: PhantomData,
}
}
}

impl<'a, S, P> futures_core::future::Future for FindFuture<'a, S, P, S::Item>
where
S: futures_core::stream::Stream + Unpin + Sized,
P: FnMut(&S::Item) -> bool,
{
type Output = Option<S::Item>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
use futures_core::stream::Stream;

let item = futures_core::ready!(self.as_mut().stream().poll_next(cx));

match item {
Some(v) => match (self.as_mut().p())(&v) {
true => Poll::Ready(Some(v)),
false => {
cx.waker().wake_by_ref();
Poll::Pending
}
},
None => Poll::Ready(None),
}
}
}
46 changes: 46 additions & 0 deletions src/stream/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
mod all;
mod any;
mod filter_map;
mod find;
mod find_map;
mod min_by;
mod next;
Expand All @@ -35,6 +36,7 @@ pub use take::Take;
use all::AllFuture;
use any::AnyFuture;
use filter_map::FilterMap;
use find::FindFuture;
use find_map::FindMapFuture;
use min_by::MinByFuture;
use next::NextFuture;
Expand Down Expand Up @@ -321,6 +323,50 @@ pub trait Stream {
}
}

/// Searches for an element in a stream that satisfies a predicate.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use std::collections::VecDeque;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
/// let res = s.find(|x| *x == 2).await;
/// assert_eq!(res, Some(2));
/// #
/// # }) }
/// ```
///
/// Resuming after a first find:
///
/// ```
/// # fn main() { async_std::task::block_on(async {
/// #
/// use async_std::prelude::*;
/// use std::collections::VecDeque;
///
/// let mut s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
/// let res = s.find(|x| *x == 2).await;
/// assert_eq!(res, Some(2));
///
/// let next = s.next().await;
/// assert_eq!(next, Some(3));
/// #
/// # }) }
/// ```
fn find<P>(&mut self, p: P) -> ret!('_, FindFuture, Option<Self::Item>, P, Self::Item)
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
FindFuture::new(self, p)
}

/// Applies function to the elements of stream and returns the first non-none result.
///
/// ```
Expand Down

0 comments on commit 568f6a6

Please sign in to comment.