Skip to content

Commit

Permalink
experiments different turbine fanouts for propagating shreds (solana-…
Browse files Browse the repository at this point in the history
…labs#29393)

The commit allocates 2% of slots to running experiments with different
turbine fanouts based on the slot number.
The experiment is feature gated with an additional feature to disable
the experiment.
  • Loading branch information
behzadnouri authored and gnapoli23 committed Jan 10, 2023
1 parent 5895e5e commit 9effc44
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 16 deletions.
56 changes: 49 additions & 7 deletions core/src/cluster_nodes.rs
Expand Up @@ -5,7 +5,7 @@ use {
rand::{seq::SliceRandom, Rng, SeedableRng},
rand_chacha::ChaChaRng,
solana_gossip::{
cluster_info::{compute_retransmit_peers, ClusterInfo},
cluster_info::{compute_retransmit_peers, ClusterInfo, DATA_PLANE_FANOUT},
contact_info::ContactInfo,
crds::GossipRoute,
crds_gossip_pull::CRDS_GOSSIP_PULL_CRDS_TIMEOUT_MS,
Expand Down Expand Up @@ -35,6 +35,8 @@ use {
},
};

pub(crate) const MAX_NUM_TURBINE_HOPS: usize = 4;

#[allow(clippy::large_enum_variant)]
enum NodeId {
// TVU node obtained through gossip (staked or not).
Expand Down Expand Up @@ -233,8 +235,10 @@ impl ClusterNodes<RetransmitStage> {
0
} else if self_index <= fanout {
1
} else {
} else if self_index <= fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let peers = get_retransmit_peers(fanout, self_index, &nodes);
return RetransmitPeers {
Expand All @@ -249,8 +253,10 @@ impl ClusterNodes<RetransmitStage> {
0
} else if self_index < fanout {
1
} else {
} else if self_index < fanout.saturating_add(1).saturating_mul(fanout) {
2
} else {
3 // If changed, update MAX_NUM_TURBINE_HOPS.
};
let (neighbors, children) = compute_retransmit_peers(fanout, self_index, &nodes);
// Assert that the node itself is included in the set of neighbors, at
Expand Down Expand Up @@ -480,11 +486,47 @@ pub fn make_test_cluster<R: Rng>(
(nodes, stakes, cluster_info)
}

pub(crate) fn get_data_plane_fanout(shred_slot: Slot, root_bank: &Bank) -> usize {
if enable_turbine_fanout_experiments(shred_slot, root_bank) {
// Allocate ~2% of slots to turbine fanout experiments.
match shred_slot % 359 {
11 => 64,
61 => 768,
111 => 128,
161 => 640,
211 => 256,
261 => 512,
311 => 384,
_ => DATA_PLANE_FANOUT,
}
} else {
DATA_PLANE_FANOUT
}
}

fn drop_redundant_turbine_path(shred_slot: Slot, root_bank: &Bank) -> bool {
let feature_slot = root_bank
.feature_set
.activated_slot(&feature_set::drop_redundant_turbine_path::id());
match feature_slot {
check_feature_activation(
&feature_set::drop_redundant_turbine_path::id(),
shred_slot,
root_bank,
)
}

fn enable_turbine_fanout_experiments(shred_slot: Slot, root_bank: &Bank) -> bool {
check_feature_activation(
&feature_set::enable_turbine_fanout_experiments::id(),
shred_slot,
root_bank,
) && !check_feature_activation(
&feature_set::disable_turbine_fanout_experiments::id(),
shred_slot,
root_bank,
)
}

// Returns true if the feature is effective for the shred slot.
fn check_feature_activation(feature: &Pubkey, shred_slot: Slot, root_bank: &Bank) -> bool {
match root_bank.feature_set.activated_slot(feature) {
None => false,
Some(feature_slot) => {
let epoch_schedule = root_bank.epoch_schedule();
Expand Down
22 changes: 13 additions & 9 deletions core/src/retransmit_stage.rs
Expand Up @@ -3,17 +3,14 @@

use {
crate::{
cluster_nodes::{ClusterNodes, ClusterNodesCache},
cluster_nodes::{self, ClusterNodes, ClusterNodesCache, MAX_NUM_TURBINE_HOPS},
packet_hasher::PacketHasher,
},
crossbeam_channel::{Receiver, RecvTimeoutError},
itertools::{izip, Itertools},
lru::LruCache,
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_gossip::{
cluster_info::{ClusterInfo, DATA_PLANE_FANOUT},
contact_info::ContactInfo,
},
solana_gossip::{cluster_info::ClusterInfo, contact_info::ContactInfo},
solana_ledger::{
leader_schedule_cache::LeaderScheduleCache,
shred::{self, ShredId},
Expand Down Expand Up @@ -56,8 +53,8 @@ struct RetransmitSlotStats {
outset: u64, // 1st shred retransmit timestamp.
// Number of shreds sent and received at different
// distances from the turbine broadcast root.
num_shreds_received: [usize; 3],
num_shreds_sent: [usize; 3],
num_shreds_received: [usize; MAX_NUM_TURBINE_HOPS],
num_shreds_sent: [usize; MAX_NUM_TURBINE_HOPS],
}

struct RetransmitStats {
Expand Down Expand Up @@ -300,8 +297,9 @@ fn retransmit_shred(
stats: &RetransmitStats,
) -> (/*root_distance:*/ usize, /*num_nodes:*/ usize) {
let mut compute_turbine_peers = Measure::start("turbine_start");
let data_plane_fanout = cluster_nodes::get_data_plane_fanout(key.slot(), root_bank);
let (root_distance, addrs) =
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, DATA_PLANE_FANOUT);
cluster_nodes.get_retransmit_addrs(slot_leader, key, root_bank, data_plane_fanout);
let addrs: Vec<_> = addrs
.into_iter()
.filter(|addr| ContactInfo::is_valid_address(addr, socket_addr_space))
Expand Down Expand Up @@ -441,7 +439,7 @@ impl AddAssign for RetransmitSlotStats {
} else {
self.outset.min(outset)
};
for k in 0..3 {
for k in 0..MAX_NUM_TURBINE_HOPS {
self.num_shreds_received[k] += num_shreds_received[k];
self.num_shreds_sent[k] += num_shreds_sent[k];
}
Expand Down Expand Up @@ -555,9 +553,15 @@ impl RetransmitSlotStats {
self.num_shreds_received[2],
i64
),
(
"num_shreds_received_3rd_layer",
self.num_shreds_received[3],
i64
),
("num_shreds_sent_root", self.num_shreds_sent[0], i64),
("num_shreds_sent_1st_layer", self.num_shreds_sent[1], i64),
("num_shreds_sent_2nd_layer", self.num_shreds_sent[2], i64),
("num_shreds_sent_3rd_layer", self.num_shreds_sent[3], i64),
);
}
}
Expand Down
10 changes: 10 additions & 0 deletions sdk/src/feature_set.rs
Expand Up @@ -558,6 +558,14 @@ pub mod commission_updates_only_allowed_in_first_half_of_epoch {
solana_sdk::declare_id!("noRuG2kzACwgaY7TVmLRnUNPLKNVQE1fb7X55YWBehp");
}

pub mod enable_turbine_fanout_experiments {
solana_sdk::declare_id!("D31EFnLgdiysi84Woo3of4JMu7VmasUS3Z7j9HYXCeLY");
}

pub mod disable_turbine_fanout_experiments {
solana_sdk::declare_id!("Gz1aLrbeQ4Q6PTSafCZcGWZXz91yVRi7ASFzFEr1U4sa");
}

lazy_static! {
/// Map of feature identifiers to user-visible description
pub static ref FEATURE_NAMES: HashMap<Pubkey, &'static str> = [
Expand Down Expand Up @@ -692,6 +700,8 @@ lazy_static! {
(enable_alt_bn128_syscall::id(), "add alt_bn128 syscalls #27961"),
(enable_program_redeployment_cooldown::id(), "enable program redeployment cooldown #29135"),
(commission_updates_only_allowed_in_first_half_of_epoch::id(), "validator commission updates are only allowed in the first half of an epoch #29362"),
(enable_turbine_fanout_experiments::id(), "enable turbine fanout experiments #29393"),
(disable_turbine_fanout_experiments::id(), "disable turbine fanout experiments #29393"),
/*************** ADD NEW FEATURES HERE ***************/
]
.iter()
Expand Down

0 comments on commit 9effc44

Please sign in to comment.