diff --git a/Cargo.toml b/Cargo.toml index 1957dec..1ec7f81 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,5 @@ colored = "2.0" sysinfo = "0.30.0" serde = { version = "1.0.193", features = ["derive"] } serde_json = "1.0.108" -time = "0.3.31" \ No newline at end of file +time = "0.3.31" +indicatif = "0.17.7" \ No newline at end of file diff --git a/src/services/fetch_block_service.rs b/src/services/fetch_block_service.rs index bfd4155..c700d5e 100644 --- a/src/services/fetch_block_service.rs +++ b/src/services/fetch_block_service.rs @@ -3,7 +3,7 @@ use crate::model::solana_block::{BlockBatch, Reverse, SerializedSolanaBlock}; use crate::utilities::priority_queue::PriorityQueue; use crate::utilities::rate_limiter::RateLimiter; use crate::utilities::threading::ThreadPool; -use log::{error, info}; +use log::{debug, error, info}; use solana_client::rpc_client::RpcClient; use std::sync::{Arc, Condvar, Mutex}; use std::thread; @@ -101,7 +101,7 @@ impl FetchBlockService { } // ship batch to write thread - info!( + debug!( "Dispatching block batch {}-{} to be written to file.", current_slot - solana_block::BATCH_SIZE + 1, current_slot @@ -122,7 +122,7 @@ impl FetchBlockService { self.thread_pool.execute(closure); } - info!("Block caching has started"); + info!("Block caching is starting, please wait.."); loop { let completed = completed_count.lock().unwrap(); if *completed == no_of_threads as u32 { diff --git a/src/services/write_service.rs b/src/services/write_service.rs index 3c84c66..baf2cdf 100644 --- a/src/services/write_service.rs +++ b/src/services/write_service.rs @@ -1,6 +1,8 @@ +use crate::model::solana_block; use crate::model::solana_block::{BlockBatch, Reverse}; use crate::utilities::priority_queue::PriorityQueue; -use log::{error, info}; +use indicatif::{ProgressBar, ProgressStyle}; +use log::{debug, error}; use std::fs::OpenOptions; use std::io::Write; use std::sync::{Arc, Condvar, Mutex}; @@ -96,21 +98,28 @@ impl WriteService { /// let write_service = WriteService::new(write_queue); /// write_service.initialize(String::from("path/to/output_file.txt")); /// ``` - pub fn initialize(&self, output_file: String) { + pub fn initialize(&self, output_file: String, slot_range: u64) { let queue_clone = self.write_queue.clone(); let condvar_clone = self.condvar.clone(); thread::spawn(move || { let mut next_sequence_id = 1_u64; + let progress_bar = ProgressBar::new(slot_range); + progress_bar.set_style( + ProgressStyle::default_bar() + .template("{prefix:.bold.dim} [{wide_bar:.cyan/blue}] {pos}/{len} ({eta})") + .expect("Template style for progress bar is invalid.") + .progress_chars("##-"), + ); { loop { wait_for_data(queue_clone.clone(), &condvar_clone.clone(), next_sequence_id); let mut queue = queue_clone.lock().unwrap(); let mut file = OpenOptions::new().append(true).create(true).open(&output_file).expect("Unable to open file"); - info!("Checking to see if there are any blocks to write to file"); + debug!("Checking to see if there are any blocks to write to file"); while queue.peek().is_some() && queue.peek().unwrap().0.sequence_number == next_sequence_id { let block_batch = queue.pop().unwrap(); - info!( + debug!( "Attempting to write block batch {} to file", block_batch.0.sequence_number ); @@ -119,7 +128,11 @@ impl WriteService { error!("Could not write block on slot {} to file: {}", block.slot_number, e); } } - info!("Block batch {} written to file", block_batch.0.sequence_number); + debug!("Block batch {} written to file", block_batch.0.sequence_number); + progress_bar.inc(solana_block::BATCH_SIZE); + if progress_bar.position() > slot_range - solana_block::BATCH_SIZE { + progress_bar.finish_and_clear(); + } next_sequence_id += 1; } } @@ -133,11 +146,6 @@ pub fn wait_for_data(pq: Arc>>>, condvar while queue.peek().is_none() || (queue.peek().is_some() && queue.peek().unwrap().0.sequence_number != next_sequence_id) { - if queue.peek().is_some() { - let result = &queue.peek().unwrap().0; - queue = condvar.wait(queue).unwrap(); - } else { - queue = condvar.wait(queue).unwrap(); - } + queue = condvar.wait(queue).unwrap(); } }