Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 26 additions & 27 deletions src/adnl/src/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,8 +493,6 @@ pub struct QuicNode {
/// Shared subscriber list for all accept loops.
subscribers: Arc<Vec<Arc<dyn Subscriber>>>,
peer_keys: lockfree::map::Map<Arc<KeyId>, SocketAddr>,
/// Max concurrent in-flight streams per inbound connection.
max_streams_per_connection: usize,
/// Inbound connection maps, one per endpoint/accept-loop. Used by the stats dumper.
inbound_pools: Mutex<Vec<Arc<QuicInboundMap>>>,
/// Per-TL-tag message counters for the stats dumper.
Expand All @@ -514,7 +512,6 @@ impl QuicNode {
const CONNECTION_CHECK_INTERVAL: Duration = Duration::from_secs(5);
/// How often the stats dumper logs connection statistics.
const STATS_DUMP_INTERVAL: Duration = Duration::from_secs(60);
const DEFAULT_MAX_STREAMS_PER_CONNECTION: usize = 256;
const DEFAULT_QUERY_TIMEOUT_MS: u64 = 5000;
/// Maximum number of messages buffered per outbound peer
const SEND_QUEUE_CAPACITY: usize = 1024;
Expand Down Expand Up @@ -561,12 +558,9 @@ impl QuicNode {
pub fn new(
subscribers: Vec<Arc<dyn Subscriber>>,
cancellation_token: tokio_util::sync::CancellationToken,
max_streams_per_connection: Option<usize>,
runtime_handle: tokio::runtime::Handle,
rate_limit_config: Option<QuicRateLimitConfig>,
) -> Arc<Self> {
Comment thread
bvscd marked this conversation as resolved.
let max_streams_per_connection =
max_streams_per_connection.unwrap_or(Self::DEFAULT_MAX_STREAMS_PER_CONNECTION);
static CRYPTO_INIT: Once = Once::new();
CRYPTO_INIT.call_once(|| {
rustls::crypto::ring::default_provider()
Expand All @@ -580,7 +574,6 @@ impl QuicNode {
endpoints: Mutex::new(HashMap::new()),
subscribers: Arc::new(subscribers),
peer_keys: lockfree::map::Map::new(),
max_streams_per_connection,
inbound_pools: Mutex::new(Vec::new()),
msg_stats: MsgStats::new(),
key_cmd_tx,
Expand Down Expand Up @@ -697,6 +690,8 @@ impl QuicNode {
quinn::IdleTimeout::try_from(Duration::from_secs(15)).expect("15s fits in IdleTimeout"),
));
client_transport.keep_alive_interval(Some(Duration::from_secs(5)));
client_transport
.congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default()));
quinn_client_config.transport_config(Arc::new(client_transport));

let local_key_state = Arc::new(LocalKeyState {
Expand Down Expand Up @@ -1197,6 +1192,8 @@ impl QuicNode {
// Keep established connections alive so the idle timeout only fires on
// truly dead peers, not on connections that are just quiet between rounds.
transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
transport_config
.congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default()));
quinn_server_config.transport_config(Arc::new(transport_config));
Comment thread
bvscd marked this conversation as resolved.

// Create UDP socket with SO_REUSEADDR so the port can be reused immediately
Expand All @@ -1214,6 +1211,22 @@ impl QuicNode {
sock.set_nonblocking(true).map_err(|e| error!("Cannot set non-blocking: {e}"))?;
UdpSocket::from(sock)
};
// Probe the UDP socket's hardware/OS offload capabilities before moving
// it into the quinn endpoint. GSO/GRO status is useful for diagnosing
// throughput differences across hosts.
match quinn::udp::UdpSocketState::new((&udp_socket).into()) {
Ok(state) => log::info!(
target: TARGET,
"QUIC UDP caps on {bind_addr}: max_gso_segments={}, gro_segments={}, may_fragment={}",
state.max_gso_segments(),
state.gro_segments(),
state.may_fragment(),
),
Err(e) => log::warn!(
target: TARGET,
"QUIC UDP caps probe failed on {bind_addr}: {e}"
),
}
let runtime: Arc<dyn quinn::Runtime> = Arc::new(quinn::TokioRuntime);
let endpoint = quinn::Endpoint::new(
quinn::EndpointConfig::default(),
Expand Down Expand Up @@ -1253,7 +1266,6 @@ impl QuicNode {
server_cert_resolver,
self.subscribers.clone(),
bind_addr,
self.max_streams_per_connection,
self.cancellation_token.clone(),
inbound,
self.msg_stats.clone(),
Expand Down Expand Up @@ -1327,7 +1339,6 @@ impl QuicNode {
inbound: Arc<QuicInboundMap>,
subscribers: Arc<Vec<Arc<dyn Subscriber>>>,
bind_addr: SocketAddr,
max_streams_per_connection: usize,
msg_stats: Arc<MsgStats>,
fallback_config: Option<Arc<quinn::ServerConfig>>,
reconnect_tracker: Arc<Mutex<HashMap<IpAddr, ReconnectTracker>>>,
Expand Down Expand Up @@ -1416,17 +1427,15 @@ impl QuicNode {

let peers = AdnlPeers::with_keys(local_key_id, peer_key_id);
let conn_id = conn.stable_id();
// Limit concurrent in-flight streams per connection to bound memory usage.
// When the semaphore is full, accept stalls, applying QUIC-level backpressure.
let stream_semaphore = Arc::new(tokio::sync::Semaphore::new(max_streams_per_connection));

// Accept both bi-directional streams (queries + legacy messages) and
// uni-directional streams (fire-and-forget messages from the new sender).
// Concurrency is bounded at the QUIC layer via
// `TransportConfig::max_concurrent_bidi_streams` — no additional
// user-level semaphore is needed.
let streams_accepted = Arc::new(AtomicU64::new(0));
Comment thread
bvscd marked this conversation as resolved.
let conn_bi = conn.clone();
let conn_uni = conn.clone();
let sem_bi = stream_semaphore.clone();
let sem_uni = stream_semaphore;
let subs_bi = subscribers.clone();
let subs_uni = subscribers;
let peers_bi = peers.clone();
Expand All @@ -1446,15 +1455,10 @@ impl QuicNode {
}
};
streams_bi.fetch_add(1, Ordering::Relaxed);
let permit = match sem_bi.clone().acquire_owned().await {
Ok(p) => p,
Err(_) => break,
};
let subscribers = subs_bi.clone();
let peers = peers_bi.clone();
let stats = stats_bi.clone();
tokio::spawn(async move {
let _permit = permit;
if let Err(e) = Self::process_incoming_stream(
recv,
send,
Expand All @@ -1481,15 +1485,10 @@ impl QuicNode {
}
};
streams_uni.fetch_add(1, Ordering::Relaxed);
let permit = match sem_uni.clone().acquire_owned().await {
Ok(p) => p,
Err(_) => break,
};
let subscribers = subs_uni.clone();
let peers = peers_uni.clone();
let stats = stats_uni.clone();
tokio::spawn(async move {
let _permit = permit;
if let Err(e) =
Self::process_incoming_uni_stream(recv, &subscribers, &peers, addr, &stats)
.await
Expand Down Expand Up @@ -1563,6 +1562,8 @@ impl QuicNode {
quinn::IdleTimeout::try_from(Duration::from_secs(15)).expect("15s fits in IdleTimeout"),
));
transport_config.keep_alive_interval(Some(Duration::from_secs(5)));
transport_config
.congestion_controller_factory(Arc::new(quinn::congestion::BbrConfig::default()));
server_config.transport_config(Arc::new(transport_config));
Ok(server_config)
}
Expand Down Expand Up @@ -2107,7 +2108,6 @@ impl QuicNode {
server_cert_resolver: Arc<QuicServerCertResolver>,
subscribers: Arc<Vec<Arc<dyn Subscriber>>>,
bind_addr: SocketAddr,
max_streams_per_connection: usize,
cancellation_token: tokio_util::sync::CancellationToken,
inbound: Arc<QuicInboundMap>,
msg_stats: Arc<MsgStats>,
Expand Down Expand Up @@ -2187,8 +2187,7 @@ impl QuicNode {
}
_ = Self::handle_connection(
incoming, lkn, scr, ib, subs, bind_addr,
max_streams_per_connection, stats,
fallback_config, tracker,
stats, fallback_config, tracker,
) => {}
}
});
Expand Down
Loading