Skip to content

Commit

Permalink
Add tick to MultiProgress to complement join
Browse files Browse the repository at this point in the history
* Changed `joining` field of `MultiProgress` from `AtomicBool` to `Mutex`.
  Because the internal `join_impl` can now be called multiple times
  I figured it's better to use a Mutex as it automatically unlocks
  preventing panics "bricking" the progress bar.
* Because MultiProgress can contain multiple child progress bars I feel
  like the meaning of a single tick isn't really clear. Just a single
  state update (`self.rx.recv()`) seems too small. Because of that I
  allow the user to configure how long do they want a tick to last
  adding greater flexibility.
* Made `is_done` public to let the user know whether they still need
  to invoke `tick`
  • Loading branch information
mibac138 committed Mar 17, 2020
1 parent 0529656 commit 0966eb2
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 22 deletions.
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,9 @@ mod utils;

pub use crate::format::{BinaryBytes, DecimalBytes, FormattedDuration, HumanBytes, HumanDuration};
pub use crate::iter::{ProgressBarIter, ProgressIterator};
pub use crate::progress::{MultiProgress, ProgressBar, ProgressBarWrap, ProgressDrawTarget};
pub use crate::progress::{
MultiProgress, ProgressBar, ProgressBarWrap, ProgressDrawTarget, TickTimeLimit,
};
pub use crate::style::ProgressStyle;

#[cfg(feature = "with_rayon")]
Expand Down
117 changes: 96 additions & 21 deletions src/progress.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::fmt;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::sync::mpsc::{channel, Receiver, Sender, TryRecvError};
use std::sync::{Arc, TryLockError};
use std::sync::{Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -859,9 +858,8 @@ struct MultiProgressState {
/// Manages multiple progress bars from different threads.
pub struct MultiProgress {
state: RwLock<MultiProgressState>,
joining: AtomicBool,
tx: Sender<(usize, ProgressDrawState)>,
rx: Receiver<(usize, ProgressDrawState)>,
rx: Mutex<Receiver<(usize, ProgressDrawState)>>,
}

impl fmt::Debug for MultiProgress {
Expand Down Expand Up @@ -897,9 +895,8 @@ impl MultiProgress {
draw_target,
move_cursor: false,
}),
joining: AtomicBool::new(false),
tx,
rx,
rx: Mutex::new(rx),
}
}

Expand Down Expand Up @@ -938,19 +935,34 @@ impl MultiProgress {

/// Waits for all progress bars to report that they are finished.
///
/// You need to call this as this will request the draw instructions
/// from the remote progress bars. Not calling this will deadlock
/// your program.
/// You need to call this or `tick` as this will request the draw instructions
/// from the remote progress bars. Not calling this will deadlock your program.
pub fn join(&self) -> io::Result<()> {
self.join_impl(false)
self.join_impl(false, None)
}

/// Works like `join` but clears the progress bar in the end.
pub fn join_and_clear(&self) -> io::Result<()> {
self.join_impl(true)
self.join_impl(true, None)
}

fn is_done(&self) -> bool {
/// Blocks only as long as the `time_limit` allows for.
/// Requires to be called as long as `is_done` returns false to prevent
/// deadlocking your program.
///
/// You need to call this or `join` as this will request the draw instructions
/// from the remote progress bars. Not calling this will deadlock your program.
pub fn tick(&self, time_limit: TickTimeLimit) -> io::Result<()> {
self.join_impl(false, Some(time_limit))
}

/// Works like `tick` but if it so happens that the progress bar finishes
/// before updating ends it also clears the progress bar.
pub fn tick_and_clear(&self, time_limit: TickTimeLimit) -> io::Result<()> {
self.join_impl(true, Some(time_limit))
}

pub fn is_done(&self) -> bool {
let state = self.state.read().unwrap();
if state.objects.is_empty() {
return true;
Expand All @@ -963,15 +975,25 @@ impl MultiProgress {
true
}

fn join_impl(&self, clear: bool) -> io::Result<()> {
if self.joining.load(Ordering::Acquire) {
panic!("Already joining!");
}
self.joining.store(true, Ordering::Release);
fn join_impl(&self, clear: bool, time_limit: Option<TickTimeLimit>) -> io::Result<()> {
let rx = match self.rx.try_lock() {
Err(TryLockError::WouldBlock) => panic!("Update already in progress"),
rx => rx.unwrap(),
};

let move_cursor = self.state.read().unwrap().move_cursor;
let time_left: TickTimeTracker = time_limit.into();
while !self.is_done() {
let (idx, draw_state) = self.rx.recv().unwrap();
let (idx, draw_state) = if time_left.can_block() {
rx.recv().unwrap()
} else if time_left.can_probe() {
match rx.try_recv() {
Err(TryRecvError::Empty) => return Ok(()),
recv => recv.unwrap(),
}
} else {
return Ok(());
};
let ts = draw_state.ts;
let force_draw = draw_state.finished || draw_state.force_draw;

Expand Down Expand Up @@ -1038,12 +1060,65 @@ impl MultiProgress {
})?;
}

self.joining.store(false, Ordering::Release);

Ok(())
}
}

/// Defines an upper time limit for a single `MultiProgress` bar tick. A tick may contain multiple
/// internal updates. Each update comes from a single child progress bar and contains an internal
/// state change.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum TickTimeLimit {
/// Function will block until there isn't any immediately available state update.
Indefinite,
/// Function will block at most until duration elapses (counting from just before the
/// first update) and at least until there isn't any immediately available state update.
Timeout(Duration),
/// Function will block at most until the instant and at least until there isn't any
/// immediately available state update.
Deadline(Instant),
}

enum TickTimeTracker {
/// A `join` function call
Unlimited,
Indefinite,
Deadline(Instant),
}

impl TickTimeTracker {
/// Returns true if the time limit allows for unlimited blocking (created by invoking `join`)
fn can_block(&self) -> bool {
match self {
TickTimeTracker::Unlimited => true,
_ => false,
}
}
/// Returns true if there is still time left and it's allowed to try to update and false if
/// the deadline has already been reached
fn can_probe(&self) -> bool {
match self {
TickTimeTracker::Deadline(instant) => *instant > Instant::now(),
_ => true,
}
}
}

impl From<Option<TickTimeLimit>> for TickTimeTracker {
fn from(limit: Option<TickTimeLimit>) -> Self {
match limit {
Some(limit) => match limit {
TickTimeLimit::Indefinite => TickTimeTracker::Indefinite,
TickTimeLimit::Timeout(duration) => {
TickTimeTracker::Deadline(Instant::now() + duration)
}
TickTimeLimit::Deadline(instant) => TickTimeTracker::Deadline(instant),
},
None => TickTimeTracker::Unlimited,
}
}
}

/// Iterator for `wrap_iter`.
#[derive(Debug)]
pub struct ProgressBarIter<I> {
Expand Down

0 comments on commit 0966eb2

Please sign in to comment.