Skip to content

Commit

Permalink
Adding prometheus metrics to follow quic states (#277)
Browse files Browse the repository at this point in the history
  • Loading branch information
godmodegalactus committed Jan 16, 2024
1 parent 36c6276 commit 4ed7eea
Show file tree
Hide file tree
Showing 12 changed files with 124 additions and 60 deletions.
3 changes: 1 addition & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
pub mod commitment_utils;
pub mod encoding;
pub mod keypair_loader;
pub mod quic_connection;
pub mod quic_connection_utils;
pub mod network_utils;
pub mod solana_utils;
pub mod stores;
pub mod structures;
Expand Down
37 changes: 37 additions & 0 deletions core/src/network_utils.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use quinn::{Connection, TransportConfig};

// connection for sending proxy request: FrameStats {
// ACK: 2, CONNECTION_CLOSE: 0, CRYPTO: 3, DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1,
// MAX_DATA: 0, MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4,
// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0, RETIRE_CONNECTION_ID: 1,
// STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0, STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
// rtt=1.08178ms
pub fn connection_stats(connection: &Connection) -> String {
// see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats
format!(
"stable_id {}, rtt={:?}, stats {:?}",
connection.stable_id(),
connection.stats().path.rtt,
connection.stats().frame_rx
)
}

/// env flag to optionally disable GSO (generic segmentation offload) on environments where Quinn cannot detect it properly
/// see https://github.com/quinn-rs/quinn/pull/1671
pub fn apply_gso_workaround(tc: &mut TransportConfig) {
if disable_gso() {
tc.enable_segmentation_offload(false);
}
}

pub fn log_gso_workaround() {
log::info!("GSO force-disabled? {}", disable_gso());
}

/// note: true means that quinn's heuristic for GSO detection is used to decide if GSO is used
fn disable_gso() -> bool {
std::env::var("DISABLE_GSO")
.unwrap_or("false".to_string())
.parse::<bool>()
.expect("flag must be true or false")
}
2 changes: 1 addition & 1 deletion lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::stores::{
block_information_store::{BlockInformation, BlockInformationStore},
cluster_info_store::ClusterInfo,
Expand All @@ -37,6 +36,7 @@ use solana_lite_rpc_history::history::History;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;
use solana_lite_rpc_history::postgres::postgres_session::PostgresSessionCache;
use solana_lite_rpc_services::data_caching_service::DataCachingService;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_services::tpu_utils::tpu_connection_path::TpuConnectionPath;
use solana_lite_rpc_services::tpu_utils::tpu_service::{TpuService, TpuServiceConfig};
use solana_lite_rpc_services::transaction_replayer::TransactionReplayer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use crossbeam_channel::Sender;

use log::{debug, error, info, trace, warn};

use solana_lite_rpc_core::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_core::solana_utils::SerializableTransaction;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::identity_stakes::IdentityStakesData;
Expand Down Expand Up @@ -39,6 +38,7 @@ use tracing_subscriber::EnvFilter;
use solana_lite_rpc_quic_forward_proxy::proxy::QuicForwardProxy;
use solana_lite_rpc_quic_forward_proxy::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use solana_lite_rpc_quic_forward_proxy::validator_identity::ValidatorIdentity;
use solana_lite_rpc_services::quic_connection_utils::QuicConnectionParameters;
use solana_lite_rpc_services::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use tracing_subscriber::fmt::format::FmtSpan;

Expand Down
2 changes: 1 addition & 1 deletion quic-forward-proxy/src/inbound/proxy_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::util::FALLBACK_TIMEOUT;
use anyhow::{anyhow, bail, Context};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Endpoint, ServerConfig, VarInt};
use solana_lite_rpc_core::quic_connection_utils::apply_gso_workaround;
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::packet::PACKET_DATA_SIZE;
use std::net::SocketAddr;
use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion quic-forward-proxy/src/outbound/tx_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use log::{debug, info, trace, warn};
use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
use solana_lite_rpc_core::quic_connection_utils::apply_gso_workaround;
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::quic::QUIC_MAX_TIMEOUT;
use solana_streamer::nonblocking::quic::ALPN_TPU_PROTOCOL_ID;
use solana_streamer::tls_certificates::new_self_signed_tls_certificate;
Expand Down
2 changes: 2 additions & 0 deletions services/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
pub mod data_caching_service;
pub mod metrics_capture;
pub mod prometheus_sync;
pub mod quic_connection;
pub mod quic_connection_utils;
pub mod tpu_utils;
pub mod transaction_replayer;
pub mod transaction_service;
Expand Down
34 changes: 26 additions & 8 deletions core/src/quic_connection.rs → services/src/quic_connection.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use crate::{
quic_connection_utils::{QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils},
structures::rotating_queue::RotatingQueue,
use crate::quic_connection_utils::{
QuicConnectionError, QuicConnectionParameters, QuicConnectionUtils,
};
use futures::FutureExt;
use log::warn;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{Connection, Endpoint};
use solana_lite_rpc_core::structures::rotating_queue::RotatingQueue;
use solana_sdk::pubkey::Pubkey;
use std::{
net::SocketAddr,
Expand All @@ -17,6 +18,19 @@ use tokio::sync::{OwnedSemaphorePermit, RwLock, Semaphore};

pub type EndpointPool = RotatingQueue<Endpoint>;

lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTION_RESET: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_connection_reset", "Number of times connection was reset")).unwrap();
static ref NB_QUIC_CONNECTION_REQUESTED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_connection_requested", "Number of connections requested")).unwrap();
static ref TRIED_SEND_TRANSCTION_TRIED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_send_transaction_tried", "Number of times send transaction was tried")).unwrap();
static ref SEND_TRANSCTION_SUCESSFUL: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_send_transaction_successful", "Number of times send transaction was successful")).unwrap();
static ref NB_QUIC_COULDNOT_ESTABLISH_CONNECTION: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_nb_couldnot_establish_connection", "Number of times quic connection could not be established")).unwrap();
}

#[derive(Clone)]
#[warn(clippy::rc_clone_in_vec_init)]
pub struct QuicConnection {
Expand Down Expand Up @@ -52,10 +66,10 @@ impl QuicConnection {
}
}

async fn connect(&self) -> Option<Connection> {
async fn connect(&self, is_already_connected: bool) -> Option<Connection> {
QuicConnectionUtils::connect(
self.identity,
true,
is_already_connected,
self.endpoint.clone(),
self.socket_address,
self.connection_params.connection_timeout,
Expand All @@ -80,7 +94,8 @@ impl QuicConnection {
if connection.stable_id() != current_stable_id {
Some(connection)
} else {
let new_conn = self.connect().await;
NB_QUIC_CONNECTION_RESET.inc();
let new_conn = self.connect(true).await;
if let Some(new_conn) = new_conn {
*conn = Some(new_conn);
conn.clone()
Expand All @@ -94,7 +109,8 @@ impl QuicConnection {
}
}
None => {
let connection = self.connect().await;
NB_QUIC_CONNECTION_REQUESTED.inc();
let connection = self.connect(false).await;
*self.connection.write().await = connection.clone();
self.has_connected_once.store(true, Ordering::Relaxed);
connection
Expand All @@ -114,6 +130,7 @@ impl QuicConnection {
let connection = self.get_connection().await;

if let Some(connection) = connection {
TRIED_SEND_TRANSCTION_TRIED.inc();
let current_stable_id = connection.stable_id() as u64;
match QuicConnectionUtils::open_unistream(
connection,
Expand All @@ -131,7 +148,7 @@ impl QuicConnection {
.await
{
Ok(()) => {
// do nothing
SEND_TRANSCTION_SUCESSFUL.inc();
}
Err(QuicConnectionError::ConnectionError { retry }) => {
do_retry = retry;
Expand All @@ -154,6 +171,7 @@ impl QuicConnection {
break;
}
} else {
NB_QUIC_COULDNOT_ESTABLISH_CONNECTION.inc();
warn!(
"Could not establish connection with {}",
self.identity.to_string()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use log::{info, trace};
use log::trace;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::{
ClientConfig, Connection, ConnectionError, Endpoint, EndpointConfig, IdleTimeout, SendStream,
TokioRuntime, TransportConfig,
};
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_sdk::pubkey::Pubkey;
use std::{
net::{IpAddr, Ipv4Addr, SocketAddr},
Expand All @@ -14,6 +16,23 @@ use std::{
};
use tokio::time::timeout;

lazy_static::lazy_static! {
static ref NB_QUIC_0RTT_TIMEOUT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_0RTT_timedout", "Number of times 0RTT timedout")).unwrap();
static ref NB_QUIC_CONNECTION_TIMEOUT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_timedout", "Number of times connection timedout")).unwrap();
static ref NB_QUIC_CONNECTION_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_connection_errored", "Number of times connection errored")).unwrap();
static ref NB_QUIC_WRITEALL_TIMEOUT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_writeall_timedout", "Number of times writeall timedout")).unwrap();
static ref NB_QUIC_WRITEALL_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_writeall_errored", "Number of times writeall errored")).unwrap();
static ref NB_QUIC_FINISH_TIMEOUT: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_finish_timedout", "Number of times finish timedout")).unwrap();
static ref NB_QUIC_FINISH_ERRORED: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_quic_finish_errored", "Number of times finish errored")).unwrap();
}

const ALPN_TPU_PROTOCOL_ID: &[u8] = b"solana-tpu";

pub enum QuicConnectionError {
Expand Down Expand Up @@ -74,8 +93,20 @@ impl QuicConnectionUtils {
connection_timeout: Duration,
) -> anyhow::Result<Connection> {
let connecting = endpoint.connect(addr, "connect")?;
let res = timeout(connection_timeout, connecting).await??;
Ok(res)
match timeout(connection_timeout, connecting).await {
Ok(res) => match res {
Ok(connection) => Ok(connection),
Err(e) => {
NB_QUIC_CONNECTION_ERRORED.inc();
Err(e.into())
}
},
Err(_) => {
// timed out
NB_QUIC_CONNECTION_TIMEOUT.inc();
Err(ConnectionError::TimedOut.into())
}
}
}

pub async fn make_connection_0rtt(
Expand All @@ -89,13 +120,18 @@ impl QuicConnectionUtils {
if (timeout(connection_timeout, zero_rtt).await).is_ok() {
connection
} else {
NB_QUIC_0RTT_TIMEOUT.inc();
return Err(ConnectionError::TimedOut.into());
}
}
Err(connecting) => {
if let Ok(connecting_result) = timeout(connection_timeout, connecting).await {
if connecting_result.is_err() {
NB_QUIC_CONNECTION_ERRORED.inc();
}
connecting_result?
} else {
NB_QUIC_CONNECTION_TIMEOUT.inc();
return Err(ConnectionError::TimedOut.into());
}
}
Expand Down Expand Up @@ -153,11 +189,13 @@ impl QuicConnectionUtils {
identity,
e
);
NB_QUIC_WRITEALL_ERRORED.inc();
return Err(QuicConnectionError::ConnectionError { retry: true });
}
}
Err(_) => {
log::debug!("timeout while writing transaction for {}", identity);
NB_QUIC_WRITEALL_TIMEOUT.inc();
return Err(QuicConnectionError::TimeOut);
}
}
Expand All @@ -172,11 +210,13 @@ impl QuicConnectionUtils {
identity,
e
);
NB_QUIC_FINISH_ERRORED.inc();
return Err(QuicConnectionError::ConnectionError { retry: false });
}
}
Err(_) => {
log::debug!("timeout while finishing transaction for {}", identity);
NB_QUIC_FINISH_TIMEOUT.inc();
return Err(QuicConnectionError::TimeOut);
}
}
Expand Down Expand Up @@ -217,39 +257,3 @@ impl rustls::client::ServerCertVerifier for SkipServerVerification {
Ok(rustls::client::ServerCertVerified::assertion())
}
}

// connection for sending proxy request: FrameStats {
// ACK: 2, CONNECTION_CLOSE: 0, CRYPTO: 3, DATA_BLOCKED: 0, DATAGRAM: 0, HANDSHAKE_DONE: 1,
// MAX_DATA: 0, MAX_STREAM_DATA: 1, MAX_STREAMS_BIDI: 0, MAX_STREAMS_UNI: 0, NEW_CONNECTION_ID: 4,
// NEW_TOKEN: 0, PATH_CHALLENGE: 0, PATH_RESPONSE: 0, PING: 0, RESET_STREAM: 0, RETIRE_CONNECTION_ID: 1,
// STREAM_DATA_BLOCKED: 0, STREAMS_BLOCKED_BIDI: 0, STREAMS_BLOCKED_UNI: 0, STOP_SENDING: 0, STREAM: 0 }
// rtt=1.08178ms
pub fn connection_stats(connection: &Connection) -> String {
// see https://www.rfc-editor.org/rfc/rfc9000.html#name-frame-types-and-formats
format!(
"stable_id {}, rtt={:?}, stats {:?}",
connection.stable_id(),
connection.stats().path.rtt,
connection.stats().frame_rx
)
}

/// env flag to optionally disable GSO (generic segmentation offload) on environments where Quinn cannot detect it properly
/// see https://github.com/quinn-rs/quinn/pull/1671
pub fn apply_gso_workaround(tc: &mut TransportConfig) {
if disable_gso() {
tc.enable_segmentation_offload(false);
}
}

pub fn log_gso_workaround() {
info!("GSO force-disabled? {}", disable_gso());
}

/// note: true means that quinn's heuristic for GSO detection is used to decide if GSO is used
fn disable_gso() -> bool {
std::env::var("DISABLE_GSO")
.unwrap_or("false".to_string())
.parse::<bool>()
.expect("flag must be true or false")
}
5 changes: 2 additions & 3 deletions services/src/tpu_utils/quic_proxy_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use solana_sdk::pubkey::Pubkey;
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::{broadcast::Receiver, RwLock};

use solana_lite_rpc_core::quic_connection_utils::{
apply_gso_workaround, QuicConnectionParameters, SkipServerVerification,
};
use crate::quic_connection_utils::{QuicConnectionParameters, SkipServerVerification};
use solana_lite_rpc_core::network_utils::apply_gso_workaround;
use solana_lite_rpc_core::structures::proxy_request_format::{TpuForwardingRequest, TxData};

use crate::tpu_utils::quinn_auto_reconnect::AutoReconnect;
Expand Down
7 changes: 5 additions & 2 deletions services/src/tpu_utils/tpu_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use log::{error, trace};
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use quinn::Endpoint;
use solana_lite_rpc_core::{
quic_connection::{PooledConnection, QuicConnectionPool},
quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils},
stores::data_cache::DataCache,
structures::{
identity_stakes::IdentityStakesData, rotating_queue::RotatingQueue,
Expand All @@ -23,6 +21,11 @@ use std::{
};
use tokio::sync::{broadcast::Receiver, broadcast::Sender};

use crate::{
quic_connection::{PooledConnection, QuicConnectionPool},
quic_connection_utils::{QuicConnectionParameters, QuicConnectionUtils},
};

lazy_static::lazy_static! {
static ref NB_QUIC_CONNECTIONS: GenericGauge<prometheus::core::AtomicI64> =
register_int_gauge!(opts!("literpc_nb_active_quic_connections", "Number of quic connections open")).unwrap();
Expand Down
6 changes: 4 additions & 2 deletions services/src/tpu_utils/tpu_service.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
use anyhow::Context;
use prometheus::{core::GenericGauge, opts, register_int_gauge};
use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo;

use super::tpu_connection_manager::TpuConnectionManager;
use crate::quic_connection_utils::QuicConnectionParameters;
use crate::tpu_utils::quic_proxy_connection_manager::QuicProxyConnectionManager;
use crate::tpu_utils::tpu_connection_path::TpuConnectionPath;
use crate::tpu_utils::tpu_service::ConnectionManager::{DirectTpu, QuicProxy};
use solana_lite_rpc_core::quic_connection_utils::{log_gso_workaround, QuicConnectionParameters};

use solana_lite_rpc_core::network_utils::log_gso_workaround;
use solana_lite_rpc_core::stores::data_cache::DataCache;
use solana_lite_rpc_core::structures::transaction_sent_info::SentTransactionInfo;
use solana_lite_rpc_core::traits::leaders_fetcher_interface::LeaderFetcherInterface;
use solana_lite_rpc_core::types::SlotStream;
use solana_lite_rpc_core::AnyhowJoinHandle;
Expand Down

0 comments on commit 4ed7eea

Please sign in to comment.