diff --git a/Cargo.lock b/Cargo.lock index 37dbdb8f26..8aa0c42b11 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7482,6 +7482,7 @@ dependencies = [ "clap", "compio", "compio-net", + "compio-quic", "console-subscriber", "crossbeam", "ctrlc", diff --git a/Cargo.toml b/Cargo.toml index 6c209860ba..8656406ad7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -133,6 +133,7 @@ cyper = { git = "https://github.com/krishvishal/cyper.git", rev = "cd75e266df6ab ], default-features = false } cyper-axum = { git = "https://github.com/krishvishal/cyper", rev = "cd75e266df6ab0a9b9474eb7dda1735650d17db6" } compio-net = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } +compio-quic = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" } tokio-rustls = "0.26.2" toml = "0.9.4" tracing = "0.1.41" diff --git a/core/common/src/certificates.rs b/core/common/src/certificates.rs index f697b776d5..94febb6071 100644 --- a/core/common/src/certificates.rs +++ b/core/common/src/certificates.rs @@ -23,9 +23,12 @@ use rustls::pki_types::{CertificateDer, PrivateKeyDer}; pub fn generate_self_signed_certificate( domain: &str, ) -> Result<(Vec>, PrivateKeyDer<'static>), Box> { - let cert = rcgen::generate_simple_self_signed(vec![domain.to_string()])?; - let cert_der = cert.cert.der(); - let key_der = cert.signing_key.serialize_der(); + let certified_key = rcgen::generate_simple_self_signed(vec![domain.to_string()])?; + let cert_der = certified_key.cert.der().to_vec(); + let key_der = certified_key.signing_key.serialize_der(); + + let cert_der = CertificateDer::from(cert_der); let key = PrivateKeyDer::try_from(key_der)?; - Ok((vec![cert_der.clone()], key)) + + Ok((vec![cert_der], key)) } diff --git a/core/configs/server.toml b/core/configs/server.toml index 092dbcad28..8512831440 100644 --- a/core/configs/server.toml +++ b/core/configs/server.toml @@ -271,6 +271,20 @@ cert_file = "core/certs/iggy_cert.pem" # Path to the QUIC TLS key file. key_file = "core/certs/iggy_key.pem" +# Configuration for the QUIC socket +[quic.socket] +# Whether to override the OS-default socket parameters +override_defaults = false + +# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS +recv_buffer_size = "64 KB" + +# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS +send_buffer_size = "64 KB" + +# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +keepalive = false + # Message cleaner configuration. [message_cleaner] # Enables or disables the background process for deleting expired messages. diff --git a/core/server/Cargo.toml b/core/server/Cargo.toml index 35693903eb..dfb1c8ad55 100644 --- a/core/server/Cargo.toml +++ b/core/server/Cargo.toml @@ -50,6 +50,7 @@ clap = { workspace = true } cyper = { workspace = true } cyper-axum = { workspace = true } compio-net = { workspace = true } +compio-quic = { workspace = true } console-subscriber = { workspace = true, optional = true } crossbeam = { workspace = true } ctrlc = { version = "3.4", features = ["termination"] } @@ -142,4 +143,4 @@ serial_test = { workspace = true } [[bin]] name = "iggy-server" -path = "src/main.rs" +path = "src/main.rs" \ No newline at end of file diff --git a/core/server/src/binary/sender.rs b/core/server/src/binary/sender.rs index b54e25fb72..4e94f73510 100644 --- a/core/server/src/binary/sender.rs +++ b/core/server/src/binary/sender.rs @@ -27,9 +27,9 @@ use bytes::BytesMut; use compio::buf::{IoBuf, IoBufMut}; use compio::io::{AsyncReadExt, AsyncWriteExt}; use compio::net::TcpStream; +use compio_quic::{RecvStream, SendStream}; use iggy_common::IggyError; use nix::libc; -use quinn::{RecvStream, SendStream}; macro_rules! forward_async_methods { ( diff --git a/core/server/src/configs/defaults.rs b/core/server/src/configs/defaults.rs index 01f7140b7d..1bdd5ff659 100644 --- a/core/server/src/configs/defaults.rs +++ b/core/server/src/configs/defaults.rs @@ -22,7 +22,7 @@ use super::tcp::TcpSocketConfig; use crate::configs::http::{ HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig, }; -use crate::configs::quic::{QuicCertificateConfig, QuicConfig}; +use crate::configs::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig}; use crate::configs::server::{ ArchiverConfig, DataMaintenanceConfig, HeartbeatConfig, MessageSaverConfig, MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig, @@ -124,6 +124,18 @@ impl Default for QuicConfig { keep_alive_interval: SERVER_CONFIG.quic.keep_alive_interval.parse().unwrap(), max_idle_timeout: SERVER_CONFIG.quic.max_idle_timeout.parse().unwrap(), certificate: QuicCertificateConfig::default(), + socket: QuicSocketConfig::default(), + } + } +} + +impl Default for QuicSocketConfig { + fn default() -> QuicSocketConfig { + QuicSocketConfig { + override_defaults: SERVER_CONFIG.quic.socket.override_defaults, + recv_buffer_size: SERVER_CONFIG.quic.socket.recv_buffer_size.parse().unwrap(), + send_buffer_size: SERVER_CONFIG.quic.socket.send_buffer_size.parse().unwrap(), + keepalive: SERVER_CONFIG.quic.socket.keepalive, } } } diff --git a/core/server/src/configs/quic.rs b/core/server/src/configs/quic.rs index 55a5920b04..f2cedd6dea 100644 --- a/core/server/src/configs/quic.rs +++ b/core/server/src/configs/quic.rs @@ -37,6 +37,15 @@ pub struct QuicConfig { #[serde_as(as = "DisplayFromStr")] pub max_idle_timeout: IggyDuration, pub certificate: QuicCertificateConfig, + pub socket: QuicSocketConfig, +} + +#[derive(Debug, Deserialize, Serialize, Clone)] +pub struct QuicSocketConfig { + pub override_defaults: bool, + pub recv_buffer_size: IggyByteSize, + pub send_buffer_size: IggyByteSize, + pub keepalive: bool, } #[derive(Debug, Deserialize, Serialize, Clone)] diff --git a/core/server/src/quic/listener.rs b/core/server/src/quic/listener.rs index f5cfc42952..f9dfa471f5 100644 --- a/core/server/src/quic/listener.rs +++ b/core/server/src/quic/listener.rs @@ -22,75 +22,88 @@ use crate::binary::command::{ServerCommand, ServerCommandHandler}; use crate::binary::sender::SenderKind; use crate::server_error::ConnectionError; use crate::shard::IggyShard; +use crate::shard::transmission::event::ShardEvent; use crate::streaming::clients::client_manager::Transport; use crate::streaming::session::Session; +use crate::{shard_debug, shard_info}; use anyhow::anyhow; +use compio_quic::{Connection, Endpoint, RecvStream, SendStream}; use iggy_common::IggyError; -use quinn::{Connection, Endpoint, RecvStream, SendStream}; use tracing::{error, info, trace}; -const LISTENERS_COUNT: u32 = 10; const INITIAL_BYTES_LENGTH: usize = 4; -//TODO: Fixme -/* - -pub fn start(endpoint: Endpoint, system: SharedSystem) { - for _ in 0..LISTENERS_COUNT { - let endpoint = endpoint.clone(); - let system = system.clone(); - //TODO: Fixme - /* - tokio::spawn(async move { - while let Some(incoming_connection) = endpoint.accept().await { - info!( - "Incoming connection from client: {}", - incoming_connection.remote_address() - ); - let system = system.clone(); - let incoming_connection = incoming_connection.accept(); - if incoming_connection.is_err() { + +pub async fn start(endpoint: Endpoint, shard: Rc) -> Result<(), IggyError> { + info!("Starting QUIC listener for shard {}", shard.id); + + // Since the QUIC Endpoint is internally Arc-wrapped and can be shared, + // we only need one worker per shard rather than multiple workers per endpoint. + // This avoids the N×workers multiplication when multiple shards are used. + while let Some(incoming_conn) = endpoint.wait_incoming().await { + let remote_addr = incoming_conn.remote_address(); + trace!("Incoming connection from client: {}", remote_addr); + let shard = shard.clone(); + + // Spawn each connection handler independently to maintain concurrency + compio::runtime::spawn(async move { + trace!("Accepting connection from {}", remote_addr); + match incoming_conn.await { + Ok(connection) => { + trace!("Connection established from {}", remote_addr); + if let Err(error) = handle_connection(connection, shard).await { + error!("QUIC connection from {} has failed: {error}", remote_addr); + } + } + Err(error) => { error!( - "Error when accepting incoming connection: {:?}", - incoming_connection + "Error when accepting incoming connection from {}: {:?}", + remote_addr, error ); - continue; } - let incoming_connection = incoming_connection.unwrap(); - tokio::spawn(async move { - if let Err(error) = handle_connection(incoming_connection, system).await { - error!("Connection has failed: {error}"); - } - }); } - }); - */ + }) + .detach(); } + + info!("QUIC listener for shard {} stopped", shard.id); + Ok(()) } async fn handle_connection( - incoming_connection: quinn::Connecting, + connection: Connection, shard: Rc, ) -> Result<(), ConnectionError> { - let connection = incoming_connection.await?; let address = connection.remote_address(); info!("Client has connected: {address}"); - let session = system - .read() - .await - .add_client(&address, Transport::Quic) - .await; + let session = shard.add_client(&address, Transport::Quic); let client_id = session.client_id; - while let Some(stream) = accept_stream(&connection, &system, client_id).await? { - let system = system.clone(); + shard_debug!( + shard.id, + "Added {} client with session: {} for IP address: {}", + Transport::Quic, + session, + address + ); + + // Add session to active sessions and broadcast to all shards + shard.add_active_session(session.clone()); + let event = ShardEvent::NewSession { + address, + transport: Transport::Quic, + }; + let _responses = shard.broadcast_event_to_all_shards(event.into()).await; + + while let Some(stream) = accept_stream(&connection, &shard, client_id).await? { + let shard = shard.clone(); let session = session.clone(); let handle_stream_task = async move { - if let Err(err) = handle_stream(stream, system, session).await { + if let Err(err) = handle_stream(stream, shard, session).await { error!("Error when handling QUIC stream: {:?}", err) } }; - let _handle = tokio::spawn(handle_stream_task); + let _handle = compio::runtime::spawn(handle_stream_task).detach(); } Ok(()) } @@ -99,18 +112,18 @@ type BiStream = (SendStream, RecvStream); async fn accept_stream( connection: &Connection, - system: &SharedSystem, + shard: &Rc, client_id: u32, ) -> Result, ConnectionError> { match connection.accept_bi().await { - Err(quinn::ConnectionError::ApplicationClosed { .. }) => { + Err(compio_quic::ConnectionError::ApplicationClosed { .. }) => { info!("Connection closed"); - system.read().await.delete_client(client_id).await; + shard.delete_client(client_id).await; Ok(None) } Err(error) => { error!("Error when handling QUIC stream: {:?}", error); - system.read().await.delete_client(client_id).await; + shard.delete_client(client_id).await; Err(error.into()) } Ok(stream) => Ok(Some(stream)), @@ -119,16 +132,16 @@ async fn accept_stream( async fn handle_stream( stream: BiStream, - system: SharedSystem, - session: impl AsRef + std::fmt::Debug, + shard: Rc, + session: Rc, ) -> anyhow::Result<()> { let (send_stream, mut recv_stream) = stream; let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH]; let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH]; - recv_stream.read_exact(&mut length_buffer).await?; - recv_stream.read_exact(&mut code_buffer).await?; + recv_stream.read_exact(&mut length_buffer[..]).await?; + recv_stream.read_exact(&mut code_buffer[..]).await?; let length = u32::from_le_bytes(length_buffer); let code = u32::from_le_bytes(code_buffer); @@ -145,17 +158,9 @@ async fn handle_stream( } }; - // if let Err(e) = command.validate() { - // sender.send_error_response(e.clone()).await?; - // return Err(anyhow!("Command validation failed: {e}")); - // } - trace!("Received a QUIC command: {command}, payload size: {length}"); - match command - .handle(&mut sender, length, session.as_ref(), &system) - .await - { + match command.handle(&mut sender, length, &session, &shard).await { Ok(_) => { trace!( "Command was handled successfully, session: {:?}. QUIC response was sent.", @@ -183,4 +188,3 @@ async fn handle_stream( } } } -*/ diff --git a/core/server/src/quic/mod.rs b/core/server/src/quic/mod.rs index 40db7fdfbf..27b597f684 100644 --- a/core/server/src/quic/mod.rs +++ b/core/server/src/quic/mod.rs @@ -19,5 +19,6 @@ mod listener; pub mod quic_sender; pub mod quic_server; +pub mod quic_socket; pub const COMPONENT: &str = "QUIC"; diff --git a/core/server/src/quic/quic_sender.rs b/core/server/src/quic/quic_sender.rs index 4c93e66ab7..f0a3cb0353 100644 --- a/core/server/src/quic/quic_sender.rs +++ b/core/server/src/quic/quic_sender.rs @@ -19,13 +19,12 @@ use crate::quic::COMPONENT; use crate::streaming::utils::PooledBuffer; use crate::{binary::sender::Sender, server_error::ServerError}; -use bytes::BytesMut; -use compio::buf::{IoBuf, IoBufMut}; +use compio::BufResult; +use compio::buf::IoBufMut; +use compio::io::AsyncReadExt; +use compio_quic::{RecvStream, SendStream}; use error_set::ErrContext; use iggy_common::IggyError; -use nix::libc; -use quinn::{RecvStream, SendStream}; -use std::io::IoSlice; use tracing::{debug, error}; const STATUS_OK: &[u8] = &[0; 4]; @@ -37,19 +36,17 @@ pub struct QuicSender { } impl Sender for QuicSender { + /// Reads data from the QUIC stream directly into the buffer. async fn read(&mut self, buffer: B) -> (Result<(), IggyError>, B) { - //TODO: Fixme - // Not-so-nice code because quinn recv stream has different API for read_exact - /* - let read_bytes = buffer.len(); - self.recv.read_exact(buffer).await.map_err(|error| { - error!("Failed to read from the stream: {:?}", error); - IggyError::QuicError - })?; - */ - - //Ok(read_bytes) - todo!(); + let BufResult(result, buffer) = + ::read_exact(&mut self.recv, buffer).await; + match (result, buffer) { + (Ok(_), buffer) => (Ok(()), buffer), + (Err(error), buffer) => { + error!("Failed to read from the stream: {:?}", error); + (Err(IggyError::QuicError), buffer) + } + } } async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> { @@ -87,8 +84,6 @@ impl Sender for QuicSender { let mut total_bytes_written = 0; - //TODO: Fixme - /* for slice in slices { let slice_data = &*slice; if !slice_data.is_empty() { @@ -103,7 +98,6 @@ impl Sender for QuicSender { total_bytes_written += slice_data.len(); } } - */ debug!( "Sent vectored response: {} bytes of payload", diff --git a/core/server/src/quic/quic_server.rs b/core/server/src/quic/quic_server.rs index 0a8020b88b..6a0255adc7 100644 --- a/core/server/src/quic/quic_server.rs +++ b/core/server/src/quic/quic_server.rs @@ -19,50 +19,104 @@ use std::fs::File; use std::io::BufReader; use std::net::SocketAddr; +use std::rc::Rc; use std::sync::Arc; use anyhow::Result; +use compio_quic::{ + Endpoint, EndpointConfig, IdleTimeout, ServerBuilder, ServerConfig, TransportConfig, VarInt, +}; use error_set::ErrContext; -use quinn::{Endpoint, IdleTimeout, VarInt}; +use rustls::crypto::ring::default_provider; use rustls::pki_types::{CertificateDer, PrivateKeyDer}; -use tracing::info; +use tracing::{error, info, trace, warn}; use crate::configs::quic::QuicConfig; -use crate::quic::COMPONENT; -use crate::quic::listener; +use crate::quic::{COMPONENT, listener, quic_socket}; use crate::server_error::QuicError; +use crate::shard::IggyShard; -//TODO: Fixme -/* /// Starts the QUIC server. /// Returns the address the server is listening on. -pub fn start(config: QuicConfig, system: SharedSystem) -> SocketAddr { - info!("Initializing Iggy QUIC server..."); - let address = config.address.parse().unwrap(); - let quic_config = configure_quic(config); - if let Err(error) = quic_config { - panic!("Error when configuring QUIC: {error:?}"); +pub async fn span_quic_server(shard: Rc) -> Result<(), iggy_common::IggyError> { + // Ensure rustls crypto provider is installed (thread-safe, idempotent) + if rustls::crypto::CryptoProvider::get_default().is_none() { + if let Err(e) = default_provider().install_default() { + warn!( + "Failed to install rustls crypto provider: {:?}. This may be normal if another thread installed it first.", + e + ); + } else { + trace!("Rustls crypto provider installed successfully"); + } + } else { + trace!("Rustls crypto provider already installed"); } - let endpoint = Endpoint::server(quic_config.unwrap(), address).unwrap(); - let addr = endpoint.local_addr().unwrap(); - listener::start(endpoint, system); - info!("Iggy QUIC server has started on: {:?}", addr); - addr + let config = shard.config.quic.clone(); + let addr: SocketAddr = config.address.parse().map_err(|e| { + error!("Failed to parse QUIC address '{}': {}", config.address, e); + iggy_common::IggyError::QuicError + })?; + info!( + "Initializing Iggy QUIC server on shard {} for address {}", + shard.id, addr + ); + + let server_config = configure_quic(&config).map_err(|e| { + error!("Failed to configure QUIC server: {:?}", e); + iggy_common::IggyError::QuicError + })?; + trace!("Building UDP socket for QUIC endpoint on {}", addr); + + let socket = quic_socket::build(&addr, &config.socket); + socket.bind(&addr.into()).map_err(|e| { + error!("Failed to bind socket: {}", e); + iggy_common::IggyError::CannotBindToSocket(addr.to_string()) + })?; + socket.set_nonblocking(true).map_err(|e| { + error!("Failed to set nonblocking: {}", e); + iggy_common::IggyError::QuicError + })?; + + let std_socket: std::net::UdpSocket = socket.into(); + let socket = compio_net::UdpSocket::from_std(std_socket).map_err(|e| { + error!("Failed to convert std socket to compio socket: {:?}", e); + iggy_common::IggyError::QuicError + })?; + trace!("Creating QUIC endpoint with server config"); + + let endpoint = Endpoint::new(socket, EndpointConfig::default(), Some(server_config), None) + .map_err(|e| { + error!("Failed to create QUIC endpoint: {:?}", e); + iggy_common::IggyError::QuicError + })?; + + let actual_addr = endpoint.local_addr().map_err(|e| { + error!("Failed to get local address: {e}"); + iggy_common::IggyError::CannotBindToSocket(addr.to_string()) + })?; + + info!( + "Iggy QUIC server has started for shard {} on {}", + shard.id, actual_addr + ); + shard.quic_bound_address.set(Some(actual_addr)); + listener::start(endpoint, shard).await } -fn configure_quic(config: QuicConfig) -> Result { - let (certificate, key) = match config.certificate.self_signed { +fn configure_quic(config: &QuicConfig) -> Result { + let (certificates, private_key) = match config.certificate.self_signed { true => generate_self_signed_cert()?, false => load_certificates(&config.certificate.cert_file, &config.certificate.key_file)?, }; - let mut server_config = quinn::ServerConfig::with_single_cert(certificate, key) + let mut builder = ServerBuilder::new_with_single_cert(certificates, private_key) .with_error_context(|error| { - format!("{COMPONENT} (error: {error}) - failed to create server config") + format!("{COMPONENT} (error: {error}) - failed to create QUIC server builder") }) .map_err(|_| QuicError::ConfigCreationError)?; - let mut transport = quinn::TransportConfig::default(); + let mut transport = TransportConfig::default(); transport.initial_mtu(config.initial_mtu.as_bytes_u64() as u16); transport.send_window(config.send_window.as_bytes_u64()); transport.receive_window( @@ -80,6 +134,7 @@ fn configure_quic(config: QuicConfig) -> Result }) .map_err(|_| QuicError::TransportConfigError)?, ); + if !config.keep_alive_interval.is_zero() { transport.keep_alive_interval(Some(config.keep_alive_interval.get_duration())); } @@ -92,6 +147,7 @@ fn configure_quic(config: QuicConfig) -> Result transport.max_idle_timeout(Some(max_idle_timeout)); } + let mut server_config = builder.build(); server_config.transport_config(Arc::new(transport)); Ok(server_config) } @@ -137,4 +193,3 @@ fn load_certificates( let key = keys.remove(0); Ok((certs, key)) } - */ diff --git a/core/server/src/quic/quic_socket.rs b/core/server/src/quic/quic_socket.rs new file mode 100644 index 0000000000..eeda103e9f --- /dev/null +++ b/core/server/src/quic/quic_socket.rs @@ -0,0 +1,66 @@ +/* Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::SocketAddr; +use std::num::TryFromIntError; + +use crate::configs::quic::QuicSocketConfig; + +/// Build a UDP socket for the given address and configure the options that are +/// required by the server +pub fn build(addr: &SocketAddr, config: &QuicSocketConfig) -> Socket { + // Choose the correct address family based on the target address + let socket = Socket::new(Domain::for_address(*addr), Type::DGRAM, Some(Protocol::UDP)) + .expect("Unable to create a UDP socket"); + + // Allow multiple sockets (shards) to bind to the same address + socket + .set_reuse_address(true) + .expect("Unable to set SO_REUSEADDR on socket"); + + // SO_REUSEPORT is only available on Unix-like systems + #[cfg(unix)] + socket + .set_reuse_port(true) + .expect("Unable to set SO_REUSEPORT on socket"); + + // Configure socket buffer sizes and keepalive if override is enabled + if config.override_defaults { + config + .recv_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size| socket.set_recv_buffer_size(size)) + .expect("Unable to set SO_RCVBUF on socket"); + + config + .send_buffer_size + .as_bytes_u64() + .try_into() + .map_err(|e: TryFromIntError| std::io::Error::other(e.to_string())) + .and_then(|size| socket.set_send_buffer_size(size)) + .expect("Unable to set SO_SNDBUF on socket"); + + socket + .set_keepalive(config.keepalive) + .expect("Unable to set SO_KEEPALIVE on socket"); + } + + socket +} diff --git a/core/server/src/server_error.rs b/core/server/src/server_error.rs index ca29a62753..6351d38725 100644 --- a/core/server/src/server_error.rs +++ b/core/server/src/server_error.rs @@ -16,8 +16,8 @@ * under the License. */ +use compio_quic::{ConnectionError as QuicConnectionError, ReadError, WriteError}; use error_set::error_set; -use quinn::{ConnectionError as QuicConnectionError, ReadToEndError, WriteError}; use rusty_s3::BucketError; use std::array::TryFromSliceError; use tokio::io; @@ -32,8 +32,8 @@ error_set!( #[display("Write error")] WriteError(WriteError), - #[display("Read to end error")] - ReadToEndError(ReadToEndError) + #[display("Read error")] + ReadToEndError(ReadError) }; ConfigError = { diff --git a/core/server/src/shard/builder.rs b/core/server/src/shard/builder.rs index 153a474d5c..a287e88dc2 100644 --- a/core/server/src/shard/builder.rs +++ b/core/server/src/shard/builder.rs @@ -148,6 +148,7 @@ impl IggyShardBuilder { task_registry: TaskRegistry::new(), is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), + quic_bound_address: Cell::new(None), streams2: Streams::init(), users: Default::default(), diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs index 14dc9101db..42d9af807b 100644 --- a/core/server/src/shard/mod.rs +++ b/core/server/src/shard/mod.rs @@ -167,6 +167,7 @@ pub struct IggyShard { pub(crate) task_registry: TaskRegistry, pub(crate) is_shutting_down: AtomicBool, pub(crate) tcp_bound_address: Cell>, + pub(crate) quic_bound_address: Cell>, } impl IggyShard { @@ -219,6 +220,7 @@ impl IggyShard { task_registry: TaskRegistry::new(), is_shutting_down: AtomicBool::new(false), tcp_bound_address: Cell::new(None), + quic_bound_address: Cell::new(None), }; let user = User::root(DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD); shard @@ -273,6 +275,10 @@ impl IggyShard { ))); } + if self.config.quic.enabled { + tasks.push(Box::pin(crate::quic::quic_server::span_quic_server(self.clone()))); + } + let stop_receiver = self.get_stop_receiver(); let shard_for_shutdown = self.clone(); diff --git a/foreign/cpp/tests/e2e/server.toml b/foreign/cpp/tests/e2e/server.toml index c541a43cf5..c41e86c174 100644 --- a/foreign/cpp/tests/e2e/server.toml +++ b/foreign/cpp/tests/e2e/server.toml @@ -195,6 +195,20 @@ cert_file = "core/certs/iggy_cert.pem" # Path to the QUIC TLS key file. key_file = "core/certs/iggy_key.pem" +# Configuration for the QUIC socket +[quic.socket] +# Whether to override the OS-default socket parameters +override_defaults = false + +# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS +recv_buffer_size = "64 KB" + +# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS +send_buffer_size = "64 KB" + +# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection +keepalive = false + # Message cleaner configuration. [message_cleaner] # Enables or disables the background process for deleting expired messages.