diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1b2..902ea8781 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -44,6 +44,7 @@ mod min_by; mod next; mod nth; mod partial_cmp; +mod peekable; mod scan; mod skip; mod skip_while; @@ -80,6 +81,7 @@ pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; pub use map::Map; +pub use peekable::Peekable; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -974,6 +976,37 @@ extension_trait! { } } + #[doc = r#" + ## Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + let mut s = s.peekable(); + + assert_eq!(s.next().await, Some(1)); + assert_eq!(s.peek().await, Some(1)); + assert_eq!(s.next().await, Some(2)); + assert_eq!(s.next().await, Some(3)); + assert_eq!(s.next().await, None); + # + # }) } + ``` + "#] + #[inline] + fn peekable(self) -> Peekable + where + Self: Sized, + { + Peekable::new(self) + } + + #[doc = r#" A stream adaptor similar to [`fold`] that holds internal state and produces a new stream. diff --git a/src/stream/stream/peekable.rs b/src/stream/stream/peekable.rs new file mode 100644 index 000000000..4095f84c2 --- /dev/null +++ b/src/stream/stream/peekable.rs @@ -0,0 +1,66 @@ +use std::pin::Pin; + +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; +use crate::stream::stream::StreamExt; + +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct Peekable +where + S: Sized, +{ + stream: S, + peeked: Option>>, +} + +impl Peekable +where + S: Stream, +{ + pin_utils::unsafe_pinned!(stream: S); + pin_utils::unsafe_unpinned!(peeked: Option>>); + + pub(crate) fn new(stream: S) -> Self { + Peekable { + stream: stream, + peeked: None, + } + } + + pub fn peek(mut self: Pin<&mut Self>) -> &Poll> { + match &self.peeked { + Some(peeked) => &peeked, + None => { + // how to get the next `next` value? What about `Context` + let next = self.stream.next(); + *self.as_mut().peeked() = Some(next); + &next + } + } + } +} + +impl Stream for Peekable +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match &self.peeked { + Some(peeked) => { + let v = self.as_mut().peeked().take().unwrap(); + *self.as_mut().peeked() = None; + + v + } + None => { + Poll::Ready(next) + } + } + } +}