From e5ef519bae97c8e7563a03767d9aa17ec1f42fab Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Mon, 8 Jan 2024 04:33:24 +0900 Subject: [PATCH 1/3] test: add test cases for transition of max_concurrent_send_streams in client --- tests/h2-tests/tests/client_request.rs | 152 +++++++++++++++++++++++++ 1 file changed, 152 insertions(+) diff --git a/tests/h2-tests/tests/client_request.rs b/tests/h2-tests/tests/client_request.rs index 88c7df46..ef4e8dd4 100644 --- a/tests/h2-tests/tests/client_request.rs +++ b/tests/h2-tests/tests/client_request.rs @@ -1661,6 +1661,158 @@ async fn client_builder_header_table_size() { join(srv, h2).await; } +#[tokio::test] +async fn default_max_concurrent_send_streams_and_update_it_based_on_empty_settings_frame() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send empty SETTINGS frame (i.e. no MAX_CONCURRENT_STREAMS is set) + srv.send_frame(frames::settings()).await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + // The default value is 100 + assert_eq!(h2.max_concurrent_send_streams(), 100); + h2.as_mut().await.unwrap(); + // If the server's initial SETTINGS frame does not include MAX_CONCURRENT_STREAMS, + // this should be updated to usize::MAX. + assert_eq!(h2.max_concurrent_send_streams(), usize::MAX); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn configured_max_concurrent_send_streams_and_update_it_based_on_non_empty_settings_frame() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::Builder::new() + // Configure the initial value to 2024 + .initial_max_send_streams(2024) + .handshake::<_, bytes::Bytes>(io) + .await + .unwrap(); + let mut h2 = std::pin::pin!(h2); + // It should be pre-configured value before it receives the initial + // SETTINGS frame from the server + assert_eq!(h2.max_concurrent_send_streams(), 2024); + h2.as_mut().await.unwrap(); + // Now the client has received the initial SETTINGS frame from the + // server, which should update the value accordingly + assert_eq!(h2.max_concurrent_send_streams(), 42); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings()).await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), 100); + h2.as_mut().await.unwrap(); + assert_eq!(h2.max_concurrent_send_streams(), 42); + }; + + join(srv, h2).await; +} + +#[tokio::test] +async fn receive_settings_frame_twice_with_second_one_non_empty() { + h2_support::trace_init!(); + let (io, mut srv) = mock::new(); + + let srv = async move { + // Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42 + srv.send_frame(frames::settings().max_concurrent_streams(42)) + .await; + + // Handle the client's connection preface + srv.read_preface().await.unwrap(); + match srv.next().await { + Some(frame) => match frame.unwrap() { + h2::frame::Frame::Settings(_) => { + let ack = frame::Settings::ack(); + srv.send(ack.into()).await.unwrap(); + } + frame => { + panic!("unexpected frame: {:?}", frame); + } + }, + None => { + panic!("unexpected EOF"); + } + } + + // Should receive the ack for the server's initial SETTINGS frame + let frame = assert_settings!(srv.next().await.unwrap().unwrap()); + assert!(frame.is_ack()); + + // Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS + // This should not update the max_concurrent_send_streams value that + // the client manages. + srv.send_frame(frames::settings().max_concurrent_streams(2024)) + .await; + }; + + let h2 = async move { + let (_client, h2) = client::handshake(io).await.unwrap(); + let mut h2 = std::pin::pin!(h2); + assert_eq!(h2.max_concurrent_send_streams(), 100); + h2.as_mut().await.unwrap(); + assert_eq!(h2.max_concurrent_send_streams(), 2024); + }; + + join(srv, h2).await; +} + const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0]; const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0]; From c2ec817a931127066fcad6425339651e65f85c8e Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Mon, 8 Jan 2024 04:44:29 +0900 Subject: [PATCH 2/3] fix: change default initial_max_send_streams to 100 --- src/client.rs | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/client.rs b/src/client.rs index 35cfc141..ae0404f9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -312,9 +312,19 @@ pub struct Builder { reset_stream_duration: Duration, /// Initial maximum number of locally initiated (send) streams. + /// + /// The default value is 100; this value is derived from what the HTTP/2 + /// spec recommends as the minimum value that endpoints advertise to their + /// peers, meaning that using this value will minimize the likelihood of the + /// failure where the local endpoint attempts to open too many streams + /// and gets rejected by the remote peer with the `REFUSED_STREAM` error. + /// + /// /// After receiving a Settings frame from the remote peer, /// the connection will overwrite this value with the /// MAX_CONCURRENT_STREAMS specified in the frame. + /// If no value is advertised by the remote peer, it will be + /// set to usize::MAX. initial_max_send_streams: usize, /// Initial target window size for new connections. @@ -642,7 +652,7 @@ impl Builder { reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX, pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX, initial_target_connection_window_size: None, - initial_max_send_streams: usize::MAX, + initial_max_send_streams: 100, settings: Default::default(), stream_id: 1.into(), } @@ -843,10 +853,21 @@ impl Builder { /// Sets the initial maximum of locally initiated (send) streams. /// + /// The default value is 100; this value is derived from what the HTTP/2 + /// spec recommends as the minimum value that endpoints advertise to their + /// peers (see [Section 6.5.2]), meaning that using this value will minimize + /// the likelihood of the failure where the local endpoint attempts to open + /// too many streams and thus gets rejected by the remote peer with the + /// REFUSED_STREAM error. + /// /// The initial settings will be overwritten by the remote peer when /// the Settings frame is received. The new value will be set to the /// `max_concurrent_streams()` from the frame. /// + /// If no value is advertised in the initial SETTINGS sent from the remote + /// peer as part of [HTTP/2 Connection Preface], it will be overwritten to + /// `usize::MAX`. + /// /// This setting prevents the caller from exceeding this number of /// streams that are counted towards the concurrency limit. /// @@ -855,7 +876,9 @@ impl Builder { /// /// See [Section 5.1.2] in the HTTP/2 spec for more details. /// - /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2 + /// [Section 6.5.2]: https://www.rfc-editor.org/rfc/rfc9113.html#section-6.5.2-2.6.1 + /// [HTTP/2 Connection Preface]: https://www.rfc-editor.org/rfc/rfc9113.html#name-http-2-connection-preface + /// [Section 5.1.2]: https://www.rfc-editor.org/rfc/rfc9113.html#name-stream-concurrency /// /// # Examples /// From a228c826f9d973abc559a49ba456084331311e64 Mon Sep 17 00:00:00 2001 From: Yusuke Tanaka Date: Mon, 8 Jan 2024 04:46:58 +0900 Subject: [PATCH 3/3] fix: set max_send_streams to usize::MAX if the initial SETTINGS frame does not include MAX_CONCURRENT_STREAMS --- src/proto/settings.rs | 18 ++++++++++++++++-- src/proto/streams/counts.rs | 8 +++++--- src/proto/streams/streams.rs | 8 ++++++-- 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 28065cc6..72ba11fa 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -12,6 +12,9 @@ pub(crate) struct Settings { /// the socket first then the settings applied **before** receiving any /// further frames. remote: Option, + /// Whether the connection has received the initial SETTINGS frame from the + /// remote peer. + has_received_remote_initial_settings: bool, } #[derive(Debug)] @@ -32,6 +35,7 @@ impl Settings { // the handshake process. local: Local::WaitingAck(local), remote: None, + has_received_remote_initial_settings: false, } } @@ -96,6 +100,15 @@ impl Settings { } } + /// Sets `true` to `self.has_received_remote_initial_settings`. + /// Returns `true` if this method is called for the first time. + /// (i.e. it is the initial SETTINGS frame from the remote peer) + fn mark_remote_initial_settings_as_received(&mut self) -> bool { + let has_received = self.has_received_remote_initial_settings; + self.has_received_remote_initial_settings = true; + !has_received + } + pub(crate) fn poll_send( &mut self, cx: &mut Context, @@ -108,7 +121,7 @@ impl Settings { C: Buf, P: Peer, { - if let Some(settings) = &self.remote { + if let Some(settings) = self.remote.clone() { if !dst.poll_ready(cx)?.is_ready() { return Poll::Pending; } @@ -121,7 +134,8 @@ impl Settings { tracing::trace!("ACK sent; applying settings"); - streams.apply_remote_settings(settings)?; + let is_initial = self.mark_remote_initial_settings_as_received(); + streams.apply_remote_settings(&settings, is_initial)?; if let Some(val) = settings.header_table_size() { dst.set_send_header_table_size(val as usize); diff --git a/src/proto/streams/counts.rs b/src/proto/streams/counts.rs index add1312e..f8810333 100644 --- a/src/proto/streams/counts.rs +++ b/src/proto/streams/counts.rs @@ -147,9 +147,11 @@ impl Counts { self.num_remote_reset_streams -= 1; } - pub fn apply_remote_settings(&mut self, settings: &frame::Settings) { - if let Some(val) = settings.max_concurrent_streams() { - self.max_send_streams = val as usize; + pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) { + match settings.max_concurrent_streams() { + Some(val) => self.max_send_streams = val as usize, + None if is_initial => self.max_send_streams = usize::MAX, + None => {} } } diff --git a/src/proto/streams/streams.rs b/src/proto/streams/streams.rs index 274bf455..b6194ca0 100644 --- a/src/proto/streams/streams.rs +++ b/src/proto/streams/streams.rs @@ -186,14 +186,18 @@ where me.poll_complete(&self.send_buffer, cx, dst) } - pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> { + pub fn apply_remote_settings( + &mut self, + frame: &frame::Settings, + is_initial: bool, + ) -> Result<(), Error> { let mut me = self.inner.lock().unwrap(); let me = &mut *me; let mut send_buffer = self.send_buffer.inner.lock().unwrap(); let send_buffer = &mut *send_buffer; - me.counts.apply_remote_settings(frame); + me.counts.apply_remote_settings(frame, is_initial); me.actions.send.apply_remote_settings( frame,