Skip to content

Commit

Permalink
Making using of tpu forwards customable
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Apr 3, 2024
1 parent 9369cdb commit 71820a9
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 17 deletions.
1 change: 1 addition & 0 deletions examples/custom-tpu-send-transactions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ async fn main() -> anyhow::Result<()> {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 5,
prioritization_heap_size,
enable_tpu_forwarding: None,
},
tpu_connection_path: TpuConnectionPath::QuicDirectPath,
};
Expand Down
4 changes: 4 additions & 0 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,5 +393,9 @@ fn quic_params_from_environment() -> Option<QuicConnectionParameters> {
quic_connection_parameters.unistreams_to_create_new_connection_in_percentage,
);

quic_connection_parameters.enable_tpu_forwarding = env::var("ENABLE_TPU_FORWARDING")
.map(|value| Some(value.parse::<bool>().unwrap()))
.unwrap_or(quic_connection_parameters.enable_tpu_forwarding);

Some(quic_connection_parameters)
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ const QUIC_CONNECTION_PARAMS: QuicConnectionParameters = QuicConnectionParameter
number_of_transactions_per_unistream: 10,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
};

#[test]
Expand Down Expand Up @@ -575,27 +576,27 @@ async fn start_literpc_client_proxy_mode(
QuicProxyConnectionManager::new(certificate, key, forward_proxy_address).await;

// this effectively controls how many connections we will have
let mut connections_to_keep: HashMap<Pubkey, SocketAddr> = HashMap::new();
let mut connections_to_keep: HashSet<(Pubkey, SocketAddr)> = HashSet::new();
let addr1 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111jepwNWbYG87sgwnBbUJnQHrPiUJzMpqJXZ")?,
addr1,
);
));

let addr2 = UdpSocket::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();
connections_to_keep.insert(
connections_to_keep.insert((
Pubkey::from_str("1111111k4AYMctpyJakWNvGcte6tR8BLyZw54R8qu")?,
addr2,
);
));

// this is the real streamer
connections_to_keep.insert(validator_identity.pubkey(), streamer_listen_addrs);
connections_to_keep.insert((validator_identity.pubkey(), streamer_listen_addrs));

// get information about the optional validator identity stake
// populated from get_stakes_for_identity()
Expand Down
2 changes: 2 additions & 0 deletions services/src/quic_connection_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct QuicConnectionParameters {
pub number_of_transactions_per_unistream: usize,
pub unistreams_to_create_new_connection_in_percentage: u8,
pub prioritization_heap_size: Option<usize>,
pub enable_tpu_forwarding: Option<bool>,
}

impl Default for QuicConnectionParameters {
Expand All @@ -123,6 +124,7 @@ impl Default for QuicConnectionParameters {
number_of_transactions_per_unistream: 1,
unistreams_to_create_new_connection_in_percentage: 10,
prioritization_heap_size: None,
enable_tpu_forwarding: None,
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use solana_lite_rpc_core::{
};
use solana_sdk::pubkey::Pubkey;
use solana_streamer::nonblocking::quic::compute_max_allowed_uni_streams;
use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration};
use std::{collections::HashSet, net::SocketAddr, sync::Arc, time::Duration};
use tokio::sync::{
broadcast::{self, Receiver, Sender},
Notify,
Expand Down Expand Up @@ -283,8 +283,8 @@ impl TpuConnectionManager {
// using mpsc as a oneshot channel/ because with one shot channel we cannot reuse the reciever
let broadcast_receiver = broadcast_sender.subscribe();
active_connection.start_listening(broadcast_receiver, identity_stakes);
self.identity_to_active_connection
.insert(*identity, active_connection);
self.active_connections
.insert((*identity, *socket_addr), active_connection);
}
}

Expand Down
36 changes: 28 additions & 8 deletions services/src/tpu_utils/tpu_service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_streamer::nonblocking::quic::ConnectionPeerType;

use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
Expand All @@ -15,7 +16,7 @@ use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::{quic::QUIC_PORT_OFFSET, signature::Keypair, slot_history::Slot};
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
use std::collections::HashMap;
use std::collections::HashSet;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
Expand Down Expand Up @@ -128,8 +129,21 @@ impl TpuService {
.leader_schedule
.get_slot_leaders(current_slot, last_slot)
.await?;

let identity_stakes = self.data_cache.identity_stakes.get_stakes().await;

let enable_tpu_forwards = {
match identity_stakes.peer_type {
ConnectionPeerType::Unstaked => false,
ConnectionPeerType::Staked => self
.config
.quic_connection_params
.enable_tpu_forwarding
.unwrap_or_default(),
}
};
// get next leader with its tpu port
let connections_to_keep: HashMap<_, _> = next_leaders
let connections_to_keep: HashSet<_, _> = next_leaders
.iter()
.map(|x| {
let contact_info = cluster_nodes.get(&x.pubkey);
Expand All @@ -141,16 +155,22 @@ impl TpuService {
})
.filter(|x| x.1.is_some())
.flat_map(|x| {
let mut addresses = vec![];
let mut tpu_addr = x.1.unwrap();
// add quic port offset
tpu_addr.set_port(tpu_addr.port() + QUIC_PORT_OFFSET);

// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr.clone();
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
addresses.push((x.0, tpu_addr));

if enable_tpu_forwards {
// Technically the forwards port could be anywhere and unfortunately getClusterNodes
// does not report it. However it's nearly always directly after the tpu port.
let mut tpu_forwards_addr = tpu_addr;
tpu_forwards_addr.set_port(tpu_addr.port() + 1);
addresses.push((x.0, tpu_forwards_addr));
}

[(x.0, tpu_addr), (x.0, tpu_forwards_addr)]
addresses
})
.collect();

Expand All @@ -162,7 +182,7 @@ impl TpuService {
.update_connections(
self.broadcast_sender.clone(),
connections_to_keep,
self.data_cache.identity_stakes.get_stakes().await,
identity_stakes,
self.data_cache.clone(),
self.config.quic_connection_params,
)
Expand Down

0 comments on commit 71820a9

Please sign in to comment.