diff --git a/src/adnl/src/quic/mod.rs b/src/adnl/src/quic/mod.rs index aaf67e4d..ab9d7222 100644 --- a/src/adnl/src/quic/mod.rs +++ b/src/adnl/src/quic/mod.rs @@ -493,8 +493,6 @@ pub struct QuicNode { /// Shared subscriber list for all accept loops. subscribers: Arc>>, peer_keys: lockfree::map::Map, SocketAddr>, - /// Max concurrent in-flight streams per inbound connection. - max_streams_per_connection: usize, /// Inbound connection maps, one per endpoint/accept-loop. Used by the stats dumper. inbound_pools: Mutex>>, /// Per-TL-tag message counters for the stats dumper. @@ -514,7 +512,6 @@ impl QuicNode { const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(5); /// How often the stats dumper logs connection statistics. const STATS_DUMP_INTERVAL: Duration = Duration::from_secs(60); - const DEFAULT_MAX_STREAMS_PER_CONNECTION: usize = 256; const DEFAULT_QUERY_TIMEOUT_MS: u64 = 5000; /// Maximum number of messages buffered per outbound peer const SEND_QUEUE_CAPACITY: usize = 1024; @@ -561,12 +558,9 @@ impl QuicNode { pub fn new( subscribers: Vec>, cancellation_token: tokio_util::sync::CancellationToken, - max_streams_per_connection: Option, runtime_handle: tokio::runtime::Handle, rate_limit_config: Option, ) -> Arc { - let max_streams_per_connection = - max_streams_per_connection.unwrap_or(Self::DEFAULT_MAX_STREAMS_PER_CONNECTION); static CRYPTO_INIT: Once = Once::new(); CRYPTO_INIT.call_once(|| { rustls::crypto::ring::default_provider() @@ -580,7 +574,6 @@ impl QuicNode { endpoints: Mutex::new(HashMap::new()), subscribers: Arc::new(subscribers), peer_keys: lockfree::map::Map::new(), - max_streams_per_connection, inbound_pools: Mutex::new(Vec::new()), msg_stats: MsgStats::new(), key_cmd_tx, @@ -697,6 +690,8 @@ impl QuicNode { quinn::IdleTimeout::try_from(Duration::from_secs(15)).expect("15s fits in IdleTimeout"), )); client_transport.keep_alive_interval(Some(Duration::from_secs(5))); + client_transport + .congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default())); quinn_client_config.transport_config(Arc::new(client_transport)); let local_key_state = Arc::new(LocalKeyState { @@ -1197,6 +1192,8 @@ impl QuicNode { // Keep established connections alive so the idle timeout only fires on // truly dead peers, not on connections that are just quiet between rounds. transport_config.keep_alive_interval(Some(Duration::from_secs(5))); + transport_config + .congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default())); quinn_server_config.transport_config(Arc::new(transport_config)); // Create UDP socket with SO_REUSEADDR so the port can be reused immediately @@ -1214,6 +1211,22 @@ impl QuicNode { sock.set_nonblocking(true).map_err(|e| error!("Cannot set non-blocking: {e}"))?; UdpSocket::from(sock) }; + // Probe the UDP socket's hardware/OS offload capabilities before moving + // it into the quinn endpoint. GSO/GRO status is useful for diagnosing + // throughput differences across hosts. + match quinn::udp::UdpSocketState::new((&udp_socket).into()) { + Ok(state) => log::info!( + target: TARGET, + "QUIC UDP caps on {bind_addr}: max_gso_segments={}, gro_segments={}, may_fragment={}", + state.max_gso_segments(), + state.gro_segments(), + state.may_fragment(), + ), + Err(e) => log::warn!( + target: TARGET, + "QUIC UDP caps probe failed on {bind_addr}: {e}" + ), + } let runtime: Arc = Arc::new(quinn::TokioRuntime); let endpoint = quinn::Endpoint::new( quinn::EndpointConfig::default(), @@ -1253,7 +1266,6 @@ impl QuicNode { server_cert_resolver, self.subscribers.clone(), bind_addr, - self.max_streams_per_connection, self.cancellation_token.clone(), inbound, self.msg_stats.clone(), @@ -1327,7 +1339,6 @@ impl QuicNode { inbound: Arc, subscribers: Arc>>, bind_addr: SocketAddr, - max_streams_per_connection: usize, msg_stats: Arc, fallback_config: Option>, reconnect_tracker: Arc>>, @@ -1416,17 +1427,15 @@ impl QuicNode { let peers = AdnlPeers::with_keys(local_key_id, peer_key_id); let conn_id = conn.stable_id(); - // Limit concurrent in-flight streams per connection to bound memory usage. - // When the semaphore is full, accept stalls, applying QUIC-level backpressure. - let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(max_streams_per_connection)); // Accept both bi-directional streams (queries + legacy messages) and // uni-directional streams (fire-and-forget messages from the new sender). + // Concurrency is bounded at the QUIC layer via + // `TransportConfig::max_concurrent_bidi_streams` — no additional + // user-level semaphore is needed. let streams_accepted = Arc::new(AtomicU64::new(0)); let conn_bi = conn.clone(); let conn_uni = conn.clone(); - let sem_bi = stream_semaphore.clone(); - let sem_uni = stream_semaphore; let subs_bi = subscribers.clone(); let subs_uni = subscribers; let peers_bi = peers.clone(); @@ -1446,15 +1455,10 @@ impl QuicNode { } }; streams_bi.fetch_add(1, Ordering::Relaxed); - let permit = match sem_bi.clone().acquire_owned().await { - Ok(p) => p, - Err(_) => break, - }; let subscribers = subs_bi.clone(); let peers = peers_bi.clone(); let stats = stats_bi.clone(); tokio::spawn(async move { - let _permit = permit; if let Err(e) = Self::process_incoming_stream( recv, send, @@ -1481,15 +1485,10 @@ impl QuicNode { } }; streams_uni.fetch_add(1, Ordering::Relaxed); - let permit = match sem_uni.clone().acquire_owned().await { - Ok(p) => p, - Err(_) => break, - }; let subscribers = subs_uni.clone(); let peers = peers_uni.clone(); let stats = stats_uni.clone(); tokio::spawn(async move { - let _permit = permit; if let Err(e) = Self::process_incoming_uni_stream(recv, &subscribers, &peers, addr, &stats) .await @@ -1563,6 +1562,8 @@ impl QuicNode { quinn::IdleTimeout::try_from(Duration::from_secs(15)).expect("15s fits in IdleTimeout"), )); transport_config.keep_alive_interval(Some(Duration::from_secs(5))); + transport_config + .congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default())); server_config.transport_config(Arc::new(transport_config)); Ok(server_config) } @@ -2107,7 +2108,6 @@ impl QuicNode { server_cert_resolver: Arc, subscribers: Arc>>, bind_addr: SocketAddr, - max_streams_per_connection: usize, cancellation_token: tokio_util::sync::CancellationToken, inbound: Arc, msg_stats: Arc, @@ -2187,8 +2187,7 @@ impl QuicNode { } _ = Self::handle_connection( incoming, lkn, scr, ib, subs, bind_addr, - max_streams_per_connection, stats, - fallback_config, tracker, + stats, fallback_config, tracker, ) => {} } }); diff --git a/src/adnl/tests/test_quic.rs b/src/adnl/tests/test_quic.rs index 2eccf2fa..b7fdc70c 100644 --- a/src/adnl/tests/test_quic.rs +++ b/src/adnl/tests/test_quic.rs @@ -12,15 +12,7 @@ use adnl::{ node::{AdnlNode, IpAddress}, DhtNode, OverlayNode, QuicNode, QuicRateLimitConfig, }; -use std::{ - collections::HashSet, - net::Ipv4Addr, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - time::Duration, -}; +use std::{collections::HashSet, net::Ipv4Addr, sync::Arc, time::Duration}; use tokio_util::sync::CancellationToken; use ton_api::{ deserialize_boxed, serialize_boxed, @@ -156,7 +148,6 @@ fn test_quic_concurrent_accept() { let server = QuicNode::new( vec![server_sub], server_token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -190,7 +181,6 @@ fn test_quic_concurrent_accept() { let quic = QuicNode::new( vec![sub], token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -291,7 +281,6 @@ fn test_quic_session() { let quic_a = QuicNode::new( vec![sub_a], token_a.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -300,7 +289,6 @@ fn test_quic_session() { let quic_b = QuicNode::new( vec![sub_b], token_b.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -380,7 +368,6 @@ fn test_quic_reconnect_after_server_restart() { let client = QuicNode::new( vec![client_sub], client_token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -405,7 +392,6 @@ fn test_quic_reconnect_after_server_restart() { let server1 = QuicNode::new( vec![server_sub1], server_token1.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -444,7 +430,6 @@ fn test_quic_reconnect_after_server_restart() { let server2 = QuicNode::new( vec![server_sub2], server_token2.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -469,177 +454,6 @@ fn test_quic_reconnect_after_server_restart() { }); } -/// Subscriber that tracks concurrent processing count and holds streams open. -struct SlowSubscriber { - key_id: Arc, - current: Arc, - peak: Arc, - processed: Arc, - hold_duration: Duration, -} - -#[async_trait::async_trait] -impl Subscriber for SlowSubscriber { - async fn try_consume_custom(&self, _data: &[u8], peers: &AdnlPeers) -> Result { - if peers.local() != &self.key_id { - return Ok(false); - } - let prev = self.current.fetch_add(1, Ordering::SeqCst); - let concurrent = prev + 1; - self.peak.fetch_max(concurrent, Ordering::SeqCst); - tokio::time::sleep(self.hold_duration).await; - self.current.fetch_sub(1, Ordering::SeqCst); - self.processed.fetch_add(1, Ordering::SeqCst); - Ok(true) - } - - async fn try_consume_query(&self, object: TLObject, peers: &AdnlPeers) -> Result { - if peers.local() != &self.key_id { - return Ok(QueryResult::Rejected(object)); - } - // Answer pings normally so the client can establish the connection. - match object.downcast::() { - Ok(ping) => QueryResult::consume( - AdnlPong { value: ping.value }, - #[cfg(feature = "telemetry")] - None, - ), - Err(obj) => Ok(QueryResult::Rejected(obj)), - } - } -} - -/// Verify that the per-connection stream semaphore limits concurrent processing. -/// Server stream limit = 2, subscriber holds each stream for 1s. -/// Client fires 4 messages concurrently — peak concurrency must not exceed 2. -#[test] -fn test_quic_stream_limit() { - init_test_log(); - let rt = tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap(); - rt.block_on(async { - const STREAM_LIMIT: usize = 2; - const NUM_MESSAGES: usize = 4; - const HOLD: Duration = Duration::from_secs(1); - const SERVER_PORT: u16 = 5850; - const CLIENT_PORT: u16 = 5851; - const TIMEOUT: Duration = Duration::from_secs(15); - - // --- server with stream limit = 2 --- - let server_token = CancellationToken::new(); - let server_key = ed25519_generate_private_key().unwrap().to_bytes(); - let (_, server_cfg) = AdnlNodeConfig::from_ip_address_and_private_keys( - &format!("127.0.0.1:{SERVER_PORT}"), - vec![(server_key, KEY_TAG)], - ) - .unwrap(); - let server_key_id = server_cfg.key_by_tag(KEY_TAG).unwrap().id().clone(); - - let peak = Arc::new(AtomicUsize::new(0)); - let current = Arc::new(AtomicUsize::new(0)); - let processed = Arc::new(AtomicUsize::new(0)); - let server_sub = Arc::new(SlowSubscriber { - key_id: server_key_id.clone(), - current: current.clone(), - peak: peak.clone(), - processed: processed.clone(), - hold_duration: HOLD, - }) as Arc; - - let server_bind: SocketAddr = - format!("127.0.0.1:{}", SERVER_PORT + QuicNode::OFFSET_PORT).parse().unwrap(); - let server = QuicNode::new( - vec![server_sub], - server_token.clone(), - Some(STREAM_LIMIT), - tokio::runtime::Handle::current(), - Some(QuicRateLimitConfig::disabled()), - ); - server.add_key(&server_key, &server_key_id, server_bind).unwrap(); - - // --- client (normal limits) --- - let client_token = CancellationToken::new(); - let client_key = ed25519_generate_private_key().unwrap().to_bytes(); - let (_, client_cfg) = AdnlNodeConfig::from_ip_address_and_private_keys( - &format!("127.0.0.1:{CLIENT_PORT}"), - vec![(client_key, KEY_TAG)], - ) - .unwrap(); - let client_key_id = client_cfg.key_by_tag(KEY_TAG).unwrap().id().clone(); - - let (cli_tx, _cli_rx) = tokio::sync::mpsc::unbounded_channel(); - let client_sub = Arc::new(TestSubscriber { key_id: client_key_id.clone(), msg_tx: cli_tx }) - as Arc; - - let client_bind: SocketAddr = - format!("127.0.0.1:{}", CLIENT_PORT + QuicNode::OFFSET_PORT).parse().unwrap(); - let client = QuicNode::new( - vec![client_sub], - client_token.clone(), - None, - tokio::runtime::Handle::current(), - Some(QuicRateLimitConfig::disabled()), - ); - client.add_key(&client_key, &client_key_id, client_bind).unwrap(); - - // Register peers - client.add_peer_key(server_key_id.clone(), server_bind).unwrap(); - server.add_peer_key(client_key_id.clone(), client_bind).unwrap(); - let peers = AdnlPeers::with_keys(client_key_id.clone(), server_key_id.clone()); - - // Establish the connection with a ping/pong first - let resp = tokio::time::timeout( - Duration::from_secs(10), - client.query(make_ping_data(100500), None, &peers, None), - ) - .await - .expect("initial query timed out") - .expect("initial query failed"); - assert_eq!(parse_pong(resp.unwrap()), 100500, "warmup pong mismatch"); - - // --- fire NUM_MESSAGES concurrently --- - let mut handles = Vec::with_capacity(NUM_MESSAGES); - for i in 0..NUM_MESSAGES { - let quic = client.clone(); - let peers = peers.clone(); - handles.push(tokio::spawn(async move { - let payload = format!("msg-{i}"); - quic.message(payload.as_bytes().to_vec(), None, &peers) - .await - .unwrap_or_else(|e| panic!("message {i} failed: {e}")); - })); - } - - // Wait for all messages to be processed by the slow subscriber - let _ = tokio::time::timeout(TIMEOUT, async { - for h in handles { - let _ = h.await; - } - while processed.load(Ordering::SeqCst) < NUM_MESSAGES { - tokio::time::sleep(Duration::from_millis(100)).await; - } - }) - .await - .expect("stream limit test timed out"); - - let observed_peak = peak.load(Ordering::SeqCst); - println!( - "Stream limit test: limit={STREAM_LIMIT}, \ - messages={NUM_MESSAGES}, peak_concurrent={observed_peak}" - ); - assert!( - observed_peak <= STREAM_LIMIT, - "Peak concurrency {observed_peak} exceeded stream limit {STREAM_LIMIT}" - ); - assert!(observed_peak > 0, "No messages were processed — test is broken"); - - // --- cleanup --- - client.shutdown(); - server.shutdown(); - client_token.cancel(); - server_token.cancel(); - }); -} - // --------------------------------------------------------------------------- // Helper: create a QUIC endpoint with a fresh key on the given ADNL port. // --------------------------------------------------------------------------- @@ -668,13 +482,8 @@ fn make_endpoint_with_config( let (tx, _rx) = tokio::sync::mpsc::unbounded_channel(); let sub = Arc::new(TestSubscriber { key_id: key_id.clone(), msg_tx: tx }) as Arc; - let quic = QuicNode::new( - vec![sub], - token.clone(), - None, - tokio::runtime::Handle::current(), - Some(rl_config), - ); + let quic = + QuicNode::new(vec![sub], token.clone(), tokio::runtime::Handle::current(), Some(rl_config)); quic.add_key(&key, &key_id, bind).unwrap(); (quic, key, key_id, bind, token) } @@ -1610,7 +1419,6 @@ fn test_quic_connection_pool_exhaustion() { let quic = QuicNode::new( vec![sub], token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -1720,7 +1528,6 @@ fn test_quic_message_burst_reconnect() { let client = QuicNode::new( vec![client_sub], client_token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -1742,7 +1549,6 @@ fn test_quic_message_burst_reconnect() { let server1 = QuicNode::new( vec![srv_sub1], srv_token1.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -1788,7 +1594,6 @@ fn test_quic_message_burst_reconnect() { let server2 = QuicNode::new( vec![srv_sub2], srv_token2.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -1862,7 +1667,6 @@ fn test_quic_single_sender_invariant() { let client = QuicNode::new( vec![client_sub], client_token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); @@ -1883,7 +1687,6 @@ fn test_quic_single_sender_invariant() { let server = QuicNode::new( vec![srv_sub], srv_token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); diff --git a/src/node/consensus-common/src/node_test_network.rs b/src/node/consensus-common/src/node_test_network.rs index 055fcf03..d1685f23 100644 --- a/src/node/consensus-common/src/node_test_network.rs +++ b/src/node/consensus-common/src/node_test_network.rs @@ -248,7 +248,6 @@ impl<'a> NodeTestNetwork<'a> { let quic = QuicNode::new( vec![overlay.clone()], cancellation_token.clone(), - None, tokio::runtime::Handle::current(), Some(QuicRateLimitConfig::disabled()), ); diff --git a/src/node/src/network/node_network.rs b/src/node/src/network/node_network.rs index 088c9593..8ea37d7e 100644 --- a/src/node/src/network/node_network.rs +++ b/src/node/src/network/node_network.rs @@ -175,7 +175,6 @@ impl NodeNetwork { let quic = adnl::QuicNode::new( vec![overlay.clone()], cancellation_token.clone(), - None, tokio::runtime::Handle::current(), None, ); diff --git a/src/node/tests/compat_test/src/test_helpers.rs b/src/node/tests/compat_test/src/test_helpers.rs index 93e68bd9..495d1d7a 100644 --- a/src/node/tests/compat_test/src/test_helpers.rs +++ b/src/node/tests/compat_test/src/test_helpers.rs @@ -531,7 +531,6 @@ impl RustQuicTestNode { let quic = QuicNode::new( quic_subscribers, cancellation_token.clone(), - None, rt.handle().clone(), Some(QuicRateLimitConfig::disabled()), );