Skip to content

Commit

Permalink
feat(progress-bar): added progress bar to cli
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamesmallon1 committed Dec 27, 2023
1 parent 8862f58 commit ff53a86
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 15 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ colored = "2.0"
sysinfo = "0.30.0"
serde = { version = "1.0.193", features = ["derive"] }
serde_json = "1.0.108"
time = "0.3.31"
time = "0.3.31"
indicatif = "0.17.7"
6 changes: 3 additions & 3 deletions src/services/fetch_block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::model::solana_block::{BlockBatch, Reverse, SerializedSolanaBlock};
use crate::utilities::priority_queue::PriorityQueue;
use crate::utilities::rate_limiter::RateLimiter;
use crate::utilities::threading::ThreadPool;
use log::{error, info};
use log::{debug, error, info};
use solana_client::rpc_client::RpcClient;
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
Expand Down Expand Up @@ -101,7 +101,7 @@ impl FetchBlockService {
}

// ship batch to write thread
info!(
debug!(
"Dispatching block batch {}-{} to be written to file.",
current_slot - solana_block::BATCH_SIZE + 1,
current_slot
Expand All @@ -122,7 +122,7 @@ impl FetchBlockService {
self.thread_pool.execute(closure);
}

info!("Block caching has started");
info!("Block caching is starting, please wait..");
loop {
let completed = completed_count.lock().unwrap();
if *completed == no_of_threads as u32 {
Expand Down
30 changes: 19 additions & 11 deletions src/services/write_service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::model::solana_block;
use crate::model::solana_block::{BlockBatch, Reverse};
use crate::utilities::priority_queue::PriorityQueue;
use log::{error, info};
use indicatif::{ProgressBar, ProgressStyle};
use log::{debug, error};
use std::fs::OpenOptions;
use std::io::Write;
use std::sync::{Arc, Condvar, Mutex};
Expand Down Expand Up @@ -96,21 +98,28 @@ impl WriteService {
/// let write_service = WriteService::new(write_queue);
/// write_service.initialize(String::from("path/to/output_file.txt"));
/// ```
pub fn initialize(&self, output_file: String) {
pub fn initialize(&self, output_file: String, slot_range: u64) {
let queue_clone = self.write_queue.clone();
let condvar_clone = self.condvar.clone();
thread::spawn(move || {
let mut next_sequence_id = 1_u64;
let progress_bar = ProgressBar::new(slot_range);
progress_bar.set_style(
ProgressStyle::default_bar()
.template("{prefix:.bold.dim} [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})")
.expect("Template style for progress bar is invalid.")
.progress_chars("##-"),
);
{
loop {
wait_for_data(queue_clone.clone(), &condvar_clone.clone(), next_sequence_id);
let mut queue = queue_clone.lock().unwrap();
let mut file =
OpenOptions::new().append(true).create(true).open(&output_file).expect("Unable to open file");
info!("Checking to see if there are any blocks to write to file");
debug!("Checking to see if there are any blocks to write to file");
while queue.peek().is_some() && queue.peek().unwrap().0.sequence_number == next_sequence_id {
let block_batch = queue.pop().unwrap();
info!(
debug!(
"Attempting to write block batch {} to file",
block_batch.0.sequence_number
);
Expand All @@ -119,7 +128,11 @@ impl WriteService {
error!("Could not write block on slot {} to file: {}", block.slot_number, e);
}
}
info!("Block batch {} written to file", block_batch.0.sequence_number);
debug!("Block batch {} written to file", block_batch.0.sequence_number);
progress_bar.inc(solana_block::BATCH_SIZE);
if progress_bar.position() > slot_range - solana_block::BATCH_SIZE {
progress_bar.finish_and_clear();
}
next_sequence_id += 1;
}
}
Expand All @@ -133,11 +146,6 @@ pub fn wait_for_data(pq: Arc<Mutex<PriorityQueue<Reverse<BlockBatch>>>>, condvar
while queue.peek().is_none()
|| (queue.peek().is_some() && queue.peek().unwrap().0.sequence_number != next_sequence_id)
{
if queue.peek().is_some() {
let result = &queue.peek().unwrap().0;
queue = condvar.wait(queue).unwrap();
} else {
queue = condvar.wait(queue).unwrap();
}
queue = condvar.wait(queue).unwrap();
}
}

0 comments on commit ff53a86

Please sign in to comment.