From 4ed7eead8b35225305787ee85993f6047b685c69 Mon Sep 17 00:00:00 2001 From: galactus <96341601+godmodegalactus@users.noreply.github.com> Date: Tue, 16 Jan 2024 11:30:40 +0100 Subject: [PATCH] Adding prometheus metrics to follow quic states (#277) --- core/src/lib.rs | 3 +- core/src/network_utils.rs | 37 +++++++++ lite-rpc/src/main.rs | 2 +- .../tests/quic_proxy_tpu_integrationtest.rs | 2 +- .../src/inbound/proxy_listener.rs | 2 +- quic-forward-proxy/src/outbound/tx_forward.rs | 2 +- services/src/lib.rs | 2 + {core => services}/src/quic_connection.rs | 34 ++++++-- .../src/quic_connection_utils.rs | 82 ++++++++++--------- .../quic_proxy_connection_manager.rs | 5 +- .../src/tpu_utils/tpu_connection_manager.rs | 7 +- services/src/tpu_utils/tpu_service.rs | 6 +- 12 files changed, 124 insertions(+), 60 deletions(-) create mode 100644 core/src/network_utils.rs rename {core => services}/src/quic_connection.rs (83%) rename {core => services}/src/quic_connection_utils.rs (74%) diff --git a/core/src/lib.rs b/core/src/lib.rs index 12dc7df2..9f517722 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,8 +1,7 @@ pub mod commitment_utils; pub mod encoding; pub mod keypair_loader; -pub mod quic_connection; -pub mod quic_connection_utils; +pub mod network_utils; pub mod solana_utils; pub mod stores; pub mod structures; diff --git a/core/src/network_utils.rs b/core/src/network_utils.rs new file mode 100644 index 00000000..3933f3c3 --- /dev/null +++ b/core/src/network_utils.rs @@ -0,0 +1,37 @@ +use quinn::{Connection, TransportConfig}; + +// connection for sending proxy request: FrameStats { +// ACK: 2, CONNECTION_CLOSE: 0, CRYPTO: 3, DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1, +// MAX_DATA: 0, MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4, +// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0, RETIRE_CONNECTION_ID: 1, +// STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0, STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 } +// rtt=1.08178ms +pub fn connection_stats(connection: &Connection) -> String { + // see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats + format!( + "stable_id {}, rtt={:?}, stats {:?}", + connection.stable_id(), + connection.stats().path.rtt, + connection.stats().frame_rx + ) +} + +/// env flag to optionally disable GSO (generic segmentation offload) on environments where Quinn cannot detect it properly +/// see https://github.com/quinn-rs/quinn/pull/1671 +pub fn apply_gso_workaround(tc: &mut TransportConfig) { + if disable_gso() { + tc.enable_segmentation_offload(false); + } +} + +pub fn log_gso_workaround() { + log::info!("GSO force-disabled? {}", disable_gso()); +} + +/// note: true means that quinn's heuristic for GSO detection is used to decide if GSO is used +fn disable_gso() -> bool { + std::env::var("DISABLE_GSO") + .unwrap_or("false".to_string()) + .parse::() + .expect("flag must be true or false") +} diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index dc4580c8..2e5dacb2 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -17,7 +17,6 @@ use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{ use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter; use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription; use solana_lite_rpc_core::keypair_loader::load_identity_keypair; -use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_core::stores::{ block_information_store::{BlockInformation, BlockInformationStore}, cluster_info_store::ClusterInfo, @@ -37,6 +36,7 @@ use solana_lite_rpc_history::history::History; use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig; use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache; use solana_lite_rpc_services::data_caching_service::DataCachingService; +use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath; use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig}; use solana_lite_rpc_services::transaction_replayer::TransactionReplayer; diff --git a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs index 05e49c41..d4b45f49 100644 --- a/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs +++ b/quic-forward-proxy-integration-test/tests/quic_proxy_tpu_integrationtest.rs @@ -3,7 +3,6 @@ use crossbeam_channel::Sender; use log::{debug, error, info, trace, warn}; -use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_core::solana_utils::SerializableTransaction; use solana_lite_rpc_core::stores::data_cache::DataCache; use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData; @@ -39,6 +38,7 @@ use tracing_subscriber::EnvFilter; use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy; use solana_lite_rpc_quic_forward_proxy::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider; use solana_lite_rpc_quic_forward_proxy::validator_identity::ValidatorIdentity; +use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters; use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; use tracing_subscriber::fmt::format::FmtSpan; diff --git a/quic-forward-proxy/src/inbound/proxy_listener.rs b/quic-forward-proxy/src/inbound/proxy_listener.rs index 7460b77a..7442094e 100644 --- a/quic-forward-proxy/src/inbound/proxy_listener.rs +++ b/quic-forward-proxy/src/inbound/proxy_listener.rs @@ -7,7 +7,7 @@ use crate::util::FALLBACK_TIMEOUT; use anyhow::{anyhow, bail, Context}; use log::{debug, error, info, trace, warn}; use quinn::{Connecting, Endpoint, ServerConfig, VarInt}; -use solana_lite_rpc_core::quic_connection_utils::apply_gso_workaround; +use solana_lite_rpc_core::network_utils::apply_gso_workaround; use solana_sdk::packet::PACKET_DATA_SIZE; use std::net::SocketAddr; use std::sync::Arc; diff --git a/quic-forward-proxy/src/outbound/tx_forward.rs b/quic-forward-proxy/src/outbound/tx_forward.rs index a1c79fe3..46b225f8 100644 --- a/quic-forward-proxy/src/outbound/tx_forward.rs +++ b/quic-forward-proxy/src/outbound/tx_forward.rs @@ -11,7 +11,7 @@ use log::{debug, info, trace, warn}; use quinn::{ ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt, }; -use solana_lite_rpc_core::quic_connection_utils::apply_gso_workaround; +use solana_lite_rpc_core::network_utils::apply_gso_workaround; use solana_sdk::quic::QUIC_MAX_TIMEOUT; use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID; use solana_streamer::tls_certificates::new_self_signed_tls_certificate; diff --git a/services/src/lib.rs b/services/src/lib.rs index c6494060..8e6fe6ae 100644 --- a/services/src/lib.rs +++ b/services/src/lib.rs @@ -1,6 +1,8 @@ pub mod data_caching_service; pub mod metrics_capture; pub mod prometheus_sync; +pub mod quic_connection; +pub mod quic_connection_utils; pub mod tpu_utils; pub mod transaction_replayer; pub mod transaction_service; diff --git a/core/src/quic_connection.rs b/services/src/quic_connection.rs similarity index 83% rename from core/src/quic_connection.rs rename to services/src/quic_connection.rs index e102cd29..62a509a0 100644 --- a/core/src/quic_connection.rs +++ b/services/src/quic_connection.rs @@ -1,10 +1,11 @@ -use crate::{ - quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils}, - structures::rotating_queue::RotatingQueue, +use crate::quic_connection_utils::{ + QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils, }; use futures::FutureExt; use log::warn; +use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::{Connection, Endpoint}; +use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue; use solana_sdk::pubkey::Pubkey; use std::{ net::SocketAddr, @@ -17,6 +18,19 @@ use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore}; pub type EndpointPool = RotatingQueue; +lazy_static::lazy_static! { + static ref NB_QUIC_CONNECTION_RESET: GenericGauge = + register_int_gauge!(opts!("literpc_quic_nb_connection_reset", "Number of times connection was reset")).unwrap(); + static ref NB_QUIC_CONNECTION_REQUESTED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_nb_connection_requested", "Number of connections requested")).unwrap(); + static ref TRIED_SEND_TRANSCTION_TRIED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_nb_send_transaction_tried", "Number of times send transaction was tried")).unwrap(); + static ref SEND_TRANSCTION_SUCESSFUL: GenericGauge = + register_int_gauge!(opts!("literpc_quic_nb_send_transaction_successful", "Number of times send transaction was successful")).unwrap(); + static ref NB_QUIC_COULDNOT_ESTABLISH_CONNECTION: GenericGauge = + register_int_gauge!(opts!("literpc_quic_nb_couldnot_establish_connection", "Number of times quic connection could not be established")).unwrap(); +} + #[derive(Clone)] #[warn(clippy::rc_clone_in_vec_init)] pub struct QuicConnection { @@ -52,10 +66,10 @@ impl QuicConnection { } } - async fn connect(&self) -> Option { + async fn connect(&self, is_already_connected: bool) -> Option { QuicConnectionUtils::connect( self.identity, - true, + is_already_connected, self.endpoint.clone(), self.socket_address, self.connection_params.connection_timeout, @@ -80,7 +94,8 @@ impl QuicConnection { if connection.stable_id() != current_stable_id { Some(connection) } else { - let new_conn = self.connect().await; + NB_QUIC_CONNECTION_RESET.inc(); + let new_conn = self.connect(true).await; if let Some(new_conn) = new_conn { *conn = Some(new_conn); conn.clone() @@ -94,7 +109,8 @@ impl QuicConnection { } } None => { - let connection = self.connect().await; + NB_QUIC_CONNECTION_REQUESTED.inc(); + let connection = self.connect(false).await; *self.connection.write().await = connection.clone(); self.has_connected_once.store(true, Ordering::Relaxed); connection @@ -114,6 +130,7 @@ impl QuicConnection { let connection = self.get_connection().await; if let Some(connection) = connection { + TRIED_SEND_TRANSCTION_TRIED.inc(); let current_stable_id = connection.stable_id() as u64; match QuicConnectionUtils::open_unistream( connection, @@ -131,7 +148,7 @@ impl QuicConnection { .await { Ok(()) => { - // do nothing + SEND_TRANSCTION_SUCESSFUL.inc(); } Err(QuicConnectionError::ConnectionError { retry }) => { do_retry = retry; @@ -154,6 +171,7 @@ impl QuicConnection { break; } } else { + NB_QUIC_COULDNOT_ESTABLISH_CONNECTION.inc(); warn!( "Could not establish connection with {}", self.identity.to_string() diff --git a/core/src/quic_connection_utils.rs b/services/src/quic_connection_utils.rs similarity index 74% rename from core/src/quic_connection_utils.rs rename to services/src/quic_connection_utils.rs index 426ea874..c8f70ae1 100644 --- a/core/src/quic_connection_utils.rs +++ b/services/src/quic_connection_utils.rs @@ -1,8 +1,10 @@ -use log::{info, trace}; +use log::trace; +use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::{ ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream, TokioRuntime, TransportConfig, }; +use solana_lite_rpc_core::network_utils::apply_gso_workaround; use solana_sdk::pubkey::Pubkey; use std::{ net::{IpAddr, Ipv4Addr, SocketAddr}, @@ -14,6 +16,23 @@ use std::{ }; use tokio::time::timeout; +lazy_static::lazy_static! { + static ref NB_QUIC_0RTT_TIMEOUT: GenericGauge = + register_int_gauge!(opts!("literpc_quic_0RTT_timedout", "Number of times 0RTT timedout")).unwrap(); + static ref NB_QUIC_CONNECTION_TIMEOUT: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_timedout", "Number of times connection timedout")).unwrap(); + static ref NB_QUIC_CONNECTION_ERRORED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_connection_errored", "Number of times connection errored")).unwrap(); + static ref NB_QUIC_WRITEALL_TIMEOUT: GenericGauge = + register_int_gauge!(opts!("literpc_quic_writeall_timedout", "Number of times writeall timedout")).unwrap(); + static ref NB_QUIC_WRITEALL_ERRORED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_writeall_errored", "Number of times writeall errored")).unwrap(); + static ref NB_QUIC_FINISH_TIMEOUT: GenericGauge = + register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap(); + static ref NB_QUIC_FINISH_ERRORED: GenericGauge = + register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap(); +} + const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu"; pub enum QuicConnectionError { @@ -74,8 +93,20 @@ impl QuicConnectionUtils { connection_timeout: Duration, ) -> anyhow::Result { let connecting = endpoint.connect(addr, "connect")?; - let res = timeout(connection_timeout, connecting).await??; - Ok(res) + match timeout(connection_timeout, connecting).await { + Ok(res) => match res { + Ok(connection) => Ok(connection), + Err(e) => { + NB_QUIC_CONNECTION_ERRORED.inc(); + Err(e.into()) + } + }, + Err(_) => { + // timed out + NB_QUIC_CONNECTION_TIMEOUT.inc(); + Err(ConnectionError::TimedOut.into()) + } + } } pub async fn make_connection_0rtt( @@ -89,13 +120,18 @@ impl QuicConnectionUtils { if (timeout(connection_timeout, zero_rtt).await).is_ok() { connection } else { + NB_QUIC_0RTT_TIMEOUT.inc(); return Err(ConnectionError::TimedOut.into()); } } Err(connecting) => { if let Ok(connecting_result) = timeout(connection_timeout, connecting).await { + if connecting_result.is_err() { + NB_QUIC_CONNECTION_ERRORED.inc(); + } connecting_result? } else { + NB_QUIC_CONNECTION_TIMEOUT.inc(); return Err(ConnectionError::TimedOut.into()); } } @@ -153,11 +189,13 @@ impl QuicConnectionUtils { identity, e ); + NB_QUIC_WRITEALL_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: true }); } } Err(_) => { log::debug!("timeout while writing transaction for {}", identity); + NB_QUIC_WRITEALL_TIMEOUT.inc(); return Err(QuicConnectionError::TimeOut); } } @@ -172,11 +210,13 @@ impl QuicConnectionUtils { identity, e ); + NB_QUIC_FINISH_ERRORED.inc(); return Err(QuicConnectionError::ConnectionError { retry: false }); } } Err(_) => { log::debug!("timeout while finishing transaction for {}", identity); + NB_QUIC_FINISH_TIMEOUT.inc(); return Err(QuicConnectionError::TimeOut); } } @@ -217,39 +257,3 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification { Ok(rustls::client::ServerCertVerified::assertion()) } } - -// connection for sending proxy request: FrameStats { -// ACK: 2, CONNECTION_CLOSE: 0, CRYPTO: 3, DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1, -// MAX_DATA: 0, MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4, -// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0, RETIRE_CONNECTION_ID: 1, -// STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0, STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 } -// rtt=1.08178ms -pub fn connection_stats(connection: &Connection) -> String { - // see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats - format!( - "stable_id {}, rtt={:?}, stats {:?}", - connection.stable_id(), - connection.stats().path.rtt, - connection.stats().frame_rx - ) -} - -/// env flag to optionally disable GSO (generic segmentation offload) on environments where Quinn cannot detect it properly -/// see https://github.com/quinn-rs/quinn/pull/1671 -pub fn apply_gso_workaround(tc: &mut TransportConfig) { - if disable_gso() { - tc.enable_segmentation_offload(false); - } -} - -pub fn log_gso_workaround() { - info!("GSO force-disabled? {}", disable_gso()); -} - -/// note: true means that quinn's heuristic for GSO detection is used to decide if GSO is used -fn disable_gso() -> bool { - std::env::var("DISABLE_GSO") - .unwrap_or("false".to_string()) - .parse::() - .expect("flag must be true or false") -} diff --git a/services/src/tpu_utils/quic_proxy_connection_manager.rs b/services/src/tpu_utils/quic_proxy_connection_manager.rs index 494fc99f..07ee1820 100644 --- a/services/src/tpu_utils/quic_proxy_connection_manager.rs +++ b/services/src/tpu_utils/quic_proxy_connection_manager.rs @@ -16,9 +16,8 @@ use solana_sdk::pubkey::Pubkey; use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::{broadcast::Receiver, RwLock}; -use solana_lite_rpc_core::quic_connection_utils::{ - apply_gso_workaround, QuicConnectionParameters, SkipServerVerification, -}; +use crate::quic_connection_utils::{QuicConnectionParameters, SkipServerVerification}; +use solana_lite_rpc_core::network_utils::apply_gso_workaround; use solana_lite_rpc_core::structures::proxy_request_format::{TpuForwardingRequest, TxData}; use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect; diff --git a/services/src/tpu_utils/tpu_connection_manager.rs b/services/src/tpu_utils/tpu_connection_manager.rs index 4e8f16c1..90e2f39f 100644 --- a/services/src/tpu_utils/tpu_connection_manager.rs +++ b/services/src/tpu_utils/tpu_connection_manager.rs @@ -3,8 +3,6 @@ use log::{error, trace}; use prometheus::{core::GenericGauge, opts, register_int_gauge}; use quinn::Endpoint; use solana_lite_rpc_core::{ - quic_connection::{PooledConnection, QuicConnectionPool}, - quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils}, stores::data_cache::DataCache, structures::{ identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue, @@ -23,6 +21,11 @@ use std::{ }; use tokio::sync::{broadcast::Receiver, broadcast::Sender}; +use crate::{ + quic_connection::{PooledConnection, QuicConnectionPool}, + quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils}, +}; + lazy_static::lazy_static! { static ref NB_QUIC_CONNECTIONS: GenericGauge = register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap(); diff --git a/services/src/tpu_utils/tpu_service.rs b/services/src/tpu_utils/tpu_service.rs index d26af134..087a71f0 100644 --- a/services/src/tpu_utils/tpu_service.rs +++ b/services/src/tpu_utils/tpu_service.rs @@ -1,13 +1,15 @@ use anyhow::Context; use prometheus::{core::GenericGauge, opts, register_int_gauge}; -use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; use super::tpu_connection_manager::TpuConnectionManager; +use crate::quic_connection_utils::QuicConnectionParameters; use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager; use crate::tpu_utils::tpu_connection_path::TpuConnectionPath; use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy}; -use solana_lite_rpc_core::quic_connection_utils::{log_gso_workaround, QuicConnectionParameters}; + +use solana_lite_rpc_core::network_utils::log_gso_workaround; use solana_lite_rpc_core::stores::data_cache::DataCache; +use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo; use solana_lite_rpc_core::traits::leaders_fetcher_interface::LeaderFetcherInterface; use solana_lite_rpc_core::types::SlotStream; use solana_lite_rpc_core::AnyhowJoinHandle;