Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reduce the default value of initial_max_send_streams to 100 #732

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,19 @@ pub struct Builder {
reset_stream_duration: Duration,

/// Initial maximum number of locally initiated (send) streams.
///
/// The default value is 100; this value is derived from what the HTTP/2
/// spec recommends as the minimum value that endpoints advertise to their
/// peers, meaning that using this value will minimize the likelihood of the
/// failure where the local endpoint attempts to open too many streams
/// and gets rejected by the remote peer with the `REFUSED_STREAM` error.
/// <https://www.rfc-editor.org/rfc/rfc9113.html#section-6.5.2-2.6.1>
///
/// After receiving a Settings frame from the remote peer,
/// the connection will overwrite this value with the
/// MAX_CONCURRENT_STREAMS specified in the frame.
/// If no value is advertised by the remote peer, it will be
/// set to usize::MAX.
initial_max_send_streams: usize,

/// Initial target window size for new connections.
Expand Down Expand Up @@ -642,7 +652,7 @@ impl Builder {
reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
initial_target_connection_window_size: None,
initial_max_send_streams: usize::MAX,
initial_max_send_streams: 100,
settings: Default::default(),
stream_id: 1.into(),
}
Expand Down Expand Up @@ -843,10 +853,21 @@ impl Builder {

/// Sets the initial maximum of locally initiated (send) streams.
///
/// The default value is 100; this value is derived from what the HTTP/2
/// spec recommends as the minimum value that endpoints advertise to their
/// peers (see [Section 6.5.2]), meaning that using this value will minimize
/// the likelihood of the failure where the local endpoint attempts to open
/// too many streams and thus gets rejected by the remote peer with the
/// REFUSED_STREAM error.
///
/// The initial settings will be overwritten by the remote peer when
/// the Settings frame is received. The new value will be set to the
/// `max_concurrent_streams()` from the frame.
///
/// If no value is advertised in the initial SETTINGS sent from the remote
/// peer as part of [HTTP/2 Connection Preface], it will be overwritten to
/// `usize::MAX`.
///
/// This setting prevents the caller from exceeding this number of
/// streams that are counted towards the concurrency limit.
///
Expand All @@ -855,7 +876,9 @@ impl Builder {
///
/// See [Section 5.1.2] in the HTTP/2 spec for more details.
///
/// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
/// [Section 6.5.2]: https://www.rfc-editor.org/rfc/rfc9113.html#section-6.5.2-2.6.1
/// [HTTP/2 Connection Preface]: https://www.rfc-editor.org/rfc/rfc9113.html#name-http-2-connection-preface
/// [Section 5.1.2]: https://www.rfc-editor.org/rfc/rfc9113.html#name-stream-concurrency
///
/// # Examples
///
Expand Down
18 changes: 16 additions & 2 deletions src/proto/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub(crate) struct Settings {
/// the socket first then the settings applied **before** receiving any
/// further frames.
remote: Option<frame::Settings>,
/// Whether the connection has received the initial SETTINGS frame from the
/// remote peer.
has_received_remote_initial_settings: bool,
}

#[derive(Debug)]
Expand All @@ -32,6 +35,7 @@ impl Settings {
// the handshake process.
local: Local::WaitingAck(local),
remote: None,
has_received_remote_initial_settings: false,
}
}

Expand Down Expand Up @@ -96,6 +100,15 @@ impl Settings {
}
}

/// Sets `true` to `self.has_received_remote_initial_settings`.
/// Returns `true` if this method is called for the first time.
/// (i.e. it is the initial SETTINGS frame from the remote peer)
fn mark_remote_initial_settings_as_received(&mut self) -> bool {
let has_received = self.has_received_remote_initial_settings;
self.has_received_remote_initial_settings = true;
!has_received
}

pub(crate) fn poll_send<T, B, C, P>(
&mut self,
cx: &mut Context,
Expand All @@ -108,7 +121,7 @@ impl Settings {
C: Buf,
P: Peer,
{
if let Some(settings) = &self.remote {
if let Some(settings) = self.remote.clone() {
if !dst.poll_ready(cx)?.is_ready() {
return Poll::Pending;
}
Expand All @@ -121,7 +134,8 @@ impl Settings {

tracing::trace!("ACK sent; applying settings");

streams.apply_remote_settings(settings)?;
let is_initial = self.mark_remote_initial_settings_as_received();
streams.apply_remote_settings(&settings, is_initial)?;

if let Some(val) = settings.header_table_size() {
dst.set_send_header_table_size(val as usize);
Expand Down
8 changes: 5 additions & 3 deletions src/proto/streams/counts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,11 @@ impl Counts {
self.num_remote_reset_streams -= 1;
}

pub fn apply_remote_settings(&mut self, settings: &frame::Settings) {
if let Some(val) = settings.max_concurrent_streams() {
self.max_send_streams = val as usize;
pub fn apply_remote_settings(&mut self, settings: &frame::Settings, is_initial: bool) {
match settings.max_concurrent_streams() {
Some(val) => self.max_send_streams = val as usize,
None if is_initial => self.max_send_streams = usize::MAX,
None => {}
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/proto/streams/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,18 @@ where
me.poll_complete(&self.send_buffer, cx, dst)
}

pub fn apply_remote_settings(&mut self, frame: &frame::Settings) -> Result<(), Error> {
pub fn apply_remote_settings(
&mut self,
frame: &frame::Settings,
is_initial: bool,
) -> Result<(), Error> {
let mut me = self.inner.lock().unwrap();
let me = &mut *me;

let mut send_buffer = self.send_buffer.inner.lock().unwrap();
let send_buffer = &mut *send_buffer;

me.counts.apply_remote_settings(frame);
me.counts.apply_remote_settings(frame, is_initial);

me.actions.send.apply_remote_settings(
frame,
Expand Down
152 changes: 152 additions & 0 deletions tests/h2-tests/tests/client_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1661,6 +1661,158 @@ async fn client_builder_header_table_size() {
join(srv, h2).await;
}

#[tokio::test]
async fn default_max_concurrent_send_streams_and_update_it_based_on_empty_settings_frame() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send empty SETTINGS frame (i.e. no MAX_CONCURRENT_STREAMS is set)
srv.send_frame(frames::settings()).await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
// The default value is 100
assert_eq!(h2.max_concurrent_send_streams(), 100);
h2.as_mut().await.unwrap();
// If the server's initial SETTINGS frame does not include MAX_CONCURRENT_STREAMS,
// this should be updated to usize::MAX.
assert_eq!(h2.max_concurrent_send_streams(), usize::MAX);
};

join(srv, h2).await;
}

#[tokio::test]
async fn configured_max_concurrent_send_streams_and_update_it_based_on_non_empty_settings_frame() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;
};

let h2 = async move {
let (_client, h2) = client::Builder::new()
// Configure the initial value to 2024
.initial_max_send_streams(2024)
.handshake::<_, bytes::Bytes>(io)
.await
.unwrap();
let mut h2 = std::pin::pin!(h2);
// It should be pre-configured value before it receives the initial
// SETTINGS frame from the server
assert_eq!(h2.max_concurrent_send_streams(), 2024);
h2.as_mut().await.unwrap();
// Now the client has received the initial SETTINGS frame from the
// server, which should update the value accordingly
assert_eq!(h2.max_concurrent_send_streams(), 42);
};

join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings()).await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), 100);
h2.as_mut().await.unwrap();
assert_eq!(h2.max_concurrent_send_streams(), 42);
};

join(srv, h2).await;
}

#[tokio::test]
async fn receive_settings_frame_twice_with_second_one_non_empty() {
h2_support::trace_init!();
let (io, mut srv) = mock::new();

let srv = async move {
// Send the initial SETTINGS frame with MAX_CONCURRENT_STREAMS set to 42
srv.send_frame(frames::settings().max_concurrent_streams(42))
.await;

// Handle the client's connection preface
srv.read_preface().await.unwrap();
match srv.next().await {
Some(frame) => match frame.unwrap() {
h2::frame::Frame::Settings(_) => {
let ack = frame::Settings::ack();
srv.send(ack.into()).await.unwrap();
}
frame => {
panic!("unexpected frame: {:?}", frame);
}
},
None => {
panic!("unexpected EOF");
}
}

// Should receive the ack for the server's initial SETTINGS frame
let frame = assert_settings!(srv.next().await.unwrap().unwrap());
assert!(frame.is_ack());

// Send another SETTINGS frame with no MAX_CONCURRENT_STREAMS
// This should not update the max_concurrent_send_streams value that
// the client manages.
srv.send_frame(frames::settings().max_concurrent_streams(2024))
.await;
};

let h2 = async move {
let (_client, h2) = client::handshake(io).await.unwrap();
let mut h2 = std::pin::pin!(h2);
assert_eq!(h2.max_concurrent_send_streams(), 100);
h2.as_mut().await.unwrap();
assert_eq!(h2.max_concurrent_send_streams(), 2024);
};

join(srv, h2).await;
}

const SETTINGS: &[u8] = &[0, 0, 0, 4, 0, 0, 0, 0, 0];
const SETTINGS_ACK: &[u8] = &[0, 0, 0, 4, 1, 0, 0, 0, 0];

Expand Down