diff --git a/core/src/cluster_nodes.rs b/core/src/cluster_nodes.rs index 270c83990519a8..ec78c120891423 100644 --- a/core/src/cluster_nodes.rs +++ b/core/src/cluster_nodes.rs @@ -5,7 +5,7 @@ use { rand::{seq::SliceRandom, Rng, SeedableRng}, rand_chacha::ChaChaRng, solana_gossip::{ - cluster_info::{compute_retransmit_peers, ClusterInfo}, + cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT}, contact_info::ContactInfo, crds::GossipRoute, crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS, @@ -35,6 +35,8 @@ use { }, }; +pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4; + #[allow(clippy::large_enum_variant)] enum NodeId { // TVU node obtained through gossip (staked or not). @@ -233,8 +235,10 @@ impl ClusterNodes { 0 } else if self_index <= fanout { 1 - } else { + } else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) { 2 + } else { + 3 // If changed, update MAX_NUM_TURBINE_HOPS. }; let peers = get_retransmit_peers(fanout, self_index, &nodes); return RetransmitPeers { @@ -249,8 +253,10 @@ impl ClusterNodes { 0 } else if self_index < fanout { 1 - } else { + } else if self_index < fanout.saturating_add(1).saturating_mul(fanout) { 2 + } else { + 3 // If changed, update MAX_NUM_TURBINE_HOPS. }; let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes); // Assert that the node itself is included in the set of neighbors, at @@ -480,11 +486,47 @@ pub fn make_test_cluster( (nodes, stakes, cluster_info) } +pub(crate) fn get_data_plane_fanout(shred_slot: Slot, root_bank: &Bank) -> usize { + if enable_turbine_fanout_experiments(shred_slot, root_bank) { + // Allocate ~2% of slots to turbine fanout experiments. + match shred_slot % 359 { + 11 => 64, + 61 => 768, + 111 => 128, + 161 => 640, + 211 => 256, + 261 => 512, + 311 => 384, + _ => DATA_PLANE_FANOUT, + } + } else { + DATA_PLANE_FANOUT + } +} + fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool { - let feature_slot = root_bank - .feature_set - .activated_slot(&feature_set::drop_redundant_turbine_path::id()); - match feature_slot { + check_feature_activation( + &feature_set::drop_redundant_turbine_path::id(), + shred_slot, + root_bank, + ) +} + +fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool { + check_feature_activation( + &feature_set::enable_turbine_fanout_experiments::id(), + shred_slot, + root_bank, + ) && !check_feature_activation( + &feature_set::disable_turbine_fanout_experiments::id(), + shred_slot, + root_bank, + ) +} + +// Returns true if the feature is effective for the shred slot. +fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool { + match root_bank.feature_set.activated_slot(feature) { None => false, Some(feature_slot) => { let epoch_schedule = root_bank.epoch_schedule(); diff --git a/core/src/retransmit_stage.rs b/core/src/retransmit_stage.rs index b8ea2356b55e30..8406df409c8ba0 100644 --- a/core/src/retransmit_stage.rs +++ b/core/src/retransmit_stage.rs @@ -3,17 +3,14 @@ use { crate::{ - cluster_nodes::{ClusterNodes, ClusterNodesCache}, + cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS}, packet_hasher::PacketHasher, }, crossbeam_channel::{Receiver, RecvTimeoutError}, itertools::{izip, Itertools}, lru::LruCache, rayon::{prelude::*, ThreadPool, ThreadPoolBuilder}, - solana_gossip::{ - cluster_info::{ClusterInfo, DATA_PLANE_FANOUT}, - contact_info::ContactInfo, - }, + solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo}, solana_ledger::{ leader_schedule_cache::LeaderScheduleCache, shred::{self, ShredId}, @@ -56,8 +53,8 @@ struct RetransmitSlotStats { outset: u64, // 1st shred retransmit timestamp. // Number of shreds sent and received at different // distances from the turbine broadcast root. - num_shreds_received: [usize; 3], - num_shreds_sent: [usize; 3], + num_shreds_received: [usize; MAX_NUM_TURBINE_HOPS], + num_shreds_sent: [usize; MAX_NUM_TURBINE_HOPS], } struct RetransmitStats { @@ -300,8 +297,9 @@ fn retransmit_shred( stats: &RetransmitStats, ) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) { let mut compute_turbine_peers = Measure::start("turbine_start"); + let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank); let (root_distance, addrs) = - cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT); + cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout); let addrs: Vec<_> = addrs .into_iter() .filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space)) @@ -441,7 +439,7 @@ impl AddAssign for RetransmitSlotStats { } else { self.outset.min(outset) }; - for k in 0..3 { + for k in 0..MAX_NUM_TURBINE_HOPS { self.num_shreds_received[k] += num_shreds_received[k]; self.num_shreds_sent[k] += num_shreds_sent[k]; } @@ -555,9 +553,15 @@ impl RetransmitSlotStats { self.num_shreds_received[2], i64 ), + ( + "num_shreds_received_3rd_layer", + self.num_shreds_received[3], + i64 + ), ("num_shreds_sent_root", self.num_shreds_sent[0], i64), ("num_shreds_sent_1st_layer", self.num_shreds_sent[1], i64), ("num_shreds_sent_2nd_layer", self.num_shreds_sent[2], i64), + ("num_shreds_sent_3rd_layer", self.num_shreds_sent[3], i64), ); } } diff --git a/sdk/src/feature_set.rs b/sdk/src/feature_set.rs index 8e30dd3d6c44ef..228dc56a0559da 100644 --- a/sdk/src/feature_set.rs +++ b/sdk/src/feature_set.rs @@ -558,6 +558,14 @@ pub mod commission_updates_only_allowed_in_first_half_of_epoch { solana_sdk::declare_id!("noRuG2kzACwgaY7TVmLRnUNPLKNVQE1fb7X55YWBehp"); } +pub mod enable_turbine_fanout_experiments { + solana_sdk::declare_id!("D31EFnLgdiysi84Woo3of4JMu7VmasUS3Z7j9HYXCeLY"); +} + +pub mod disable_turbine_fanout_experiments { + solana_sdk::declare_id!("Gz1aLrbeQ4Q6PTSafCZcGWZXz91yVRi7ASFzFEr1U4sa"); +} + lazy_static! { /// Map of feature identifiers to user-visible description pub static ref FEATURE_NAMES: HashMap = [ @@ -692,6 +700,8 @@ lazy_static! { (enable_alt_bn128_syscall::id(), "add alt_bn128 syscalls #27961"), (enable_program_redeployment_cooldown::id(), "enable program redeployment cooldown #29135"), (commission_updates_only_allowed_in_first_half_of_epoch::id(), "validator commission updates are only allowed in the first half of an epoch #29362"), + (enable_turbine_fanout_experiments::id(), "enable turbine fanout experiments #29393"), + (disable_turbine_fanout_experiments::id(), "disable turbine fanout experiments #29393"), /*************** ADD NEW FEATURES HERE ***************/ ] .iter()