Skip to content

Commit

Permalink
default staked client in LocalCluster (#716)
Browse files Browse the repository at this point in the history
* default staked client in LocalCluster

* fix underflow
  • Loading branch information
apfitzge committed Apr 10, 2024
1 parent 51f9972 commit e91a5e2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 40 deletions.
56 changes: 44 additions & 12 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use {
crate::{
cluster::{Cluster, ClusterValidatorInfo, QuicTpuClient, ValidatorInfo},
cluster_tests,
integration_tests::DEFAULT_NODE_STAKE,
validator_configs::*,
},
itertools::izip,
Expand Down Expand Up @@ -47,7 +48,7 @@ use {
transaction::Transaction,
},
solana_stake_program::stake_state,
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_tpu_client::tpu_client::{
TpuClient, TpuClientConfig, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP,
DEFAULT_TPU_USE_QUIC,
Expand All @@ -60,7 +61,7 @@ use {
collections::HashMap,
io::{Error, ErrorKind, Result},
iter,
net::UdpSocket,
net::{IpAddr, Ipv4Addr, UdpSocket},
path::{Path, PathBuf},
sync::{Arc, RwLock},
},
Expand Down Expand Up @@ -189,6 +190,46 @@ impl LocalCluster {
pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());

let connection_cache = match config.tpu_use_quic {
true => {
let client_keypair = Keypair::new();
let stake = DEFAULT_NODE_STAKE;

for validator_config in config.validator_configs.iter_mut() {
let mut overrides = HashMap::new();
overrides.insert(client_keypair.pubkey(), stake);
validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides));
}

assert!(
config.tpu_use_quic,
"no support for staked override forwarding without quic"
);

let total_stake = config.node_stakes.iter().sum::<u64>();
let stakes = HashMap::from([
(client_keypair.pubkey(), stake),
(Pubkey::new_unique(), total_stake.saturating_sub(stake)),
]);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));

Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_local_cluster_quic_staked",
config.tpu_connection_pool_size,
None,
Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
Some((&staked_nodes, &client_keypair.pubkey())),
))
}
false => Arc::new(ConnectionCache::with_udp(
"connection_cache_local_cluster_udp",
config.tpu_connection_pool_size,
)),
};

let mut validator_keys = {
if let Some(ref keys) = config.validator_keys {
assert_eq!(config.validator_configs.len(), keys.len());
Expand Down Expand Up @@ -321,16 +362,7 @@ impl LocalCluster {
entry_point_info: leader_contact_info,
validators,
genesis_config,
connection_cache: match config.tpu_use_quic {
true => Arc::new(ConnectionCache::new_quic(
"connection_cache_local_cluster_quic",
config.tpu_connection_pool_size,
)),
false => Arc::new(ConnectionCache::with_udp(
"connection_cache_local_cluster_udp",
config.tpu_connection_pool_size,
)),
},
connection_cache,
};

let node_pubkey_to_vote_key: HashMap<Pubkey, Arc<Keypair>> = keys_in_genesis
Expand Down
32 changes: 4 additions & 28 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
solana_accounts_db::{
hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs,
},
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_client::thin_client::ThinClient,
solana_core::{
consensus::{
tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH,
Expand Down Expand Up @@ -75,7 +75,7 @@ use {
system_program, system_transaction,
vote::state::VoteStateUpdate,
},
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_streamer::socket::SocketAddrSpace,
solana_turbine::broadcast_stage::{
broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition},
BroadcastStageType,
Expand All @@ -87,12 +87,11 @@ use {
fs,
io::Read,
iter,
net::{IpAddr, Ipv4Addr},
num::NonZeroUsize,
path::Path,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex, RwLock,
Arc, Mutex,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
Expand Down Expand Up @@ -362,12 +361,6 @@ fn test_forwarding() {
..ClusterConfig::default()
};

let client_keypair = Keypair::new();
let mut overrides = HashMap::new();
let stake = DEFAULT_NODE_STAKE * 10;
let total_stake = stake + config.node_stakes.iter().sum::<u64>();
overrides.insert(client_keypair.pubkey(), stake);
config.validator_configs[1].staked_nodes_overrides = Arc::new(RwLock::new(overrides));
let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);

let cluster_nodes = discover_cluster(
Expand All @@ -385,28 +378,11 @@ fn test_forwarding() {
.find(|c| c.pubkey() != &leader_pubkey)
.unwrap();

let stakes = HashMap::from([
(client_keypair.pubkey(), stake),
(Pubkey::new_unique(), total_stake - stake),
]);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));

let client_connection_cache = Arc::new(ConnectionCache::new_with_client_options(
"client-connection-cache",
1,
None,
Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
Some((&staked_nodes, &client_keypair.pubkey())),
));

// Confirm that transactions were forwarded to and processed by the leader.
cluster_tests::send_many_transactions(
validator_info,
&cluster.funding_keypair,
&client_connection_cache,
&cluster.connection_cache,
10,
20,
);
Expand Down

0 comments on commit e91a5e2

Please sign in to comment.