From c5d0eb105d2e21d3188f26224fe9ecc7fd47175a Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Sat, 2 Aug 2025 23:47:55 +0530 Subject: [PATCH 01/15] fix(server): Update quic server with compio. --- Cargo.lock | 1 + Cargo.toml | 1 + core/server/Cargo.toml | 1 + core/server/src/binary/sender.rs | 2 +- core/server/src/quic/listener.rs | 194 +++++++++++------- core/server/src/quic/quic_sender.rs | 31 +-- core/server/src/quic/quic_server.rs | 99 +++++---- core/server/src/server_error.rs | 4 + core/server/src/shard/builder.rs | 1 + core/server/src/shard/mod.rs | 2 + core/server/src/shard/system/messages.rs | 36 ++-- .../src/streaming/diagnostics/metrics.rs | 36 ++++ 12 files changed, 263 insertions(+), 145 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 3aa162b08d..a8dd8f67f1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7455,6 +7455,7 @@ dependencies = [ "clap", "compio", "compio-net", + "compio-quic", "console-subscriber", "crossbeam", "ctrlc", diff --git a/Cargo.toml b/Cargo.toml index cf5b7cd060..7d75a9342b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -135,6 +135,7 @@ cyper = { git = "https://github.com/krishvishal/cyper.git", rev = "cd75e266df6ab ], default-features = false } cyper-axum = { git = "https://github.com/krishvishal/cyper", rev = "cd75e266df6ab0a9b9474eb7dda1735650d17db6" } compio-net = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } +compio-quic = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } tokio-rustls = "0.26.2" toml = "0.9.2" tracing = "0.1.41" diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 0bb6da585b..3c65e7be9c 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -110,6 +110,7 @@ tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } compio = { workspace = true } +compio-quic = { workspace = true } tokio-rustls = { workspace = true } tokio-util = { workspace = true } toml = { workspace = true } diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index b54e25fb72..4e94f73510 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -27,9 +27,9 @@ use bytes::BytesMut; use compio::buf::{IoBuf, IoBufMut}; use compio::io::{AsyncReadExt, AsyncWriteExt}; use compio::net::TcpStream; +use compio_quic::{RecvStream, SendStream}; use iggy_common::IggyError; use nix::libc; -use quinn::{RecvStream, SendStream}; macro_rules! forward_async_methods { ( diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index f5cfc42952..45fabd67b0 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -24,73 +24,121 @@ use crate::server_error::ConnectionError; use crate::shard::IggyShard; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; +use crate::streaming::utils::random_id; use anyhow::anyhow; +use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use iggy_common::IggyError; -use quinn::{Connection, Endpoint, RecvStream, SendStream}; -use tracing::{error, info, trace}; +use tracing::{debug, error, info, trace}; const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; -//TODO: Fixme -/* -pub fn start(endpoint: Endpoint, system: SharedSystem) { - for _ in 0..LISTENERS_COUNT { +pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyError> { + info!("Starting QUIC listener with {} workers", LISTENERS_COUNT); + + for i in 0..LISTENERS_COUNT { let endpoint = endpoint.clone(); - let system = system.clone(); - //TODO: Fixme - /* - tokio::spawn(async move { - while let Some(incoming_connection) = endpoint.accept().await { - info!( - "Incoming connection from client: {}", - incoming_connection.remote_address() - ); - let system = system.clone(); - let incoming_connection = incoming_connection.accept(); - if incoming_connection.is_err() { - error!( - "Error when accepting incoming connection: {:?}", - incoming_connection - ); - continue; - } - let incoming_connection = incoming_connection.unwrap(); - tokio::spawn(async move { - if let Err(error) = handle_connection(incoming_connection, system).await { - error!("Connection has failed: {error}"); + let shard = shard.clone(); + let _ = compio::runtime::spawn(async move { + info!("Starting QUIC listener worker {}", i); + + loop { + match endpoint.wait_incoming().await { + Some(incoming_conn) => { + let shard = shard.clone(); + info!( + "Incoming connection from client: {}", + incoming_conn.remote_address() + ); + + let connection = match incoming_conn.await { + Ok(conn) => { + // Track successful connection establishment + shard.metrics.increment_quic_connections(); + conn + } + Err(error) => { + error!( + "QUIC connection acceptance failed on listener {}: {:?}", + i, error + ); + shard.metrics.increment_quic_errors(); + continue; + } + }; + + let remote_addr = connection.remote_address(); + let connection_id = random_id::get_ulid(); + + info!( + "QUIC connection {} established from {} on listener {}", + connection_id, remote_addr, i + ); + + // Spawn a task to handle this connection + let _ = compio::runtime::spawn(async move { + let start_time = std::time::Instant::now(); + + match handle_connection(connection, shard.clone()).await { + Ok(_) => { + let duration = start_time.elapsed(); + debug!( + "QUIC connection {} completed successfully in {} ms", + connection_id, + duration.as_millis() + ); + } + Err(error) => { + let duration = start_time.elapsed(); + error!( + "QUIC connection {} failed after {} ms: {error}", + connection_id, + duration.as_millis() + ); + shard.metrics.increment_quic_errors(); + } + } + + // Decrement connection count when connection ends + shard.metrics.decrement_quic_connections(); + debug!("QUIC connection {} closed", connection_id); + }); + } + None => { + // No incoming connection available, wait a bit before checking again + compio::time::sleep(std::time::Duration::from_millis(10)).await; } - }); + } } }); - */ + } + + // Keep the main task alive + loop { + compio::time::sleep(std::time::Duration::from_secs(1)).await; } } async fn handle_connection( - incoming_connection: quinn::Connecting, + connection: Connection, shard: Rc, ) -> Result<(), ConnectionError> { - let connection = incoming_connection.await?; let address = connection.remote_address(); info!("Client has connected: {address}"); - let session = system - .read() - .await - .add_client(&address, Transport::Quic) - .await; + let session = shard.add_client(&address, Transport::Quic); let client_id = session.client_id; - while let Some(stream) = accept_stream(&connection, &system, client_id).await? { - let system = system.clone(); + + while let Some(stream) = accept_stream(&connection, &shard, client_id).await? { + let shard = shard.clone(); let session = session.clone(); let handle_stream_task = async move { - if let Err(err) = handle_stream(stream, system, session).await { + if let Err(err) = handle_stream(stream, shard, session).await { error!("Error when handling QUIC stream: {:?}", err) } }; - let _handle = tokio::spawn(handle_stream_task); + let _ = compio::runtime::spawn(handle_stream_task); } Ok(()) } @@ -99,18 +147,18 @@ type BiStream = (SendStream, RecvStream); async fn accept_stream( connection: &Connection, - system: &SharedSystem, + shard: &Rc, client_id: u32, ) -> Result, ConnectionError> { match connection.accept_bi().await { - Err(quinn::ConnectionError::ApplicationClosed { .. }) => { + Err(compio_quic::ConnectionError::ApplicationClosed { .. }) => { info!("Connection closed"); - system.read().await.delete_client(client_id).await; + shard.delete_client(client_id).await; Ok(None) } Err(error) => { error!("Error when handling QUIC stream: {:?}", error); - system.read().await.delete_client(client_id).await; + shard.delete_client(client_id).await; Err(error.into()) } Ok(stream) => Ok(Some(stream)), @@ -119,21 +167,28 @@ async fn accept_stream( async fn handle_stream( stream: BiStream, - system: SharedSystem, - session: impl AsRef + std::fmt::Debug, + shard: Rc, + session: Rc, ) -> anyhow::Result<()> { let (send_stream, mut recv_stream) = stream; + let request_id = random_id::get_ulid(); + let start_time = std::time::Instant::now(); + + shard.metrics.increment_quic_requests(); let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; - recv_stream.read_exact(&mut length_buffer).await?; - recv_stream.read_exact(&mut code_buffer).await?; + recv_stream.read_exact(&mut length_buffer[..]).await?; + recv_stream.read_exact(&mut code_buffer[..]).await?; let length = u32::from_le_bytes(length_buffer); let code = u32::from_le_bytes(code_buffer); - trace!("Received a QUIC request, length: {length}, code: {code}"); + trace!( + "Processing QUIC request {} with code: {}, length: {}, session: {}", + request_id, code, length, session.client_id + ); let mut sender = SenderKind::get_quic_sender(send_stream, recv_stream); @@ -145,42 +200,43 @@ async fn handle_stream( } }; - // if let Err(e) = command.validate() { - // sender.send_error_response(e.clone()).await?; - // return Err(anyhow!("Command validation failed: {e}")); - // } - trace!("Received a QUIC command: {command}, payload size: {length}"); - match command - .handle(&mut sender, length, session.as_ref(), &system) - .await - { + match command.handle(&mut sender, length, &session, &shard).await { Ok(_) => { + let duration = start_time.elapsed(); trace!( - "Command was handled successfully, session: {:?}. QUIC response was sent.", - session + "QUIC request {} completed successfully in {} ms (session: {})", + request_id, + duration.as_millis(), + session.client_id ); Ok(()) } Err(e) => { + let duration = start_time.elapsed(); error!( - "Command was not handled successfully, session: {:?}, error: {e}.", - session + "QUIC request {} failed after {} ms (session: {}): {e}", + request_id, + duration.as_millis(), + session.client_id ); - // Only return a connection-terminating error for client not found + + shard.metrics.increment_quic_errors(); + if let IggyError::ClientNotFound(_) = e { sender.send_error_response(e.clone()).await?; - trace!("QUIC error response was sent."); - error!("Session will be deleted."); + trace!("QUIC error response sent for request {}", request_id); + error!( + "Session {} will be deleted due to client not found", + session.client_id + ); Err(anyhow!("Client not found: {e}")) } else { - // For all other errors, send response and continue the connection sender.send_error_response(e).await?; - trace!("QUIC error response was sent."); + trace!("QUIC error response sent for request {}", request_id); Ok(()) } } } } -*/ diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index 4c93e66ab7..b66ecd706e 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -19,13 +19,10 @@ use crate::quic::COMPONENT; use crate::streaming::utils::PooledBuffer; use crate::{binary::sender::Sender, server_error::ServerError}; -use bytes::BytesMut; -use compio::buf::{IoBuf, IoBufMut}; +use compio::buf::IoBufMut; +use compio_quic::{RecvStream, SendStream}; use error_set::ErrContext; use iggy_common::IggyError; -use nix::libc; -use quinn::{RecvStream, SendStream}; -use std::io::IoSlice; use tracing::{debug, error}; const STATUS_OK: &[u8] = &[0; 4]; @@ -37,19 +34,16 @@ pub struct QuicSender { } impl Sender for QuicSender { + /// Reads data from the QUIC stream directly into the buffer. async fn read(&mut self, buffer: B) -> (Result<(), IggyError>, B) { - //TODO: Fixme - // Not-so-nice code because quinn recv stream has different API for read_exact - /* - let read_bytes = buffer.len(); - self.recv.read_exact(buffer).await.map_err(|error| { - error!("Failed to read from the stream: {:?}", error); - IggyError::QuicError - })?; - */ - - //Ok(read_bytes) - todo!(); + let mut buffer = buffer; + match self.recv.read_exact(buffer.as_mut_slice()).await { + Ok(_) => (Ok(()), buffer), + Err(error) => { + error!("Failed to read from QUIC stream: {:?}", error); + (Err(IggyError::QuicError), buffer) + } + } } async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { @@ -87,8 +81,6 @@ impl Sender for QuicSender { let mut total_bytes_written = 0; - //TODO: Fixme - /* for slice in slices { let slice_data = &*slice; if !slice_data.is_empty() { @@ -103,7 +95,6 @@ impl Sender for QuicSender { total_bytes_written += slice_data.len(); } } - */ debug!( "Sent vectored response: {} bytes of payload", diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 0a8020b88b..40352729aa 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -19,85 +19,103 @@ use std::fs::File; use std::io::BufReader; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::Arc; use anyhow::Result; +use compio_quic::{Endpoint, IdleTimeout, ServerConfig, TransportConfig, VarInt}; use error_set::ErrContext; -use quinn::{Endpoint, IdleTimeout, VarInt}; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use tracing::info; +use tracing::{error, info}; use crate::configs::quic::QuicConfig; use crate::quic::COMPONENT; use crate::quic::listener; use crate::server_error::QuicError; +use crate::shard::IggyShard; -//TODO: Fixme -/* /// Starts the QUIC server. /// Returns the address the server is listening on. -pub fn start(config: QuicConfig, system: SharedSystem) -> SocketAddr { - info!("Initializing Iggy QUIC server..."); - let address = config.address.parse().unwrap(); - let quic_config = configure_quic(config); - if let Err(error) = quic_config { - panic!("Error when configuring QUIC: {error:?}"); +pub async fn start( + server_name: &'static str, + addr: SocketAddr, + config: &QuicConfig, + shard: Rc, +) -> Result<(), iggy_common::IggyError> { + if shard.id != 0 { + info!( + "QUIC server restricted to shard 0, skipping on shard {}", + shard.id + ); + return Ok(()); } - let endpoint = Endpoint::server(quic_config.unwrap(), address).unwrap(); - let addr = endpoint.local_addr().unwrap(); - listener::start(endpoint, system); - info!("Iggy QUIC server has started on: {:?}", addr); - addr + info!("Initializing Iggy QUIC server on shard 0..."); + + // Configure QUIC server + let server_config = configure_quic(config).map_err(|e| { + error!("Failed to configure QUIC: {:?}", e); + iggy_common::IggyError::QuicError + })?; + + // Create QUIC endpoint + let endpoint = Endpoint::server(addr, server_config).await.map_err(|e| { + error!("Failed to create QUIC endpoint: {:?}", e); + iggy_common::IggyError::CannotBindToSocket(addr.to_string()) + })?; + + let actual_addr = endpoint.local_addr().map_err(|e| { + error!("Failed to get local address: {e}"); + iggy_common::IggyError::CannotBindToSocket(addr.to_string()) + })?; + + info!("{} server has started on: {:?}", server_name, actual_addr); + + // Store the bound address (only shard 0 runs QUIC server) + shard.quic_bound_address.set(Some(actual_addr)); + + listener::start(endpoint, shard).await } -fn configure_quic(config: QuicConfig) -> Result { - let (certificate, key) = match config.certificate.self_signed { +fn configure_quic(config: &QuicConfig) -> Result { + let (certificates, private_key) = match config.certificate.self_signed { true => generate_self_signed_cert()?, false => load_certificates(&config.certificate.cert_file, &config.certificate.key_file)?, }; - let mut server_config = quinn::ServerConfig::with_single_cert(certificate, key) + let mut server_config = ServerConfig::with_single_cert(certificates, private_key) .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to create server config") }) .map_err(|_| QuicError::ConfigCreationError)?; - let mut transport = quinn::TransportConfig::default(); + + let mut transport = TransportConfig::default(); + + // Configure transport parameters transport.initial_mtu(config.initial_mtu.as_bytes_u64() as u16); transport.send_window(config.send_window.as_bytes_u64()); - transport.receive_window( - VarInt::try_from(config.receive_window.as_bytes_u64()) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - invalid receive window") - }) - .map_err(|_| QuicError::TransportConfigError)?, - ); + transport.receive_window(VarInt::from_u64(config.receive_window.as_bytes_u64()).unwrap()); transport.datagram_send_buffer_size(config.datagram_send_buffer_size.as_bytes_u64() as usize); - transport.max_concurrent_bidi_streams( - VarInt::try_from(config.max_concurrent_bidi_streams) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - invalid bidi stream limit") - }) - .map_err(|_| QuicError::TransportConfigError)?, - ); + transport + .max_concurrent_bidi_streams(VarInt::from_u64(config.max_concurrent_bidi_streams).unwrap()); + if !config.keep_alive_interval.is_zero() { transport.keep_alive_interval(Some(config.keep_alive_interval.get_duration())); } + if !config.max_idle_timeout.is_zero() { - let max_idle_timeout = IdleTimeout::try_from(config.max_idle_timeout.get_duration()) - .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - invalid idle timeout") - }) + // Create IdleTimeout from Duration - different API than quinn + let idle_timeout = IdleTimeout::try_from(config.max_idle_timeout.get_duration()) .map_err(|_| QuicError::TransportConfigError)?; - transport.max_idle_timeout(Some(max_idle_timeout)); + transport.max_idle_timeout(Some(idle_timeout)); } server_config.transport_config(Arc::new(transport)); Ok(server_config) } -fn generate_self_signed_cert<'a>() -> Result<(Vec>, PrivateKeyDer<'a>), QuicError> -{ +fn generate_self_signed_cert() +-> Result<(Vec>, PrivateKeyDer<'static>), QuicError> { iggy_common::generate_self_signed_certificate("localhost") .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to generate self-signed certificate") @@ -137,4 +155,3 @@ fn load_certificates( let key = keys.remove(0); Ok((certs, key)) } - */ diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index ca29a62753..dce3584f54 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -16,6 +16,7 @@ * under the License. */ +use compio_quic::ConnectionError as CompioQuicConnectionError; use error_set::error_set; use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, WriteError}; use rusty_s3::BucketError; @@ -74,6 +75,9 @@ error_set!( ConnectionError = { #[display("Connection error")] QuicConnectionError(QuicConnectionError), + + #[display("Compio QUIC connection error")] + CompioQuicConnectionError(CompioQuicConnectionError), } || IoError || CommonError; LogError = { diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 67a1704369..1723ae5587 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -135,6 +135,7 @@ impl IggyShardBuilder { task_registry: TaskRegistry::new(), is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), + quic_bound_address: Cell::new(None), streams2: Streams::init(), users: Default::default(), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 19af5fc5fb..63977a5f74 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -168,6 +168,7 @@ pub struct IggyShard { pub(crate) task_registry: TaskRegistry, pub(crate) is_shutting_down: AtomicBool, pub(crate) tcp_bound_address: Cell>, + pub(crate) quic_bound_address: Cell>, } impl IggyShard { @@ -221,6 +222,7 @@ impl IggyShard { task_registry: TaskRegistry::new(), is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), + quic_bound_address: Cell::new(None), }; let user = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); shard diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index c5e527d0b7..114d5d005b 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -89,15 +89,19 @@ impl IggyShard { .await? { ShardSendRequestResult::Recoil(message) => { - if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message - && let ShardRequestPayload::SendMessages { batch } = payload - { - topic.append_messages(partition_id, batch).await.with_error_context(|error| { - format!("{COMPONENT}: Failed to append messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") - }) + if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message { + if let ShardRequestPayload::SendMessages { batch } = payload { + topic.append_messages(partition_id, batch).await.with_error_context(|error| { + format!("{COMPONENT}: Failed to append messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") + }) + } else { + unreachable!( + "Expected a SendMessages request inside of SendMessages handler, impossible state" + ); + } } else { unreachable!( - "Expected a SendMessages request inside of SendMessages handler, impossible state" + "Expected a ShardMessage request inside of ShardMessage handler, impossible state" ); } } @@ -172,15 +176,19 @@ impl IggyShard { .await? { ShardSendRequestResult::Recoil(message) => { - if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message - && let ShardRequestPayload::PollMessages { consumer, args } = payload - { - topic.get_messages(consumer, partition_id, args.strategy, args.count).await.with_error_context(|error| { - format!("{COMPONENT}: Failed to get messages for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") - }) + if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message { + if let ShardRequestPayload::PollMessages { consumer, args } = payload { + topic.get_messages(consumer, partition_id, args.strategy, args.count).await.with_error_context(|error| { + format!("{COMPONENT}: Failed to get messages for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") + }) + } else { + unreachable!( + "Expected a PollMessages request inside of PollMessages handler, impossible state" + ); + } } else { unreachable!( - "Expected a PollMessages request inside of PollMessages handler, impossible state" + "Expected a ShardMessage request inside of ShardMessage handler, impossible state" ); } } diff --git a/core/server/src/streaming/diagnostics/metrics.rs b/core/server/src/streaming/diagnostics/metrics.rs index b5a602c57e..7d6530246b 100644 --- a/core/server/src/streaming/diagnostics/metrics.rs +++ b/core/server/src/streaming/diagnostics/metrics.rs @@ -26,6 +26,10 @@ use tracing::error; pub(crate) struct Metrics { registry: Registry, http_requests: Counter, + quic_connections: Gauge, + quic_requests: Counter, + quic_errors: Counter, + tcp_connections: Gauge, streams: Gauge, topics: Gauge, partitions: Gauge, @@ -40,6 +44,10 @@ impl Metrics { let mut metrics = Metrics { registry: ::default(), http_requests: Counter::default(), + quic_connections: Gauge::default(), + quic_requests: Counter::default(), + quic_errors: Counter::default(), + tcp_connections: Gauge::default(), streams: Gauge::default(), topics: Gauge::default(), partitions: Gauge::default(), @@ -50,6 +58,10 @@ impl Metrics { }; metrics.register_counter("http_requests", metrics.http_requests.clone()); + metrics.register_gauge("quic_connections", metrics.quic_connections.clone()); + metrics.register_counter("quic_requests", metrics.quic_requests.clone()); + metrics.register_counter("quic_errors", metrics.quic_errors.clone()); + metrics.register_gauge("tcp_connections", metrics.tcp_connections.clone()); metrics.register_gauge("streams", metrics.streams.clone()); metrics.register_gauge("topics", metrics.topics.clone()); metrics.register_gauge("partitions", metrics.partitions.clone()); @@ -83,6 +95,30 @@ impl Metrics { self.http_requests.inc(); } + pub fn increment_quic_connections(&self) { + self.quic_connections.inc(); + } + + pub fn decrement_quic_connections(&self) { + self.quic_connections.dec(); + } + + pub fn increment_quic_requests(&self) { + self.quic_requests.inc(); + } + + pub fn increment_quic_errors(&self) { + self.quic_errors.inc(); + } + + pub fn increment_tcp_connections(&self) { + self.tcp_connections.inc(); + } + + pub fn decrement_tcp_connections(&self) { + self.tcp_connections.dec(); + } + pub fn increment_streams(&self, count: u32) { self.streams.inc_by(count as i64); } From ead3a153a0f6fabe899bc5d293a001d2ec7a88b9 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Sun, 3 Aug 2025 12:50:32 +0530 Subject: [PATCH 02/15] fix review comments. --- core/server/src/quic/listener.rs | 15 +------- core/server/src/quic/quic_server.rs | 16 +++++---- core/server/src/server_error.rs | 10 ++---- core/server/src/shard/mod.rs | 5 +++ core/server/src/shard/system/messages.rs | 36 ++++++++----------- .../src/streaming/diagnostics/metrics.rs | 36 ------------------- 6 files changed, 32 insertions(+), 86 deletions(-) diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 45fabd67b0..dbae9dcece 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -52,17 +52,12 @@ pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyE ); let connection = match incoming_conn.await { - Ok(conn) => { - // Track successful connection establishment - shard.metrics.increment_quic_connections(); - conn - } + Ok(conn) => conn, Err(error) => { error!( "QUIC connection acceptance failed on listener {}: {:?}", i, error ); - shard.metrics.increment_quic_errors(); continue; } }; @@ -95,12 +90,8 @@ pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyE connection_id, duration.as_millis() ); - shard.metrics.increment_quic_errors(); } } - - // Decrement connection count when connection ends - shard.metrics.decrement_quic_connections(); debug!("QUIC connection {} closed", connection_id); }); } @@ -174,8 +165,6 @@ async fn handle_stream( let request_id = random_id::get_ulid(); let start_time = std::time::Instant::now(); - shard.metrics.increment_quic_requests(); - let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; @@ -222,8 +211,6 @@ async fn handle_stream( session.client_id ); - shard.metrics.increment_quic_errors(); - if let IggyError::ClientNotFound(_) = e { sender.send_error_response(e.clone()).await?; trace!("QUIC error response sent for request {}", request_id); diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 40352729aa..b9941942c6 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -36,12 +36,7 @@ use crate::shard::IggyShard; /// Starts the QUIC server. /// Returns the address the server is listening on. -pub async fn start( - server_name: &'static str, - addr: SocketAddr, - config: &QuicConfig, - shard: Rc, -) -> Result<(), iggy_common::IggyError> { +pub async fn start(shard: Rc) -> Result<(), iggy_common::IggyError> { if shard.id != 0 { info!( "QUIC server restricted to shard 0, skipping on shard {}", @@ -50,10 +45,17 @@ pub async fn start( return Ok(()); } + let server_name = "Iggy QUIC"; + let config = shard.config.quic.clone(); + let addr: SocketAddr = config + .address + .parse() + .expect("Failed to parse QUIC address"); + info!("Initializing Iggy QUIC server on shard 0..."); // Configure QUIC server - let server_config = configure_quic(config).map_err(|e| { + let server_config = configure_quic(&config).map_err(|e| { error!("Failed to configure QUIC: {:?}", e); iggy_common::IggyError::QuicError })?; diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index dce3584f54..6351d38725 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -16,9 +16,8 @@ * under the License. */ -use compio_quic::ConnectionError as CompioQuicConnectionError; +use compio_quic::{ConnectionError as QuicConnectionError, ReadError, WriteError}; use error_set::error_set; -use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, WriteError}; use rusty_s3::BucketError; use std::array::TryFromSliceError; use tokio::io; @@ -33,8 +32,8 @@ error_set!( #[display("Write error")] WriteError(WriteError), - #[display("Read to end error")] - ReadToEndError(ReadToEndError) + #[display("Read error")] + ReadToEndError(ReadError) }; ConfigError = { @@ -75,9 +74,6 @@ error_set!( ConnectionError = { #[display("Connection error")] QuicConnectionError(QuicConnectionError), - - #[display("Compio QUIC connection error")] - CompioQuicConnectionError(CompioQuicConnectionError), } || IoError || CommonError; LogError = { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 63977a5f74..6f6f3226b5 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -277,6 +277,11 @@ impl IggyShard { ))); } + if self.config.quic.enabled && self.id == 0 { + shard_info!(self.id, "Starting QUIC server on shard"); + tasks.push(Box::pin(crate::quic::quic_server::start(self.clone()))); + } + let stop_receiver = self.get_stop_receiver(); let shard_for_shutdown = self.clone(); diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 114d5d005b..c5e527d0b7 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -89,19 +89,15 @@ impl IggyShard { .await? { ShardSendRequestResult::Recoil(message) => { - if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message { - if let ShardRequestPayload::SendMessages { batch } = payload { - topic.append_messages(partition_id, batch).await.with_error_context(|error| { - format!("{COMPONENT}: Failed to append messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") - }) - } else { - unreachable!( - "Expected a SendMessages request inside of SendMessages handler, impossible state" - ); - } + if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message + && let ShardRequestPayload::SendMessages { batch } = payload + { + topic.append_messages(partition_id, batch).await.with_error_context(|error| { + format!("{COMPONENT}: Failed to append messages to stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") + }) } else { unreachable!( - "Expected a ShardMessage request inside of ShardMessage handler, impossible state" + "Expected a SendMessages request inside of SendMessages handler, impossible state" ); } } @@ -176,19 +172,15 @@ impl IggyShard { .await? { ShardSendRequestResult::Recoil(message) => { - if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message { - if let ShardRequestPayload::PollMessages { consumer, args } = payload { - topic.get_messages(consumer, partition_id, args.strategy, args.count).await.with_error_context(|error| { - format!("{COMPONENT}: Failed to get messages for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") - }) - } else { - unreachable!( - "Expected a PollMessages request inside of PollMessages handler, impossible state" - ); - } + if let ShardMessage::Request( ShardRequest { partition_id, payload, .. } ) = message + && let ShardRequestPayload::PollMessages { consumer, args } = payload + { + topic.get_messages(consumer, partition_id, args.strategy, args.count).await.with_error_context(|error| { + format!("{COMPONENT}: Failed to get messages for stream_id: {stream_id}, topic_id: {topic_id}, partition_id: {partition_id}, error: {error})") + }) } else { unreachable!( - "Expected a ShardMessage request inside of ShardMessage handler, impossible state" + "Expected a PollMessages request inside of PollMessages handler, impossible state" ); } } diff --git a/core/server/src/streaming/diagnostics/metrics.rs b/core/server/src/streaming/diagnostics/metrics.rs index 7d6530246b..b5a602c57e 100644 --- a/core/server/src/streaming/diagnostics/metrics.rs +++ b/core/server/src/streaming/diagnostics/metrics.rs @@ -26,10 +26,6 @@ use tracing::error; pub(crate) struct Metrics { registry: Registry, http_requests: Counter, - quic_connections: Gauge, - quic_requests: Counter, - quic_errors: Counter, - tcp_connections: Gauge, streams: Gauge, topics: Gauge, partitions: Gauge, @@ -44,10 +40,6 @@ impl Metrics { let mut metrics = Metrics { registry: ::default(), http_requests: Counter::default(), - quic_connections: Gauge::default(), - quic_requests: Counter::default(), - quic_errors: Counter::default(), - tcp_connections: Gauge::default(), streams: Gauge::default(), topics: Gauge::default(), partitions: Gauge::default(), @@ -58,10 +50,6 @@ impl Metrics { }; metrics.register_counter("http_requests", metrics.http_requests.clone()); - metrics.register_gauge("quic_connections", metrics.quic_connections.clone()); - metrics.register_counter("quic_requests", metrics.quic_requests.clone()); - metrics.register_counter("quic_errors", metrics.quic_errors.clone()); - metrics.register_gauge("tcp_connections", metrics.tcp_connections.clone()); metrics.register_gauge("streams", metrics.streams.clone()); metrics.register_gauge("topics", metrics.topics.clone()); metrics.register_gauge("partitions", metrics.partitions.clone()); @@ -95,30 +83,6 @@ impl Metrics { self.http_requests.inc(); } - pub fn increment_quic_connections(&self) { - self.quic_connections.inc(); - } - - pub fn decrement_quic_connections(&self) { - self.quic_connections.dec(); - } - - pub fn increment_quic_requests(&self) { - self.quic_requests.inc(); - } - - pub fn increment_quic_errors(&self) { - self.quic_errors.inc(); - } - - pub fn increment_tcp_connections(&self) { - self.tcp_connections.inc(); - } - - pub fn decrement_tcp_connections(&self) { - self.tcp_connections.dec(); - } - pub fn increment_streams(&self, count: u32) { self.streams.inc_by(count as i64); } From d9c11128b5a07feca8eba24f2858eb7e640d0a97 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Sun, 3 Aug 2025 12:54:36 +0530 Subject: [PATCH 03/15] remove quinn from server dependencies. --- Cargo.lock | 1 - core/server/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a8dd8f67f1..d586aeaf17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7486,7 +7486,6 @@ dependencies = [ "opentelemetry-semantic-conventions", "opentelemetry_sdk", "prometheus-client", - "quinn", "reqwest", "ring", "rustls", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 3c65e7be9c..320326eca0 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -94,7 +94,6 @@ opentelemetry_sdk = { version = "0.30.0", features = [ "experimental_trace_batch_span_processor_with_async_runtime", ] } prometheus-client = "0.23.1" -quinn = { workspace = true } reqwest = { workspace = true, features = ["rustls-tls-no-provider"] } ring = "0.17.14" rustls = { workspace = true } From 5bc0a2bb23f67c0b1fcd78fe4608a2eea2efefa4 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Sun, 3 Aug 2025 20:40:36 +0530 Subject: [PATCH 04/15] remove restriction of quic server only on shard 0. --- core/server/src/quic/quic_server.rs | 10 +--------- core/server/src/shard/mod.rs | 3 +-- 2 files changed, 2 insertions(+), 11 deletions(-) diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index b9941942c6..0a9ca5372e 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -37,14 +37,6 @@ use crate::shard::IggyShard; /// Starts the QUIC server. /// Returns the address the server is listening on. pub async fn start(shard: Rc) -> Result<(), iggy_common::IggyError> { - if shard.id != 0 { - info!( - "QUIC server restricted to shard 0, skipping on shard {}", - shard.id - ); - return Ok(()); - } - let server_name = "Iggy QUIC"; let config = shard.config.quic.clone(); let addr: SocketAddr = config @@ -52,7 +44,7 @@ pub async fn start(shard: Rc) -> Result<(), iggy_common::IggyError> { .parse() .expect("Failed to parse QUIC address"); - info!("Initializing Iggy QUIC server on shard 0..."); + info!("Initializing Iggy QUIC server on shard {}...", shard.id); // Configure QUIC server let server_config = configure_quic(&config).map_err(|e| { diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 6f6f3226b5..a512cd0eee 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -277,8 +277,7 @@ impl IggyShard { ))); } - if self.config.quic.enabled && self.id == 0 { - shard_info!(self.id, "Starting QUIC server on shard"); + if self.config.quic.enabled { tasks.push(Box::pin(crate::quic::quic_server::start(self.clone()))); } From 0dcfba08e0cf0e456e3fc84097c7051db01017db Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Mon, 4 Aug 2025 16:53:01 +0530 Subject: [PATCH 05/15] minor cleanup --- core/server/src/quic/listener.rs | 6 ---- core/server/src/quic/quic_server.rs | 46 ++++++++++++++--------------- 2 files changed, 22 insertions(+), 30 deletions(-) diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index dbae9dcece..6957cc8d85 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -41,7 +41,6 @@ pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyE let shard = shard.clone(); let _ = compio::runtime::spawn(async move { info!("Starting QUIC listener worker {}", i); - loop { match endpoint.wait_incoming().await { Some(incoming_conn) => { @@ -50,7 +49,6 @@ pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyE "Incoming connection from client: {}", incoming_conn.remote_address() ); - let connection = match incoming_conn.await { Ok(conn) => conn, Err(error) => { @@ -61,15 +59,12 @@ pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyE continue; } }; - let remote_addr = connection.remote_address(); let connection_id = random_id::get_ulid(); - info!( "QUIC connection {} established from {} on listener {}", connection_id, remote_addr, i ); - // Spawn a task to handle this connection let _ = compio::runtime::spawn(async move { let start_time = std::time::Instant::now(); @@ -103,7 +98,6 @@ pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyE } }); } - // Keep the main task alive loop { compio::time::sleep(std::time::Duration::from_secs(1)).await; diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 0a9ca5372e..d7a5f7f1c3 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -37,7 +37,6 @@ use crate::shard::IggyShard; /// Starts the QUIC server. /// Returns the address the server is listening on. pub async fn start(shard: Rc) -> Result<(), iggy_common::IggyError> { - let server_name = "Iggy QUIC"; let config = shard.config.quic.clone(); let addr: SocketAddr = config .address @@ -45,29 +44,20 @@ pub async fn start(shard: Rc) -> Result<(), iggy_common::IggyError> { .expect("Failed to parse QUIC address"); info!("Initializing Iggy QUIC server on shard {}...", shard.id); - - // Configure QUIC server let server_config = configure_quic(&config).map_err(|e| { error!("Failed to configure QUIC: {:?}", e); iggy_common::IggyError::QuicError })?; - - // Create QUIC endpoint let endpoint = Endpoint::server(addr, server_config).await.map_err(|e| { error!("Failed to create QUIC endpoint: {:?}", e); iggy_common::IggyError::CannotBindToSocket(addr.to_string()) })?; - let actual_addr = endpoint.local_addr().map_err(|e| { error!("Failed to get local address: {e}"); iggy_common::IggyError::CannotBindToSocket(addr.to_string()) })?; - - info!("{} server has started on: {:?}", server_name, actual_addr); - - // Store the bound address (only shard 0 runs QUIC server) + info!("Iggy QUIC server has started on: {:?}", actual_addr); shard.quic_bound_address.set(Some(actual_addr)); - listener::start(endpoint, shard).await } @@ -82,34 +72,42 @@ fn configure_quic(config: &QuicConfig) -> Result { format!("{COMPONENT} (error: {error}) - failed to create server config") }) .map_err(|_| QuicError::ConfigCreationError)?; - let mut transport = TransportConfig::default(); - - // Configure transport parameters transport.initial_mtu(config.initial_mtu.as_bytes_u64() as u16); transport.send_window(config.send_window.as_bytes_u64()); - transport.receive_window(VarInt::from_u64(config.receive_window.as_bytes_u64()).unwrap()); + transport.receive_window( + VarInt::try_from(config.receive_window.as_bytes_u64()) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - invalid receive window") + }) + .map_err(|_| QuicError::TransportConfigError)?, + ); transport.datagram_send_buffer_size(config.datagram_send_buffer_size.as_bytes_u64() as usize); - transport - .max_concurrent_bidi_streams(VarInt::from_u64(config.max_concurrent_bidi_streams).unwrap()); - + transport.max_concurrent_bidi_streams( + VarInt::try_from(config.max_concurrent_bidi_streams) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - invalid bidi stream limit") + }) + .map_err(|_| QuicError::TransportConfigError)?, + ); if !config.keep_alive_interval.is_zero() { transport.keep_alive_interval(Some(config.keep_alive_interval.get_duration())); } - if !config.max_idle_timeout.is_zero() { - // Create IdleTimeout from Duration - different API than quinn - let idle_timeout = IdleTimeout::try_from(config.max_idle_timeout.get_duration()) + let max_idle_timeout = IdleTimeout::try_from(config.max_idle_timeout.get_duration()) + .with_error_context(|error| { + format!("{COMPONENT} (error: {error}) - invalid idle timeout") + }) .map_err(|_| QuicError::TransportConfigError)?; - transport.max_idle_timeout(Some(idle_timeout)); + transport.max_idle_timeout(Some(max_idle_timeout)); } server_config.transport_config(Arc::new(transport)); Ok(server_config) } -fn generate_self_signed_cert() --> Result<(Vec>, PrivateKeyDer<'static>), QuicError> { +fn generate_self_signed_cert<'a>() -> Result<(Vec>, PrivateKeyDer<'a>), QuicError> +{ iggy_common::generate_self_signed_certificate("localhost") .with_error_context(|error| { format!("{COMPONENT} (error: {error}) - failed to generate self-signed certificate") From 730cbce430067a75a1bca33d34695810e9d5b575 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Wed, 6 Aug 2025 21:36:54 +0530 Subject: [PATCH 06/15] make quic listiner to orginal approach for while let instaed of loop --- core/server/src/quic/listener.rs | 121 +++++++++---------------------- 1 file changed, 36 insertions(+), 85 deletions(-) diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 6957cc8d85..c882c920f9 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -24,84 +24,43 @@ use crate::server_error::ConnectionError; use crate::shard::IggyShard; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; -use crate::streaming::utils::random_id; use anyhow::anyhow; use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use iggy_common::IggyError; -use tracing::{debug, error, info, trace}; +use tracing::{error, info, trace}; const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyError> { - info!("Starting QUIC listener with {} workers", LISTENERS_COUNT); - - for i in 0..LISTENERS_COUNT { + for _ in 0..LISTENERS_COUNT { let endpoint = endpoint.clone(); let shard = shard.clone(); let _ = compio::runtime::spawn(async move { - info!("Starting QUIC listener worker {}", i); - loop { - match endpoint.wait_incoming().await { - Some(incoming_conn) => { - let shard = shard.clone(); - info!( - "Incoming connection from client: {}", - incoming_conn.remote_address() - ); - let connection = match incoming_conn.await { - Ok(conn) => conn, - Err(error) => { - error!( - "QUIC connection acceptance failed on listener {}: {:?}", - i, error - ); - continue; - } - }; - let remote_addr = connection.remote_address(); - let connection_id = random_id::get_ulid(); - info!( - "QUIC connection {} established from {} on listener {}", - connection_id, remote_addr, i + while let Some(incoming_conn) = endpoint.wait_incoming().await { + let remote_addr = incoming_conn.remote_address(); + info!("Incoming connection from client: {}", remote_addr); + let shard = shard.clone(); + let connection = match incoming_conn.await { + Ok(conn) => conn, + Err(error) => { + error!( + "Error when accepting incoming connection from {}: {:?}", + remote_addr, error ); - // Spawn a task to handle this connection - let _ = compio::runtime::spawn(async move { - let start_time = std::time::Instant::now(); - - match handle_connection(connection, shard.clone()).await { - Ok(_) => { - let duration = start_time.elapsed(); - debug!( - "QUIC connection {} completed successfully in {} ms", - connection_id, - duration.as_millis() - ); - } - Err(error) => { - let duration = start_time.elapsed(); - error!( - "QUIC connection {} failed after {} ms: {error}", - connection_id, - duration.as_millis() - ); - } - } - debug!("QUIC connection {} closed", connection_id); - }); + continue; } - None => { - // No incoming connection available, wait a bit before checking again - compio::time::sleep(std::time::Duration::from_millis(10)).await; + }; + let _ = compio::runtime::spawn(async move { + let remote_addr = connection.remote_address(); + if let Err(error) = handle_connection(connection, shard).await { + error!("QUIC connection from {} has failed: {error}", remote_addr); } - } + }); } }); } - // Keep the main task alive - loop { - compio::time::sleep(std::time::Duration::from_secs(1)).await; - } + Ok(()) } async fn handle_connection( @@ -123,7 +82,7 @@ async fn handle_connection( error!("Error when handling QUIC stream: {:?}", err) } }; - let _ = compio::runtime::spawn(handle_stream_task); + let _handle = compio::runtime::spawn(handle_stream_task); } Ok(()) } @@ -156,8 +115,6 @@ async fn handle_stream( session: Rc, ) -> anyhow::Result<()> { let (send_stream, mut recv_stream) = stream; - let request_id = random_id::get_ulid(); - let start_time = std::time::Instant::now(); let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; @@ -168,10 +125,7 @@ async fn handle_stream( let length = u32::from_le_bytes(length_buffer); let code = u32::from_le_bytes(code_buffer); - trace!( - "Processing QUIC request {} with code: {}, length: {}, session: {}", - request_id, code, length, session.client_id - ); + trace!("Received a QUIC request, length: {length}, code: {code}"); let mut sender = SenderKind::get_quic_sender(send_stream, recv_stream); @@ -183,39 +137,36 @@ async fn handle_stream( } }; + if let Err(e) = command.validate() { + sender.send_error_response(e.clone()).await?; + return Err(anyhow!("Command validation failed: {e}")); + } + trace!("Received a QUIC command: {command}, payload size: {length}"); match command.handle(&mut sender, length, &session, &shard).await { Ok(_) => { - let duration = start_time.elapsed(); trace!( - "QUIC request {} completed successfully in {} ms (session: {})", - request_id, - duration.as_millis(), - session.client_id + "Command was handled successfully, session: {:?}. QUIC response was sent.", + session ); Ok(()) } Err(e) => { - let duration = start_time.elapsed(); error!( - "QUIC request {} failed after {} ms (session: {}): {e}", - request_id, - duration.as_millis(), - session.client_id + "Command was not handled successfully, session: {:?}, error: {e}.", + session ); - + // Only return a connection-terminating error for client not found if let IggyError::ClientNotFound(_) = e { sender.send_error_response(e.clone()).await?; - trace!("QUIC error response sent for request {}", request_id); - error!( - "Session {} will be deleted due to client not found", - session.client_id - ); + trace!("QUIC error response was sent."); + error!("Session will be deleted."); Err(anyhow!("Client not found: {e}")) } else { + // For all other errors, send response and continue the connection sender.send_error_response(e).await?; - trace!("QUIC error response sent for request {}", request_id); + trace!("QUIC error response was sent."); Ok(()) } } From bcd45ad3d30d4b72fd0367e92f7a466dabb48235 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Mon, 11 Aug 2025 02:30:06 +0530 Subject: [PATCH 07/15] fix issues during quic server and client benachmarks --- core/common/src/certificates.rs | 11 +++--- core/server/src/quic/listener.rs | 47 ++++++++++++++----------- core/server/src/quic/quic_server.rs | 54 ++++++++++++++++++++++------- core/server/src/shard/mod.rs | 2 +- 4 files changed, 77 insertions(+), 37 deletions(-) diff --git a/core/common/src/certificates.rs b/core/common/src/certificates.rs index f697b776d5..94febb6071 100644 --- a/core/common/src/certificates.rs +++ b/core/common/src/certificates.rs @@ -23,9 +23,12 @@ use rustls::pki_types::{CertificateDer, PrivateKeyDer}; pub fn generate_self_signed_certificate( domain: &str, ) -> Result<(Vec>, PrivateKeyDer<'static>), Box> { - let cert = rcgen::generate_simple_self_signed(vec![domain.to_string()])?; - let cert_der = cert.cert.der(); - let key_der = cert.signing_key.serialize_der(); + let certified_key = rcgen::generate_simple_self_signed(vec![domain.to_string()])?; + let cert_der = certified_key.cert.der().to_vec(); + let key_der = certified_key.signing_key.serialize_der(); + + let cert_der = CertificateDer::from(cert_der); let key = PrivateKeyDer::try_from(key_der)?; - Ok((vec![cert_der.clone()], key)) + + Ok((vec![cert_der], key)) } diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index c882c920f9..f23368a280 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -33,32 +33,39 @@ const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyError> { - for _ in 0..LISTENERS_COUNT { + info!("Starting QUIC listener with {} workers", LISTENERS_COUNT); + for i in 0..LISTENERS_COUNT { let endpoint = endpoint.clone(); let shard = shard.clone(); - let _ = compio::runtime::spawn(async move { + + compio::runtime::spawn(async move { + trace!("QUIC listener worker {} waiting for incoming connections...",i); while let Some(incoming_conn) = endpoint.wait_incoming().await { let remote_addr = incoming_conn.remote_address(); - info!("Incoming connection from client: {}", remote_addr); + trace!("Incoming connection from client: {}", remote_addr); let shard = shard.clone(); - let connection = match incoming_conn.await { - Ok(conn) => conn, - Err(error) => { - error!( - "Error when accepting incoming connection from {}: {:?}", - remote_addr, error - ); - continue; - } - }; - let _ = compio::runtime::spawn(async move { - let remote_addr = connection.remote_address(); - if let Err(error) = handle_connection(connection, shard).await { - error!("QUIC connection from {} has failed: {error}", remote_addr); + compio::runtime::spawn(async move { + trace!("Accepting connection from {}", remote_addr); + match incoming_conn.await { + Ok(connection) => { + trace!("Connection established from {}", remote_addr); + if let Err(error) = handle_connection(connection, shard).await { + error!("QUIC connection from {} has failed: {error}", remote_addr); + } + } + Err(error) => { + error!( + "Error when accepting incoming connection from {}: {:?}", + remote_addr, error + ); + } } - }); + }) + .detach(); } - }); + info!("QUIC listener worker {} stopped", i); + }) + .detach(); } Ok(()) } @@ -82,7 +89,7 @@ async fn handle_connection( error!("Error when handling QUIC stream: {:?}", err) } }; - let _handle = compio::runtime::spawn(handle_stream_task); + let _handle = compio::runtime::spawn(handle_stream_task).detach(); } Ok(()) } diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index d7a5f7f1c3..0c3e826af5 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -23,10 +23,13 @@ use std::rc::Rc; use std::sync::Arc; use anyhow::Result; -use compio_quic::{Endpoint, IdleTimeout, ServerConfig, TransportConfig, VarInt}; +use compio_quic::{ + Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, TransportConfig, VarInt, +}; use error_set::ErrContext; +use rustls::crypto::ring::default_provider; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use tracing::{error, info}; +use tracing::{error, info, trace}; use crate::configs::quic::QuicConfig; use crate::quic::COMPONENT; @@ -36,26 +39,51 @@ use crate::shard::IggyShard; /// Starts the QUIC server. /// Returns the address the server is listening on. -pub async fn start(shard: Rc) -> Result<(), iggy_common::IggyError> { - let config = shard.config.quic.clone(); - let addr: SocketAddr = config - .address - .parse() - .expect("Failed to parse QUIC address"); +pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::IggyError> { + if let Err(e) = default_provider().install_default() { + error!("Failed to install crypto provider: {:?}", e); + return Err(iggy_common::IggyError::QuicError); + } + let config = shard.config.quic.clone(); + let addr: SocketAddr = config.address.parse().map_err(|e| { + error!("Failed to parse QUIC address '{}': {}", config.address, e); + iggy_common::IggyError::QuicError + })?; info!("Initializing Iggy QUIC server on shard {}...", shard.id); + let server_config = configure_quic(&config).map_err(|e| { error!("Failed to configure QUIC: {:?}", e); iggy_common::IggyError::QuicError })?; - let endpoint = Endpoint::server(addr, server_config).await.map_err(|e| { - error!("Failed to create QUIC endpoint: {:?}", e); + trace!("Binding UDP socket on {}", addr); + + let std_socket = std::net::UdpSocket::bind(addr).map_err(|e| { + error!("Failed to bind UDP socket: {}", e); iggy_common::IggyError::CannotBindToSocket(addr.to_string()) })?; + std_socket.set_nonblocking(true).map_err(|e| { + error!("Failed to set socket to nonblocking mode: {}", e); + iggy_common::IggyError::QuicError + })?; + + let socket = compio_net::UdpSocket::from_std(std_socket).map_err(|e| { + error!("Failed to convert std socket to compio socket: {:?}", e); + iggy_common::IggyError::QuicError + })?; + trace!("Creating QUIC endpoint with server config"); + + let endpoint = Endpoint::new(socket, EndpointConfig::default(), Some(server_config), None) + .map_err(|e| { + error!("Failed to create QUIC endpoint: {:?}", e); + iggy_common::IggyError::QuicError + })?; + let actual_addr = endpoint.local_addr().map_err(|e| { error!("Failed to get local address: {e}"); iggy_common::IggyError::CannotBindToSocket(addr.to_string()) })?; + info!("Iggy QUIC server has started on: {:?}", actual_addr); shard.quic_bound_address.set(Some(actual_addr)); listener::start(endpoint, shard).await @@ -67,9 +95,9 @@ fn configure_quic(config: &QuicConfig) -> Result { false => load_certificates(&config.certificate.cert_file, &config.certificate.key_file)?, }; - let mut server_config = ServerConfig::with_single_cert(certificates, private_key) + let mut builder = ServerBuilder::new_with_single_cert(certificates, private_key) .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to create server config") + format!("{COMPONENT} (error: {error}) - failed to create QUIC server builder") }) .map_err(|_| QuicError::ConfigCreationError)?; let mut transport = TransportConfig::default(); @@ -90,6 +118,7 @@ fn configure_quic(config: &QuicConfig) -> Result { }) .map_err(|_| QuicError::TransportConfigError)?, ); + if !config.keep_alive_interval.is_zero() { transport.keep_alive_interval(Some(config.keep_alive_interval.get_duration())); } @@ -102,6 +131,7 @@ fn configure_quic(config: &QuicConfig) -> Result { transport.max_idle_timeout(Some(max_idle_timeout)); } + let mut server_config = builder.build(); server_config.transport_config(Arc::new(transport)); Ok(server_config) } diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index a512cd0eee..6120b45ee1 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -278,7 +278,7 @@ impl IggyShard { } if self.config.quic.enabled { - tasks.push(Box::pin(crate::quic::quic_server::start(self.clone()))); + tasks.push(Box::pin(crate::quic::quic_server::span_quic_server(self.clone()))); } let stop_receiver = self.get_stop_receiver(); From 858247af00758172f8f6f4f65c31f53a2f2d9a67 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Mon, 11 Aug 2025 18:52:47 +0530 Subject: [PATCH 08/15] skip cryptoprovider installing if its already present. --- core/server/src/quic/quic_server.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 0c3e826af5..fd9256cae9 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -40,9 +40,15 @@ use crate::shard::IggyShard; /// Starts the QUIC server. /// Returns the address the server is listening on. pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::IggyError> { - if let Err(e) = default_provider().install_default() { - error!("Failed to install crypto provider: {:?}", e); - return Err(iggy_common::IggyError::QuicError); + // Ensure crypto provider is installed (ignore if already installed) + if rustls::crypto::CryptoProvider::get_default().is_none() { + default_provider().install_default() + .map_err(|e| { + error!("Failed to install crypto provider: {:?}", e); + iggy_common::IggyError::QuicError + })?; + } else { + trace!("Crypto provider already installed"); } let config = shard.config.quic.clone(); From 55c8c7f2d566ef043df3fb31e2fd8271c4f87c60 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 12 Aug 2025 00:28:27 +0530 Subject: [PATCH 09/15] improve logging message. --- core/server/src/quic/quic_server.rs | 25 +++++++++++++------------ 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index fd9256cae9..1e260f2e10 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -22,6 +22,11 @@ use std::net::SocketAddr; use std::rc::Rc; use std::sync::Arc; +use crate::configs::quic::QuicConfig; +use crate::quic::COMPONENT; +use crate::quic::listener; +use crate::server_error::QuicError; +use crate::shard::IggyShard; use anyhow::Result; use compio_quic::{ Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, TransportConfig, VarInt, @@ -29,24 +34,20 @@ use compio_quic::{ use error_set::ErrContext; use rustls::crypto::ring::default_provider; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; +use tracing::log::warn; use tracing::{error, info, trace}; -use crate::configs::quic::QuicConfig; -use crate::quic::COMPONENT; -use crate::quic::listener; -use crate::server_error::QuicError; -use crate::shard::IggyShard; - /// Starts the QUIC server. /// Returns the address the server is listening on. pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::IggyError> { - // Ensure crypto provider is installed (ignore if already installed) if rustls::crypto::CryptoProvider::get_default().is_none() { - default_provider().install_default() - .map_err(|e| { - error!("Failed to install crypto provider: {:?}", e); - iggy_common::IggyError::QuicError - })?; + if let Err(e) = default_provider().install_default() { + warn!( + "Failed to install rustls crypto provider. Error: {:?}. This may be normal if another \ + thread installed it first.", + e + ); + } } else { trace!("Crypto provider already installed"); } From e57e292ff9bff9569e8d7457d88c83bcdba96d21 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 12 Aug 2025 00:30:17 +0530 Subject: [PATCH 10/15] remove / in logging message. --- core/server/src/quic/quic_server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 1e260f2e10..59b7769e0c 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -43,7 +43,7 @@ pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::I if rustls::crypto::CryptoProvider::get_default().is_none() { if let Err(e) = default_provider().install_default() { warn!( - "Failed to install rustls crypto provider. Error: {:?}. This may be normal if another \ + "Failed to install rustls crypto provider. Error: {:?}. This may be normal if another thread installed it first.", e ); From 1b59a3473f7c142a035119ee213f14c9ec4916ab Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 12 Aug 2025 13:57:12 +0530 Subject: [PATCH 11/15] fix shards to share same udp and one worker per shard --- core/configs/server.toml | 14 ++++++++ core/server/src/configs/defaults.rs | 14 +++++++- core/server/src/configs/quic.rs | 9 +++++ core/server/src/quic/listener.rs | 51 ++++++++++++++--------------- core/server/src/quic/mod.rs | 1 + core/server/src/quic/quic_server.rs | 45 +++++++++++++++---------- foreign/cpp/tests/e2e/server.toml | 14 ++++++++ 7 files changed, 102 insertions(+), 46 deletions(-) diff --git a/core/configs/server.toml b/core/configs/server.toml index 092dbcad28..8512831440 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -271,6 +271,20 @@ cert_file = "core/certs/iggy_cert.pem" # Path to the QUIC TLS key file. key_file = "core/certs/iggy_key.pem" +# Configuration for the QUIC socket +[quic.socket] +# Whether to override the OS-default socket parameters +override_defaults = false + +# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS +recv_buffer_size = "64 KB" + +# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS +send_buffer_size = "64 KB" + +# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +keepalive = false + # Message cleaner configuration. [message_cleaner] # Enables or disables the background process for deleting expired messages. diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index 01f7140b7d..1bdd5ff659 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -22,7 +22,7 @@ use super::tcp::TcpSocketConfig; use crate::configs::http::{ HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig, }; -use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; +use crate::configs::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; use crate::configs::server::{ ArchiverConfig, DataMaintenanceConfig, HeartbeatConfig, MessageSaverConfig, MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, @@ -124,6 +124,18 @@ impl Default for QuicConfig { keep_alive_interval: SERVER_CONFIG.quic.keep_alive_interval.parse().unwrap(), max_idle_timeout: SERVER_CONFIG.quic.max_idle_timeout.parse().unwrap(), certificate: QuicCertificateConfig::default(), + socket: QuicSocketConfig::default(), + } + } +} + +impl Default for QuicSocketConfig { + fn default() -> QuicSocketConfig { + QuicSocketConfig { + override_defaults: SERVER_CONFIG.quic.socket.override_defaults, + recv_buffer_size: SERVER_CONFIG.quic.socket.recv_buffer_size.parse().unwrap(), + send_buffer_size: SERVER_CONFIG.quic.socket.send_buffer_size.parse().unwrap(), + keepalive: SERVER_CONFIG.quic.socket.keepalive, } } } diff --git a/core/server/src/configs/quic.rs b/core/server/src/configs/quic.rs index 55a5920b04..f2cedd6dea 100644 --- a/core/server/src/configs/quic.rs +++ b/core/server/src/configs/quic.rs @@ -37,6 +37,15 @@ pub struct QuicConfig { #[serde_as(as = "DisplayFromStr")] pub max_idle_timeout: IggyDuration, pub certificate: QuicCertificateConfig, + pub socket: QuicSocketConfig, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct QuicSocketConfig { + pub override_defaults: bool, + pub recv_buffer_size: IggyByteSize, + pub send_buffer_size: IggyByteSize, + pub keepalive: bool, } #[derive(Debug, Deserialize, Serialize, Clone)] diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index f23368a280..8fdf24931e 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -29,44 +29,41 @@ use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use iggy_common::IggyError; use tracing::{error, info, trace}; -const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyError> { - info!("Starting QUIC listener with {} workers", LISTENERS_COUNT); - for i in 0..LISTENERS_COUNT { - let endpoint = endpoint.clone(); + info!("Starting QUIC listener for shard {}", shard.id); + + // Since the QUIC Endpoint is internally Arc-wrapped and can be shared, + // we only need one worker per shard rather than multiple workers per endpoint. + // This avoids the N×workers multiplication when multiple shards are used. + while let Some(incoming_conn) = endpoint.wait_incoming().await { + let remote_addr = incoming_conn.remote_address(); + trace!("Incoming connection from client: {}", remote_addr); let shard = shard.clone(); + // Spawn each connection handler independently to maintain concurrency compio::runtime::spawn(async move { - trace!("QUIC listener worker {} waiting for incoming connections...",i); - while let Some(incoming_conn) = endpoint.wait_incoming().await { - let remote_addr = incoming_conn.remote_address(); - trace!("Incoming connection from client: {}", remote_addr); - let shard = shard.clone(); - compio::runtime::spawn(async move { - trace!("Accepting connection from {}", remote_addr); - match incoming_conn.await { - Ok(connection) => { - trace!("Connection established from {}", remote_addr); - if let Err(error) = handle_connection(connection, shard).await { - error!("QUIC connection from {} has failed: {error}", remote_addr); - } - } - Err(error) => { - error!( - "Error when accepting incoming connection from {}: {:?}", - remote_addr, error - ); - } + trace!("Accepting connection from {}", remote_addr); + match incoming_conn.await { + Ok(connection) => { + trace!("Connection established from {}", remote_addr); + if let Err(error) = handle_connection(connection, shard).await { + error!("QUIC connection from {} has failed: {error}", remote_addr); } - }) - .detach(); + } + Err(error) => { + error!( + "Error when accepting incoming connection from {}: {:?}", + remote_addr, error + ); + } } - info!("QUIC listener worker {} stopped", i); }) .detach(); } + + info!("QUIC listener for shard {} stopped", shard.id); Ok(()) } diff --git a/core/server/src/quic/mod.rs b/core/server/src/quic/mod.rs index 40db7fdfbf..27b597f684 100644 --- a/core/server/src/quic/mod.rs +++ b/core/server/src/quic/mod.rs @@ -19,5 +19,6 @@ mod listener; pub mod quic_sender; pub mod quic_server; +pub mod quic_socket; pub const COMPONENT: &str = "QUIC"; diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 59b7769e0c..6a0255adc7 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -22,11 +22,6 @@ use std::net::SocketAddr; use std::rc::Rc; use std::sync::Arc; -use crate::configs::quic::QuicConfig; -use crate::quic::COMPONENT; -use crate::quic::listener; -use crate::server_error::QuicError; -use crate::shard::IggyShard; use anyhow::Result; use compio_quic::{ Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, TransportConfig, VarInt, @@ -34,22 +29,28 @@ use compio_quic::{ use error_set::ErrContext; use rustls::crypto::ring::default_provider; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use tracing::log::warn; -use tracing::{error, info, trace}; +use tracing::{error, info, trace, warn}; + +use crate::configs::quic::QuicConfig; +use crate::quic::{COMPONENT, listener, quic_socket}; +use crate::server_error::QuicError; +use crate::shard::IggyShard; /// Starts the QUIC server. /// Returns the address the server is listening on. pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::IggyError> { + // Ensure rustls crypto provider is installed (thread-safe, idempotent) if rustls::crypto::CryptoProvider::get_default().is_none() { if let Err(e) = default_provider().install_default() { warn!( - "Failed to install rustls crypto provider. Error: {:?}. This may be normal if another - thread installed it first.", + "Failed to install rustls crypto provider: {:?}. This may be normal if another thread installed it first.", e ); + } else { + trace!("Rustls crypto provider installed successfully"); } } else { - trace!("Crypto provider already installed"); + trace!("Rustls crypto provider already installed"); } let config = shard.config.quic.clone(); @@ -57,23 +58,28 @@ pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::I error!("Failed to parse QUIC address '{}': {}", config.address, e); iggy_common::IggyError::QuicError })?; - info!("Initializing Iggy QUIC server on shard {}...", shard.id); + info!( + "Initializing Iggy QUIC server on shard {} for address {}", + shard.id, addr + ); let server_config = configure_quic(&config).map_err(|e| { - error!("Failed to configure QUIC: {:?}", e); + error!("Failed to configure QUIC server: {:?}", e); iggy_common::IggyError::QuicError })?; - trace!("Binding UDP socket on {}", addr); + trace!("Building UDP socket for QUIC endpoint on {}", addr); - let std_socket = std::net::UdpSocket::bind(addr).map_err(|e| { - error!("Failed to bind UDP socket: {}", e); + let socket = quic_socket::build(&addr, &config.socket); + socket.bind(&addr.into()).map_err(|e| { + error!("Failed to bind socket: {}", e); iggy_common::IggyError::CannotBindToSocket(addr.to_string()) })?; - std_socket.set_nonblocking(true).map_err(|e| { - error!("Failed to set socket to nonblocking mode: {}", e); + socket.set_nonblocking(true).map_err(|e| { + error!("Failed to set nonblocking: {}", e); iggy_common::IggyError::QuicError })?; + let std_socket: std::net::UdpSocket = socket.into(); let socket = compio_net::UdpSocket::from_std(std_socket).map_err(|e| { error!("Failed to convert std socket to compio socket: {:?}", e); iggy_common::IggyError::QuicError @@ -91,7 +97,10 @@ pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::I iggy_common::IggyError::CannotBindToSocket(addr.to_string()) })?; - info!("Iggy QUIC server has started on: {:?}", actual_addr); + info!( + "Iggy QUIC server has started for shard {} on {}", + shard.id, actual_addr + ); shard.quic_bound_address.set(Some(actual_addr)); listener::start(endpoint, shard).await } diff --git a/foreign/cpp/tests/e2e/server.toml b/foreign/cpp/tests/e2e/server.toml index cc8242dd10..01311d484d 100644 --- a/foreign/cpp/tests/e2e/server.toml +++ b/foreign/cpp/tests/e2e/server.toml @@ -178,6 +178,20 @@ cert_file = "core/certs/iggy_cert.pem" # Path to the QUIC TLS key file. key_file = "core/certs/iggy_key.pem" +# Configuration for the QUIC socket +[quic.socket] +# Whether to override the OS-default socket parameters +override_defaults = false + +# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS +recv_buffer_size = "64 KB" + +# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS +send_buffer_size = "64 KB" + +# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +keepalive = false + # Message cleaner configuration. [message_cleaner] # Enables or disables the background process for deleting expired messages. From 350d1b441b3e49e4cfc29668145e78ff4cc767dc Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Wed, 13 Aug 2025 00:24:44 +0530 Subject: [PATCH 12/15] broadcast sessions to all shards. --- core/server/src/quic/listener.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 8fdf24931e..2c29a3a95e 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -22,8 +22,10 @@ use crate::binary::command::{ServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::server_error::ConnectionError; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; +use crate::{shard_debug, shard_info}; use anyhow::anyhow; use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use iggy_common::IggyError; @@ -76,6 +78,21 @@ async fn handle_connection( let session = shard.add_client(&address, Transport::Quic); let client_id = session.client_id; + shard_debug!( + shard.id, + "Added {} client with session: {} for IP address: {}", + Transport::Quic, + session, + address + ); + + // Add session to active sessions and broadcast to all shards + shard.add_active_session(session.clone()); + let event = ShardEvent::NewSession { + address, + transport: Transport::Quic, + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; while let Some(stream) = accept_stream(&connection, &shard, client_id).await? { let shard = shard.clone(); From d4bbcf0ba90e47f6078f73a2f3b1338c22c21ee0 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Mon, 18 Aug 2025 00:42:09 +0530 Subject: [PATCH 13/15] add back compio module. dopped accidentally during merge conflict. --- Cargo.lock | 1 + core/server/Cargo.toml | 1 + 2 files changed, 2 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 28db9b2d95..8aa0c42b11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7514,6 +7514,7 @@ dependencies = [ "opentelemetry_sdk", "papaya", "prometheus-client", + "quinn", "reqwest", "ring", "rustls", diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 081c5d68b5..dfb1c8ad55 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -50,6 +50,7 @@ clap = { workspace = true } cyper = { workspace = true } cyper-axum = { workspace = true } compio-net = { workspace = true } +compio-quic = { workspace = true } console-subscriber = { workspace = true, optional = true } crossbeam = { workspace = true } ctrlc = { version = "3.4", features = ["termination"] } From 4625d451f6cf9c1bd49a172699273c2d4c947e2e Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Tue, 19 Aug 2025 11:57:21 +0530 Subject: [PATCH 14/15] add quic socket. --- core/server/src/quic/quic_socket.rs | 66 +++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 core/server/src/quic/quic_socket.rs diff --git a/core/server/src/quic/quic_socket.rs b/core/server/src/quic/quic_socket.rs new file mode 100644 index 0000000000..eeda103e9f --- /dev/null +++ b/core/server/src/quic/quic_socket.rs @@ -0,0 +1,66 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::SocketAddr; +use std::num::TryFromIntError; + +use crate::configs::quic::QuicSocketConfig; + +/// Build a UDP socket for the given address and configure the options that are +/// required by the server +pub fn build(addr: &SocketAddr, config: &QuicSocketConfig) -> Socket { + // Choose the correct address family based on the target address + let socket = Socket::new(Domain::for_address(*addr), Type::DGRAM, Some(Protocol::UDP)) + .expect("Unable to create a UDP socket"); + + // Allow multiple sockets (shards) to bind to the same address + socket + .set_reuse_address(true) + .expect("Unable to set SO_REUSEADDR on socket"); + + // SO_REUSEPORT is only available on Unix-like systems + #[cfg(unix)] + socket + .set_reuse_port(true) + .expect("Unable to set SO_REUSEPORT on socket"); + + // Configure socket buffer sizes and keepalive if override is enabled + if config.override_defaults { + config + .recv_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size| socket.set_recv_buffer_size(size)) + .expect("Unable to set SO_RCVBUF on socket"); + + config + .send_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size| socket.set_send_buffer_size(size)) + .expect("Unable to set SO_SNDBUF on socket"); + + socket + .set_keepalive(config.keepalive) + .expect("Unable to set SO_KEEPALIVE on socket"); + } + + socket +} From 99efef70bbe61d682624137f679fe0dd8f5a4d36 Mon Sep 17 00:00:00 2001 From: Jagadesh Adireddi Date: Wed, 20 Aug 2025 19:52:12 +0530 Subject: [PATCH 15/15] fix as per code revieew suggestion. --- core/server/src/quic/listener.rs | 5 ----- core/server/src/quic/quic_sender.rs | 13 ++++++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index 2c29a3a95e..f9dfa471f5 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -158,11 +158,6 @@ async fn handle_stream( } }; - if let Err(e) = command.validate() { - sender.send_error_response(e.clone()).await?; - return Err(anyhow!("Command validation failed: {e}")); - } - trace!("Received a QUIC command: {command}, payload size: {length}"); match command.handle(&mut sender, length, &session, &shard).await { diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index b66ecd706e..f0a3cb0353 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -19,7 +19,9 @@ use crate::quic::COMPONENT; use crate::streaming::utils::PooledBuffer; use crate::{binary::sender::Sender, server_error::ServerError}; +use compio::BufResult; use compio::buf::IoBufMut; +use compio::io::AsyncReadExt; use compio_quic::{RecvStream, SendStream}; use error_set::ErrContext; use iggy_common::IggyError; @@ -36,11 +38,12 @@ pub struct QuicSender { impl Sender for QuicSender { /// Reads data from the QUIC stream directly into the buffer. async fn read(&mut self, buffer: B) -> (Result<(), IggyError>, B) { - let mut buffer = buffer; - match self.recv.read_exact(buffer.as_mut_slice()).await { - Ok(_) => (Ok(()), buffer), - Err(error) => { - error!("Failed to read from QUIC stream: {:?}", error); + let BufResult(result, buffer) = + ::read_exact(&mut self.recv, buffer).await; + match (result, buffer) { + (Ok(_), buffer) => (Ok(()), buffer), + (Err(error), buffer) => { + error!("Failed to read from the stream: {:?}", error); (Err(IggyError::QuicError), buffer) } }