Skip to content

Commit

Permalink
BankingStage Forwarding Filter (#685)
Browse files Browse the repository at this point in the history
* add PacketFlags::FROM_STAKED_NODE

* Only forward packets from staked node

* fix local-cluster test forwarding

* review comment

* tpu_votes get marked as from_staked_node
  • Loading branch information
apfitzge committed Apr 9, 2024
1 parent 592107a commit 1744e9e
Show file tree
Hide file tree
Showing 11 changed files with 63 additions and 11 deletions.
1 change: 1 addition & 0 deletions bench-streamer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ fn main() -> Result<()> {
Duration::from_millis(1), // coalesce
true,
None,
false,
));
}

Expand Down
1 change: 1 addition & 0 deletions core/src/banking_stage/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@ impl Forwarder {
self.update_data_budget();
let packet_vec: Vec<_> = forwardable_packets
.filter(|p| !p.meta().forwarded())
.filter(|p| p.meta().is_from_staked_node())
.filter(|p| self.data_budget.take(p.meta().size))
.filter_map(|p| p.data(..).map(|data| data.to_vec()))
.collect();
Expand Down
3 changes: 3 additions & 0 deletions core/src/fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -194,6 +195,7 @@ impl FetchStage {
coalesce,
true,
in_vote_only_mode.clone(),
false, // unstaked connections
)
})
.collect()
Expand All @@ -216,6 +218,7 @@ impl FetchStage {
coalesce,
true,
None,
true, // only staked connections should be voting
)
})
.collect();
Expand Down
2 changes: 2 additions & 0 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ impl AncestorHashesService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);

let (quic_endpoint_response_sender, quic_endpoint_response_receiver) = unbounded();
Expand Down Expand Up @@ -1304,6 +1305,7 @@ mod test {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (remote_request_sender, remote_request_receiver) = unbounded();
let t_packet_adapter = Builder::new()
Expand Down
1 change: 1 addition & 0 deletions core/src/repair/serve_repair_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ impl ServeRepairService {
Duration::from_millis(1), // coalesce
false, // use_pinned_memory
None, // in_vote_only_mode
false, // is_staked_service
);
let t_packet_adapter = Builder::new()
.name(String::from("solServRAdapt"))
Expand Down
1 change: 1 addition & 0 deletions core/src/shred_fetch_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ impl ShredFetchStage {
PACKET_COALESCE_DURATION,
true, // use_pinned_memory
None, // in_vote_only_mode
false,
)
})
.collect();
Expand Down
1 change: 1 addition & 0 deletions gossip/src/gossip_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ impl GossipService {
Duration::from_millis(1), // coalesce
false,
None,
false,
);
let (consume_sender, listen_receiver) = unbounded();
let t_socket_consume = cluster_info.clone().start_socket_consume_thread(
Expand Down
42 changes: 32 additions & 10 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use {
solana_accounts_db::{
hardened_unpack::open_genesis_config, utils::create_accounts_run_and_snapshot_dirs,
},
solana_client::thin_client::ThinClient,
solana_client::{connection_cache::ConnectionCache, thin_client::ThinClient},
solana_core::{
consensus::{
tower_storage::FileTowerStorage, Tower, SWITCH_FORK_THRESHOLD, VOTE_THRESHOLD_DEPTH,
Expand Down Expand Up @@ -56,12 +56,9 @@ use {
response::RpcSignatureResult,
},
solana_runtime::{
commitment::VOTE_THRESHOLD_SIZE,
snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_bank_utils,
snapshot_config::SnapshotConfig,
snapshot_package::SnapshotKind,
snapshot_utils::{self},
commitment::VOTE_THRESHOLD_SIZE, snapshot_archive_info::SnapshotArchiveInfoGetter,
snapshot_bank_utils, snapshot_config::SnapshotConfig, snapshot_package::SnapshotKind,
snapshot_utils,
},
solana_sdk::{
account::AccountSharedData,
Expand All @@ -78,7 +75,7 @@ use {
system_program, system_transaction,
vote::state::VoteStateUpdate,
},
solana_streamer::socket::SocketAddrSpace,
solana_streamer::{socket::SocketAddrSpace, streamer::StakedNodes},
solana_turbine::broadcast_stage::{
broadcast_duplicates_run::{BroadcastDuplicatesConfig, ClusterPartition},
BroadcastStageType,
Expand All @@ -90,11 +87,12 @@ use {
fs,
io::Read,
iter,
net::{IpAddr, Ipv4Addr},
num::NonZeroUsize,
path::Path,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc, Mutex,
Arc, Mutex, RwLock,
},
thread::{sleep, Builder, JoinHandle},
time::{Duration, Instant},
Expand Down Expand Up @@ -363,6 +361,13 @@ fn test_forwarding() {
),
..ClusterConfig::default()
};

let client_keypair = Keypair::new();
let mut overrides = HashMap::new();
let stake = DEFAULT_NODE_STAKE * 10;
let total_stake = stake + config.node_stakes.iter().sum::<u64>();
overrides.insert(client_keypair.pubkey(), stake);
config.validator_configs[1].staked_nodes_overrides = Arc::new(RwLock::new(overrides));
let cluster = LocalCluster::new(&mut config, SocketAddrSpace::Unspecified);

let cluster_nodes = discover_cluster(
Expand All @@ -380,11 +385,28 @@ fn test_forwarding() {
.find(|c| c.pubkey() != &leader_pubkey)
.unwrap();

let stakes = HashMap::from([
(client_keypair.pubkey(), stake),
(Pubkey::new_unique(), total_stake - stake),
]);
let staked_nodes = Arc::new(RwLock::new(StakedNodes::new(
Arc::new(stakes),
HashMap::<Pubkey, u64>::default(), // overrides
)));

let client_connection_cache = Arc::new(ConnectionCache::new_with_client_options(
"client-connection-cache",
1,
None,
Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
Some((&staked_nodes, &client_keypair.pubkey())),
));

// Confirm that transactions were forwarded to and processed by the leader.
cluster_tests::send_many_transactions(
validator_info,
&cluster.funding_keypair,
&cluster.connection_cache,
&client_connection_cache,
10,
20,
);
Expand Down
12 changes: 12 additions & 0 deletions sdk/src/packet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ bitflags! {
const ROUND_COMPUTE_UNIT_PRICE = 0b0010_0000;
/// For tracking performance
const PERF_TRACK_PACKET = 0b0100_0000;
/// For marking packets from staked nodes
const FROM_STAKED_NODE = 0b1000_0000;
}
}

Expand Down Expand Up @@ -215,6 +217,11 @@ impl Meta {
self.port = socket_addr.port();
}

pub fn set_from_staked_node(&mut self, from_staked_node: bool) {
self.flags
.set(PacketFlags::FROM_STAKED_NODE, from_staked_node);
}

#[inline]
pub fn discard(&self) -> bool {
self.flags.contains(PacketFlags::DISCARD)
Expand Down Expand Up @@ -278,6 +285,11 @@ impl Meta {
pub fn round_compute_unit_price(&self) -> bool {
self.flags.contains(PacketFlags::ROUND_COMPUTE_UNIT_PRICE)
}

#[inline]
pub fn is_from_staked_node(&self) -> bool {
self.flags.contains(PacketFlags::FROM_STAKED_NODE)
}
}

impl Default for Meta {
Expand Down
1 change: 1 addition & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,7 @@ async fn handle_chunk(
if packet_accum.is_none() {
let mut meta = Meta::default();
meta.set_socket_addr(remote_addr);
meta.set_from_staked_node(matches!(peer_type, ConnectionPeerType::Staked(_)));
*packet_accum = Some(PacketAccumulator {
meta,
chunks: Vec::new(),
Expand Down
9 changes: 8 additions & 1 deletion streamer/src/streamer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn recv_loop(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> Result<()> {
loop {
let mut packet_batch = if use_pinned_memory {
Expand Down Expand Up @@ -147,7 +148,9 @@ fn recv_loop(
if len == PACKETS_PER_BATCH {
full_packet_batches_count.fetch_add(1, Ordering::Relaxed);
}

packet_batch
.iter_mut()
.for_each(|p| p.meta_mut().set_from_staked_node(is_staked_service));
packet_batch_sender.send(packet_batch)?;
}
break;
Expand All @@ -156,6 +159,7 @@ fn recv_loop(
}
}

#[allow(clippy::too_many_arguments)]
pub fn receiver(
thread_name: String,
socket: Arc<UdpSocket>,
Expand All @@ -166,6 +170,7 @@ pub fn receiver(
coalesce: Duration,
use_pinned_memory: bool,
in_vote_only_mode: Option<Arc<AtomicBool>>,
is_staked_service: bool,
) -> JoinHandle<()> {
let res = socket.set_read_timeout(Some(Duration::new(1, 0)));
assert!(res.is_ok(), "streamer::receiver set_read_timeout error");
Expand All @@ -181,6 +186,7 @@ pub fn receiver(
coalesce,
use_pinned_memory,
in_vote_only_mode,
is_staked_service,
);
})
.unwrap()
Expand Down Expand Up @@ -490,6 +496,7 @@ mod test {
Duration::from_millis(1), // coalesce
true,
None,
false,
);
const NUM_PACKETS: usize = 5;
let t_responder = {
Expand Down

0 comments on commit 1744e9e

Please sign in to comment.