Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into feature/block-priofees
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Jan 17, 2024
2 parents 5c73513 + 4ed7eea commit a15834d
Show file tree
Hide file tree
Showing 15 changed files with 321 additions and 305 deletions.
368 changes: 147 additions & 221 deletions Cargo.lock

Large diffs are not rendered by default.

61 changes: 47 additions & 14 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::grpc_subscription::map_block_update;
use crate::grpc_subscription::{create_block_processing_task, map_block_update};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcSourceConfig,
Expand All @@ -12,7 +13,7 @@ use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::collections::{BTreeSet, HashMap};
use std::time::Duration;
use tokio::sync::broadcast::Receiver;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
Expand Down Expand Up @@ -68,19 +69,51 @@ pub fn create_grpc_multiplex_blocks_subscription(
let jh_block_emitter_task = {
tokio::task::spawn(async move {
loop {
let confirmed_blocks_stream = {
let (confirmed_block_sender, mut confirmed_block_reciever) =
tokio::sync::mpsc::unbounded_channel::<ProducedBlock>();
let _confirmed_blocks_tasks = {
let commitment_config = CommitmentConfig::confirmed();

let mut streams = Vec::new();
let mut tasks = Vec::new();
let mut streams = vec![];
for grpc_source in &grpc_sources {
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
streams.push(stream);
let (block_sender, block_reciever) = async_channel::unbounded();
tasks.push(create_block_processing_task(
grpc_source.grpc_addr.clone(),
grpc_source.grpc_x_token.clone(),
block_sender,
yellowstone_grpc_proto::geyser::CommitmentLevel::Confirmed,
));
streams.push(block_reciever)
}

create_multiplexed_stream(streams, BlockExtractor(commitment_config))
let merging_streams: AnyhowJoinHandle = tokio::task::spawn(async move {
let mut slots_processed = BTreeSet::<u64>::new();
loop {
let block_message =
futures::stream::select_all(streams.clone()).next().await;
const MAX_SIZE: usize = 1024;
if let Some(block) = block_message {
let slot = block.slot;
// check if the slot is in the map, if not check if the container is half full and the slot in question is older than the lowest value
// it means that the slot is too old to process
if !slots_processed.contains(&slot)
&& (slots_processed.len() < MAX_SIZE / 2
|| slot
> slots_processed.first().cloned().unwrap_or_default())
{
confirmed_block_sender
.send(map_block_update(block, commitment_config))
.context("Issue to send confirmed block")?;
slots_processed.insert(slot);
if slots_processed.len() > MAX_SIZE {
slots_processed.pop_first();
}
}
}
}
});
tasks.push(merging_streams);
tasks
};

let finalized_blockmeta_stream = {
Expand All @@ -99,7 +132,6 @@ pub fn create_grpc_multiplex_blocks_subscription(

// by blockhash
let mut recent_confirmed_blocks = HashMap::<String, ProducedBlock>::new();
let mut confirmed_blocks_stream = std::pin::pin!(confirmed_blocks_stream);
let mut finalized_blockmeta_stream = std::pin::pin!(finalized_blockmeta_stream);

let mut cleanup_tick = tokio::time::interval(Duration::from_secs(5));
Expand All @@ -109,7 +141,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
const MAX_ALLOWED_CLEANUP_WITHOUT_RECV: u8 = 12; // 12*5 = 60s without recving data
loop {
tokio::select! {
confirmed_block = confirmed_blocks_stream.next() => {
confirmed_block = confirmed_block_reciever.recv() => {
cleanup_without_recv_blocks = 0;

let confirmed_block = confirmed_block.expect("confirmed block from stream");
Expand All @@ -134,7 +166,8 @@ pub fn create_grpc_multiplex_blocks_subscription(
continue;
}
} else {
debug!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
// this warning is ok for first few blocks when we start lrpc
warn!("finalized block meta received for blockhash {} which was never seen or already emitted", blockhash);
}
},
_ = cleanup_tick.tick() => {
Expand Down
13 changes: 3 additions & 10 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::broadcast::Sender;
use yellowstone_grpc_client::GeyserGrpcClient;

use yellowstone_grpc_proto::prelude::{
Expand Down Expand Up @@ -244,7 +243,7 @@ pub fn map_block_update(
pub fn create_block_processing_task(
grpc_addr: String,
grpc_x_token: Option<String>,
block_sx: Sender<ProducedBlock>,
block_sx: async_channel::Sender<SubscribeUpdateBlock>,
commitment_level: CommitmentLevel,
) -> AnyhowJoinHandle {
tokio::spawn(async move {
Expand All @@ -260,12 +259,6 @@ pub fn create_block_processing_task(
},
);

let commitment_config = match commitment_level {
CommitmentLevel::Confirmed => CommitmentConfig::confirmed(),
CommitmentLevel::Finalized => CommitmentConfig::finalized(),
CommitmentLevel::Processed => CommitmentConfig::processed(),
};

// connect to grpc
let mut client =
GeyserGrpcClient::connect(grpc_addr.clone(), grpc_x_token.clone(), None)?;
Expand All @@ -292,10 +285,10 @@ pub fn create_block_processing_task(

match update {
UpdateOneof::Block(block) => {
let block = map_block_update(block, commitment_config);
block_sx
.send(block)
.context("Grpc failed to send a block")?;
.await
.context("Problem sending on block channel")?;
}
UpdateOneof::Ping(_) => {
log::trace!("GRPC Ping");
Expand Down
3 changes: 1 addition & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
pub mod commitment_utils;
pub mod encoding;
pub mod keypair_loader;
pub mod quic_connection;
pub mod quic_connection_utils;
pub mod network_utils;
pub mod solana_utils;
pub mod stores;
pub mod structures;
Expand Down
37 changes: 37 additions & 0 deletions core/src/network_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use quinn::{Connection, TransportConfig};

// connection for sending proxy request: FrameStats {
// ACK: 2, CONNECTION_CLOSE: 0, CRYPTO: 3, DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1,
// MAX_DATA: 0, MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4,
// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0, RETIRE_CONNECTION_ID: 1,
// STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0, STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
// rtt=1.08178ms
pub fn connection_stats(connection: &Connection) -> String {
// see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats
format!(
"stable_id {}, rtt={:?}, stats {:?}",
connection.stable_id(),
connection.stats().path.rtt,
connection.stats().frame_rx
)
}

/// env flag to optionally disable GSO (generic segmentation offload) on environments where Quinn cannot detect it properly
/// see https://github.com/quinn-rs/quinn/pull/1671
pub fn apply_gso_workaround(tc: &mut TransportConfig) {
if disable_gso() {
tc.enable_segmentation_offload(false);
}
}

pub fn log_gso_workaround() {
log::info!("GSO force-disabled? {}", disable_gso());
}

/// note: true means that quinn's heuristic for GSO detection is used to decide if GSO is used
fn disable_gso() -> bool {
std::env::var("DISABLE_GSO")
.unwrap_or("false".to_string())
.parse::<bool>()
.expect("flag must be true or false")
}
2 changes: 1 addition & 1 deletion lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
Expand All @@ -37,6 +36,7 @@ use solana_lite_rpc_history::history::History;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;
use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crossbeam_channel::Sender;

use log::{debug, error, info, trace, warn};

use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::solana_utils::SerializableTransaction;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData;
Expand Down Expand Up @@ -39,6 +38,7 @@ use tracing_subscriber::EnvFilter;
use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy;
use solana_lite_rpc_quic_forward_proxy::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use solana_lite_rpc_quic_forward_proxy::validator_identity::ValidatorIdentity;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use tracing_subscriber::fmt::format::FmtSpan;

Expand Down
2 changes: 1 addition & 1 deletion quic-forward-proxy/src/inbound/proxy_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::util::FALLBACK_TIMEOUT;
use anyhow::{anyhow, bail, Context};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Endpoint, ServerConfig, VarInt};
use solana_lite_rpc_core::quic_connection_utils::apply_gso_workaround;
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion quic-forward-proxy/src/outbound/tx_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{debug, info, trace, warn};
use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
use solana_lite_rpc_core::quic_connection_utils::apply_gso_workaround;
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::quic::QUIC_MAX_TIMEOUT;
use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
Expand Down
2 changes: 2 additions & 0 deletions services/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod data_caching_service;
pub mod metrics_capture;
pub mod prometheus_sync;
pub mod quic_connection;
pub mod quic_connection_utils;
pub mod tpu_utils;
pub mod transaction_replayer;
pub mod transaction_service;
Expand Down
34 changes: 26 additions & 8 deletions core/src/quic_connection.rs → services/src/quic_connection.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
structures::rotating_queue::RotatingQueue,
use crate::quic_connection_utils::{
QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils,
};
use futures::FutureExt;
use log::warn;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint};
use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue;
use solana_sdk::pubkey::Pubkey;
use std::{
net::SocketAddr,
Expand All @@ -17,6 +18,19 @@ use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTION_RESET: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_connection_reset", "Number of times connection was reset")).unwrap();
static ref NB_QUIC_CONNECTION_REQUESTED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_connection_requested", "Number of connections requested")).unwrap();
static ref TRIED_SEND_TRANSCTION_TRIED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_send_transaction_tried", "Number of times send transaction was tried")).unwrap();
static ref SEND_TRANSCTION_SUCESSFUL: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_send_transaction_successful", "Number of times send transaction was successful")).unwrap();
static ref NB_QUIC_COULDNOT_ESTABLISH_CONNECTION: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_couldnot_establish_connection", "Number of times quic connection could not be established")).unwrap();
}

#[derive(Clone)]
#[warn(clippy::rc_clone_in_vec_init)]
pub struct QuicConnection {
Expand Down Expand Up @@ -52,10 +66,10 @@ impl QuicConnection {
}
}

async fn connect(&self) -> Option<Connection> {
async fn connect(&self, is_already_connected: bool) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
true,
is_already_connected,
self.endpoint.clone(),
self.socket_address,
self.connection_params.connection_timeout,
Expand All @@ -80,7 +94,8 @@ impl QuicConnection {
if connection.stable_id() != current_stable_id {
Some(connection)
} else {
let new_conn = self.connect().await;
NB_QUIC_CONNECTION_RESET.inc();
let new_conn = self.connect(true).await;
if let Some(new_conn) = new_conn {
*conn = Some(new_conn);
conn.clone()
Expand All @@ -94,7 +109,8 @@ impl QuicConnection {
}
}
None => {
let connection = self.connect().await;
NB_QUIC_CONNECTION_REQUESTED.inc();
let connection = self.connect(false).await;
*self.connection.write().await = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed);
connection
Expand All @@ -114,6 +130,7 @@ impl QuicConnection {
let connection = self.get_connection().await;

if let Some(connection) = connection {
TRIED_SEND_TRANSCTION_TRIED.inc();
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
connection,
Expand All @@ -131,7 +148,7 @@ impl QuicConnection {
.await
{
Ok(()) => {
// do nothing
SEND_TRANSCTION_SUCESSFUL.inc();
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
Expand All @@ -154,6 +171,7 @@ impl QuicConnection {
break;
}
} else {
NB_QUIC_COULDNOT_ESTABLISH_CONNECTION.inc();
warn!(
"Could not establish connection with {}",
self.identity.to_string()
Expand Down
Loading

0 comments on commit a15834d

Please sign in to comment.