Skip to content

Commit

Permalink
removes lazy-static thread-pool from sigverify-shreds
Browse files Browse the repository at this point in the history
Instead the thread-pool is passed explicitly from higher in the call
stack so that
solana-labs#30786
can use the same thread-pool for shred deduplication.
  • Loading branch information
behzadnouri committed Mar 20, 2023
1 parent 2877fda commit 6cd2ffc
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 57 deletions.
27 changes: 22 additions & 5 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use {
crossbeam_channel::{Receiver, RecvTimeoutError, SendError, Sender},
rayon::{ThreadPool, ThreadPoolBuilder},
solana_gossip::cluster_info::ClusterInfo,
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache, shred, sigverify_shreds::verify_shreds_gpu,
},
solana_perf::{self, packet::PacketBatch, recycler_cache::RecyclerCache},
solana_rayon_threadlimit::get_thread_count,
solana_runtime::{bank::Bank, bank_forks::BankForks},
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
Expand Down Expand Up @@ -36,10 +38,15 @@ pub(crate) fn spawn_shred_sigverify(
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Builder::new()
.name("solShredVerifr".to_string())
.spawn(move || loop {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.thread_name(|i| format!("solSvrfyShred{i:02}"))
.build()
.unwrap();
let run_shred_sigverify = move || {
loop {
match run_shred_sigverify(
&thread_pool,
// We can't store the pubkey outside the loop
// because the identity might be hot swapped.
&cluster_info.id(),
Expand All @@ -58,11 +65,17 @@ pub(crate) fn spawn_shred_sigverify(
Err(Error::SendError) => break,
}
stats.maybe_submit();
})
}
};
Builder::new()
.name("solShredVerifr".to_string())
.spawn(run_shred_sigverify)
.unwrap()
}

#[allow(clippy::too_many_arguments)]
fn run_shred_sigverify(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
Expand All @@ -83,6 +96,7 @@ fn run_shred_sigverify(
stats.num_packets += packets.iter().map(PacketBatch::len).sum::<usize>();
stats.num_discards_pre += count_discards(&packets);
verify_packets(
thread_pool,
self_pubkey,
bank_forks,
leader_schedule_cache,
Expand All @@ -108,6 +122,7 @@ fn run_shred_sigverify(
}

fn verify_packets(
thread_pool: &ThreadPool,
self_pubkey: &Pubkey,
bank_forks: &RwLock<BankForks>,
leader_schedule_cache: &LeaderScheduleCache,
Expand All @@ -121,7 +136,7 @@ fn verify_packets(
.filter_map(|(slot, pubkey)| Some((slot, pubkey?.to_bytes())))
.chain(std::iter::once((Slot::MAX, [0u8; 32])))
.collect();
let out = verify_shreds_gpu(packets, &leader_slots, recycler_cache);
let out = verify_shreds_gpu(thread_pool, packets, &leader_slots, recycler_cache);
solana_perf::sigverify::mark_disabled(packets, &out);
}

Expand Down Expand Up @@ -284,7 +299,9 @@ mod tests {
batches[0][1].buffer_mut()[..shred.payload().len()].copy_from_slice(shred.payload());
batches[0][1].meta_mut().size = shred.payload().len();

let thread_pool = ThreadPoolBuilder::new().num_threads(3).build().unwrap();
verify_packets(
&thread_pool,
&Pubkey::new_unique(), // self_pubkey
&bank_forks,
&leader_schedule_cache,
Expand Down
28 changes: 25 additions & 3 deletions ledger/benches/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

extern crate test;
use {
rayon::ThreadPoolBuilder,
solana_ledger::{
shred::{Shred, ShredFlags, LEGACY_SHRED_DATA_CAPACITY},
sigverify_shreds::{sign_shreds_cpu, sign_shreds_gpu, sign_shreds_gpu_pinned_keypair},
Expand All @@ -10,6 +11,7 @@ use {
packet::{Packet, PacketBatch},
recycler_cache::RecyclerCache,
},
solana_rayon_threadlimit::get_thread_count,
solana_sdk::signature::Keypair,
std::sync::Arc,
test::Bencher,
Expand All @@ -19,6 +21,10 @@ const NUM_PACKETS: usize = 256;
const NUM_BATCHES: usize = 1;
#[bench]
fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
let recycler_cache = RecyclerCache::default();

let mut packet_batch = PacketBatch::new_pinned_with_capacity(NUM_PACKETS);
Expand All @@ -43,15 +49,31 @@ fn bench_sigverify_shreds_sign_gpu(bencher: &mut Bencher) {
let pinned_keypair = Some(Arc::new(pinned_keypair));
//warmup
for _ in 0..100 {
sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache);
sign_shreds_gpu(
&thread_pool,
&keypair,
&pinned_keypair,
&mut batches,
&recycler_cache,
);
}
bencher.iter(|| {
sign_shreds_gpu(&keypair, &pinned_keypair, &mut batches, &recycler_cache);
sign_shreds_gpu(
&thread_pool,
&keypair,
&pinned_keypair,
&mut batches,
&recycler_cache,
);
})
}

#[bench]
fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) {
let thread_pool = ThreadPoolBuilder::new()
.num_threads(get_thread_count())
.build()
.unwrap();
let mut packet_batch = PacketBatch::default();
let slot = 0xdead_c0de;
packet_batch.resize(NUM_PACKETS, Packet::default());
Expand All @@ -71,6 +93,6 @@ fn bench_sigverify_shreds_sign_cpu(bencher: &mut Bencher) {
let mut batches = vec![packet_batch; NUM_BATCHES];
let keypair = Keypair::new();
bencher.iter(|| {
sign_shreds_cpu(&keypair, &mut batches);
sign_shreds_cpu(&thread_pool, &keypair, &mut batches);
})
}
Loading

0 comments on commit 6cd2ffc

Please sign in to comment.