Skip to content

Commit

Permalink
moves turbine-disabled check to shred-fetch-stage
Browse files Browse the repository at this point in the history
If turbine_disabled is true, the commit discards turbine packets
earlier in the pipeline so that they won't interfere with the deduper
and the packets can get through once turbine is enabled again.

This is a prerequisite of:
solana-labs#30786
so that local-cluster tests pass.
  • Loading branch information
behzadnouri committed Mar 20, 2023
1 parent f42f618 commit 8cdc5bc
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
34 changes: 24 additions & 10 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@ use {
solana_streamer::streamer::{self, PacketBatchReceiver, StreamerReceiveStats},
std::{
net::UdpSocket,
sync::{atomic::AtomicBool, Arc, RwLock},
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
thread::{self, Builder, JoinHandle},
time::{Duration, Instant},
},
Expand All @@ -41,6 +44,7 @@ impl ShredFetchStage {
name: &'static str,
flags: PacketFlags,
repair_context: Option<(&UdpSocket, &ClusterInfo)>,
turbine_disabled: Arc<AtomicBool>,
) {
const STATS_SUBMIT_CADENCE: Duration = Duration::from_secs(1);
let mut rng = rand::thread_rng();
Expand Down Expand Up @@ -95,16 +99,19 @@ impl ShredFetchStage {
let max_slot = last_slot + 2 * slots_per_epoch;
let should_drop_merkle_shreds =
|shred_slot| should_drop_merkle_shreds(shred_slot, &root_bank);
let turbine_disabled = turbine_disabled.load(Ordering::Relaxed);
for packet in packet_batch.iter_mut() {
if should_discard_packet(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
) {
if turbine_disabled
|| should_discard_packet(
packet,
last_root,
max_slot,
shred_version,
&deduper,
should_drop_merkle_shreds,
&mut stats,
)
{
packet.meta_mut().set_discard(true);
} else {
packet.meta_mut().flags.insert(flags);
Expand All @@ -117,6 +124,7 @@ impl ShredFetchStage {
}
}

#[allow(clippy::too_many_arguments)]
fn packet_modifier(
sockets: Vec<Arc<UdpSocket>>,
exit: &Arc<AtomicBool>,
Expand All @@ -127,6 +135,7 @@ impl ShredFetchStage {
name: &'static str,
flags: PacketFlags,
repair_context: Option<(Arc<UdpSocket>, Arc<ClusterInfo>)>,
turbine_disabled: Arc<AtomicBool>,
) -> (Vec<JoinHandle<()>>, JoinHandle<()>) {
let (packet_sender, packet_receiver) = unbounded();
let streamers = sockets
Expand Down Expand Up @@ -158,6 +167,7 @@ impl ShredFetchStage {
name,
flags,
repair_context,
turbine_disabled,
)
})
.unwrap();
Expand All @@ -172,6 +182,7 @@ impl ShredFetchStage {
shred_version: u16,
bank_forks: Arc<RwLock<BankForks>>,
cluster_info: Arc<ClusterInfo>,
turbine_disabled: Arc<AtomicBool>,
exit: &Arc<AtomicBool>,
) -> Self {
let recycler = PacketBatchRecycler::warmed(100, 1024);
Expand All @@ -186,6 +197,7 @@ impl ShredFetchStage {
"shred_fetch",
PacketFlags::empty(),
None, // repair_context
turbine_disabled.clone(),
);

let (tvu_forwards_threads, fwd_thread_hdl) = Self::packet_modifier(
Expand All @@ -198,6 +210,7 @@ impl ShredFetchStage {
"shred_fetch_tvu_forwards",
PacketFlags::FORWARDED,
None, // repair_context
turbine_disabled.clone(),
);

let (repair_receiver, repair_handler) = Self::packet_modifier(
Expand All @@ -210,6 +223,7 @@ impl ShredFetchStage {
"shred_fetch_repair",
PacketFlags::REPAIR,
Some((repair_socket, cluster_info)),
turbine_disabled,
);

tvu_threads.extend(tvu_forwards_threads.into_iter());
Expand Down
14 changes: 3 additions & 11 deletions core/src/sigverify_shreds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use {
solana_sdk::{clock::Slot, pubkey::Pubkey},
std::{
collections::HashMap,
sync::{
atomic::{AtomicBool, Ordering},
Arc, RwLock,
},
sync::{Arc, RwLock},
thread::{Builder, JoinHandle},
time::{Duration, Instant},
},
Expand All @@ -32,7 +29,6 @@ pub(crate) fn spawn_shred_sigverify(
shred_fetch_receiver: Receiver<PacketBatch>,
retransmit_sender: Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: Sender<Vec<PacketBatch>>,
turbine_disabled: Arc<AtomicBool>,
) -> JoinHandle<()> {
let recycler_cache = RecyclerCache::warmed();
let mut stats = ShredSigVerifyStats::new(Instant::now());
Expand All @@ -49,7 +45,6 @@ pub(crate) fn spawn_shred_sigverify(
&shred_fetch_receiver,
&retransmit_sender,
&verified_sender,
&turbine_disabled,
&mut stats,
) {
Ok(()) => (),
Expand All @@ -70,7 +65,6 @@ fn run_shred_sigverify(
shred_fetch_receiver: &Receiver<PacketBatch>,
retransmit_sender: &Sender<Vec</*shred:*/ Vec<u8>>>,
verified_sender: &Sender<Vec<PacketBatch>>,
turbine_disabled: &AtomicBool,
stats: &mut ShredSigVerifyStats,
) -> Result<(), Error> {
const RECV_TIMEOUT: Duration = Duration::from_secs(1);
Expand Down Expand Up @@ -99,10 +93,8 @@ fn run_shred_sigverify(
.map(<[u8]>::to_vec)
.collect();
stats.num_retransmit_shreds += shreds.len();
if !turbine_disabled.load(Ordering::Relaxed) {
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
}
retransmit_sender.send(shreds)?;
verified_sender.send(packets)?;
stats.elapsed_micros += now.elapsed().as_micros() as u64;
Ok(())
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/tvu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Tvu {
tvu_config.shred_version,
bank_forks.clone(),
cluster_info.clone(),
turbine_disabled,
exit,
);

Expand All @@ -173,7 +174,6 @@ impl Tvu {
fetch_receiver,
retransmit_sender.clone(),
verified_sender,
turbine_disabled,
);

let retransmit_stage = RetransmitStage::new(
Expand Down

0 comments on commit 8cdc5bc

Please sign in to comment.