Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tick to MultiProgress to complement join #132

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ mod utils;

pub use crate::format::{BinaryBytes, DecimalBytes, FormattedDuration, HumanBytes, HumanDuration};
pub use crate::iter::{ProgressBarIter, ProgressIterator};
pub use crate::progress::{MultiProgress, ProgressBar, ProgressBarWrap, ProgressDrawTarget};
pub use crate::progress::{
MultiProgress, ProgressBar, ProgressBarWrap, ProgressDrawTarget, TickTimeLimit,
};
pub use crate::style::ProgressStyle;

#[cfg(feature = "with_rayon")]
Expand Down
128 changes: 104 additions & 24 deletions src/progress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::{Arc, TryLockError};
use std::sync::{Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -859,9 +858,8 @@ struct MultiProgressState {
/// Manages multiple progress bars from different threads.
pub struct MultiProgress {
state: RwLock<MultiProgressState>,
joining: AtomicBool,
tx: Sender<(usize, ProgressDrawState)>,
rx: Receiver<(usize, ProgressDrawState)>,
rx: Mutex<Receiver<(usize, ProgressDrawState)>>,
}

impl fmt::Debug for MultiProgress {
Expand Down Expand Up @@ -897,9 +895,8 @@ impl MultiProgress {
draw_target,
move_cursor: false,
}),
joining: AtomicBool::new(false),
tx,
rx,
rx: Mutex::new(rx),
}
}

Expand Down Expand Up @@ -938,23 +935,37 @@ impl MultiProgress {

/// Waits for all progress bars to report that they are finished.
///
/// You need to call this as this will request the draw instructions
/// from the remote progress bars. Not calling this will deadlock
/// your program.
/// You need to call this or `tick` as this will request the draw instructions
/// from the remote progress bars. Not calling this will deadlock your program.
pub fn join(&self) -> io::Result<()> {
self.join_impl(false)
self.join_impl(false, None)
}

/// Works like `join` but clears the progress bar in the end.
pub fn join_and_clear(&self) -> io::Result<()> {
self.join_impl(true)
self.join_impl(true, None)
}

fn is_done(&self) -> bool {
/// Blocks only as long as the `time_limit` allows for.
/// Requires to be called as long as `is_done` returns false to prevent
/// deadlocking your program.
///
/// You need to call this or `join` as this will request the draw instructions
/// from the remote progress bars. Not calling this will deadlock your program.
pub fn tick(&self, time_limit: TickTimeLimit) -> io::Result<()> {
self.join_impl(false, Some(time_limit))
}

/// Works like `tick` but if it so happens that the progress bar finishes
/// before updating ends it also clears the progress bar.
pub fn tick_and_clear(&self, time_limit: TickTimeLimit) -> io::Result<()> {
self.join_impl(true, Some(time_limit))
}

/// A MultiProgress is considered done when all connected ProgressBars are finished.
/// A ProgressBar is considered finished when it's finished, abandoned or dropped.
pub fn is_done(&self) -> bool {
let state = self.state.read().unwrap();
if state.objects.is_empty() {
return true;
}
for obj in &state.objects {
if !obj.done {
return false;
Expand All @@ -963,15 +974,24 @@ impl MultiProgress {
true
}

fn join_impl(&self, clear: bool) -> io::Result<()> {
if self.joining.load(Ordering::Acquire) {
panic!("Already joining!");
}
self.joining.store(true, Ordering::Release);
fn join_impl(&self, clear: bool, time_limit: Option<TickTimeLimit>) -> io::Result<()> {
let rx = match self.rx.try_lock() {
Err(TryLockError::WouldBlock) => panic!("Update already in progress"),
rx => rx.unwrap(),
};

let move_cursor = self.state.read().unwrap().move_cursor;
let time_tracker: TickTimeTracker = time_limit.into();
while !self.is_done() {
let (idx, draw_state) = self.rx.recv().unwrap();
let (idx, draw_state) = match time_tracker.time_left() {
TickTimeLeft::CanBlock => rx.recv().unwrap(),
TickTimeLeft::CanProbe => match rx.try_recv() {
Err(TryRecvError::Empty) => return Ok(()),
recv => recv.unwrap(),
},
TickTimeLeft::None => return Ok(()),
};

let ts = draw_state.ts;
let force_draw = draw_state.finished || draw_state.force_draw;

Expand Down Expand Up @@ -1038,12 +1058,72 @@ impl MultiProgress {
})?;
}

self.joining.store(false, Ordering::Release);

Ok(())
}
}

/// Defines an upper time limit for a single `MultiProgress` bar tick. A tick may contain multiple
/// internal updates. Each update comes from a single child progress bar and contains an internal
/// state change.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum TickTimeLimit {
/// Function will block until there isn't any immediately available state update.
Indefinite,
/// Function will block at most until duration elapses (counting from just before the
/// first update) and at least until there isn't any immediately available state update.
Timeout(Duration),
/// Function will block at most until the instant and at least until there isn't any
/// immediately available state update.
Deadline(Instant),
}

enum TickTimeTracker {
/// A `join` function call
Unlimited,
Indefinite,
Deadline(Instant),
}

enum TickTimeLeft {
/// Caller can indefinitely block the thread waiting for updates
CanBlock,
/// Caller can block as long as there are continuous updates
CanProbe,
/// The allocated time has run out and the caller must return
None,
}

impl TickTimeTracker {
fn time_left(&self) -> TickTimeLeft {
match self {
TickTimeTracker::Unlimited => TickTimeLeft::CanBlock,
TickTimeTracker::Deadline(deadline) => {
if *deadline > Instant::now() {
TickTimeLeft::CanProbe
} else {
TickTimeLeft::None
}
}
TickTimeTracker::Indefinite => TickTimeLeft::CanProbe,
}
}
}

impl From<Option<TickTimeLimit>> for TickTimeTracker {
fn from(limit: Option<TickTimeLimit>) -> Self {
match limit {
Some(limit) => match limit {
TickTimeLimit::Indefinite => TickTimeTracker::Indefinite,
TickTimeLimit::Timeout(duration) => {
TickTimeTracker::Deadline(Instant::now() + duration)
}
TickTimeLimit::Deadline(instant) => TickTimeTracker::Deadline(instant),
},
None => TickTimeTracker::Unlimited,
}
}
}

/// Iterator for `wrap_iter`.
#[derive(Debug)]
pub struct ProgressBarIter<I> {
Expand Down