From afc9bfa1df34c69113cc9cdf04edc530c6837539 Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 4 Jul 2025 13:47:13 -0400 Subject: [PATCH 1/3] feat(rt): introduce quic traits --- Cargo.toml | 3 +- src/rt/mod.rs | 4 +++ src/rt/quic.rs | 91 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 97 insertions(+), 1 deletion(-) create mode 100644 src/rt/quic.rs diff --git a/Cargo.toml b/Cargo.toml index 8334230937..5435142c7f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -101,7 +101,8 @@ nightly = [] level = "warn" check-cfg = [ 'cfg(hyper_unstable_tracing)', - 'cfg(hyper_unstable_ffi)' + 'cfg(hyper_unstable_ffi)', + 'cfg(hyper_unstable_quic)', ] [package.metadata.docs.rs] diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 0eb266974f..d1b2f81792 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -17,6 +17,10 @@ pub mod bounds; mod io; mod timer; +#[cfg(hyper_unstable_quic)] +#[cfg_attr(docsrs, doc(cfg(hyper_unstable_quic)))] +pub mod quic; + pub use self::io::{Read, ReadBuf, ReadBufCursor, Write}; pub use self::timer::{Sleep, Timer}; diff --git a/src/rt/quic.rs b/src/rt/quic.rs new file mode 100644 index 0000000000..18271d34a8 --- /dev/null +++ b/src/rt/quic.rs @@ -0,0 +1,91 @@ +//! Generic QUIC support + +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::Buf; + +// TODO: Should this be gated by an `http3` feature? + +/// A QUIC connection. +pub trait Connection { + /// Send streams that can be opened by this connection. + type SendStream: SendStream; + /// Receive streams that can be accepted by this connection. + type RecvStream: RecvStream; + /// Bidirectional streams that can be opened or accepted by this connection. + type BidiStream: SendStream + RecvStream; + /// Errors that may occur opening or accepting streams. + type Error; + + // Accepting streams + + // Q: shorten to bidi? + /// Accept a bidirection stream. + fn poll_accept_bidirectional_stream( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>; + + /// Accept a unidirectional receive stream. + fn poll_accept_recv_stream( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll, Self::Error>>; + + // Creating streams + + // Q: shorten to bidi? + /// Open a bidirectional stream. + fn poll_open_bidirectional_stream( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; + + /// Open a unidirectional send stream. + fn poll_open_send_stream( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>; +} + +/// The send portion of a QUIC stream. +pub trait SendStream { + /// Errors that may happen trying to send data. + type Error; // bounds? + /// Polls that the stream is ready to send more data. + // Q: Should this be Pin<&mut Self>? + fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; + /// Send data on the stream. + fn send_data(&mut self, data: B) -> Result<(), Self::Error>; + // fn poll_flush? + /// finish? + fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll>; + /// Close the stream with an error code. + fn reset(&mut self, reset_code: u64); +} + +/// The receive portion of a QUIC stream. +pub trait RecvStream { + /// Buffers of data that can be received. + type Buf: Buf; + /// Errors that may be received. + type Error; // bounds? + + // Q: should this be Pin? + /// Poll for more data received from the remote on this stream. + fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>>; + /// Signal to the remote peer to stop sending data. + fn stop_sending(&mut self, error_code: u64); +} + +/// An optional trait if a QUIC stream can be split into two sides. +pub trait BidiStream: SendStream + RecvStream { + /// The send side of a stream. + type SendStream: SendStream; + /// The receive side of a stream. + type RecvStream: RecvStream; + + /// Split this stream into two sides. + fn split(self) -> (Self::SendStream, Self::RecvStream); +} From 6bb179c916ecb0b46a46f44320a2bf437d8a65ca Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Fri, 4 Jul 2025 14:06:42 -0400 Subject: [PATCH 2/3] feat(server): introduce http3 connections --- Cargo.toml | 4 +- src/cfg.rs | 2 +- src/common/mod.rs | 1 + src/proto/h3/glue.rs | 165 +++++++++++++++++++++++++++++++++++++++ src/proto/h3/mod.rs | 34 ++++++++ src/proto/mod.rs | 3 + src/rt/mod.rs | 2 +- src/server/conn/http3.rs | 58 ++++++++++++++ src/server/conn/mod.rs | 2 + 9 files changed, 268 insertions(+), 3 deletions(-) create mode 100644 src/proto/h3/glue.rs create mode 100644 src/proto/h3/mod.rs create mode 100644 src/server/conn/http3.rs diff --git a/Cargo.toml b/Cargo.toml index 5435142c7f..b6079f40e4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,7 @@ futures-channel = { version = "0.3", optional = true } futures-core = { version = "0.3.31", optional = true } futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true } h2 = { version = "0.4.2", optional = true } +h3 = { version = "0.0.8", optional = true } http-body-util = { version = "0.1", optional = true } httparse = { version = "1.9", optional = true } httpdate = { version = "1.0", optional = true } @@ -69,7 +70,7 @@ tokio-util = "0.7.10" [features] # Nothing by default -default = [] +default = ["http3", "server"] # Easily turn it all on full = [ @@ -82,6 +83,7 @@ full = [ # HTTP versions http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"] http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"] +http3 = ["dep:h3"] # Client/Server client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"] diff --git a/src/cfg.rs b/src/cfg.rs index 71a5351d21..dbd52e1381 100644 --- a/src/cfg.rs +++ b/src/cfg.rs @@ -15,7 +15,7 @@ macro_rules! cfg_proto { ($($item:item)*) => { cfg_feature! { #![all( - any(feature = "http1", feature = "http2"), + any(feature = "http1", feature = "http2", feature = "http3"), any(feature = "client", feature = "server"), )] $($item)* diff --git a/src/common/mod.rs b/src/common/mod.rs index 4b73437203..4cc84cae75 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -7,6 +7,7 @@ pub(crate) mod either; #[cfg(any( all(feature = "client", any(feature = "http1", feature = "http2")), all(feature = "server", feature = "http1"), + all(feature = "server", feature = "http3"), ))] pub(crate) mod future; pub(crate) mod io; diff --git a/src/proto/h3/glue.rs b/src/proto/h3/glue.rs new file mode 100644 index 0000000000..a2335d5aed --- /dev/null +++ b/src/proto/h3/glue.rs @@ -0,0 +1,165 @@ +use std::task::{Context, Poll}; + +use bytes::Buf; + +pub(super) struct Conn(Q); + +pub(super) struct BidiStream(S); +pub(super) struct SendStream(S); +pub(super) struct RecvStream(S); + +impl h3::quic::Connection for Conn +where + Q: crate::rt::quic::Connection, + B: Buf, +{ + type RecvStream = RecvStream; + type OpenStreams = Self; + + fn poll_accept_recv(&mut self, _cx: &mut Context<'_>) + -> Poll> + { + todo!(); + } + + fn poll_accept_bidi(&mut self, _cx: &mut Context<'_>) + -> Poll> + { + todo!(); + } + + fn opener(&self) -> Self::OpenStreams { + todo!(); + } +} + +impl h3::quic::OpenStreams for Conn +where + Q: crate::rt::quic::Connection, + B: Buf, +{ + type BidiStream = BidiStream; + type SendStream = SendStream; + + fn poll_open_send(&mut self, _cx: &mut Context<'_>) + -> Poll> + { + todo!(); + } + + fn poll_open_bidi(&mut self, _cx: &mut Context<'_>) + -> Poll> + { + todo!(); + } + + fn close(&mut self, _: h3::error::Code, _: &[u8]) { + + } +} + +impl h3::quic::SendStream for BidiStream +where + S: crate::rt::quic::SendStream, + B: Buf, +{ + // Required methods + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + todo!(); + } + fn send_data>>( + &mut self, + data: T, + ) -> Result<(), h3::quic::StreamErrorIncoming> { + todo!(); + } + fn poll_finish( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + todo!(); + } + fn reset(&mut self, reset_code: u64) { + todo!(); + } + fn send_id(&self) -> h3::quic::StreamId { + todo!() + } +} + +impl h3::quic::SendStream for SendStream +where + S: crate::rt::quic::SendStream, + B: Buf, +{ + // Required methods + fn poll_ready( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + todo!(); + } + fn send_data>>( + &mut self, + data: T, + ) -> Result<(), h3::quic::StreamErrorIncoming> { + todo!(); + } + fn poll_finish( + &mut self, + cx: &mut Context<'_>, + ) -> Poll> { + todo!(); + } + fn reset(&mut self, reset_code: u64) { + todo!(); + } + fn send_id(&self) -> h3::quic::StreamId { + todo!() + } +} + +impl h3::quic::RecvStream for BidiStream +where + S: crate::rt::quic::RecvStream, +{ + type Buf = S::Buf; + + // Required methods + fn poll_data( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, h3::quic::StreamErrorIncoming>> { + todo!(); + } + fn stop_sending(&mut self, error_code: u64) { + todo!(); + } + fn recv_id(&self) -> h3::quic::StreamId { + todo!(); + } +} + +impl h3::quic::RecvStream for RecvStream +where + S: crate::rt::quic::RecvStream, +{ + type Buf = S::Buf; + + // Required methods + fn poll_data( + &mut self, + cx: &mut Context<'_>, + ) -> Poll, h3::quic::StreamErrorIncoming>> { + todo!(); + } + fn stop_sending(&mut self, error_code: u64) { + todo!(); + } + fn recv_id(&self) -> h3::quic::StreamId { + todo!(); + } +} diff --git a/src/proto/h3/mod.rs b/src/proto/h3/mod.rs new file mode 100644 index 0000000000..2138ed50b2 --- /dev/null +++ b/src/proto/h3/mod.rs @@ -0,0 +1,34 @@ +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use bytes::Buf; +use h3::server::Connection; + +use pin_project_lite::pin_project; + +mod glue; + +pin_project! { + pub(crate) struct Server + where + Q: crate::rt::quic::Connection, + B: Buf, + { + exec: E, + q: Connection, B>, + s: S, + } +} + +impl Future for Server +where + Q: crate::rt::quic::Connection, + B: Buf, +{ + type Output = crate::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + todo!() + } +} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index fcdf2b97c0..40acde179b 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -16,6 +16,9 @@ cfg_feature! { #[cfg(feature = "http2")] pub(crate) mod h2; +#[cfg(feature = "http3")] +pub(crate) mod h3; + /// An Incoming Message head. Includes request/status line, and headers. #[cfg(feature = "http1")] #[derive(Debug, Default)] diff --git a/src/rt/mod.rs b/src/rt/mod.rs index d1b2f81792..08dceb5813 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -17,7 +17,7 @@ pub mod bounds; mod io; mod timer; -#[cfg(hyper_unstable_quic)] +//#[cfg(hyper_unstable_quic)] #[cfg_attr(docsrs, doc(cfg(hyper_unstable_quic)))] pub mod quic; diff --git a/src/server/conn/http3.rs b/src/server/conn/http3.rs new file mode 100644 index 0000000000..20b149aea3 --- /dev/null +++ b/src/server/conn/http3.rs @@ -0,0 +1,58 @@ +//! HTTP/3 Server Connections + +use std::error::Error as StdError; + +use pin_project_lite::pin_project; + +use crate::body::{Body, Incoming as IncomingBody}; +use crate::rt::quic; +use crate::service::HttpService; + +pin_project! { + /// A Future representing an HTTP/3 connection. + #[must_use = "futures do nothing unless polled"] + pub struct Connection { + _i: (Q, S, E), + } +} + +/// A configuration builder for HTTP/3 server connections. +/// +/// **Note**: The default values of options are *not considered stable*. They +/// are subject to change at any time. +#[derive(Clone, Debug)] +pub struct Builder { + exec: E, +} + +// ===== impl Connection ===== + +// ===== impl Builder ===== + +impl Builder { + /// Create a new connection builder. + pub fn new(exec: E) -> Self { + Self { + exec, + } + } + + /// Bind a connection together with a [`Service`](crate::service::Service). + /// + /// This returns a Future that must be polled in order for HTTP to be + /// driven on the connection. + pub fn serve_connection(&self, quic: Q, service: S) -> Connection + where + S: HttpService, + S::Error: Into>, + Bd: Body + 'static, + Bd::Error: Into>, + Q: quic::Connection, + //E: Http2ServerConnExec, + E: Clone, + { + Connection { + _i: (quic, service, self.exec.clone()), + } + } +} diff --git a/src/server/conn/mod.rs b/src/server/conn/mod.rs index 54b309e88e..63f3089a9f 100644 --- a/src/server/conn/mod.rs +++ b/src/server/conn/mod.rs @@ -18,3 +18,5 @@ pub mod http1; #[cfg(feature = "http2")] pub mod http2; +//#[cfg(feature = "http3")] +pub mod http3; From bc12303134bbfc9a77aab6f94dbd07e4070c7733 Mon Sep 17 00:00:00 2001 From: Rafael de Conde Reis <82062045+Rafael-Conde@users.noreply.github.com> Date: Thu, 27 Nov 2025 12:25:51 -0300 Subject: [PATCH 3/3] QUIC glue impl + start server impl Signed-off-by: Rafael de Conde Reis <82062045+Rafael-Conde@users.noreply.github.com> --- Cargo.toml | 3 +- src/error.rs | 32 +++- src/proto/h3/glue.rs | 344 ++++++++++++++++++++++++++++++++++----- src/proto/h3/mod.rs | 39 ++++- src/rt/quic.rs | 52 +++++- src/server/conn/http3.rs | 51 +++++- 6 files changed, 460 insertions(+), 61 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 8ab0510b56..52658dae07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,13 +70,14 @@ tokio-util = "0.7.10" [features] # Nothing by default -default = ["http3", "server"] +default = ["http3", "http2", "server"] # Easily turn it all on full = [ "client", "http1", "http2", + "http3", "server", ] diff --git a/src/error.rs b/src/error.rs index 8b41f9c93d..8d2ae8581e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -83,6 +83,10 @@ pub(super) enum Kind { /// A general error from h2. #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] Http2, + + /// A general error from h3. + #[cfg(all(any(feature = "client", feature = "server"), feature = "http3"))] + Http3, } #[derive(Debug)] @@ -452,6 +456,16 @@ impl Error { } } + // Apparently the ::h3::Error isn't avaible in the version I'm working with + // #[cfg(all(any(feature = "client", feature = "server"), feature = "http3"))] + // pub(super) fn new_h3(cause: ::h3::Error) -> Error { + // if cause.is_io() { + // Error::new_io(cause.into_io().expect("h3::Error::is_io")) + // } else { + // Error::new(Kind::Http3).with(cause) + // } + // } + fn description(&self) -> &str { match self.inner.kind { Kind::Parse(Parse::Method) => "invalid HTTP method parsed", @@ -476,7 +490,7 @@ impl Error { Kind::Parse(Parse::Header(Header::TransferEncodingUnexpected)) => { "unexpected transfer-encoding parsed" } - #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(any(feature = "http1", feature = "http2", feature = "http3"))] Kind::Parse(Parse::TooLarge) => "message head is too large", Kind::Parse(Parse::Status) => "invalid HTTP status-code parsed", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] @@ -487,6 +501,7 @@ impl Error { Kind::IncompleteMessage => "connection closed before message completed", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] Kind::UnexpectedMessage => "received unexpected message from connection", + // TODO: check if this cfg needs to be changed #[cfg(any( all(feature = "http1", any(feature = "client", feature = "server")), all(feature = "http2", feature = "client") @@ -497,12 +512,12 @@ impl Error { Kind::HeaderTimeout => "read header from client timeout", #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::Body => "error reading a body from connection", #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::BodyWrite => "error writing a body to connection", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] @@ -511,13 +526,12 @@ impl Error { Kind::Http2 => "http2 error", #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::Io => "connection error", - #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::User(User::Body) => "error from user's Body stream", #[cfg(any( @@ -531,10 +545,10 @@ impl Error { } #[cfg(any( all(any(feature = "client", feature = "server"), feature = "http1"), - all(feature = "server", feature = "http2") + all(feature = "server", feature = "http2", feature = "http3") ))] Kind::User(User::Service) => "error from user's Service", - #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(any(feature = "http1", feature = "http2", feature = "http3"))] #[cfg(feature = "server")] Kind::User(User::UnexpectedHeader) => "user sent unexpected header", #[cfg(feature = "http1")] @@ -549,6 +563,8 @@ impl Error { Kind::User(User::DispatchGone) => "dispatch task is gone", #[cfg(feature = "ffi")] Kind::User(User::AbortedByCallback) => "operation aborted by an application callback", + #[cfg(all(any(feature = "client", feature = "server"), feature = "http3"))] + Kind::Http3 => "http3 error", } } } diff --git a/src/proto/h3/glue.rs b/src/proto/h3/glue.rs index a2335d5aed..4c35fa5f39 100644 --- a/src/proto/h3/glue.rs +++ b/src/proto/h3/glue.rs @@ -1,60 +1,175 @@ -use std::task::{Context, Poll}; +use std::{ + fmt::Display, + task::{Context, Poll}, +}; use bytes::Buf; +use futures_core::ready; pub(super) struct Conn(Q); +impl Clone for Conn +where + Q: Clone, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + pub(super) struct BidiStream(S); pub(super) struct SendStream(S); pub(super) struct RecvStream(S); impl h3::quic::Connection for Conn where - Q: crate::rt::quic::Connection, + Q: crate::rt::quic::Connection + Unpin + Clone, B: Buf, { type RecvStream = RecvStream; type OpenStreams = Self; - fn poll_accept_recv(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_accept_recv( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let recv_stream = match ready!(pinned_self.poll_accept_recv_stream(_cx)) { + Ok(Some(recv_stream)) => recv_stream, + Ok(None) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError:: { + description: String::from("Error accepting receive stream"), + source: None, + }), + ))); + } + Err(err) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }), + ))); + } + }; + + Poll::Ready(Ok(RecvStream(recv_stream))) } - fn poll_accept_bidi(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_accept_bidi( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let bidi_stream = match ready!(pinned_self.poll_accept_bidirectional_stream(_cx)) { + Ok(Some(bidi_stream)) => bidi_stream, + Ok(None) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError:: { + description: String::from("Error accepting receive stream"), + source: None, + }), + ))); + } + Err(err) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }), + ))); + } + }; + + Poll::Ready(Ok(BidiStream(bidi_stream))) } fn opener(&self) -> Self::OpenStreams { - todo!(); + self.clone() } } impl h3::quic::OpenStreams for Conn where - Q: crate::rt::quic::Connection, + Q: crate::rt::quic::Connection + Unpin, B: Buf, { type BidiStream = BidiStream; type SendStream = SendStream; - fn poll_open_send(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_open_send( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let send_stream = match ready!(pinned_self.poll_open_send_stream(_cx)) { + Ok(send_stream) => send_stream, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(SendStream(send_stream))) } - fn poll_open_bidi(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_open_bidi( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let bidi_stream = match ready!(pinned_self.poll_open_bidirectional_stream(_cx)) { + Ok(bidi_stream) => bidi_stream, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(BidiStream(bidi_stream))) + } + + fn close(&mut self, code: h3::error::Code, reason: &[u8]) { + self.0.close(code.value(), reason); } +} - fn close(&mut self, _: h3::error::Code, _: &[u8]) { +impl BidiStream { + fn send_data( + &mut self, + data: h3::quic::WriteBuf, + ) -> Result<(), h3::quic::StreamErrorIncoming> + where + S: crate::rt::quic::SendStream, + B: Buf, + Buff: Buf, + { + use crate::rt::quic::SendStream as SendStreamTrait; + match >::send_data(&mut self.0, data) { + Ok(()) => Ok(()), + Err(err) => { + return Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Sending data to the bidirectional stream"), + source: Some(err), + }, + ))); + } + } } } @@ -68,25 +183,57 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let ready = match ready!(self.0.poll_ready(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn send_data>>( &mut self, data: T, ) -> Result<(), h3::quic::StreamErrorIncoming> { - todo!(); + self.send_data(data.into()) } fn poll_finish( &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let done = match ready!(self.0.poll_finish(cx)) { + Ok(done) => done, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Getting finish state"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(done)) } fn reset(&mut self, reset_code: u64) { - todo!(); + self.0.reset(reset_code); } fn send_id(&self) -> h3::quic::StreamId { - todo!() + match crate::rt::quic::SendStreamID::u62_id(&self.0.send_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } } } @@ -100,25 +247,69 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let ready = match ready!(self.0.poll_ready(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn send_data>>( &mut self, data: T, ) -> Result<(), h3::quic::StreamErrorIncoming> { - todo!(); + use crate::rt::quic::SendStream as SendStreamTrait; + + match >::send_data(&mut self.0, data.into()) { + Ok(()) => Ok(()), + Err(err) => { + return Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Sending data to the bidirectional stream"), + source: Some(err), + }, + ))); + } + } } fn poll_finish( &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let done = match ready!(self.0.poll_finish(cx)) { + Ok(done) => done, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Getting finish state"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(done)) } fn reset(&mut self, reset_code: u64) { - todo!(); + self.0.reset(reset_code); } fn send_id(&self) -> h3::quic::StreamId { - todo!() + match crate::rt::quic::SendStreamID::u62_id(&self.0.send_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } } } @@ -133,13 +324,33 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, h3::quic::StreamErrorIncoming>> { - todo!(); + let ready = match ready!(self.0.poll_data(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn stop_sending(&mut self, error_code: u64) { - todo!(); + self.0.stop_sending(error_code); } fn recv_id(&self) -> h3::quic::StreamId { - todo!(); + match crate::rt::quic::SendStreamID::u62_id(&self.0.recv_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } } } @@ -154,12 +365,71 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, h3::quic::StreamErrorIncoming>> { - todo!(); + let ready = match ready!(self.0.poll_data(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn stop_sending(&mut self, error_code: u64) { - todo!(); + self.0.stop_sending(error_code); } fn recv_id(&self) -> h3::quic::StreamId { - todo!(); + match crate::rt::quic::SendStreamID::u62_id(&self.0.recv_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } + } +} + +#[derive(Debug)] +pub(super) struct GlueError { + description: String, + source: Option, +} + +impl std::error::Error for GlueError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + if let Some(ref value) = self.source { + return Some(value as &(dyn std::error::Error + 'static)); + } + + None + } + + fn description(&self) -> &str { + self.description.as_str() + } + + fn cause(&self) -> Option<&dyn std::error::Error> { + self.source() + } +} + +impl Display for GlueError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ConnectionError{{ description: {}, source: {} }}", + self.description, + if let Some(ref src) = self.source { + format!("{src}") + } else { + "None".to_string() + } + ) } } diff --git a/src/proto/h3/mod.rs b/src/proto/h3/mod.rs index 2138ed50b2..72361f3c1b 100644 --- a/src/proto/h3/mod.rs +++ b/src/proto/h3/mod.rs @@ -1,8 +1,10 @@ use std::future::Future; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use bytes::Buf; +use futures_core::ready; +use futures_util::FutureExt; use h3::server::Connection; use pin_project_lite::pin_project; @@ -13,6 +15,8 @@ pin_project! { pub(crate) struct Server where Q: crate::rt::quic::Connection, + Q: Unpin, + Q: Clone, B: Buf, { exec: E, @@ -21,14 +25,43 @@ pin_project! { } } +impl Server +where + Q: crate::rt::quic::Connection + Unpin + Clone, + B: Buf, +{ + pub fn new(quic: Q, service: S) -> Self { + todo!() + } +} + impl Future for Server where - Q: crate::rt::quic::Connection, + Q: crate::rt::quic::Connection + Unpin + Clone, B: Buf, { type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - todo!() + use crate::error::Kind; + loop { + let accept_fut = self.q.accept(); + let Some(resolver) = ready!(pin!(accept_fut).poll(cx)) + .map_err(|err| crate::Error::new(Kind::Http3).with(err))? + else { + // I believe this means completed + return Poll::Ready(Ok(())); + }; + + match ready!(pin!(resolver.resolve_request()).poll(cx)) { + Ok((request, request_stream)) => { + // process request + // request. + } + Err(err) => { + // process request error + } + }; + } } } diff --git a/src/rt/quic.rs b/src/rt/quic.rs index 18271d34a8..34489ce06b 100644 --- a/src/rt/quic.rs +++ b/src/rt/quic.rs @@ -1,5 +1,6 @@ //! Generic QUIC support +use std::error::Error; use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,6 +8,9 @@ use bytes::Buf; // TODO: Should this be gated by an `http3` feature? +#[derive(Debug, Clone, Copy)] +pub struct ErrorCode(u64); + /// A QUIC connection. pub trait Connection { /// Send streams that can be opened by this connection. @@ -16,7 +20,7 @@ pub trait Connection { /// Bidirectional streams that can be opened or accepted by this connection. type BidiStream: SendStream + RecvStream; /// Errors that may occur opening or accepting streams. - type Error; + type Error: Error + Send + Sync + 'static; // Accepting streams @@ -47,22 +51,59 @@ pub trait Connection { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>; + + /// Close the stream + // This is an async function because on closing, the QUIC protocol could send `reason` + // to the other side of the connection(AFAIK, this should also immediatly close the connection + // locally, not receiving the closing ACK from the other side, which I believe to be OK on the + // protocol side of things). + fn close( + &mut self, + // cx: &mut Context<'_>, + code: u64, + reason: &[u8], + ) -> Result<(), Self::Error>; +} + +// accepting name suggestions here +pub enum InitiatorSide { + Client = 0, + Server = 1, +} + +// accepting name suggestions here +pub enum StreamDirection { + Unidirectional = 1, + Bidirectional = 0, +} + +pub trait SendStreamID { + fn index(&self) -> u64; + fn initiator_side(&self) -> InitiatorSide; + fn direction(&self) -> StreamDirection; + /// the u62 number that identifies the stream + fn u62_id(&self) -> u64; } /// The send portion of a QUIC stream. pub trait SendStream { /// Errors that may happen trying to send data. - type Error; // bounds? + type Error: std::error::Error + Send + Sync + 'static; + type SendStreamID: SendStreamID; /// Polls that the stream is ready to send more data. // Q: Should this be Pin<&mut Self>? fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; /// Send data on the stream. - fn send_data(&mut self, data: B) -> Result<(), Self::Error>; + // Added another generic parameter because this was restricting the type of `Buf` to `B` in the + // `SendStream`, which I believe shouldn't be a restriction + fn send_data(&mut self, data: Buff) -> Result<(), Self::Error>; // fn poll_flush? /// finish? fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll>; /// Close the stream with an error code. fn reset(&mut self, reset_code: u64); + /// Get QUIC send stream id + fn send_id(&self) -> Self::SendStreamID; } /// The receive portion of a QUIC stream. @@ -70,13 +111,16 @@ pub trait RecvStream { /// Buffers of data that can be received. type Buf: Buf; /// Errors that may be received. - type Error; // bounds? + type Error: std::error::Error + Send + Sync + 'static; // bounds? + type SendStreamID: SendStreamID; // Q: should this be Pin? /// Poll for more data received from the remote on this stream. fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>>; /// Signal to the remote peer to stop sending data. fn stop_sending(&mut self, error_code: u64); + /// Get QUIC send stream id + fn recv_id(&self) -> Self::SendStreamID; } /// An optional trait if a QUIC stream can be split into two sides. diff --git a/src/server/conn/http3.rs b/src/server/conn/http3.rs index 20b149aea3..4e3127b5bb 100644 --- a/src/server/conn/http3.rs +++ b/src/server/conn/http3.rs @@ -1,18 +1,54 @@ //! HTTP/3 Server Connections use std::error::Error as StdError; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use futures_core::ready; use pin_project_lite::pin_project; use crate::body::{Body, Incoming as IncomingBody}; +use crate::proto; use crate::rt::quic; use crate::service::HttpService; pin_project! { /// A Future representing an HTTP/3 connection. #[must_use = "futures do nothing unless polled"] - pub struct Connection { - _i: (Q, S, E), + pub struct Connection + where + Q: crate::rt::quic::Connection, + Q: Clone, + Q: Unpin, + B: bytes::Buf, + { + // _i: (Q, S, E), + conn: proto::h3::Server + } +} + +impl Future for Connection +where + S: HttpService, + S::Error: Into>, + Q: crate::rt::quic::Connection + Unpin + Clone, + B: bytes::Buf, + Bd: Body + 'static, + Bd::Error: Into>, + // E: Http2ServerConnExec, +{ + type Output = crate::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match ready!(Pin::new(&mut self.conn).poll(cx)) { + Ok(_done) => { + //TODO: the proto::h2::Server no longer needs to return + //the Dispatched enum + Poll::Ready(Ok(())) + } + Err(e) => Poll::Ready(Err(e)), + } } } @@ -32,27 +68,26 @@ pub struct Builder { impl Builder { /// Create a new connection builder. pub fn new(exec: E) -> Self { - Self { - exec, - } + Self { exec } } /// Bind a connection together with a [`Service`](crate::service::Service). /// /// This returns a Future that must be polled in order for HTTP to be /// driven on the connection. - pub fn serve_connection(&self, quic: Q, service: S) -> Connection + pub fn serve_connection(&self, quic: Q, service: S) -> Connection where S: HttpService, S::Error: Into>, + B: bytes::Buf, Bd: Body + 'static, Bd::Error: Into>, - Q: quic::Connection, + Q: quic::Connection + Unpin + Clone, //E: Http2ServerConnExec, E: Clone, { Connection { - _i: (quic, service, self.exec.clone()), + conn: todo!(), // _i: (quic, service, self.exec.clone()), } } }