From e91a5e27444af47c63946a971244c1075bbc99bf Mon Sep 17 00:00:00 2001 From: Andrew Fitzgerald Date: Wed, 10 Apr 2024 15:33:07 -0500 Subject: [PATCH] default staked client in LocalCluster (#716) * default staked client in LocalCluster * fix underflow --- local-cluster/src/local_cluster.rs | 56 ++++++++++++++++++++++------ local-cluster/tests/local_cluster.rs | 32 ++-------------- 2 files changed, 48 insertions(+), 40 deletions(-) diff --git a/local-cluster/src/local_cluster.rs b/local-cluster/src/local_cluster.rs index fe27a4a1aed21c..8e43f1875ecf22 100644 --- a/local-cluster/src/local_cluster.rs +++ b/local-cluster/src/local_cluster.rs @@ -2,6 +2,7 @@ use { crate::{ cluster::{Cluster, ClusterValidatorInfo, QuicTpuClient, ValidatorInfo}, cluster_tests, + integration_tests::DEFAULT_NODE_STAKE, validator_configs::*, }, itertools::izip, @@ -47,7 +48,7 @@ use { transaction::Transaction, }, solana_stake_program::stake_state, - solana_streamer::socket::SocketAddrSpace, + solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, solana_tpu_client::tpu_client::{ TpuClient, TpuClientConfig, DEFAULT_TPU_CONNECTION_POOL_SIZE, DEFAULT_TPU_ENABLE_UDP, DEFAULT_TPU_USE_QUIC, @@ -60,7 +61,7 @@ use { collections::HashMap, io::{Error, ErrorKind, Result}, iter, - net::UdpSocket, + net::{IpAddr, Ipv4Addr, UdpSocket}, path::{Path, PathBuf}, sync::{Arc, RwLock}, }, @@ -189,6 +190,46 @@ impl LocalCluster { pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self { assert_eq!(config.validator_configs.len(), config.node_stakes.len()); + let connection_cache = match config.tpu_use_quic { + true => { + let client_keypair = Keypair::new(); + let stake = DEFAULT_NODE_STAKE; + + for validator_config in config.validator_configs.iter_mut() { + let mut overrides = HashMap::new(); + overrides.insert(client_keypair.pubkey(), stake); + validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides)); + } + + assert!( + config.tpu_use_quic, + "no support for staked override forwarding without quic" + ); + + let total_stake = config.node_stakes.iter().sum::(); + let stakes = HashMap::from([ + (client_keypair.pubkey(), stake), + (Pubkey::new_unique(), total_stake.saturating_sub(stake)), + ]); + let staked_nodes = Arc::new(RwLock::new(StakedNodes::new( + Arc::new(stakes), + HashMap::::default(), // overrides + ))); + + Arc::new(ConnectionCache::new_with_client_options( + "connection_cache_local_cluster_quic_staked", + config.tpu_connection_pool_size, + None, + Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), + Some((&staked_nodes, &client_keypair.pubkey())), + )) + } + false => Arc::new(ConnectionCache::with_udp( + "connection_cache_local_cluster_udp", + config.tpu_connection_pool_size, + )), + }; + let mut validator_keys = { if let Some(ref keys) = config.validator_keys { assert_eq!(config.validator_configs.len(), keys.len()); @@ -321,16 +362,7 @@ impl LocalCluster { entry_point_info: leader_contact_info, validators, genesis_config, - connection_cache: match config.tpu_use_quic { - true => Arc::new(ConnectionCache::new_quic( - "connection_cache_local_cluster_quic", - config.tpu_connection_pool_size, - )), - false => Arc::new(ConnectionCache::with_udp( - "connection_cache_local_cluster_udp", - config.tpu_connection_pool_size, - )), - }, + connection_cache, }; let node_pubkey_to_vote_key: HashMap> = keys_in_genesis diff --git a/local-cluster/tests/local_cluster.rs b/local-cluster/tests/local_cluster.rs index ed95bf85d6c056..804c0db7884371 100644 --- a/local-cluster/tests/local_cluster.rs +++ b/local-cluster/tests/local_cluster.rs @@ -9,7 +9,7 @@ use { solana_accounts_db::{ hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs, }, - solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient}, + solana_client::thin_client::ThinClient, solana_core::{ consensus::{ tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH, @@ -75,7 +75,7 @@ use { system_program, system_transaction, vote::state::VoteStateUpdate, }, - solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes}, + solana_streamer::socket::SocketAddrSpace, solana_turbine::broadcast_stage::{ broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition}, BroadcastStageType, @@ -87,12 +87,11 @@ use { fs, io::Read, iter, - net::{IpAddr, Ipv4Addr}, num::NonZeroUsize, path::Path, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, RwLock, + Arc, Mutex, }, thread::{sleep, Builder, JoinHandle}, time::{Duration, Instant}, @@ -362,12 +361,6 @@ fn test_forwarding() { ..ClusterConfig::default() }; - let client_keypair = Keypair::new(); - let mut overrides = HashMap::new(); - let stake = DEFAULT_NODE_STAKE * 10; - let total_stake = stake + config.node_stakes.iter().sum::(); - overrides.insert(client_keypair.pubkey(), stake); - config.validator_configs[1].staked_nodes_overrides = Arc::new(RwLock::new(overrides)); let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified); let cluster_nodes = discover_cluster( @@ -385,28 +378,11 @@ fn test_forwarding() { .find(|c| c.pubkey() != &leader_pubkey) .unwrap(); - let stakes = HashMap::from([ - (client_keypair.pubkey(), stake), - (Pubkey::new_unique(), total_stake - stake), - ]); - let staked_nodes = Arc::new(RwLock::new(StakedNodes::new( - Arc::new(stakes), - HashMap::::default(), // overrides - ))); - - let client_connection_cache = Arc::new(ConnectionCache::new_with_client_options( - "client-connection-cache", - 1, - None, - Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))), - Some((&staked_nodes, &client_keypair.pubkey())), - )); - // Confirm that transactions were forwarded to and processed by the leader. cluster_tests::send_many_transactions( validator_info, &cluster.funding_keypair, - &client_connection_cache, + &cluster.connection_cache, 10, 20, );