diff --git a/Cargo.toml b/Cargo.toml index 8c841d22..d8e6ad89 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ rust-version = "1.62.1" [dependencies] console = { version = "0.15", default-features = false, features = ["ansi-parsing"] } +futures-core = { version = "0.3", default-features = false, optional = true } number_prefix = "0.4" portable-atomic = "1.0.0" rayon = { version = "1.1", optional = true } @@ -27,6 +28,7 @@ clap = { version = "4", features = ["color", "derive"] } once_cell = "1" rand = "0.8" tokio = { version = "1", features = ["fs", "time", "rt"] } +futures = "0.3" # so the doctest for wrap_stream is nice [target.'cfg(target_arch = "wasm32")'.dependencies] instant = "0.1" @@ -35,6 +37,7 @@ instant = "0.1" default = ["unicode-width", "console/unicode-width"] improved_unicode = ["unicode-segmentation", "unicode-width", "console/unicode-width"] in_memory = ["vt100"] +futures = ["dep:futures-core"] [package.metadata.docs.rs] all-features = true diff --git a/src/iter.rs b/src/iter.rs index 91534885..43792576 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -277,6 +277,26 @@ impl tokio::io::Async } } +#[cfg(feature = "futures")] +#[cfg_attr(docsrs, doc(cfg(feature = "futures")))] +impl futures_core::Stream for ProgressBarIter { + type Item = S::Item; + + fn poll_next( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + let this = self.get_mut(); + let item = std::pin::Pin::new(&mut this.it).poll_next(cx); + match &item { + std::task::Poll::Ready(Some(_)) => this.progress.inc(1), + std::task::Poll::Ready(None) => this.progress.finish_using_style(), + std::task::Poll::Pending => {} + } + item + } +} + impl io::Write for ProgressBarIter { fn write(&mut self, buf: &[u8]) -> io::Result { self.it.write(buf).map(|inc| { diff --git a/src/progress_bar.rs b/src/progress_bar.rs index 6fc87033..b8e0986a 100644 --- a/src/progress_bar.rs +++ b/src/progress_bar.rs @@ -511,6 +511,28 @@ impl ProgressBar { } } + /// Wraps a [`futures::Stream`](https://docs.rs/futures/0.3/futures/stream/trait.StreamExt.html) with the progress bar + /// + /// ``` + /// # use indicatif::ProgressBar; + /// # futures::executor::block_on(async { + /// use futures::stream::{self, StreamExt}; + /// let pb = ProgressBar::new(10); + /// let mut stream = pb.wrap_stream(stream::iter('a'..='z')); + /// + /// assert_eq!(stream.next().await, Some('a')); + /// assert_eq!(stream.count().await, 25); + /// # }); // block_on + /// ``` + #[cfg(feature = "futures")] + #[cfg_attr(docsrs, doc(cfg(feature = "futures")))] + pub fn wrap_stream(&self, stream: S) -> ProgressBarIter { + ProgressBarIter { + progress: self.clone(), + it: stream, + } + } + /// Returns the current position pub fn position(&self) -> u64 { self.state().state.pos()