Skip to content

Commit

Permalink
fix(fbs): improved thread completion counter using atomics
Browse files Browse the repository at this point in the history
  • Loading branch information
Jamesmallon1 committed Jan 8, 2024
1 parent 92e0478 commit b92aa04
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 20 deletions.
2 changes: 1 addition & 1 deletion src/networking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl BlockFetcher for SolanaClient {
)
.map_err(|err| {
if err.to_string().contains("-32009") || err.to_string().contains("-32004") {
// set verbosity lower as slot has been skipped or is not available
// set verbosity lower as slot has been skipped or is not available for a specific slot
debug!("Could not retrieve block {} due to error: {}", slot, err)
} else {
error!("Could not retrieve block {} due to error: {}", slot, err);
Expand Down
30 changes: 11 additions & 19 deletions src/services/fetch_block_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::utilities::rate_limiter::RateLimiting;
use crate::utilities::threading::{JobDispatcher, WorkerCounter};
use log::{debug, info};
use std::error::Error;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::thread;
use std::time::Duration;
Expand Down Expand Up @@ -80,7 +81,7 @@ impl<
let no_of_threads = self.thread_pool.get_number_of_workers();
let total_slots = to_slot - from_slot + 1;
let slots_per_thread = total_slots / no_of_threads as u64;
let completed_count = Arc::new(Mutex::new(0));
let completed_count = Arc::new(AtomicI32::new(0));
let number_of_block_batches =
((slots_per_thread as f64 / solana_block::BATCH_SIZE as f64).ceil() as u64).max(1);

Expand Down Expand Up @@ -123,7 +124,7 @@ impl<
condvar_clone.notify_one();
}

*completed_clone.lock().unwrap() += 1;
completed_clone.fetch_add(1, Ordering::SeqCst);
};

self.thread_pool.execute(closure);
Expand Down Expand Up @@ -181,13 +182,11 @@ impl<
Ok((start_slot, end_slot))
}

fn wait_for_thread_pool_completion(&self, completed_count: Arc<Mutex<i32>>, no_of_threads: usize) {
fn wait_for_thread_pool_completion(&self, completed_count: Arc<AtomicI32>, no_of_threads: usize) {
loop {
let completed = completed_count.lock().unwrap();
if *completed == no_of_threads as i32 {
if completed_count.load(Ordering::SeqCst) == no_of_threads as i32 {
break;
}
drop(completed);
thread::sleep(Duration::from_millis(500));
}
}
Expand Down Expand Up @@ -291,14 +290,11 @@ mod tests {

#[test]
fn test_wait_for_thread_pool_completion_all_complete() {
let completed_count = Arc::new(Mutex::new(0));
let mut completed_count = Arc::new(AtomicI32::new(0));
let no_of_threads = 5;

// simulate all threads completion
{
let mut completed = completed_count.lock().unwrap();
*completed = no_of_threads as i32;
}
completed_count.fetch_add(no_of_threads as i32, Ordering::Relaxed);

let service = FetchBlockService::<MockRateLimiting, MockQueue<Reverse<BlockBatch>>, MockThreadPool>::new(
Arc::new(Mutex::new(MockQueue::new())),
Expand All @@ -319,21 +315,17 @@ mod tests {

#[test]
fn test_wait_for_thread_pool_completion_partial_complete() {
let completed_count = Arc::new(Mutex::new(0));
let no_of_threads = 5;
let mut completed_count = Arc::new(AtomicI32::new(0));
let no_of_threads = 5usize;

// simulate one thread has not completed
{
let mut completed = completed_count.lock().unwrap();
*completed = no_of_threads as i32 - 1;
}
completed_count.fetch_add(no_of_threads as i32 - 1, Ordering::Relaxed);

// spawn a thread to simulate a completion event after 200ms (< 500ms wait time)
let completed_clone = completed_count.clone();
thread::spawn(move || {
thread::sleep(Duration::from_millis(200));
let mut completed = completed_clone.lock().unwrap();
*completed += 1
completed_clone.fetch_add(1, Ordering::Relaxed);
});

let service = FetchBlockService::<MockRateLimiting, MockQueue<Reverse<BlockBatch>>, MockThreadPool>::new(
Expand Down

0 comments on commit b92aa04

Please sign in to comment.