Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ cyper = { git = "https://github.com/krishvishal/cyper.git", rev = "cd75e266df6ab
], default-features = false }
cyper-axum = { git = "https://github.com/krishvishal/cyper", rev = "cd75e266df6ab0a9b9474eb7dda1735650d17db6" }
compio-net = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" }
compio-quic = { git = "https://github.com/compio-rs/compio.git", rev = "fe4243f0b6811ebc325afd081c9b087b4d9817be" }
tokio-rustls = "0.26.2"
toml = "0.9.4"
tracing = "0.1.41"
Expand Down
11 changes: 7 additions & 4 deletions core/common/src/certificates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,12 @@ use rustls::pki_types::{CertificateDer, PrivateKeyDer};
pub fn generate_self_signed_certificate(
domain: &str,
) -> Result<(Vec<CertificateDer<'static>>, PrivateKeyDer<'static>), Box<dyn std::error::Error>> {
let cert = rcgen::generate_simple_self_signed(vec![domain.to_string()])?;
let cert_der = cert.cert.der();
let key_der = cert.signing_key.serialize_der();
let certified_key = rcgen::generate_simple_self_signed(vec![domain.to_string()])?;
let cert_der = certified_key.cert.der().to_vec();
let key_der = certified_key.signing_key.serialize_der();

let cert_der = CertificateDer::from(cert_der);
let key = PrivateKeyDer::try_from(key_der)?;
Ok((vec![cert_der.clone()], key))

Ok((vec![cert_der], key))
}
14 changes: 14 additions & 0 deletions core/configs/server.toml
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,20 @@ cert_file = "core/certs/iggy_cert.pem"
# Path to the QUIC TLS key file.
key_file = "core/certs/iggy_key.pem"

# Configuration for the QUIC socket
[quic.socket]
# Whether to override the OS-default socket parameters
override_defaults = false

# SO_RCVBUF: maximum size of the receive buffer, can be clamped by the OS
recv_buffer_size = "64 KB"

# SO_SNDBUF: maximum size of the send buffer, can be clamped by the OS
send_buffer_size = "64 KB"

# SO_KEEPALIVE: whether to regularly send a keepalive packet maintaining the connection
keepalive = false

# Message cleaner configuration.
[message_cleaner]
# Enables or disables the background process for deleting expired messages.
Expand Down
3 changes: 2 additions & 1 deletion core/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ clap = { workspace = true }
cyper = { workspace = true }
cyper-axum = { workspace = true }
compio-net = { workspace = true }
compio-quic = { workspace = true }
console-subscriber = { workspace = true, optional = true }
crossbeam = { workspace = true }
ctrlc = { version = "3.4", features = ["termination"] }
Expand Down Expand Up @@ -142,4 +143,4 @@ serial_test = { workspace = true }

[[bin]]
name = "iggy-server"
path = "src/main.rs"
path = "src/main.rs"
2 changes: 1 addition & 1 deletion core/server/src/binary/sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use bytes::BytesMut;
use compio::buf::{IoBuf, IoBufMut};
use compio::io::{AsyncReadExt, AsyncWriteExt};
use compio::net::TcpStream;
use compio_quic::{RecvStream, SendStream};
use iggy_common::IggyError;
use nix::libc;
use quinn::{RecvStream, SendStream};

macro_rules! forward_async_methods {
(
Expand Down
14 changes: 13 additions & 1 deletion core/server/src/configs/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use super::tcp::TcpSocketConfig;
use crate::configs::http::{
HttpConfig, HttpCorsConfig, HttpJwtConfig, HttpMetricsConfig, HttpTlsConfig,
};
use crate::configs::quic::{QuicCertificateConfig, QuicConfig};
use crate::configs::quic::{QuicCertificateConfig, QuicConfig, QuicSocketConfig};
use crate::configs::server::{
ArchiverConfig, DataMaintenanceConfig, HeartbeatConfig, MessageSaverConfig,
MessagesMaintenanceConfig, PersonalAccessTokenCleanerConfig, PersonalAccessTokenConfig,
Expand Down Expand Up @@ -124,6 +124,18 @@ impl Default for QuicConfig {
keep_alive_interval: SERVER_CONFIG.quic.keep_alive_interval.parse().unwrap(),
max_idle_timeout: SERVER_CONFIG.quic.max_idle_timeout.parse().unwrap(),
certificate: QuicCertificateConfig::default(),
socket: QuicSocketConfig::default(),
}
}
}

impl Default for QuicSocketConfig {
fn default() -> QuicSocketConfig {
QuicSocketConfig {
override_defaults: SERVER_CONFIG.quic.socket.override_defaults,
recv_buffer_size: SERVER_CONFIG.quic.socket.recv_buffer_size.parse().unwrap(),
send_buffer_size: SERVER_CONFIG.quic.socket.send_buffer_size.parse().unwrap(),
keepalive: SERVER_CONFIG.quic.socket.keepalive,
}
}
}
Expand Down
9 changes: 9 additions & 0 deletions core/server/src/configs/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ pub struct QuicConfig {
#[serde_as(as = "DisplayFromStr")]
pub max_idle_timeout: IggyDuration,
pub certificate: QuicCertificateConfig,
pub socket: QuicSocketConfig,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
pub struct QuicSocketConfig {
pub override_defaults: bool,
pub recv_buffer_size: IggyByteSize,
pub send_buffer_size: IggyByteSize,
pub keepalive: bool,
}

#[derive(Debug, Deserialize, Serialize, Clone)]
Expand Down
124 changes: 64 additions & 60 deletions core/server/src/quic/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,75 +22,88 @@ use crate::binary::command::{ServerCommand, ServerCommandHandler};
use crate::binary::sender::SenderKind;
use crate::server_error::ConnectionError;
use crate::shard::IggyShard;
use crate::shard::transmission::event::ShardEvent;
use crate::streaming::clients::client_manager::Transport;
use crate::streaming::session::Session;
use crate::{shard_debug, shard_info};
use anyhow::anyhow;
use compio_quic::{Connection, Endpoint, RecvStream, SendStream};
use iggy_common::IggyError;
use quinn::{Connection, Endpoint, RecvStream, SendStream};
use tracing::{error, info, trace};

const LISTENERS_COUNT: u32 = 10;
const INITIAL_BYTES_LENGTH: usize = 4;
//TODO: Fixme
/*

pub fn start(endpoint: Endpoint, system: SharedSystem) {
for _ in 0..LISTENERS_COUNT {
let endpoint = endpoint.clone();
let system = system.clone();
//TODO: Fixme
/*
tokio::spawn(async move {
while let Some(incoming_connection) = endpoint.accept().await {
info!(
"Incoming connection from client: {}",
incoming_connection.remote_address()
);
let system = system.clone();
let incoming_connection = incoming_connection.accept();
if incoming_connection.is_err() {

pub async fn start(endpoint: Endpoint, shard: Rc<IggyShard>) -> Result<(), IggyError> {
Comment thread
jadireddi marked this conversation as resolved.
info!("Starting QUIC listener for shard {}", shard.id);

// Since the QUIC Endpoint is internally Arc-wrapped and can be shared,
// we only need one worker per shard rather than multiple workers per endpoint.
// This avoids the N×workers multiplication when multiple shards are used.
while let Some(incoming_conn) = endpoint.wait_incoming().await {
let remote_addr = incoming_conn.remote_address();
trace!("Incoming connection from client: {}", remote_addr);
let shard = shard.clone();

// Spawn each connection handler independently to maintain concurrency
compio::runtime::spawn(async move {
trace!("Accepting connection from {}", remote_addr);
match incoming_conn.await {
Ok(connection) => {
trace!("Connection established from {}", remote_addr);
if let Err(error) = handle_connection(connection, shard).await {
error!("QUIC connection from {} has failed: {error}", remote_addr);
}
}
Err(error) => {
error!(
"Error when accepting incoming connection: {:?}",
incoming_connection
"Error when accepting incoming connection from {}: {:?}",
remote_addr, error
);
continue;
}
let incoming_connection = incoming_connection.unwrap();
tokio::spawn(async move {
if let Err(error) = handle_connection(incoming_connection, system).await {
error!("Connection has failed: {error}");
}
});
}
});
*/
})
.detach();
}

info!("QUIC listener for shard {} stopped", shard.id);
Ok(())
}

async fn handle_connection(
incoming_connection: quinn::Connecting,
connection: Connection,
shard: Rc<IggyShard>,
) -> Result<(), ConnectionError> {
let connection = incoming_connection.await?;
let address = connection.remote_address();
info!("Client has connected: {address}");
let session = system
.read()
.await
.add_client(&address, Transport::Quic)
.await;

let session = shard.add_client(&address, Transport::Quic);
let client_id = session.client_id;
while let Some(stream) = accept_stream(&connection, &system, client_id).await? {
let system = system.clone();
shard_debug!(
shard.id,
"Added {} client with session: {} for IP address: {}",
Transport::Quic,
session,
address
);

// Add session to active sessions and broadcast to all shards
shard.add_active_session(session.clone());
let event = ShardEvent::NewSession {
address,
transport: Transport::Quic,
};
let _responses = shard.broadcast_event_to_all_shards(event.into()).await;

while let Some(stream) = accept_stream(&connection, &shard, client_id).await? {
let shard = shard.clone();
let session = session.clone();

let handle_stream_task = async move {
if let Err(err) = handle_stream(stream, system, session).await {
if let Err(err) = handle_stream(stream, shard, session).await {
error!("Error when handling QUIC stream: {:?}", err)
}
};
let _handle = tokio::spawn(handle_stream_task);
let _handle = compio::runtime::spawn(handle_stream_task).detach();
}
Ok(())
}
Expand All @@ -99,18 +112,18 @@ type BiStream = (SendStream, RecvStream);

async fn accept_stream(
connection: &Connection,
system: &SharedSystem,
shard: &Rc<IggyShard>,
client_id: u32,
) -> Result<Option<BiStream>, ConnectionError> {
match connection.accept_bi().await {
Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
Err(compio_quic::ConnectionError::ApplicationClosed { .. }) => {
info!("Connection closed");
system.read().await.delete_client(client_id).await;
shard.delete_client(client_id).await;
Ok(None)
}
Err(error) => {
error!("Error when handling QUIC stream: {:?}", error);
system.read().await.delete_client(client_id).await;
shard.delete_client(client_id).await;
Err(error.into())
}
Ok(stream) => Ok(Some(stream)),
Expand All @@ -119,16 +132,16 @@ async fn accept_stream(

async fn handle_stream(
stream: BiStream,
system: SharedSystem,
session: impl AsRef<Session> + std::fmt::Debug,
shard: Rc<IggyShard>,
session: Rc<Session>,
) -> anyhow::Result<()> {
let (send_stream, mut recv_stream) = stream;

let mut length_buffer = [0u8; INITIAL_BYTES_LENGTH];
let mut code_buffer = [0u8; INITIAL_BYTES_LENGTH];

recv_stream.read_exact(&mut length_buffer).await?;
recv_stream.read_exact(&mut code_buffer).await?;
recv_stream.read_exact(&mut length_buffer[..]).await?;
recv_stream.read_exact(&mut code_buffer[..]).await?;

let length = u32::from_le_bytes(length_buffer);
let code = u32::from_le_bytes(code_buffer);
Expand All @@ -145,17 +158,9 @@ async fn handle_stream(
}
};

// if let Err(e) = command.validate() {
// sender.send_error_response(e.clone()).await?;
// return Err(anyhow!("Command validation failed: {e}"));
// }

trace!("Received a QUIC command: {command}, payload size: {length}");

match command
.handle(&mut sender, length, session.as_ref(), &system)
.await
{
match command.handle(&mut sender, length, &session, &shard).await {
Ok(_) => {
trace!(
"Command was handled successfully, session: {:?}. QUIC response was sent.",
Expand Down Expand Up @@ -183,4 +188,3 @@ async fn handle_stream(
}
}
}
*/
1 change: 1 addition & 0 deletions core/server/src/quic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
mod listener;
pub mod quic_sender;
pub mod quic_server;
pub mod quic_socket;

pub const COMPONENT: &str = "QUIC";
34 changes: 14 additions & 20 deletions core/server/src/quic/quic_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,12 @@
use crate::quic::COMPONENT;
use crate::streaming::utils::PooledBuffer;
use crate::{binary::sender::Sender, server_error::ServerError};
use bytes::BytesMut;
use compio::buf::{IoBuf, IoBufMut};
use compio::BufResult;
use compio::buf::IoBufMut;
use compio::io::AsyncReadExt;
use compio_quic::{RecvStream, SendStream};
use error_set::ErrContext;
use iggy_common::IggyError;
use nix::libc;
use quinn::{RecvStream, SendStream};
use std::io::IoSlice;
use tracing::{debug, error};

const STATUS_OK: &[u8] = &[0; 4];
Expand All @@ -37,19 +36,17 @@ pub struct QuicSender {
}

impl Sender for QuicSender {
/// Reads data from the QUIC stream directly into the buffer.
async fn read<B: IoBufMut>(&mut self, buffer: B) -> (Result<(), IggyError>, B) {
//TODO: Fixme
// Not-so-nice code because quinn recv stream has different API for read_exact
/*
let read_bytes = buffer.len();
self.recv.read_exact(buffer).await.map_err(|error| {
error!("Failed to read from the stream: {:?}", error);
IggyError::QuicError
})?;
*/

//Ok(read_bytes)
todo!();
let BufResult(result, buffer) =
<RecvStream as AsyncReadExt>::read_exact(&mut self.recv, buffer).await;
match (result, buffer) {
(Ok(_), buffer) => (Ok(()), buffer),
(Err(error), buffer) => {
error!("Failed to read from the stream: {:?}", error);
(Err(IggyError::QuicError), buffer)
}
}
}

async fn send_empty_ok_response(&mut self) -> Result<(), IggyError> {
Expand Down Expand Up @@ -87,8 +84,6 @@ impl Sender for QuicSender {

let mut total_bytes_written = 0;

//TODO: Fixme
/*
for slice in slices {
let slice_data = &*slice;
if !slice_data.is_empty() {
Expand All @@ -103,7 +98,6 @@ impl Sender for QuicSender {
total_bytes_written += slice_data.len();
}
}
*/

debug!(
"Sent vectored response: {} bytes of payload",
Expand Down
Loading