Skip to content

Commit

Permalink
feat: add futures::Stream support
Browse files Browse the repository at this point in the history
  • Loading branch information
aatifsyed committed Jun 23, 2023
1 parent 4ffc40e commit 151d6da
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 0 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ rust-version = "1.62.1"

[dependencies]
console = { version = "0.15", default-features = false, features = ["ansi-parsing"] }
futures-core = { version = "0.3", default-features = false, optional = true }
number_prefix = "0.4"
portable-atomic = "1.0.0"
rayon = { version = "1.1", optional = true }
Expand All @@ -27,6 +28,7 @@ clap = { version = "4", features = ["color", "derive"] }
once_cell = "1"
rand = "0.8"
tokio = { version = "1", features = ["fs", "time", "rt"] }
futures = "0.3"

[target.'cfg(target_arch = "wasm32")'.dependencies]
instant = "0.1"
Expand All @@ -35,6 +37,7 @@ instant = "0.1"
default = ["unicode-width", "console/unicode-width"]
improved_unicode = ["unicode-segmentation", "unicode-width", "console/unicode-width"]
in_memory = ["vt100"]
futures = ["dep:futures-core"]

[package.metadata.docs.rs]
all-features = true
Expand Down
20 changes: 20 additions & 0 deletions src/iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,26 @@ impl<W: tokio::io::AsyncBufRead + Unpin + tokio::io::AsyncRead> tokio::io::Async
}
}

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
impl<S: futures_core::Stream + Unpin> futures_core::Stream for ProgressBarIter<S> {
type Item = S::Item;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
let item = std::pin::Pin::new(&mut this.it).poll_next(cx);
match &item {
std::task::Poll::Ready(Some(_)) => this.progress.inc(1),
std::task::Poll::Ready(None) => this.progress.finish_using_style(),
std::task::Poll::Pending => {}
}
item
}
}

impl<W: io::Write> io::Write for ProgressBarIter<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.it.write(buf).map(|inc| {
Expand Down
24 changes: 24 additions & 0 deletions src/progress_bar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,30 @@ impl ProgressBar {
}
}

#[cfg(feature = "futures")]
#[cfg_attr(docsrs, doc(cfg(feature = "futures")))]
/// Wraps a [`futures::Stream`] with the progress bar
///
/// ```
/// # use indicatif::ProgressBar;
/// use futures::stream::{self, StreamExt};
/// # async fn test() {
/// let pb = ProgressBar::new(10);
///
/// let stream = pb.wrap_stream(stream::iter(1..=10));
/// assert_eq!(
/// stream.count().await,
/// 10,
/// );
/// # }
/// ```
pub fn wrap_stream<S: futures_core::Stream>(&self, stream: S) -> ProgressBarIter<S> {
ProgressBarIter {
progress: self.clone(),
it: stream,
}
}

/// Returns the current position
pub fn position(&self) -> u64 {
self.state().state.pos()
Expand Down

0 comments on commit 151d6da

Please sign in to comment.