diff --git a/Cargo.toml b/Cargo.toml index 8ab0510b56..52658dae07 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,13 +70,14 @@ tokio-util = "0.7.10" [features] # Nothing by default -default = ["http3", "server"] +default = ["http3", "http2", "server"] # Easily turn it all on full = [ "client", "http1", "http2", + "http3", "server", ] diff --git a/src/error.rs b/src/error.rs index 8b41f9c93d..8d2ae8581e 100644 --- a/src/error.rs +++ b/src/error.rs @@ -83,6 +83,10 @@ pub(super) enum Kind { /// A general error from h2. #[cfg(all(any(feature = "client", feature = "server"), feature = "http2"))] Http2, + + /// A general error from h3. + #[cfg(all(any(feature = "client", feature = "server"), feature = "http3"))] + Http3, } #[derive(Debug)] @@ -452,6 +456,16 @@ impl Error { } } + // Apparently the ::h3::Error isn't avaible in the version I'm working with + // #[cfg(all(any(feature = "client", feature = "server"), feature = "http3"))] + // pub(super) fn new_h3(cause: ::h3::Error) -> Error { + // if cause.is_io() { + // Error::new_io(cause.into_io().expect("h3::Error::is_io")) + // } else { + // Error::new(Kind::Http3).with(cause) + // } + // } + fn description(&self) -> &str { match self.inner.kind { Kind::Parse(Parse::Method) => "invalid HTTP method parsed", @@ -476,7 +490,7 @@ impl Error { Kind::Parse(Parse::Header(Header::TransferEncodingUnexpected)) => { "unexpected transfer-encoding parsed" } - #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(any(feature = "http1", feature = "http2", feature = "http3"))] Kind::Parse(Parse::TooLarge) => "message head is too large", Kind::Parse(Parse::Status) => "invalid HTTP status-code parsed", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] @@ -487,6 +501,7 @@ impl Error { Kind::IncompleteMessage => "connection closed before message completed", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] Kind::UnexpectedMessage => "received unexpected message from connection", + // TODO: check if this cfg needs to be changed #[cfg(any( all(feature = "http1", any(feature = "client", feature = "server")), all(feature = "http2", feature = "client") @@ -497,12 +512,12 @@ impl Error { Kind::HeaderTimeout => "read header from client timeout", #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::Body => "error reading a body from connection", #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::BodyWrite => "error writing a body to connection", #[cfg(all(any(feature = "client", feature = "server"), feature = "http1"))] @@ -511,13 +526,12 @@ impl Error { Kind::Http2 => "http2 error", #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::Io => "connection error", - #[cfg(all( any(feature = "client", feature = "server"), - any(feature = "http1", feature = "http2") + any(feature = "http1", feature = "http2", feature = "http3") ))] Kind::User(User::Body) => "error from user's Body stream", #[cfg(any( @@ -531,10 +545,10 @@ impl Error { } #[cfg(any( all(any(feature = "client", feature = "server"), feature = "http1"), - all(feature = "server", feature = "http2") + all(feature = "server", feature = "http2", feature = "http3") ))] Kind::User(User::Service) => "error from user's Service", - #[cfg(any(feature = "http1", feature = "http2"))] + #[cfg(any(feature = "http1", feature = "http2", feature = "http3"))] #[cfg(feature = "server")] Kind::User(User::UnexpectedHeader) => "user sent unexpected header", #[cfg(feature = "http1")] @@ -549,6 +563,8 @@ impl Error { Kind::User(User::DispatchGone) => "dispatch task is gone", #[cfg(feature = "ffi")] Kind::User(User::AbortedByCallback) => "operation aborted by an application callback", + #[cfg(all(any(feature = "client", feature = "server"), feature = "http3"))] + Kind::Http3 => "http3 error", } } } diff --git a/src/proto/h3/glue.rs b/src/proto/h3/glue.rs index a2335d5aed..4c35fa5f39 100644 --- a/src/proto/h3/glue.rs +++ b/src/proto/h3/glue.rs @@ -1,60 +1,175 @@ -use std::task::{Context, Poll}; +use std::{ + fmt::Display, + task::{Context, Poll}, +}; use bytes::Buf; +use futures_core::ready; pub(super) struct Conn(Q); +impl Clone for Conn +where + Q: Clone, +{ + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + pub(super) struct BidiStream(S); pub(super) struct SendStream(S); pub(super) struct RecvStream(S); impl h3::quic::Connection for Conn where - Q: crate::rt::quic::Connection, + Q: crate::rt::quic::Connection + Unpin + Clone, B: Buf, { type RecvStream = RecvStream; type OpenStreams = Self; - fn poll_accept_recv(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_accept_recv( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let recv_stream = match ready!(pinned_self.poll_accept_recv_stream(_cx)) { + Ok(Some(recv_stream)) => recv_stream, + Ok(None) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError:: { + description: String::from("Error accepting receive stream"), + source: None, + }), + ))); + } + Err(err) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }), + ))); + } + }; + + Poll::Ready(Ok(RecvStream(recv_stream))) } - fn poll_accept_bidi(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_accept_bidi( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let bidi_stream = match ready!(pinned_self.poll_accept_bidirectional_stream(_cx)) { + Ok(Some(bidi_stream)) => bidi_stream, + Ok(None) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError:: { + description: String::from("Error accepting receive stream"), + source: None, + }), + ))); + } + Err(err) => { + return Poll::Ready(Err(h3::quic::ConnectionErrorIncoming::Undefined( + ::std::sync::Arc::new(GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }), + ))); + } + }; + + Poll::Ready(Ok(BidiStream(bidi_stream))) } fn opener(&self) -> Self::OpenStreams { - todo!(); + self.clone() } } impl h3::quic::OpenStreams for Conn where - Q: crate::rt::quic::Connection, + Q: crate::rt::quic::Connection + Unpin, B: Buf, { type BidiStream = BidiStream; type SendStream = SendStream; - fn poll_open_send(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_open_send( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let send_stream = match ready!(pinned_self.poll_open_send_stream(_cx)) { + Ok(send_stream) => send_stream, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(SendStream(send_stream))) } - fn poll_open_bidi(&mut self, _cx: &mut Context<'_>) - -> Poll> - { - todo!(); + fn poll_open_bidi( + &mut self, + _cx: &mut Context<'_>, + ) -> Poll> { + let pinned_self = ::std::pin::Pin::new(&mut self.0); + + let bidi_stream = match ready!(pinned_self.poll_open_bidirectional_stream(_cx)) { + Ok(bidi_stream) => bidi_stream, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(BidiStream(bidi_stream))) + } + + fn close(&mut self, code: h3::error::Code, reason: &[u8]) { + self.0.close(code.value(), reason); } +} - fn close(&mut self, _: h3::error::Code, _: &[u8]) { +impl BidiStream { + fn send_data( + &mut self, + data: h3::quic::WriteBuf, + ) -> Result<(), h3::quic::StreamErrorIncoming> + where + S: crate::rt::quic::SendStream, + B: Buf, + Buff: Buf, + { + use crate::rt::quic::SendStream as SendStreamTrait; + match >::send_data(&mut self.0, data) { + Ok(()) => Ok(()), + Err(err) => { + return Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Sending data to the bidirectional stream"), + source: Some(err), + }, + ))); + } + } } } @@ -68,25 +183,57 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let ready = match ready!(self.0.poll_ready(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn send_data>>( &mut self, data: T, ) -> Result<(), h3::quic::StreamErrorIncoming> { - todo!(); + self.send_data(data.into()) } fn poll_finish( &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let done = match ready!(self.0.poll_finish(cx)) { + Ok(done) => done, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Getting finish state"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(done)) } fn reset(&mut self, reset_code: u64) { - todo!(); + self.0.reset(reset_code); } fn send_id(&self) -> h3::quic::StreamId { - todo!() + match crate::rt::quic::SendStreamID::u62_id(&self.0.send_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } } } @@ -100,25 +247,69 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let ready = match ready!(self.0.poll_ready(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn send_data>>( &mut self, data: T, ) -> Result<(), h3::quic::StreamErrorIncoming> { - todo!(); + use crate::rt::quic::SendStream as SendStreamTrait; + + match >::send_data(&mut self.0, data.into()) { + Ok(()) => Ok(()), + Err(err) => { + return Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Sending data to the bidirectional stream"), + source: Some(err), + }, + ))); + } + } } fn poll_finish( &mut self, cx: &mut Context<'_>, ) -> Poll> { - todo!(); + let done = match ready!(self.0.poll_finish(cx)) { + Ok(done) => done, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Getting finish state"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(done)) } fn reset(&mut self, reset_code: u64) { - todo!(); + self.0.reset(reset_code); } fn send_id(&self) -> h3::quic::StreamId { - todo!() + match crate::rt::quic::SendStreamID::u62_id(&self.0.send_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } } } @@ -133,13 +324,33 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, h3::quic::StreamErrorIncoming>> { - todo!(); + let ready = match ready!(self.0.poll_data(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn stop_sending(&mut self, error_code: u64) { - todo!(); + self.0.stop_sending(error_code); } fn recv_id(&self) -> h3::quic::StreamId { - todo!(); + match crate::rt::quic::SendStreamID::u62_id(&self.0.recv_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } } } @@ -154,12 +365,71 @@ where &mut self, cx: &mut Context<'_>, ) -> Poll, h3::quic::StreamErrorIncoming>> { - todo!(); + let ready = match ready!(self.0.poll_data(cx)) { + Ok(ready) => ready, + Err(err) => { + return Poll::Ready(Err(h3::quic::StreamErrorIncoming::Unknown(Box::new( + GlueError { + description: String::from("Error accepting receive stream"), + source: Some(err), + }, + )))); + } + }; + + Poll::Ready(Ok(ready)) } fn stop_sending(&mut self, error_code: u64) { - todo!(); + self.0.stop_sending(error_code); } fn recv_id(&self) -> h3::quic::StreamId { - todo!(); + match crate::rt::quic::SendStreamID::u62_id(&self.0.recv_id()).try_into() { + Ok(id) => id, + Err(err) => { + // As there is no room for error in the API, this is the first solution that came + // to mind. Reconstructing the number from the other values, could result in the + // same place so this implementation seems cleaner to me + panic!("Invalid u62 QUIC stream ID: {err}"); + } + } + } +} + +#[derive(Debug)] +pub(super) struct GlueError { + description: String, + source: Option, +} + +impl std::error::Error for GlueError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + if let Some(ref value) = self.source { + return Some(value as &(dyn std::error::Error + 'static)); + } + + None + } + + fn description(&self) -> &str { + self.description.as_str() + } + + fn cause(&self) -> Option<&dyn std::error::Error> { + self.source() + } +} + +impl Display for GlueError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "ConnectionError{{ description: {}, source: {} }}", + self.description, + if let Some(ref src) = self.source { + format!("{src}") + } else { + "None".to_string() + } + ) } } diff --git a/src/proto/h3/mod.rs b/src/proto/h3/mod.rs index 2138ed50b2..72361f3c1b 100644 --- a/src/proto/h3/mod.rs +++ b/src/proto/h3/mod.rs @@ -1,8 +1,10 @@ use std::future::Future; -use std::pin::Pin; +use std::pin::{pin, Pin}; use std::task::{Context, Poll}; use bytes::Buf; +use futures_core::ready; +use futures_util::FutureExt; use h3::server::Connection; use pin_project_lite::pin_project; @@ -13,6 +15,8 @@ pin_project! { pub(crate) struct Server where Q: crate::rt::quic::Connection, + Q: Unpin, + Q: Clone, B: Buf, { exec: E, @@ -21,14 +25,43 @@ pin_project! { } } +impl Server +where + Q: crate::rt::quic::Connection + Unpin + Clone, + B: Buf, +{ + pub fn new(quic: Q, service: S) -> Self { + todo!() + } +} + impl Future for Server where - Q: crate::rt::quic::Connection, + Q: crate::rt::quic::Connection + Unpin + Clone, B: Buf, { type Output = crate::Result<()>; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - todo!() + use crate::error::Kind; + loop { + let accept_fut = self.q.accept(); + let Some(resolver) = ready!(pin!(accept_fut).poll(cx)) + .map_err(|err| crate::Error::new(Kind::Http3).with(err))? + else { + // I believe this means completed + return Poll::Ready(Ok(())); + }; + + match ready!(pin!(resolver.resolve_request()).poll(cx)) { + Ok((request, request_stream)) => { + // process request + // request. + } + Err(err) => { + // process request error + } + }; + } } } diff --git a/src/rt/quic.rs b/src/rt/quic.rs index 18271d34a8..34489ce06b 100644 --- a/src/rt/quic.rs +++ b/src/rt/quic.rs @@ -1,5 +1,6 @@ //! Generic QUIC support +use std::error::Error; use std::pin::Pin; use std::task::{Context, Poll}; @@ -7,6 +8,9 @@ use bytes::Buf; // TODO: Should this be gated by an `http3` feature? +#[derive(Debug, Clone, Copy)] +pub struct ErrorCode(u64); + /// A QUIC connection. pub trait Connection { /// Send streams that can be opened by this connection. @@ -16,7 +20,7 @@ pub trait Connection { /// Bidirectional streams that can be opened or accepted by this connection. type BidiStream: SendStream + RecvStream; /// Errors that may occur opening or accepting streams. - type Error; + type Error: Error + Send + Sync + 'static; // Accepting streams @@ -47,22 +51,59 @@ pub trait Connection { self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll>; + + /// Close the stream + // This is an async function because on closing, the QUIC protocol could send `reason` + // to the other side of the connection(AFAIK, this should also immediatly close the connection + // locally, not receiving the closing ACK from the other side, which I believe to be OK on the + // protocol side of things). + fn close( + &mut self, + // cx: &mut Context<'_>, + code: u64, + reason: &[u8], + ) -> Result<(), Self::Error>; +} + +// accepting name suggestions here +pub enum InitiatorSide { + Client = 0, + Server = 1, +} + +// accepting name suggestions here +pub enum StreamDirection { + Unidirectional = 1, + Bidirectional = 0, +} + +pub trait SendStreamID { + fn index(&self) -> u64; + fn initiator_side(&self) -> InitiatorSide; + fn direction(&self) -> StreamDirection; + /// the u62 number that identifies the stream + fn u62_id(&self) -> u64; } /// The send portion of a QUIC stream. pub trait SendStream { /// Errors that may happen trying to send data. - type Error; // bounds? + type Error: std::error::Error + Send + Sync + 'static; + type SendStreamID: SendStreamID; /// Polls that the stream is ready to send more data. // Q: Should this be Pin<&mut Self>? fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll>; /// Send data on the stream. - fn send_data(&mut self, data: B) -> Result<(), Self::Error>; + // Added another generic parameter because this was restricting the type of `Buf` to `B` in the + // `SendStream`, which I believe shouldn't be a restriction + fn send_data(&mut self, data: Buff) -> Result<(), Self::Error>; // fn poll_flush? /// finish? fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll>; /// Close the stream with an error code. fn reset(&mut self, reset_code: u64); + /// Get QUIC send stream id + fn send_id(&self) -> Self::SendStreamID; } /// The receive portion of a QUIC stream. @@ -70,13 +111,16 @@ pub trait RecvStream { /// Buffers of data that can be received. type Buf: Buf; /// Errors that may be received. - type Error; // bounds? + type Error: std::error::Error + Send + Sync + 'static; // bounds? + type SendStreamID: SendStreamID; // Q: should this be Pin? /// Poll for more data received from the remote on this stream. fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>>; /// Signal to the remote peer to stop sending data. fn stop_sending(&mut self, error_code: u64); + /// Get QUIC send stream id + fn recv_id(&self) -> Self::SendStreamID; } /// An optional trait if a QUIC stream can be split into two sides. diff --git a/src/server/conn/http3.rs b/src/server/conn/http3.rs index 20b149aea3..4e3127b5bb 100644 --- a/src/server/conn/http3.rs +++ b/src/server/conn/http3.rs @@ -1,18 +1,54 @@ //! HTTP/3 Server Connections use std::error::Error as StdError; +use std::future::Future; +use std::pin::Pin; +use std::task::{Context, Poll}; +use futures_core::ready; use pin_project_lite::pin_project; use crate::body::{Body, Incoming as IncomingBody}; +use crate::proto; use crate::rt::quic; use crate::service::HttpService; pin_project! { /// A Future representing an HTTP/3 connection. #[must_use = "futures do nothing unless polled"] - pub struct Connection { - _i: (Q, S, E), + pub struct Connection + where + Q: crate::rt::quic::Connection, + Q: Clone, + Q: Unpin, + B: bytes::Buf, + { + // _i: (Q, S, E), + conn: proto::h3::Server + } +} + +impl Future for Connection +where + S: HttpService, + S::Error: Into>, + Q: crate::rt::quic::Connection + Unpin + Clone, + B: bytes::Buf, + Bd: Body + 'static, + Bd::Error: Into>, + // E: Http2ServerConnExec, +{ + type Output = crate::Result<()>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match ready!(Pin::new(&mut self.conn).poll(cx)) { + Ok(_done) => { + //TODO: the proto::h2::Server no longer needs to return + //the Dispatched enum + Poll::Ready(Ok(())) + } + Err(e) => Poll::Ready(Err(e)), + } } } @@ -32,27 +68,26 @@ pub struct Builder { impl Builder { /// Create a new connection builder. pub fn new(exec: E) -> Self { - Self { - exec, - } + Self { exec } } /// Bind a connection together with a [`Service`](crate::service::Service). /// /// This returns a Future that must be polled in order for HTTP to be /// driven on the connection. - pub fn serve_connection(&self, quic: Q, service: S) -> Connection + pub fn serve_connection(&self, quic: Q, service: S) -> Connection where S: HttpService, S::Error: Into>, + B: bytes::Buf, Bd: Body + 'static, Bd::Error: Into>, - Q: quic::Connection, + Q: quic::Connection + Unpin + Clone, //E: Http2ServerConnExec, E: Clone, { Connection { - _i: (quic, service, self.exec.clone()), + conn: todo!(), // _i: (quic, service, self.exec.clone()), } } }