Skip to content

Commit

Permalink
Step 6 - Display stats 10 times per second. MVP Complete
Browse files Browse the repository at this point in the history
We use Arc and AtomicUsize for thread safety and make Stats struct own the
incrementing of its lines and bytes.

The minimum viable product is complete. We can put this program in
between pipes and stats will spew to console nicely without interupting
the final data output.
  • Loading branch information
Scott Pierce committed Mar 17, 2018
1 parent d2c5ef1 commit e0b51a9
Showing 1 changed file with 28 additions and 12 deletions.
40 changes: 28 additions & 12 deletions stream_stats.rs
@@ -1,31 +1,40 @@
use std::fmt; use std::fmt;
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};
use std::io::{self, BufRead, BufReader, BufWriter, Write}; use std::io::{self, BufRead, BufReader, BufWriter, Write};
use std::time::Instant; use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread::{self, sleep};
use std::time::{Duration, Instant};


static READ_BUF_SIZE: usize = 1024 * 1024; static READ_BUF_SIZE: usize = 1024 * 1024;
static CLEAR_LINE: &str = "\x1B[1G\x1B[2K"; static CLEAR_LINE: &str = "\x1B[1G\x1B[2K";
static UPDATE_INTERVAL_MS: u64 = 100;


struct Stats { struct Stats {
started: Instant, started: Instant,
lines: usize, lines: AtomicUsize,
bytes: usize, bytes: AtomicUsize,
tty: File, tty: File,
} }


impl Stats { impl Stats {
fn new(tty: &str) -> Stats { fn new(tty: &str) -> Stats {
Stats { Stats {
started: Instant::now(), started: Instant::now(),
lines: 0, lines: AtomicUsize::new(0),
bytes: 0, bytes: AtomicUsize::new(0),
tty: OpenOptions::new() tty: OpenOptions::new()
.write(true) .write(true)
.append(true) .append(true)
.open(tty) .open(tty)
.expect("Cannot open tty for writing!"), .expect("Cannot open tty for writing!"),
} }
} }

fn add(&self, buffer: &Vec<u8>) {
self.lines.fetch_add(1, Ordering::Relaxed);
self.bytes.fetch_add(buffer.len(), Ordering::Relaxed);
}
} }


impl fmt::Display for Stats { impl fmt::Display for Stats {
Expand All @@ -36,17 +45,19 @@ impl fmt::Display for Stats {
if seconds == 0.0 { if seconds == 0.0 {
return write!(f, ""); return write!(f, "");
} }
let kb = self.bytes as f64 / 1024 as f64; let bytes = self.bytes.load(Ordering::Relaxed) as f64;
let lines = self.lines.load(Ordering::Relaxed) as f64;
let kb = bytes / 1024 as f64;
let kb_per_sec = kb / seconds; let kb_per_sec = kb / seconds;
let lines_per_sec = self.lines as f64 / seconds; let lines_per_sec = lines / seconds;
write!( write!(
f, f,
"{}{:.1} sec | {:.0} kb [ {:.1}/s ] | {} lines [ {:.0}/s ]", "{}{:.1} sec | {:.0} kb [ {:.1}/s ] | {:.0} lines [ {:.0}/s ]",
CLEAR_LINE, CLEAR_LINE,
seconds, seconds,
kb, kb,
kb_per_sec, kb_per_sec,
self.lines, lines,
lines_per_sec lines_per_sec
) )
} }
Expand All @@ -56,12 +67,17 @@ fn main() {
let mut reader = BufReader::with_capacity(READ_BUF_SIZE, io::stdin()); let mut reader = BufReader::with_capacity(READ_BUF_SIZE, io::stdin());
let mut writer = BufWriter::new(io::stdout()); let mut writer = BufWriter::new(io::stdout());
let mut buffer = vec![]; let mut buffer = vec![];
let mut stats = Stats::new("/dev/tty"); let stats = Arc::new(Stats::new("/dev/tty"));

let stats_clone = stats.clone();
thread::spawn(move || loop {
sleep(Duration::from_millis(UPDATE_INTERVAL_MS));
write!(&stats_clone.tty, "{}", &stats_clone).expect("Could not write to tty!");
});


while reader.read_until(b'\n', &mut buffer).unwrap() > 0 { while reader.read_until(b'\n', &mut buffer).unwrap() > 0 {
writer.write(&buffer).unwrap(); writer.write(&buffer).unwrap();
stats.lines += 1; stats.add(&buffer);
stats.bytes += &buffer.len();
buffer.clear(); buffer.clear();
} }
writer.flush().unwrap(); writer.flush().unwrap();
Expand Down

0 comments on commit e0b51a9

Please sign in to comment.