From 365f7acbe55b5cfcf60c405eb30e5b3eab218440 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Wed, 10 Apr 2024 12:51:38 +0800 Subject: [PATCH 01/13] Master v0.9.0 (#220) --- CHANGELOG.md | 23 +++++++++++++++++++++-- Cargo.toml | 2 +- tools/Cargo.toml | 4 ++-- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df54f946..d3ccf103 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,17 +11,35 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v0.9.0] - 2024-04-10 + +### Added +- Improve FFI for quic_tls_config_t +- Update the handling of probe timeout to conform with RFC 9002 +- Update limit of the output buffer for Connection::send() +- Add plot tools for goodput and interop testing + +### Changed +- Change `quic_config_set_tls_config()` in FFI +- Change `quic_tls_config_select_methods_t` in FFI + +### Fixed +- Fix NewToken frame in qlog +- Fix the unit test case `conn_write_qlog` that fails with low probability + +### Security +- limit the number of queued RETIRE_CONNECTION_ID frames + + ## [v0.8.1] - 2024-03-18 ### Removed - - Remove the sfv feature flag from h3 (to resolve a build issue at docs.rs) ## [v0.8.0] - 2024-03-15 ### Added - - Support anti-amplification limit for server - Support customized config when initiating a connection - Add callback based FFI for writing the keylog and qlog @@ -164,6 +182,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Provide example clients and servers. +[v0.9.0]: https://github.com/tencent/tquic/compare/v0.8.1...v0.9.0 [v0.8.1]: https://github.com/tencent/tquic/compare/v0.8.0...v0.8.1 [v0.8.0]: https://github.com/tencent/tquic/compare/v0.7.0...v0.8.0 [v0.7.0]: https://github.com/tencent/tquic/compare/v0.6.0...v0.7.0 diff --git a/Cargo.toml b/Cargo.toml index 12d38b38..cd03ec04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic" -version = "0.8.1" +version = "0.9.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" diff --git a/tools/Cargo.toml b/tools/Cargo.toml index 4d895935..c6d244c1 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic_tools" -version = "0.8.1" +version = "0.9.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -23,7 +23,7 @@ rand = "0.8.5" statrs = "0.16" jemallocator = { version = "0.5", package = "tikv-jemallocator" } signal-hook = "0.3.17" -tquic = { path = "..", version = "0.8.1"} +tquic = { path = "..", version = "0.9.0"} [lib] crate-type = ["lib"] From 8d141d0a2a0968ba554a1326b5a9546b52f5919b Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Thu, 11 Apr 2024 20:25:16 +0800 Subject: [PATCH 02/13] Update codecov action to v4 to resolve uploader issue (#230) --- .github/workflows/rust.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 0a954b6a..91978153 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -136,7 +136,7 @@ jobs: - name: Generate code coverage run: cargo llvm-cov --lcov --output-path lcov.info - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} files: lcov.info From 6e3f7bef4cd606c48c5fbd9120fb7a398aa5b5e7 Mon Sep 17 00:00:00 2001 From: golf_at_tt Date: Thu, 11 Apr 2024 21:19:58 +0800 Subject: [PATCH 03/13] Fix bbr3 high loss rate cannot exit slow start (#229) --- src/congestion_control/bbr3.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/congestion_control/bbr3.rs b/src/congestion_control/bbr3.rs index e5f73807..150d1e02 100644 --- a/src/congestion_control/bbr3.rs +++ b/src/congestion_control/bbr3.rs @@ -1463,7 +1463,8 @@ impl Bbr3 { // // See . fn handle_lost_packet(&mut self, now: Instant, packet: &SentPacket) { - if !self.bw_probe_samples { + // In startup phase we need to update stats upon every ack reception + if !self.bw_probe_samples && !self.in_slow_start() { // not a packet sent while probing bandwidth. return; } From 103bf1e1c56da101d331a96a3f31433dfafd0f1c Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Fri, 12 Apr 2024 10:31:24 +0800 Subject: [PATCH 04/13] tquic_server: output stats when server connection is closed (#231) --- tools/src/bin/tquic_server.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/tools/src/bin/tquic_server.rs b/tools/src/bin/tquic_server.rs index 9f9c3fea..9c1c8607 100644 --- a/tools/src/bin/tquic_server.rs +++ b/tools/src/bin/tquic_server.rs @@ -736,7 +736,18 @@ impl TransportHandler for ServerHandler { } fn on_conn_closed(&mut self, conn: &mut Connection) { - log::debug!("connection[{:?}] is closed", conn.trace_id()); + let stats = conn.stats(); + log::debug!( + "{} connection is closed. recv pkts: {}, sent pkts: {}, \ + lost pkts: {}, recv bytes: {}, sent bytes: {}, lost bytes: {}", + conn.trace_id(), + stats.recv_count, + stats.sent_count, + stats.lost_count, + stats.recv_bytes, + stats.sent_bytes, + stats.lost_bytes + ); let index = conn.index().unwrap(); self.conns.remove(&index); @@ -803,6 +814,11 @@ fn main() -> Result<()> { let mut server = Server::new(&option)?; // Run event loop. + debug!( + "{} listen on {:?}", + server.endpoint.trace_id(), + option.listen + ); let mut events = mio::Events::with_capacity(1024); loop { if let Err(e) = server.endpoint.process_connections() { @@ -813,8 +829,11 @@ fn main() -> Result<()> { .endpoint .timeout() .map(|v| cmp::max(v, TIMER_GRANULARITY)); - debug!("{} timeout: {:?}", server.endpoint.trace_id(), timeout); - + debug!( + "{} wait for io events, timeout: {:?}", + server.endpoint.trace_id(), + timeout + ); server.poll.poll(&mut events, timeout)?; // Process timeout events From eac8e72ab35b83d572380edb6756f9c9256adba3 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Fri, 12 Apr 2024 11:35:41 +0800 Subject: [PATCH 05/13] Limit configuration value of type varint (#232) --- src/codec.rs | 4 ++++ src/lib.rs | 35 ++++++++++++++++++++++++----------- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/codec.rs b/src/codec.rs index 0c0cf836..a54cbc6e 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -21,6 +21,10 @@ use bytes::BufMut; use crate::error::Error; use crate::Result; +/// The maximum value for QUIC variable-length integer encoding +/// See RFC 9000 Section 16 +pub const VINT_MAX: u64 = 4_611_686_018_427_387_903; + /// Encoder for QUIC wire data pub trait Encoder { /// Write an unsigned 8 bit integer to self. diff --git a/src/lib.rs b/src/lib.rs index 5cdb6852..f3a6d17f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -69,6 +69,7 @@ use ring::aead::UnboundKey; use ring::hmac; use rustc_hash::FxHashSet; +use crate::codec::VINT_MAX; use crate::connection::stream; use crate::tls::TlsSession; use crate::token::ResetToken; @@ -410,7 +411,7 @@ impl Config { /// Set the `max_idle_timeout` transport parameter in milliseconds. /// Idle timeout is disabled by default. pub fn set_max_idle_timeout(&mut self, v: u64) { - self.local_transport_params.max_idle_timeout = v; + self.local_transport_params.max_idle_timeout = cmp::min(v, VINT_MAX); } /// Set handshake timeout in milliseconds. Zero turns the timeout off. @@ -422,7 +423,7 @@ impl Config { /// the size of UDP payloads that the endpoint is willing to receive. The /// default value is `65527`. pub fn set_recv_udp_payload_size(&mut self, v: u16) { - self.local_transport_params.max_udp_payload_size = v as u64; + self.local_transport_params.max_udp_payload_size = cmp::min(v as u64, VINT_MAX); } /// Set the initial maximum outgoing UDP payload size. @@ -438,51 +439,51 @@ impl Config { /// value for the maximum amount of data that can be sent on the connection. /// The default value is `10485760`. pub fn set_initial_max_data(&mut self, v: u64) { - self.local_transport_params.initial_max_data = v; + self.local_transport_params.initial_max_data = cmp::min(v, VINT_MAX); } /// Set the `initial_max_stream_data_bidi_local` transport parameter. /// The default value is `5242880`. pub fn set_initial_max_stream_data_bidi_local(&mut self, v: u64) { self.local_transport_params - .initial_max_stream_data_bidi_local = v; + .initial_max_stream_data_bidi_local = cmp::min(v, VINT_MAX); } /// Set the `initial_max_stream_data_bidi_remote` transport parameter. /// The default value is `2097152`. pub fn set_initial_max_stream_data_bidi_remote(&mut self, v: u64) { self.local_transport_params - .initial_max_stream_data_bidi_remote = v; + .initial_max_stream_data_bidi_remote = cmp::min(v, VINT_MAX); } /// Set the `initial_max_stream_data_uni` transport parameter. /// The default value is `1048576`. pub fn set_initial_max_stream_data_uni(&mut self, v: u64) { - self.local_transport_params.initial_max_stream_data_uni = v; + self.local_transport_params.initial_max_stream_data_uni = cmp::min(v, VINT_MAX); } /// Set the `initial_max_streams_bidi` transport parameter. /// The default value is `200`. pub fn set_initial_max_streams_bidi(&mut self, v: u64) { - self.local_transport_params.initial_max_streams_bidi = v; + self.local_transport_params.initial_max_streams_bidi = cmp::min(v, VINT_MAX); } /// Set the `initial_max_streams_uni` transport parameter. /// The default value is `100`. pub fn set_initial_max_streams_uni(&mut self, v: u64) { - self.local_transport_params.initial_max_streams_uni = v; + self.local_transport_params.initial_max_streams_uni = cmp::min(v, VINT_MAX); } /// Set the `ack_delay_exponent` transport parameter. /// The default value is `3`. pub fn set_ack_delay_exponent(&mut self, v: u64) { - self.local_transport_params.ack_delay_exponent = v; + self.local_transport_params.ack_delay_exponent = cmp::min(v, VINT_MAX); } /// Set the `max_ack_delay` transport parameter. /// The default value is `25`. pub fn set_max_ack_delay(&mut self, v: u64) { - self.local_transport_params.max_ack_delay = v; + self.local_transport_params.max_ack_delay = cmp::min(v, VINT_MAX); } /// Set congestion control algorithm that the connection would use. @@ -532,7 +533,7 @@ impl Config { /// The default value is `2`. Lower values will be ignored. pub fn set_active_connection_id_limit(&mut self, v: u64) { if v >= 2 { - self.local_transport_params.active_conn_id_limit = v; + self.local_transport_params.active_conn_id_limit = cmp::min(v, VINT_MAX); } } @@ -963,6 +964,18 @@ mod tests { Ok(()) } + + #[test] + fn initial_max_streams_bidi() -> Result<()> { + let mut config = Config::new()?; + config.set_initial_max_streams_bidi(u64::MAX); + assert_eq!( + config.local_transport_params.initial_max_streams_bidi, + VINT_MAX + ); + + Ok(()) + } } pub use crate::congestion_control::CongestionControlAlgorithm; From fb24e88dd8066dc8b2affec797818eeda66253a1 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Fri, 12 Apr 2024 13:05:36 +0800 Subject: [PATCH 06/13] Improve API for stream creation (#233) --- include/tquic.h | 23 ++++++- src/connection/connection.rs | 31 ++++++--- src/connection/stream.rs | 124 ++++++++++++++++++++++++++++++++++- src/ffi.rs | 41 +++++++++++- 4 files changed, 208 insertions(+), 11 deletions(-) diff --git a/include/tquic.h b/include/tquic.h index 69878b61..f5c68f70 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -891,13 +891,34 @@ ssize_t quic_stream_write(struct quic_conn_t *conn, bool fin); /** - * Create a new quic transport stream with the given id and priority. + * Create a new quic stream with the given id and priority. + * This is a low-level API for stream creation. It is recommended to use + * `quic_stream_bidi_new` for bidirectional streams or `quic_stream_uni_new` + * for unidrectional streams. */ int quic_stream_new(struct quic_conn_t *conn, uint64_t stream_id, uint8_t urgency, bool incremental); +/** + * Create a new quic bidiectional stream with the given id and priority. + * If success, the output parameter `stream_id` carrys the id of the created stream. + */ +int quic_stream_bidi_new(struct quic_conn_t *conn, + uint8_t urgency, + bool incremental, + uint64_t *stream_id); + +/** + * Create a new quic uniectional stream with the given id and priority. + * If success, the output parameter `stream_id` carrys the id of the created stream. + */ +int quic_stream_uni_new(struct quic_conn_t *conn, + uint8_t urgency, + bool incremental, + uint64_t *stream_id); + /** * Shutdown stream reading or writing. */ diff --git a/src/connection/connection.rs b/src/connection/connection.rs index c37f2792..2e1f9212 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -3515,10 +3515,25 @@ impl Connection { } /// Create a new stream with given stream id and priority. + /// This is a low-level API for stream creation. It is recommended to use + /// `stream_bidi_new` for bidirectional streams or `stream_uni_new` for + /// unidrectional streams. pub fn stream_new(&mut self, stream_id: u64, urgency: u8, incremental: bool) -> Result<()> { self.stream_set_priority(stream_id, urgency, incremental) } + /// Create a new bidirectional stream with given stream priority. + /// Return id of the created stream upon success. + pub fn stream_bidi_new(&mut self, urgency: u8, incremental: bool) -> Result { + self.streams.stream_bidi_new(urgency, incremental) + } + + /// Create a new unidrectional stream with given stream priority. + /// Return id of the created stream upon success. + pub fn stream_uni_new(&mut self, urgency: u8, incremental: bool) -> Result { + self.streams.stream_uni_new(urgency, incremental) + } + /// Shutdown stream reading or writing. pub fn stream_shutdown(&mut self, stream_id: u64, direction: Shutdown, err: u64) -> Result<()> { self.mark_tickable(true); @@ -6440,15 +6455,16 @@ pub(crate) mod tests { // Client create bidi streams let data = TestPair::new_test_data(5); - for i in 0..3 { + for _ in 0..3 { + let sid = test_pair.client.stream_bidi_new(0, false)?; assert_eq!( - test_pair.client.stream_write(i * 4, data.clone(), true)?, + test_pair.client.stream_write(sid, data.clone(), true)?, data.len() ); } // Client fail to create more streams assert_eq!( - test_pair.client.stream_write(16, data.clone(), true), + test_pair.client.stream_bidi_new(0, false), Err(Error::StreamLimitError) ); let packets = TestPair::conn_packets_out(&mut test_pair.client)?; @@ -6489,17 +6505,16 @@ pub(crate) mod tests { // Client create uni streams let data = TestPair::new_test_data(5); - for i in 0..2 { + for _ in 0..2 { + let sid = test_pair.client.stream_uni_new(0, false)?; assert_eq!( - test_pair - .client - .stream_write(2 + i * 4, data.clone(), true)?, + test_pair.client.stream_write(sid, data.clone(), true)?, data.len() ); } // Client fail to create more streams assert_eq!( - test_pair.client.stream_write(10, data.clone(), true), + test_pair.client.stream_uni_new(0, false), Err(Error::StreamLimitError) ); let packets = TestPair::conn_packets_out(&mut test_pair.client)?; diff --git a/src/connection/stream.rs b/src/connection/stream.rs index 0c1485d3..60277411 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -132,6 +132,12 @@ pub struct StreamMap { /// frame to the peer. pub rx_almost_full: bool, + /// Stream id for next bidirectional stream. + next_stream_id_bidi: u64, + + /// Stream id for next unidirectional stream. + next_stream_id_uni: u64, + /// Peer transport parameters. peer_transport_params: StreamTransportParams, @@ -175,6 +181,9 @@ impl StreamMap { max_stream_window, rx_almost_full: false, + next_stream_id_bidi: if is_server { 1 } else { 0 }, + next_stream_id_uni: if is_server { 3 } else { 2 }, + local_transport_params: local_params, peer_transport_params: StreamTransportParams::default(), @@ -198,6 +207,26 @@ impl StreamMap { self.streams.get_mut(&id) } + /// Create a new bidirectional stream with given stream priority. + /// Return id of the created stream upon success. + pub fn stream_bidi_new(&mut self, urgency: u8, incremental: bool) -> Result { + let stream_id = self.next_stream_id_bidi; + match self.stream_set_priority(stream_id, urgency, incremental) { + Ok(_) => Ok(stream_id), + Err(e) => Err(e), + } + } + + /// Create a new unidrectional stream with given stream priority. + /// Return id of the created stream upon success. + pub fn stream_uni_new(&mut self, urgency: u8, incremental: bool) -> Result { + let stream_id = self.next_stream_id_uni; + match self.stream_set_priority(stream_id, urgency, incremental) { + Ok(_) => Ok(stream_id), + Err(e) => Err(e), + } + } + /// Get the lowest offset of data to be read. pub fn stream_read_offset(&mut self, stream_id: u64) -> Option { match self.get_mut(stream_id) { @@ -612,6 +641,12 @@ impl StreamMap { /// Return a mutable reference to the stream with the given ID if it exists, /// or create a new one with given paras otherwise if it is allowed. fn get_or_create(&mut self, id: u64, local: bool) -> Result<&mut Stream> { + // A stream ID is a 62-bit integer (0 to 262-1) that is unique for all + // streams on a connection. + if id > crate::codec::VINT_MAX { + return Err(Error::ProtocolViolation); + } + match self.streams.entry(id) { // 1.Can not find any stream with the given stream ID. // It may not be created yet or it has been closed. @@ -656,6 +691,15 @@ impl StreamMap { self.writable.insert(id); } + // Update stream id for next bidirectional/unidirectional stream. + if bidi { + self.next_stream_id_bidi = cmp::max(self.next_stream_id_bidi, id); + self.next_stream_id_bidi = self.next_stream_id_bidi.saturating_add(4); + } else { + self.next_stream_id_uni = cmp::max(self.next_stream_id_uni, id); + self.next_stream_id_uni = self.next_stream_id_uni.saturating_add(4); + } + self.events.add(Event::StreamCreated(id)); Ok(v.insert(new_stream)) } @@ -3097,7 +3141,7 @@ impl ConcurrencyControl { let n = std::cmp::max(self.local_opened_streams_bidi, stream_sequence); if n > self.peer_max_streams_bidi { - // Can't open more bididirectional streams than the peer allows, send + // Can't open more bidirectional streams than the peer allows, send // a STREAMS_BLOCKED(type: 0x16) frame to notify the peer update the // max_streams_bidi limit. self.update_streams_blocked_at(true, Some(self.peer_max_streams_bidi)); @@ -3220,6 +3264,84 @@ mod tests { use super::*; // StreamMap unit tests + #[test] + fn streams_new_client() { + let peer_tp = StreamTransportParams { + initial_max_streams_bidi: crate::codec::VINT_MAX, + initial_max_streams_uni: crate::codec::VINT_MAX, + ..StreamTransportParams::default() + }; + let mut map = StreamMap::new(false, 50, 50, StreamTransportParams::default()); + map.update_peer_stream_transport_params(peer_tp); + + // client initiated bidirectional streams + let id = map.stream_bidi_new(0, false); + assert_eq!(id, Ok(0)); + let id = map.stream_bidi_new(0, false); + assert_eq!(id, Ok(4)); + + assert_eq!(map.stream_set_priority(20, 0, false), Ok(())); + assert_eq!(map.stream_bidi_new(0, false), Ok(24)); + assert_eq!( + map.stream_set_priority(crate::codec::VINT_MAX - 3, 0, false), + Ok(()) + ); + assert_eq!(map.stream_bidi_new(0, false), Err(Error::ProtocolViolation)); + + // client initiated unidirectional streams + let id = map.stream_uni_new(0, false); + assert_eq!(id, Ok(2)); + let id = map.stream_uni_new(0, false); + assert_eq!(id, Ok(6)); + + assert_eq!(map.stream_set_priority(22, 0, false), Ok(())); + assert_eq!(map.stream_uni_new(0, false), Ok(26)); + assert_eq!( + map.stream_set_priority(crate::codec::VINT_MAX - 1, 0, false), + Ok(()) + ); + assert_eq!(map.stream_uni_new(0, false), Err(Error::ProtocolViolation)); + } + + #[test] + fn streams_new_server() { + let peer_tp = StreamTransportParams { + initial_max_streams_bidi: crate::codec::VINT_MAX, + initial_max_streams_uni: crate::codec::VINT_MAX, + ..StreamTransportParams::default() + }; + let mut map = StreamMap::new(true, 50, 50, StreamTransportParams::default()); + map.update_peer_stream_transport_params(peer_tp); + + // server initiated bidirectional streams + let id = map.stream_bidi_new(1, false); + assert_eq!(id, Ok(1)); + let id = map.stream_bidi_new(5, false); + assert_eq!(id, Ok(5)); + + assert_eq!(map.stream_set_priority(21, 0, false), Ok(())); + assert_eq!(map.stream_bidi_new(0, false), Ok(25)); + assert_eq!( + map.stream_set_priority(crate::codec::VINT_MAX - 2, 0, false), + Ok(()) + ); + assert_eq!(map.stream_bidi_new(0, false), Err(Error::ProtocolViolation)); + + // server initiated unidirectional streams + let id = map.stream_uni_new(0, false); + assert_eq!(id, Ok(3)); + let id = map.stream_uni_new(0, false); + assert_eq!(id, Ok(7)); + + assert_eq!(map.stream_set_priority(23, 0, false), Ok(())); + assert_eq!(map.stream_uni_new(0, false), Ok(27)); + assert_eq!( + map.stream_set_priority(crate::codec::VINT_MAX, 0, false), + Ok(()) + ); + assert_eq!(map.stream_uni_new(0, false), Err(Error::ProtocolViolation)); + } + // Test StreamMap::write #[test] fn stream_write_invalid_sid() { diff --git a/src/ffi.rs b/src/ffi.rs index 78782c4f..2facfcd1 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -1231,7 +1231,10 @@ pub extern "C" fn quic_stream_write( } } -/// Create a new quic transport stream with the given id and priority. +/// Create a new quic stream with the given id and priority. +/// This is a low-level API for stream creation. It is recommended to use +/// `quic_stream_bidi_new` for bidirectional streams or `quic_stream_uni_new` +/// for unidrectional streams. #[no_mangle] pub extern "C" fn quic_stream_new( conn: &mut Connection, @@ -1245,6 +1248,42 @@ pub extern "C" fn quic_stream_new( } } +/// Create a new quic bidiectional stream with the given id and priority. +/// If success, the output parameter `stream_id` carrys the id of the created stream. +#[no_mangle] +pub extern "C" fn quic_stream_bidi_new( + conn: &mut Connection, + urgency: u8, + incremental: bool, + stream_id: &mut u64, +) -> c_int { + match conn.stream_bidi_new(urgency, incremental) { + Ok(id) => { + *stream_id = id; + 0 + } + Err(e) => e.to_errno() as c_int, + } +} + +/// Create a new quic uniectional stream with the given id and priority. +/// If success, the output parameter `stream_id` carrys the id of the created stream. +#[no_mangle] +pub extern "C" fn quic_stream_uni_new( + conn: &mut Connection, + urgency: u8, + incremental: bool, + stream_id: &mut u64, +) -> c_int { + match conn.stream_uni_new(urgency, incremental) { + Ok(id) => { + *stream_id = id; + 0 + } + Err(e) => e.to_errno() as c_int, + } +} + /// Shutdown stream reading or writing. #[no_mangle] pub extern "C" fn quic_stream_shutdown( From ef8258594d5d17249ff708fbec59d955e6a3792e Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Mon, 15 Apr 2024 18:55:47 +0800 Subject: [PATCH 07/13] Add workflow and plot tools for fct testing (#235) --- .github/workflows/plot-fct.py | 65 ++++++++++++++++++++++++ .github/workflows/tquic-fct.yml | 90 +++++++++++++++++++++++++++++++++ include/tquic.h | 4 +- src/connection/stream.rs | 2 +- src/ffi.rs | 4 +- 5 files changed, 160 insertions(+), 5 deletions(-) create mode 100644 .github/workflows/plot-fct.py create mode 100644 .github/workflows/tquic-fct.yml diff --git a/.github/workflows/plot-fct.py b/.github/workflows/plot-fct.py new file mode 100644 index 00000000..60da15d8 --- /dev/null +++ b/.github/workflows/plot-fct.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 + +import os +import sys + +import json +import matplotlib.pyplot as plt +import numpy as np + +# QUIC implementes +IMPLS = ["tquic", "lsquic", "picoquic", "quiche"] + +# Different modes +MODES = ["1rtt", "0rtt"] + +# Different loss rates +RATES = [0, 1, 3, 5] + +# Running count for each test +COUNT = 30 + + +# Read a measurement file generated by fct testing +def read_data(data_dir, impl, cc, mode, loss): + dirname = "fct%s-%s-%s" % (mode, cc, impl) + filename = "fct%s-%s-%s-%s.json" % (mode, loss, cc, impl) + path = os.path.join(data_dir, dirname, filename) + try: + with open(path) as f: + data = json.load(f) + return data["measurements"][0][0]["data"] + except: + return None + + +# Plot the throughput graph for the specified CC algorithm under different file +# modes and packet loss rates. +def plot(data_dir, cc): + fig, axs = plt.subplots(len(MODES), len(RATES), figsize=(15,10)) + x = np.linspace(0, COUNT, COUNT) + for i in range(len(MODES)): + for j in range(len(RATES)): + for impl in IMPLS: + data = read_data(data_dir, impl, cc, MODES[i], RATES[j]) + if data is None or len(data) != COUNT: + continue + axs[i, j].plot(x, data, label=impl, marker=".") + axs[i, j].set_xlabel("Run #") + axs[i, j].set_ylabel("FCT") + axs[i, j].set_title("%s loss rate %s%%" % (MODES[i], RATES[j])) + axs[i, j].legend() + plt.suptitle(cc.upper()) + plt.tight_layout() + plt.savefig("fct-%s.png" % (cc), dpi=300) + + +if __name__ == '__main__': + if len(sys.argv) < 2: + print("Usage: %s [data_dir]" % (sys.argv[0])) + exit(1) + + data_dir= sys.argv[1] + plot(data_dir, "bbr") + plot(data_dir, "cubic") + diff --git a/.github/workflows/tquic-fct.yml b/.github/workflows/tquic-fct.yml new file mode 100644 index 00000000..31c2d3ef --- /dev/null +++ b/.github/workflows/tquic-fct.yml @@ -0,0 +1,90 @@ +name: FCT + +on: + schedule: + - cron: '30 1 * * *' + workflow_dispatch: + +env: + CARGO_TERM_COLOR: always + QUIC_IMAGES: lsquic=tquicgroup/qirls,picoquic=tquicgroup/qirpq,quiche=tquicgroup/qircq + +jobs: + measure: + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + impl: [tquic,lsquic,picoquic] + case: [fct1rtt,fct0rtt] + cc: [cubic, bbr] + + # The scheduled workflow only runs for the main repository. + # You can manually trigger it if necessary. + if: ${{ ( github.event_name == 'schedule' && github.repository == 'tencent/tquic' ) || github.event_name == 'workflow_dispatch' }} + steps: + - uses: actions/checkout@v4 + with: + submodules: 'recursive' + + - name: Build docker image + run: docker build -t tquic_interop:v1 -f interop/Dockerfile . + + - name: Install quic-interop-runner + run: | + git clone https://github.com/iyangsj/quic-interop-runner.git + cd quic-interop-runner + pip3 install -r requirements.txt + + - name: Install dependences + run: | + sudo modprobe ip6table_filter + sudo add-apt-repository -y ppa:wireshark-dev/stable + sudo apt install -y tshark + + - name: Run the interop tests + run: | + cd quic-interop-runner + python3 run.py -r "$QUIC_IMAGES,tquic=tquic_interop:v1" -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=0 --rate_to_client=0" -j ${{ matrix.case }}-0-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r "$QUIC_IMAGES,tquic=tquic_interop:v1" -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=1 --rate_to_client=1" -j ${{ matrix.case }}-1-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r "$QUIC_IMAGES,tquic=tquic_interop:v1" -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=3 --rate_to_client=3" -j ${{ matrix.case }}-3-${{ matrix.cc }}-${{ matrix.impl }}.json + python3 run.py -r "$QUIC_IMAGES,tquic=tquic_interop:v1" -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=5 --rate_to_client=5" -j ${{ matrix.case }}-5-${{ matrix.cc }}-${{ matrix.impl }}.json + + - name: Store measurement results + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.case }}-${{ matrix.cc }}-${{ matrix.impl }} + path: quic-interop-runner/fct*.json + + result: + runs-on: ubuntu-latest + needs: measure + if: ${{ (( github.event_name == 'schedule' && github.repository == 'tencent/tquic' ) || github.event_name == 'workflow_dispatch') && !cancelled() }} + steps: + - name: Download all workflow run artifacts + uses: actions/download-artifact@v4 + + - name: Display structure of downloaded files + run: ls -R + + - name: Display all measurement details + run: grep "details.*" . -Ro + + - name: Download plot tools + uses: actions/checkout@v4 + with: + path: tools + + - name: Install dependences + run: | + sudo apt install python3-matplotlib + + - name: Plot all measurement results + run: python3 tools/.github/workflows/plot-fct.py . + + - name: Store all measurement results + uses: actions/upload-artifact@v4 + with: + name: fct-all-result + path: fct* diff --git a/include/tquic.h b/include/tquic.h index f5c68f70..f35be838 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -902,7 +902,7 @@ int quic_stream_new(struct quic_conn_t *conn, bool incremental); /** - * Create a new quic bidiectional stream with the given id and priority. + * Create a new quic bidiectional stream with the given priority. * If success, the output parameter `stream_id` carrys the id of the created stream. */ int quic_stream_bidi_new(struct quic_conn_t *conn, @@ -911,7 +911,7 @@ int quic_stream_bidi_new(struct quic_conn_t *conn, uint64_t *stream_id); /** - * Create a new quic uniectional stream with the given id and priority. + * Create a new quic uniectional stream with the given priority. * If success, the output parameter `stream_id` carrys the id of the created stream. */ int quic_stream_uni_new(struct quic_conn_t *conn, diff --git a/src/connection/stream.rs b/src/connection/stream.rs index 60277411..667b88c2 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -641,7 +641,7 @@ impl StreamMap { /// Return a mutable reference to the stream with the given ID if it exists, /// or create a new one with given paras otherwise if it is allowed. fn get_or_create(&mut self, id: u64, local: bool) -> Result<&mut Stream> { - // A stream ID is a 62-bit integer (0 to 262-1) that is unique for all + // A stream ID is a 62-bit integer (0 to 2^62-1) that is unique for all // streams on a connection. if id > crate::codec::VINT_MAX { return Err(Error::ProtocolViolation); diff --git a/src/ffi.rs b/src/ffi.rs index 2facfcd1..41cfd87e 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -1248,7 +1248,7 @@ pub extern "C" fn quic_stream_new( } } -/// Create a new quic bidiectional stream with the given id and priority. +/// Create a new quic bidiectional stream with the given priority. /// If success, the output parameter `stream_id` carrys the id of the created stream. #[no_mangle] pub extern "C" fn quic_stream_bidi_new( @@ -1266,7 +1266,7 @@ pub extern "C" fn quic_stream_bidi_new( } } -/// Create a new quic uniectional stream with the given id and priority. +/// Create a new quic uniectional stream with the given priority. /// If success, the output parameter `stream_id` carrys the id of the created stream. #[no_mangle] pub extern "C" fn quic_stream_uni_new( From 0a57812dc9ec697d08e1450ad470544f082305a0 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Thu, 18 Apr 2024 11:46:38 +0800 Subject: [PATCH 08/13] Add pacing_rate to PathStats (#237) --- src/connection/path.rs | 2 +- src/connection/recovery.rs | 73 +----------------------------------- src/lib.rs | 76 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 79 insertions(+), 72 deletions(-) diff --git a/src/connection/path.rs b/src/connection/path.rs index 8f2c332d..495a027d 100644 --- a/src/connection/path.rs +++ b/src/connection/path.rs @@ -21,12 +21,12 @@ use std::time::Duration; use slab::Slab; -pub use super::recovery::PathStats; use super::recovery::Recovery; use super::timer; use crate::connection::SpaceId; use crate::error::Error; use crate::multipath_scheduler::MultipathScheduler; +use crate::PathStats; use crate::RecoveryConfig; use crate::Result; use crate::TIMER_GRANULARITY; diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index ef6281cf..1f7d474c 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -36,6 +36,7 @@ use crate::qlog; use crate::qlog::events::EventData; use crate::ranges::RangeSet; use crate::Error; +use crate::PathStats; use crate::RecoveryConfig; use crate::Result; use crate::TIMER_GRANULARITY; @@ -878,6 +879,7 @@ impl Recovery { self.stats.srtt = self.rtt.smoothed_rtt(); self.stats.rttvar = self.rtt.rttvar(); self.stats.in_slow_start = self.congestion.in_slow_start(); + self.stats.pacing_rate = self.congestion.pacing_rate().unwrap_or_default(); } /// Write a qlog RecoveryMetricsUpdated event if any recovery metric is updated. @@ -972,77 +974,6 @@ impl Recovery { } } -/// Statistics about a QUIC path. -#[derive(Default)] -pub struct PathStats { - /// The number of QUIC packets received. - pub recv_count: u64, - - /// The number of received bytes. - pub recv_bytes: u64, - - /// The number of QUIC packets sent. - pub sent_count: u64, - - /// The number of sent bytes. - pub sent_bytes: u64, - - /// The number of QUIC packets lost. - pub lost_count: u64, - - /// The number of lost bytes. - pub lost_bytes: u64, - - /// Total number of bytes acked. - pub acked_bytes: u64, - - /// Total number of packets acked. - pub acked_count: u64, - - /// Initial congestion window in bytes. - pub init_cwnd: u64, - - /// Final congestion window in bytes. - pub final_cwnd: u64, - - /// Maximum congestion window in bytes. - pub max_cwnd: u64, - - /// Minimum congestion window in bytes. - pub min_cwnd: u64, - - /// Maximum inflight data in bytes. - pub max_inflight: u64, - - /// Total loss events. - pub loss_event_count: u64, - - /// Total congestion window limited events. - pub cwnd_limited_count: u64, - - /// Total duration of congestion windowlimited events. - pub cwnd_limited_duration: Duration, - - /// The time for last congestion window event - last_cwnd_limited_time: Option, - - /* Note: the following fields are lazily updated from Recovery */ - /// Minimum roundtrip time. - pub min_rtt: Duration, - - /// Maximum roundtrip time. - pub max_rtt: Duration, - - /// Smoothed roundtrip time. - pub srtt: Duration, - - /// Roundtrip time variation. - pub rttvar: Duration, - - /// Whether the congestion controller is in slow start status. - pub in_slow_start: bool, -} - /// Metrics used for emitting qlog RecoveryMetricsUpdated event. #[derive(Default)] struct RecoveryMetrics { diff --git a/src/lib.rs b/src/lib.rs index f3a6d17f..d5d99f92 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -59,6 +59,7 @@ use std::net::SocketAddr; use std::sync::Arc; use std::time; use std::time::Duration; +use std::time::Instant; use bytes::Buf; use bytes::BufMut; @@ -896,6 +897,81 @@ pub enum PathEvent { Abandoned(usize), } +/// Statistics about path +#[repr(C)] +#[derive(Default)] +pub struct PathStats { + /// The number of QUIC packets received. + pub recv_count: u64, + + /// The number of received bytes. + pub recv_bytes: u64, + + /// The number of QUIC packets sent. + pub sent_count: u64, + + /// The number of sent bytes. + pub sent_bytes: u64, + + /// The number of QUIC packets lost. + pub lost_count: u64, + + /// The number of lost bytes. + pub lost_bytes: u64, + + /// Total number of bytes acked. + pub acked_bytes: u64, + + /// Total number of packets acked. + pub acked_count: u64, + + /// Initial congestion window in bytes. + pub init_cwnd: u64, + + /// Final congestion window in bytes. + pub final_cwnd: u64, + + /// Maximum congestion window in bytes. + pub max_cwnd: u64, + + /// Minimum congestion window in bytes. + pub min_cwnd: u64, + + /// Maximum inflight data in bytes. + pub max_inflight: u64, + + /// Total loss events. + pub loss_event_count: u64, + + /// Total congestion window limited events. + pub cwnd_limited_count: u64, + + /// Total duration of congestion windowlimited events. + pub cwnd_limited_duration: Duration, + + /// The time for last congestion window event + last_cwnd_limited_time: Option, + + /* Note: the following fields are lazily updated from Recovery */ + /// Minimum roundtrip time. + pub min_rtt: Duration, + + /// Maximum roundtrip time. + pub max_rtt: Duration, + + /// Smoothed roundtrip time. + pub srtt: Duration, + + /// Roundtrip time variation. + pub rttvar: Duration, + + /// Whether the congestion controller is in slow start status. + pub in_slow_start: bool, + + /// Pacing rate estimated by congestion control algorithm. + pub pacing_rate: u64, +} + #[cfg(test)] mod tests { use super::*; From a4a1f45808d913931709330d989d1e77d7b3d5b7 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Mon, 22 Apr 2024 16:21:24 +0800 Subject: [PATCH 09/13] Update tquic-goodput.yml/tquic-fct.yml and add gquiche (#241) --- .github/workflows/plot-fct.py | 2 +- .github/workflows/plot-goodput.py | 4 ++-- .github/workflows/tquic-fct.yml | 4 ++-- .github/workflows/tquic-goodput.yml | 4 ++-- 4 files changed, 7 insertions(+), 7 deletions(-) diff --git a/.github/workflows/plot-fct.py b/.github/workflows/plot-fct.py index 60da15d8..67b5f504 100644 --- a/.github/workflows/plot-fct.py +++ b/.github/workflows/plot-fct.py @@ -8,7 +8,7 @@ import numpy as np # QUIC implementes -IMPLS = ["tquic", "lsquic", "picoquic", "quiche"] +IMPLS = ["tquic", "gquiche", "lsquic", "picoquic", "quiche"] # Different modes MODES = ["1rtt", "0rtt"] diff --git a/.github/workflows/plot-goodput.py b/.github/workflows/plot-goodput.py index 1116c115..b55dec6f 100644 --- a/.github/workflows/plot-goodput.py +++ b/.github/workflows/plot-goodput.py @@ -8,7 +8,7 @@ import numpy as np # QUIC implementes -IMPLS = ["tquic", "lsquic", "picoquic", "quiche"] +IMPLS = ["tquic", "gquiche", "lsquic", "picoquic", "quiche"] # Different file sizes SIZES = ["10m", "1m", "100k"] @@ -44,7 +44,7 @@ def plot(data_dir, cc): data = read_data(data_dir, impl, cc, SIZES[i], RATES[j]) if data is None or len(data) != COUNT: continue - axs[i, j].plot(x, data, label=impl, marker="o") + axs[i, j].plot(x, data, label=impl, marker=".") axs[i, j].set_xlabel("Run #") axs[i, j].set_ylabel("Goodput") axs[i, j].set_title("%s loss rate %s%%" % (SIZES[i], RATES[j])) diff --git a/.github/workflows/tquic-fct.yml b/.github/workflows/tquic-fct.yml index 31c2d3ef..1a9ff85c 100644 --- a/.github/workflows/tquic-fct.yml +++ b/.github/workflows/tquic-fct.yml @@ -7,7 +7,7 @@ on: env: CARGO_TERM_COLOR: always - QUIC_IMAGES: lsquic=tquicgroup/qirls,picoquic=tquicgroup/qirpq,quiche=tquicgroup/qircq + QUIC_IMAGES: gquiche=tquicgroup/qirgq,lsquic=tquicgroup/qirls,picoquic=tquicgroup/qirpq,quiche=tquicgroup/qircq jobs: measure: @@ -16,7 +16,7 @@ jobs: strategy: fail-fast: false matrix: - impl: [tquic,lsquic,picoquic] + impl: [tquic,gquiche,lsquic,picoquic] case: [fct1rtt,fct0rtt] cc: [cubic, bbr] diff --git a/.github/workflows/tquic-goodput.yml b/.github/workflows/tquic-goodput.yml index 0460cf97..20ef8a9b 100644 --- a/.github/workflows/tquic-goodput.yml +++ b/.github/workflows/tquic-goodput.yml @@ -7,7 +7,7 @@ on: env: CARGO_TERM_COLOR: always - QUIC_IMAGES: lsquic=tquicgroup/qirls,picoquic=tquicgroup/qirpq,quiche=tquicgroup/qircq + QUIC_IMAGES: gquiche=tquicgroup/qirgq,lsquic=tquicgroup/qirls,picoquic=tquicgroup/qirpq,quiche=tquicgroup/qircq jobs: measure: @@ -15,7 +15,7 @@ jobs: strategy: matrix: - impl: [tquic,lsquic,picoquic,quiche] + impl: [tquic,gquiche,lsquic,picoquic,quiche] case: [goodput100k,goodput1m,goodput10m] cc: [cubic, bbr] From ca5ca4f8813a9d22cd46ee963b10287d5e60785a Mon Sep 17 00:00:00 2001 From: xiaofei0800 <54530729+xiaofei0800@users.noreply.github.com> Date: Thu, 25 Apr 2024 15:46:53 +0800 Subject: [PATCH 10/13] Add support for responding to key updates (#244) --- src/connection/connection.rs | 208 ++++++++++++++++++++++++++++++++++- src/connection/path.rs | 7 ++ src/connection/space.rs | 8 ++ src/packet.rs | 8 +- src/tls/boringssl/crypto.rs | 147 ++++++++++++++++++------- src/tls/boringssl/tls.rs | 4 +- src/tls/key.rs | 4 +- src/tls/tls.rs | 172 ++++++++++++++++++++++++++++- 8 files changed, 505 insertions(+), 53 deletions(-) diff --git a/src/connection/connection.rs b/src/connection/connection.rs index 2e1f9212..58b7608b 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -53,7 +53,9 @@ use crate::packet::PacketType; use crate::qlog; use crate::qlog::events; use crate::tls; +use crate::tls::Keys; use crate::tls::Level; +use crate::tls::Open; use crate::tls::TlsSession; use crate::token::AddressToken; use crate::token::ResetToken; @@ -537,6 +539,7 @@ impl Connection { packet::decrypt_header(buf, pkt_num_offset, &mut hdr, key).map_err(|_| Error::Done)?; // Decode packet sequence number + let handshake_confirmed = self.is_confirmed(); let space_id = self.get_space_id(hdr.pkt_type, pid)?; let space = self.spaces.get_mut(space_id).ok_or(Error::InternalError)?; let largest_rx_pkt_num = space.largest_rx_pkt_num; @@ -552,7 +555,7 @@ impl Connection { return Err(Error::Done); } - // Decrypt packet payload + // Select key and decrypt packet payload. let payload_offset = pkt_num_offset + hdr.pkt_num_len; let payload_len = length.checked_sub(hdr.pkt_num_len).ok_or(Error::Done)?; let mut cid_seq = None; @@ -563,6 +566,13 @@ impl Connection { .ok_or(Error::InvalidState("unknown dcid".into()))?; cid_seq = Some(seq as u32) } + + let (key, attempt_key_update) = self.tls_session.select_key( + handshake_confirmed, + self.flags.contains(EnableMultipath), + &hdr, + space, + )?; let mut payload = packet::decrypt_payload(buf, payload_offset, payload_len, cid_seq, pkt_num, key) .map_err(|_| Error::Done)?; @@ -581,6 +591,16 @@ impl Connection { self.paths.get(pid)? ); + // Try to update key. + self.tls_session.try_update_key( + &mut self.timers, + space, + attempt_key_update, + &hdr, + now, + self.paths.max_pto(), + )?; + // Update dcid for initial path self.try_set_dcid_for_initial_path(pid, &hdr)?; @@ -1497,7 +1517,7 @@ impl Connection { } else { None }, - key_phase: false, + key_phase: self.tls_session.current_key_phase(), }; let hdr_offset = hdr.to_bytes(out)?; @@ -1662,6 +1682,9 @@ impl Connection { if pkt_type == PacketType::OneRTT { let lowest_1rtt_pkt_num = space.lowest_1rtt_pkt_num; space.lowest_1rtt_pkt_num = cmp::min(lowest_1rtt_pkt_num, pkt_num); + if space.first_pkt_num_sent.is_none() { + space.first_pkt_num_sent = Some(pkt_num); + } } } @@ -2956,7 +2979,7 @@ impl Connection { Timer::Draining => self.flags.insert(Closed), - Timer::KeyDiscard => (), // TODO: support key discarding + Timer::KeyDiscard => self.tls_session.discard_prev_key(), Timer::KeepAlive => (), // TODO: schedule an outgoing Ping @@ -6841,6 +6864,183 @@ pub(crate) mod tests { Ok(()) } + + fn test_pair_for_key_update() -> Result { + let mut client_config = TestPair::new_test_config(false)?; + client_config.set_cid_len(crate::MAX_CID_LEN); + client_config.set_initial_max_data(10000); + client_config.set_initial_max_stream_data_bidi_local(10000); + client_config.set_initial_max_stream_data_bidi_remote(10000); + + let mut server_config = TestPair::new_test_config(true)?; + server_config.set_cid_len(crate::MAX_CID_LEN); + server_config.set_initial_max_data(10000); + server_config.set_initial_max_stream_data_bidi_local(10000); + server_config.set_initial_max_stream_data_bidi_remote(10000); + + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + assert_eq!(test_pair.handshake(), Ok(())); + + // Transfer some data. + let data = Bytes::from_static(b"test data over quic"); + test_pair.client.stream_write(0, data.clone(), false)?; + let packets = TestPair::conn_packets_out(&mut test_pair.client)?; + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + let mut buf = vec![0; 2048]; + assert_eq!(test_pair.server.stream_read(0, &mut buf)?, (19, false)); + assert_eq!(&buf[..19], &data[..]); + + // Server reply ack. + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + assert!(!test_pair.client.tls_session.current_key_phase()); + assert!(!test_pair.server.tls_session.current_key_phase()); + + Ok(test_pair) + } + + #[test] + fn key_update() -> Result<()> { + let mut test_pair = test_pair_for_key_update()?; + + // Client init key update. + let space = test_pair + .client + .spaces + .get_mut(SpaceId::Data) + .ok_or(Error::InternalError)?; + test_pair + .client + .tls_session + .initiate_key_update(space, false)?; + + // Transfer some data. + let data = Bytes::from_static(b"test data over quic"); + test_pair.client.stream_write(0, data.clone(), false)?; + let packets = TestPair::conn_packets_out(&mut test_pair.client)?; + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + let mut buf = vec![0; 2048]; + assert_eq!(test_pair.server.stream_read(0, &mut buf)?, (19, false)); + assert_eq!(&buf[..19], &data[..]); + + // Server reply ack. + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + assert!(test_pair.client.tls_session.current_key_phase()); + assert!(test_pair.server.tls_session.current_key_phase()); + + Ok(()) + } + + #[test] + fn key_update_with_packet_reorder() -> Result<()> { + let mut test_pair = test_pair_for_key_update()?; + + // Client send data. + let data = Bytes::from_static(b"test data over quic"); + test_pair.client.stream_write(0, data.clone(), false)?; + let prev_key_packets = TestPair::conn_packets_out(&mut test_pair.client)?; + + // Client init key update. + let space = test_pair + .client + .spaces + .get_mut(SpaceId::Data) + .ok_or(Error::InternalError)?; + test_pair + .client + .tls_session + .initiate_key_update(space, false)?; + + // Client send with new key. + let data = Bytes::from_static(b"test data over quic"); + test_pair.client.stream_write(0, data.clone(), true)?; + let new_key_packets = TestPair::conn_packets_out(&mut test_pair.client)?; + + // Server receive reordered packets. + TestPair::conn_packets_in(&mut test_pair.server, new_key_packets)?; + TestPair::conn_packets_in(&mut test_pair.server, prev_key_packets)?; + let mut buf = vec![0; 2048]; + assert_eq!(test_pair.server.stream_read(0, &mut buf)?, (38, true)); + + // Server reply ack. + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + assert!(test_pair.client.tls_session.current_key_phase()); + assert!(test_pair.server.tls_session.current_key_phase()); + + Ok(()) + } + + #[test] + fn key_update_with_previous_key_discard() -> Result<()> { + let mut test_pair = test_pair_for_key_update()?; + + // Client send data. + let data = Bytes::from_static(b"test data over quic"); + test_pair.client.stream_write(0, data.clone(), false)?; + let prev_key_packets = TestPair::conn_packets_out(&mut test_pair.client)?; + + // Client init key update. + let space = test_pair + .client + .spaces + .get_mut(SpaceId::Data) + .ok_or(Error::InternalError)?; + test_pair + .client + .tls_session + .initiate_key_update(space, false)?; + // Client send with new key. + let data = Bytes::from_static(b"test data over quic"); + test_pair.client.stream_write(0, data.clone(), true)?; + let new_key_packets = TestPair::conn_packets_out(&mut test_pair.client)?; + + // Server discard previous key and receive reordered packets. + TestPair::conn_packets_in(&mut test_pair.server, new_key_packets)?; + + let timeout = test_pair.server.timers.get(Timer::KeyDiscard); + test_pair.server.on_timeout(timeout.unwrap()); + + TestPair::conn_packets_in(&mut test_pair.server, prev_key_packets)?; + let mut buf = vec![0; 2048]; + assert_eq!(test_pair.server.stream_read(0, &mut buf), Err(Error::Done)); + + // Server reply ack. + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + assert!(test_pair.client.tls_session.current_key_phase()); + assert!(test_pair.server.tls_session.current_key_phase()); + + Ok(()) + } + + #[test] + fn key_update_with_consecutive_update() -> Result<()> { + let mut test_pair = test_pair_for_key_update()?; + + // Client init key update. + let space = test_pair + .client + .spaces + .get_mut(SpaceId::Data) + .ok_or(Error::InternalError)?; + test_pair + .client + .tls_session + .initiate_key_update(space, false)?; + + // Client init another key update. + assert_eq!( + test_pair + .client + .tls_session + .initiate_key_update(space, false), + Err(Error::Done) + ); + + Ok(()) + } } mod cid; @@ -6850,4 +7050,4 @@ mod recovery; pub(crate) mod rtt; pub(crate) mod space; pub(crate) mod stream; -mod timer; +pub(crate) mod timer; diff --git a/src/connection/path.rs b/src/connection/path.rs index 495a027d..ac8673ce 100644 --- a/src/connection/path.rs +++ b/src/connection/path.rs @@ -531,6 +531,13 @@ impl PathMap { .map(|&(_, _, loss_time)| loss_time) } + /// Return the maximum PTO among all paths. + pub fn max_pto(&self) -> Option { + self.iter() + .map(|(_, path)| path.recovery.rtt.pto_base()) + .max() + } + /// Increase send limit before address validation for server pub fn inc_anti_ampl_limit(&mut self, pid: usize, pkt_len: usize) { if !self.is_server { diff --git a/src/connection/space.rs b/src/connection/space.rs index 26a19fef..56ddc96e 100644 --- a/src/connection/space.rs +++ b/src/connection/space.rs @@ -77,6 +77,12 @@ pub struct PacketNumSpace { /// Highest received packet number. pub largest_rx_pkt_num: u64, + /// Track the first received packet in current key phase. + pub first_pkt_num_recv: Option, + + /// Track the first sent packet in current key phase. + pub first_pkt_num_sent: Option, + /// The time at which the packet of highest sequence number arrived. pub largest_rx_pkt_time: Instant, @@ -137,6 +143,8 @@ impl PacketNumSpace { consecutive_non_ack_eliciting_sent: 0, lowest_1rtt_pkt_num: std::u64::MAX, largest_rx_pkt_num: 0, + first_pkt_num_recv: None, + first_pkt_num_sent: None, largest_rx_pkt_time: Instant::now(), largest_rx_non_probing_pkt_num: 0, recv_pkt_num_need_ack: RangeSet::new(crate::MAX_ACK_RANGES), diff --git a/src/packet.rs b/src/packet.rs index ea9430ba..7db80681 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1378,7 +1378,7 @@ mod tests { b.write(&pkt_hdr_data)?; b.write(&pkt_payload)?; - let aead = Seal::new_with_secret(tls::Algorithm::ChaCha20Poly1305, &secret)?; + let aead = Seal::new_with_secret(tls::Algorithm::ChaCha20Poly1305, secret.to_vec())?; let written = encrypt_packet( out.as_mut_slice(), None, @@ -1409,7 +1409,7 @@ mod tests { }; let pkt_payload = [01, 02, 03, 04]; let cid_seq = Some(2); - let mut secret = [0u8, 32]; + let mut secret = [0u8; 32]; rand::thread_rng().fill_bytes(&mut secret); // encode the packet header and payload @@ -1419,7 +1419,7 @@ mod tests { out[payload_off..payload_end].copy_from_slice(&pkt_payload); // encrypt the packet header and payload - let seal = Seal::new_with_secret(tls::Algorithm::ChaCha20Poly1305, &secret)?; + let seal = Seal::new_with_secret(tls::Algorithm::ChaCha20Poly1305, secret.to_vec())?; let written = encrypt_packet( out.as_mut_slice(), cid_seq, @@ -1438,7 +1438,7 @@ mod tests { assert_eq!(hdr.dcid, pkt_hdr.dcid); assert_eq!(hdr.key_phase, pkt_hdr.key_phase); - let open = Open::new_with_secret(tls::Algorithm::ChaCha20Poly1305, &secret)?; + let open = Open::new_with_secret(tls::Algorithm::ChaCha20Poly1305, secret.to_vec())?; decrypt_header(&mut out, read, &mut hdr, &open)?; assert_eq!(hdr.pkt_num_len, pkt_hdr.pkt_num_len); assert_eq!(hdr.pkt_num, pkt_hdr.pkt_num); diff --git a/src/tls/boringssl/crypto.rs b/src/tls/boringssl/crypto.rs index 4ba7a676..9836d41e 100644 --- a/src/tls/boringssl/crypto.rs +++ b/src/tls/boringssl/crypto.rs @@ -79,36 +79,82 @@ impl Algorithm { } } +struct HeaderKey { + key: aead::quic::HeaderProtectionKey, + raw: Vec, +} + +impl HeaderKey { + fn new(algor: Algorithm, hp_key: Vec) -> Result { + Ok(Self { + key: aead::quic::HeaderProtectionKey::new(algor.hp_algor(), &hp_key) + .map_err(|_| Error::CryptoFail)?, + raw: hp_key, + }) + } +} + +struct PacketKey { + ctx: EvpAeadCtx, + nonce: Vec, +} + +impl PacketKey { + fn new(algor: Algorithm, key: Vec, iv: Vec) -> Result { + Ok(Self { + ctx: new_aead_ctx(algor, &key)?, + nonce: iv, + }) + } +} + /// AEAD encryption. pub struct Seal { algor: Algorithm, - ctx: EvpAeadCtx, - hp_key: aead::quic::HeaderProtectionKey, - nonce: Vec, + secret: Vec, + hdr_key: HeaderKey, + pkt_key: PacketKey, } impl Seal { /// Create a new Seal. - fn new(algor: Algorithm, key: &[u8], iv: &[u8], hp_key: &[u8]) -> Result { - Ok(Seal { + fn new( + algor: Algorithm, + secret: Vec, + hp_key: Vec, + key: Vec, + iv: Vec, + ) -> Result { + Ok(Self { algor, - ctx: new_aead_ctx(algor, key)?, - hp_key: aead::quic::HeaderProtectionKey::new(algor.hp_algor(), hp_key) - .map_err(|_| Error::CryptoFail)?, - nonce: Vec::from(iv), + secret, + hdr_key: HeaderKey::new(algor, hp_key)?, + pkt_key: PacketKey::new(algor, key, iv)?, }) } /// Create a new Seal with secret. - pub fn new_with_secret(algor: Algorithm, secret: &[u8]) -> Result { + pub fn new_with_secret(algor: Algorithm, secret: Vec) -> Result { let mut key = vec![0; algor.key_len()]; let mut iv = vec![0; algor.nonce_len()]; - let mut pn_key = vec![0; algor.key_len()]; - key::derive_pkt_key(algor.hkdf_algor(), secret, &mut key)?; - key::derive_pkt_iv(algor.hkdf_algor(), secret, &mut iv)?; - key::derive_hdr_key(algor.hkdf_algor(), secret, &mut pn_key)?; + let mut hp_key = vec![0; algor.key_len()]; + key::derive_pkt_key(algor.hkdf_algor(), &secret, &mut key)?; + key::derive_pkt_iv(algor.hkdf_algor(), &secret, &mut iv)?; + key::derive_hdr_key(algor.hkdf_algor(), &secret, &mut hp_key)?; + + Self::new(algor, secret, hp_key, key, iv) + } + + /// Derive next packet key. + pub fn derive_next_packet_key(&self) -> Result { + let mut next_secret = vec![0; self.secret.len()]; + key::derive_next_packet_key(self.algor.hkdf_algor(), &self.secret, &mut next_secret)?; + let mut next_key = Self::new_with_secret(self.algor, next_secret)?; + + // The header protection key is not updated. + next_key.hdr_key = HeaderKey::new(self.algor, self.hdr_key.raw.clone())?; - Self::new(algor, &key, &iv, &pn_key) + Ok(next_key) } /// Encrypt the plaintext and authenticate it in place. @@ -131,10 +177,10 @@ impl Seal { return Err(Error::CryptoFail); } - let nonce = build_nonce(&self.nonce, cid_seq, counter); + let nonce = build_nonce(&self.pkt_key.nonce, cid_seq, counter); let rc = unsafe { EVP_AEAD_CTX_seal_scatter( - &self.ctx, + &self.pkt_key.ctx, buf.as_mut_ptr(), buf[in_len..].as_mut_ptr(), &mut out_tag_len, @@ -158,7 +204,10 @@ impl Seal { /// Generate header protection mask. pub fn new_mask(&self, sample: &[u8]) -> Result<[u8; 5]> { - self.hp_key.new_mask(sample).map_err(|_| Error::CryptoFail) + self.hdr_key + .key + .new_mask(sample) + .map_err(|_| Error::CryptoFail) } pub fn algor(&self) -> Algorithm { @@ -169,33 +218,50 @@ impl Seal { /// AEAD decryption. pub struct Open { algor: Algorithm, - ctx: EvpAeadCtx, - hp_key: aead::quic::HeaderProtectionKey, - nonce: Vec, + secret: Vec, + hdr_key: HeaderKey, + pkt_key: PacketKey, } impl Open { /// Create a new Open. - fn new(algor: Algorithm, key: &[u8], iv: &[u8], hp_key: &[u8]) -> Result { - Ok(Open { + fn new( + algor: Algorithm, + secret: Vec, + hp_key: Vec, + key: Vec, + iv: Vec, + ) -> Result { + Ok(Self { algor, - ctx: new_aead_ctx(algor, key)?, - hp_key: aead::quic::HeaderProtectionKey::new(algor.hp_algor(), hp_key) - .map_err(|_| Error::CryptoFail)?, - nonce: Vec::from(iv), + secret, + hdr_key: HeaderKey::new(algor, hp_key)?, + pkt_key: PacketKey::new(algor, key, iv)?, }) } /// Create a new Open with secret. - pub fn new_with_secret(algor: Algorithm, secret: &[u8]) -> Result { + pub fn new_with_secret(algor: Algorithm, secret: Vec) -> Result { let mut key = vec![0; algor.key_len()]; let mut iv = vec![0; algor.nonce_len()]; let mut hp_key = vec![0; algor.key_len()]; - key::derive_pkt_key(algor.hkdf_algor(), secret, &mut key)?; - key::derive_pkt_iv(algor.hkdf_algor(), secret, &mut iv)?; - key::derive_hdr_key(algor.hkdf_algor(), secret, &mut hp_key)?; + key::derive_pkt_key(algor.hkdf_algor(), &secret, &mut key)?; + key::derive_pkt_iv(algor.hkdf_algor(), &secret, &mut iv)?; + key::derive_hdr_key(algor.hkdf_algor(), &secret, &mut hp_key)?; + + Self::new(algor, secret, hp_key, key, iv) + } + + /// Derive next packet key. + pub fn derive_next_packet_key(&self) -> Result { + let mut next_secret = vec![0; self.secret.len()]; + key::derive_next_packet_key(self.algor.hkdf_algor(), &self.secret, &mut next_secret)?; + let mut next_key = Self::new_with_secret(self.algor, next_secret)?; + + // The header protection key is not updated. + next_key.hdr_key = HeaderKey::new(self.algor, self.hdr_key.raw.clone())?; - Self::new(algor, &key, &iv, &hp_key) + Ok(next_key) } /// Decrypt the ciphertext into plaintext. @@ -217,10 +283,10 @@ impl Open { } let max_out_len = out_len; - let nonce = build_nonce(&self.nonce, cid_seq, counter); + let nonce = build_nonce(&self.pkt_key.nonce, cid_seq, counter); let rc = unsafe { EVP_AEAD_CTX_open( - &self.ctx, + &self.pkt_key.ctx, plaintext.as_mut_ptr(), &mut out_len, max_out_len, @@ -241,7 +307,10 @@ impl Open { /// Generate header protection mask. pub fn new_mask(&self, sample: &[u8]) -> Result<[u8; 5]> { - self.hp_key.new_mask(sample).map_err(|_| Error::CryptoFail) + self.hdr_key + .key + .new_mask(sample) + .map_err(|_| Error::CryptoFail) } /// Return the AEAD algorithm. @@ -278,14 +347,14 @@ pub fn derive_initial_secrets(cid: &[u8], version: u32, is_server: bool) -> Resu if is_server { return Ok(( - Open::new(aead, &client_key, &client_iv, &client_hp_key)?, - Seal::new(aead, &server_key, &server_iv, &server_hp_key)?, + Open::new(aead, secret.to_vec(), client_hp_key, client_key, client_iv)?, + Seal::new(aead, secret.to_vec(), server_hp_key, server_key, server_iv)?, )); } Ok(( - Open::new(aead, &server_key, &server_iv, &server_hp_key)?, - Seal::new(aead, &client_key, &client_iv, &client_hp_key)?, + Open::new(aead, secret.to_vec(), server_hp_key, server_key, server_iv)?, + Seal::new(aead, secret.to_vec(), client_hp_key, client_key, client_iv)?, )) } diff --git a/src/tls/boringssl/tls.rs b/src/tls/boringssl/tls.rs index 6bbe1b95..78f24ece 100644 --- a/src/tls/boringssl/tls.rs +++ b/src/tls/boringssl/tls.rs @@ -904,7 +904,7 @@ extern "C" fn set_read_secret( if level != tls::Level::ZeroRTT || session_data.is_server { let secret = unsafe { slice::from_raw_parts(secret, secret_len) }; - let open = match crypto::Open::new_with_secret(aead, secret) { + let open = match crypto::Open::new_with_secret(aead, secret.to_vec()) { Ok(v) => v, Err(_) => return 0, }; @@ -945,7 +945,7 @@ extern "C" fn set_write_secret( if level != tls::Level::ZeroRTT || !session_data.is_server { let secret = unsafe { slice::from_raw_parts(secret, secret_len) }; - let seal = match crypto::Seal::new_with_secret(aead, secret) { + let seal = match crypto::Seal::new_with_secret(aead, secret.to_vec()) { Ok(v) => v, Err(_) => return 0, }; diff --git a/src/tls/key.rs b/src/tls/key.rs index 630ee765..93c7e906 100644 --- a/src/tls/key.rs +++ b/src/tls/key.rs @@ -69,7 +69,7 @@ pub fn derive_hdr_key(algor: hkdf::Algorithm, secret: &[u8], out: &mut [u8]) -> hkdf_expand_label(&prk, b"quic hp", out) } -pub fn derive_updated_pkt_key(algor: hkdf::Algorithm, secret: &[u8], out: &mut [u8]) -> Result<()> { +pub fn derive_next_packet_key(algor: hkdf::Algorithm, secret: &[u8], out: &mut [u8]) -> Result<()> { let prk = hkdf::Prk::new_less_safe(algor, secret); hkdf_expand_label(&prk, b"quic ku", out) } @@ -194,7 +194,7 @@ mod tests { // Update packet key. let mut updated_pkt_key = [0; 32]; - assert!(derive_updated_pkt_key(algor, &secret, &mut updated_pkt_key).is_ok()); + assert!(derive_next_packet_key(algor, &secret, &mut updated_pkt_key).is_ok()); let expected_updated_pkt_key = [ 0x12, 0x23, 0x50, 0x47, 0x55, 0x03, 0x6d, 0x55, 0x63, 0x42, 0xee, 0x93, 0x61, 0xd2, 0x53, 0x42, 0x1a, 0x82, 0x6c, 0x9e, 0xcd, 0xf3, 0xc7, 0x14, 0x86, 0x84, 0xb3, 0x6b, diff --git a/src/tls/tls.rs b/src/tls/tls.rs index 790a5873..42165a6a 100644 --- a/src/tls/tls.rs +++ b/src/tls/tls.rs @@ -12,17 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::mem; use std::ops::Index; use std::ops::IndexMut; use std::path::Path; use std::sync::Arc; +use std::time::Duration; +use std::time::Instant; +use log::trace; use strum::EnumCount; use strum::IntoEnumIterator; use strum_macros::EnumCount; use strum_macros::EnumIter; use crate::codec::Decoder; +use crate::connection::space::PacketNumSpace; +use crate::connection::timer::Timer; +use crate::connection::timer::TimerTable; +use crate::packet::PacketHeader; +use crate::packet::PacketType; use crate::ConnectionId; use crate::Error; use crate::Result; @@ -169,7 +178,11 @@ impl TlsConfig { impl TlsConfig { /// Create new TlsSession. - pub fn new_session(&self, host_name: Option<&str>, is_server: bool) -> Result { + pub(crate) fn new_session( + &self, + host_name: Option<&str>, + is_server: bool, + ) -> Result { let mut session = self.tls_ctx.new_session()?; session.init(is_server)?; if !is_server { @@ -196,6 +209,9 @@ impl TlsConfig { conf_selector: None, early_data_rejected: false, }, + current_key_phase: false, + prev_key: None, + next_key: None, }) } } @@ -247,9 +263,21 @@ pub struct TlsSessionData { early_data_rejected: bool, } -pub struct TlsSession { +pub(crate) struct TlsSession { + /// Boringssl TLS session. session: boringssl::tls::Session, + + /// TLS session data. data: TlsSessionData, + + /// Current key phase. + current_key_phase: bool, + + /// Keys for previous key phase. + prev_key: Option, + + /// Keys for next key phase. + next_key: Option, } impl TlsSession { @@ -305,6 +333,146 @@ impl TlsSession { self.data.key_collection[level] = Keys::default(); } + /// Derive next keys. + fn derive_keys(&self) -> Result { + let key = &self.data.key_collection[Level::OneRTT]; + if key.open.is_none() || key.seal.is_none() { + return Err(Error::TlsFail("derive not available now".into())); + } + + Ok(Keys { + open: Some(key.open.as_ref().unwrap().derive_next_packet_key()?), + seal: Some(key.seal.as_ref().unwrap().derive_next_packet_key()?), + }) + } + + /// Select decryption key. + pub fn select_key( + &mut self, + confirmed: bool, + enable_multipath: bool, + hdr: &PacketHeader, + space: &PacketNumSpace, + ) -> Result<(&Open, bool)> { + if !confirmed + || hdr.pkt_type != PacketType::OneRTT + || self.current_key_phase == hdr.key_phase + || enable_multipath + { + trace!("{} select current key", self.data.trace_id); + let key = self.get_keys(hdr.pkt_type.to_level()?); + return Ok((key.open.as_ref().ok_or(Error::InternalError)?, false)); + } + + if let Some(first_pkt_num_recv) = space.first_pkt_num_recv { + if hdr.pkt_num > first_pkt_num_recv { + trace!("{} select next key", self.data.trace_id); + + if self.next_key.is_none() { + self.next_key = Some(self.derive_keys()?); + } + let next_key = self.next_key.as_ref().unwrap(); + return Ok((next_key.open.as_ref().ok_or(Error::InternalError)?, true)); + } + } + + if let Some(prev_key) = &self.prev_key { + trace!("{} select previous key", self.data.trace_id); + + return Ok((prev_key.open.as_ref().ok_or(Error::InternalError)?, false)); + } + + trace!("{} previous key already discarded", self.data.trace_id); + Err(Error::Done) + } + + /// Update key. + fn update_key(&mut self, space: &mut PacketNumSpace) -> Result<()> { + if self.next_key.is_none() { + self.next_key = Some(self.derive_keys()?); + } + + self.current_key_phase = !self.current_key_phase; + self.prev_key = Some(mem::replace( + &mut self.data.key_collection[Level::OneRTT], + self.next_key.take().unwrap(), + )); + space.first_pkt_num_recv = None; + space.first_pkt_num_sent = None; + + Ok(()) + } + + /// Try to update key after receiving a packet. + pub fn try_update_key( + &mut self, + timers: &mut TimerTable, + space: &mut PacketNumSpace, + attempt_key_update: bool, + hdr: &PacketHeader, + now: Instant, + max_pto: Option, + ) -> Result<()> { + if attempt_key_update { + self.update_key(space)?; + } + + if space.first_pkt_num_recv.is_none() && self.current_key_phase == hdr.key_phase { + space.first_pkt_num_recv = Some(hdr.pkt_num); + + if self.prev_key.is_some() { + if let Some(duration) = max_pto { + // An endpoint SHOULD retain old read keys for no more than three times the PTO after + // having received a packet protected using the new keys. After this period, old read + // keys and their corresponding secrets SHOULD be discarded. + // See RFC 9001 Section 6.5. + timers.set(Timer::KeyDiscard, now + duration * 3); + } + } + } + + Ok(()) + } + + /// If a key update is allowed to initiate. + fn key_update_allowed(&self, enable_multipath: bool, space: &PacketNumSpace) -> Result { + if enable_multipath { + // TODO: support key update in multipath scenario. + return Ok(false); + } + + if let Some(first_pkt_num_sent) = space.first_pkt_num_sent { + if first_pkt_num_sent <= space.largest_acked_pkt { + return Ok(true); + } + } + + Ok(false) + } + + /// Initiate a key update. + pub fn initiate_key_update( + &mut self, + space: &mut PacketNumSpace, + enable_multipath: bool, + ) -> Result<()> { + if !self.key_update_allowed(enable_multipath, space)? { + return Err(Error::Done); + } + + self.update_key(space) + } + + /// Discard the previous key. + pub fn discard_prev_key(&mut self) { + self.prev_key = None; + } + + /// Return the current key phase. + pub fn current_key_phase(&self) -> bool { + self.current_key_phase + } + /// Get overhead size of Seal operation pub fn get_overhead(&self, level: Level) -> Option { self.data.key_collection[level] From b6173baeedc063d858a73e7176add1fd1cb5fdcf Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Thu, 25 Apr 2024 19:12:33 +0800 Subject: [PATCH 11/13] Add datagram packetization layer PMTU discovery (#245) --- include/tquic.h | 13 +- interop/run_endpoint.sh | 6 - src/connection/connection.rs | 297 +++++++++++++++++++++++++++++++---- src/connection/path.rs | 12 ++ src/connection/pmtu.rs | 253 +++++++++++++++++++++++++++++ src/connection/recovery.rs | 12 +- src/connection/space.rs | 11 +- src/ffi.rs | 15 +- src/frame.rs | 14 +- src/lib.rs | 26 +-- 10 files changed, 594 insertions(+), 65 deletions(-) create mode 100644 src/connection/pmtu.rs diff --git a/include/tquic.h b/include/tquic.h index f35be838..140ea562 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -318,12 +318,17 @@ void quic_config_set_max_handshake_timeout(struct quic_config_t *config, uint64_ */ void quic_config_set_recv_udp_payload_size(struct quic_config_t *config, uint16_t v); +/* + * Enable the Datagram Packetization Layer Path MTU Discovery + * default value is true. + */ +void enable_dplpmtud(struct quic_config_t *config, bool v); + /** - * Set the initial maximum outgoing UDP payload size. - * The default and minimum value is `1200`. + * Set the maximum outgoing UDP payload size in bytes. + * It corresponds to the maximum datagram size that DPLPMTUD tries to discovery. * - * The configuration should be changed with caution. The connection may - * not work properly if an inappropriate value is set. + * The default value is `1200` which means let DPLPMTUD choose a value. */ void quic_config_set_send_udp_payload_size(struct quic_config_t *config, uintptr_t v); diff --git a/interop/run_endpoint.sh b/interop/run_endpoint.sh index 1e05edab..11aab412 100644 --- a/interop/run_endpoint.sh +++ b/interop/run_endpoint.sh @@ -80,9 +80,6 @@ if [ "$ROLE" == "client" ]; then zerortt) CLIENT_ARGS="$CLIENT_ARGS --session-file=session.bin --enable-early-data" ;; - transfer) - CLIENT_ARGS="$CLIENT_ARGS --send-udp-payload-size 1400" - ;; http3) CLIENT_ALPN="--alpn h3" ;; @@ -112,9 +109,6 @@ elif [ "$ROLE" == "server" ]; then retry) SERVER_ARGS="$SERVER_ARGS --enable-retry" ;; - transfer) - SERVER_ARGS="$SERVER_ARGS --send-udp-payload-size 1400" - ;; *) ;; esac diff --git a/src/connection/connection.rs b/src/connection/connection.rs index 58b7608b..9e7a163d 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -710,7 +710,7 @@ impl Connection { match frame { Frame::Paddings { .. } => (), // just ignore - Frame::Ping => (), // just ignore + Frame::Ping { .. } => (), // just ignore Frame::Ack { ack_delay, @@ -1318,6 +1318,18 @@ impl Connection { self.streams.on_reset_stream_frame_acked(stream_id); } + Frame::Ping { + pmtu_probe: Some((path_id, probe_size)), + } => { + if let Ok(path) = self.paths.get_mut(path_id) { + let peer_mds = self.peer_transport_params.max_udp_payload_size as usize; + path.dplpmtud.on_pmtu_probe_acked(probe_size, peer_mds); + let current = path.dplpmtud.get_current_size(); + path.recovery.update_max_datagram_size(current, false); + debug!("{} path {:?} MTU is {} now", self.trace_id, path, current); + } + } + _ => (), } } @@ -1352,7 +1364,7 @@ impl Connection { return crate::MIN_CLIENT_INITIAL_LEN; } - // Use the configured or validated max_datagram_size + // Use the validated max_datagram_size self.paths .get(pid) .ok() @@ -1403,20 +1415,20 @@ impl Connection { let mut left = cmp::min(out.len(), self.max_datagram_size(pid)); left = self.paths.cmp_anti_ampl_limit(pid, left); - let out = &mut out[..left]; let mut done = 0; // Write QUIC packets to the buffer let mut has_initial = false; while left > 0 { - let (pkt_type, written) = match self.send_packet(&mut out[done..], pid, has_initial) { - Ok(v) => v, - Err(Error::BufferTooShort) | Err(Error::Done) => break, - Err(e) => return Err(e), - }; + let (pkt_type, is_pmtu_probe, written) = + match self.send_packet(&mut out[done..], left, pid, done == 0, has_initial) { + Ok(v) => v, + Err(Error::BufferTooShort) | Err(Error::Done) => break, + Err(e) => return Err(e), + }; - left -= written; - done += written; + left = left.saturating_sub(written); + done = done.saturating_add(written); match pkt_type { PacketType::Initial => has_initial = true, @@ -1427,6 +1439,13 @@ impl Connection { _ => (), } + + // The PMTU probe is not coalesced with other packets, since packets + // that are larger than the current maximum datagram size are more + // likely to be dropped by the network. + if is_pmtu_probe { + break; + } } if done == 0 { @@ -1455,6 +1474,17 @@ impl Connection { /// Write a QUIC packet to the given buffer. /// + /// The `out` is the write buffer with a size that must be no less than `left`. + /// The `left` is the upper limit for the write size when sending a non-PMTU + /// probe packet. + /// The `path_id` is the selected path for sending out packets. + /// The `first` indicates that it is the first packet being written to the UDP + /// datagram. + /// The `has_initial` indicates that a previous Initial packet has been written + /// the UDP datagram. + /// + /// Return a tuple consisting of the packet type, PMUT probe flag, and the + /// packet size upon success. /// Return `Error::BufferTooShort` if the input buffer is too small to /// write a single QUIC packet. /// Return `Error::Done` if no packet can be sent. @@ -1462,13 +1492,15 @@ impl Connection { fn send_packet( &mut self, out: &mut [u8], + mut left: usize, path_id: usize, + first: bool, has_initial: bool, - ) -> Result<(PacketType, usize)> { + ) -> Result<(PacketType, bool, usize)> { let now = time::Instant::now(); - if out.is_empty() { - return Err(Error::BufferTooShort); + if out.len() < left { + return Err(Error::InvalidState("buffer too short".into())); } if self.is_draining() { @@ -1479,8 +1511,6 @@ impl Connection { let pkt_type = self.select_send_packet_type(path_id)?; let level = pkt_type.to_level()?; - let mut left = out.len(); - // Prepare and encode packet header (except for the Length and Packet Number field) let space_id = self.get_space_id(pkt_type, path_id)?; let pkt_num = self @@ -1519,7 +1549,7 @@ impl Connection { }, key_phase: self.tls_session.current_key_phase(), }; - let hdr_offset = hdr.to_bytes(out)?; + let hdr_offset = hdr.to_bytes(&mut out[..left])?; // Check the size of remaining space of the buffer let mut pkt_num_offset = hdr_offset; @@ -1543,9 +1573,8 @@ impl Connection { } // Encode packet number - let len = packet::encode_packet_num(pkt_num, &mut out[pkt_num_offset..])?; + let len = packet::encode_packet_num(pkt_num, &mut out[pkt_num_offset..left])?; let payload_offset = pkt_num_offset + len; - let payload_end = out.len() - crypto_overhead; // Reserved for crypto overhead // Write frames into the packet payload let (ack_elicit_required, is_probe) = { @@ -1555,14 +1584,17 @@ impl Connection { let mut write_status = FrameWriteStatus { ack_elicit_required, is_probe, + overhead: total_overhead, ..FrameWriteStatus::default() }; match self.send_frames( - &mut out[payload_offset..payload_end], + &mut out[payload_offset..], + left, &mut write_status, pkt_type, path_id, + first, has_initial, ) { Ok(..) => (), @@ -1612,6 +1644,7 @@ impl Connection { ack_eliciting: write_status.ack_eliciting, in_flight: write_status.in_flight, has_data: write_status.has_data, + pmtu_probe: write_status.is_pmtu_probe, frames: write_status.frames, rate_sample_state: Default::default(), buffer_flags: write_status.buffer_flags, @@ -1668,6 +1701,13 @@ impl Connection { self.paths.on_path_chal_sent(path_id, data, written, now)?; } + if write_status.is_pmtu_probe { + self.paths + .get_mut(path_id)? + .dplpmtud + .on_pmtu_probe_sent(written); + } + // Update connection state and statistic metrics self.stats.sent_count += 1; self.stats.sent_bytes += written as u64; @@ -1709,7 +1749,7 @@ impl Connection { self.flags.insert(SentAckElicitingSinceRecvPkt); } - Ok((pkt_type, written)) + Ok((pkt_type, write_status.is_pmtu_probe, written)) } // Write QUIC frames to the payload of a QUIC packet. @@ -1717,19 +1757,22 @@ impl Connection { // The current write offset in the `out` buffer is recorded in `st.written` // Return Error::Done if there is no frame to send or no left room to write more frames. // Return other Error if found unexpected error. + #[allow(clippy::too_many_arguments)] fn send_frames( &mut self, - out: &mut [u8], + buf: &mut [u8], + left: usize, st: &mut FrameWriteStatus, pkt_type: PacketType, path_id: usize, + first: bool, has_initial: bool, ) -> Result<()> { // Write an ACK frame - self.try_write_ack_frame(out, st, pkt_type, path_id)?; + self.try_write_ack_frame(&mut buf[..left], st, pkt_type, path_id)?; // Write a CONNECTION_CLOSE frame - self.try_write_close_frame(out, st, pkt_type, path_id)?; + self.try_write_close_frame(&mut buf[..left], st, pkt_type, path_id)?; let path = self.paths.get_mut(path_id)?; path.recovery.stat_cwnd_limited(); @@ -1743,6 +1786,15 @@ impl Connection { return Err(Error::Done); } + // Write PMTU probe frames + // Note: To probe the path MTU, the write size will exceed `left` but + // not surpass the length of `buf`. + self.try_write_pmut_probe_frames(buf, st, pkt_type, path_id, first)?; + + // Since it's not a PMTU probe packet, let's cap the buffer size for + // simplicity. + let out = &mut buf[..left]; + // Write PATH_CHALLENGE/PATH_RESPONSE frames self.try_write_path_validation_frames(out, st, pkt_type, path_id)?; @@ -1779,7 +1831,7 @@ impl Connection { // Write a PING frame if st.ack_elicit_required && !st.ack_eliciting && !self.is_closing() { - let frame = Frame::Ping; + let frame = Frame::Ping { pmtu_probe: None }; Connection::write_frame_to_packet(frame, out, st)?; st.ack_eliciting = true; st.in_flight = true; @@ -1863,6 +1915,55 @@ impl Connection { Ok(()) } + /// Write PMTU probe frames if needed. + fn try_write_pmut_probe_frames( + &mut self, + buf: &mut [u8], + st: &mut FrameWriteStatus, + pkt_type: PacketType, + path_id: usize, + first: bool, + ) -> Result<()> { + if pkt_type != PacketType::OneRTT + || !self.flags.contains(HandshakeCompleted) + || self.is_closing() + || !first + || !st.frames.is_empty() + { + return Ok(()); + } + + let peer_mds = self.peer_transport_params.max_udp_payload_size as usize; + let path = self.paths.get_mut(path_id)?; + let probe_size = path.dplpmtud.get_probe_size(peer_mds); + if !path.validated() + || !path.dplpmtud.should_probe() + || probe_size > buf.len() + || (probe_size as u64) > path.recovery.congestion.congestion_window() + || path.recovery.congestion.in_recovery(time::Instant::now()) + { + return Ok(()); + } + + // The content of the PMTU probe is limited to PING and PADDING frames. + let frame = frame::Frame::Ping { + pmtu_probe: Some((path_id, probe_size)), + }; + Connection::write_frame_to_packet(frame, buf, st)?; + + let padding_len = probe_size - st.overhead - 1; + let frame = frame::Frame::Paddings { len: padding_len }; + Connection::write_frame_to_packet(frame, buf, st)?; + + st.ack_eliciting = true; + st.in_flight = true; + st.is_pmtu_probe = true; + + // Finish writing the datagram to prevent it from coalescing with other + // QUIC packets. + Err(Error::Done) + } + /// Populate Acknowledgement frame to packet payload buffer. fn try_write_ack_frame( &mut self, @@ -2647,6 +2748,22 @@ impl Connection { self.streams.on_max_streams_frame_lost(bidi, max); } + // A PING frame contain no information, so lost PING frames + // do not require repair. However, if it indicates the loss + // of a PMTU probe, we will try to schedule a new probe. + Frame::Ping { + pmtu_probe: Some((path_id, probe_size)), + } => { + if let Ok(path) = self.paths.get_mut(path_id) { + let peer_mds = self.peer_transport_params.max_udp_payload_size as usize; + path.dplpmtud.on_pmtu_probe_lost(probe_size, peer_mds); + debug!( + "{} lost MTU probe on path {:?} size={}", + self.trace_id, path, probe_size + ); + } + } + _ => (), } } @@ -2793,6 +2910,7 @@ impl Connection { || self.need_send_new_token_frame() || self.local_error.as_ref().map_or(false, |e| e.is_app) || path.need_send_validation_frames(self.is_server) + || path.dplpmtud.should_probe() || self.cids.need_send_cid_control_frames() || self.streams.need_send_stream_frames() || self.spaces.need_send_buffered_frames()) @@ -4058,6 +4176,12 @@ struct FrameWriteStatus { /// Whether the congestion window should be ignored. is_probe: bool, + /// Whether it is a PMTU probe packet + is_pmtu_probe: bool, + + /// Packet overhead (i.e. packet header and crypto overhead) in bytes + overhead: usize, + /// Status about buffered frames written to the packet. buffer_flags: BufferFlags, } @@ -4209,7 +4333,7 @@ pub(crate) mod tests { pub fn conn_packets_out(conn: &mut Connection) -> Result, PacketInfo)>> { let mut packets = Vec::new(); loop { - let mut out = vec![0u8; 1350]; + let mut out = vec![0u8; 1500]; let info = match conn.send(&mut out) { Ok((written, info)) => { out.truncate(written); @@ -4371,6 +4495,7 @@ pub(crate) mod tests { conf.set_send_batch_size(2); conf.set_max_handshake_timeout(0); conf.enable_multipath(false); + conf.enable_dplpmtud(true); let application_protos = vec![b"h3".to_vec()]; let tls_config = if !is_server { @@ -5238,14 +5363,29 @@ pub(crate) mod tests { #[test] fn max_datagram_size() -> Result<()> { let mut client_config = TestPair::new_test_config(false)?; - client_config.set_send_udp_payload_size(2000); + client_config.set_send_udp_payload_size(1200); let mut server_config = TestPair::new_test_config(true)?; server_config.set_recv_udp_payload_size(1550); server_config.set_initial_max_data(10000); server_config.set_initial_max_stream_data_bidi_remote(10000); let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + assert_eq!( + test_pair.client.paths.get(0)?.recovery.max_datagram_size, + 1200, + ); + + // Handshake and discovery path MTU assert_eq!(test_pair.handshake(), Ok(())); + test_pair.move_forward()?; + // Check path MTU + let mds_ipv4 = 1472; + assert_eq!( + test_pair.client.paths.get(0)?.recovery.max_datagram_size, + mds_ipv4 + ); + + // Check outgoing packet size let mut buf = vec![0; 2000]; assert!(test_pair .client @@ -5253,7 +5393,7 @@ pub(crate) mod tests { .is_ok()); let r = test_pair.client.send(&mut buf); assert!(r.is_ok()); - assert_eq!(r.unwrap().0, 1550); + assert_eq!(r.unwrap().0, mds_ipv4); Ok(()) } @@ -5667,6 +5807,97 @@ pub(crate) mod tests { Ok(()) } + #[test] + fn path_mtu_discovery_max() -> Result<()> { + let cases = [ + // (cli_enable_dplpmtud, srv_enable_dplpmtud, cli_mtu , srv_mtu) + (false, false, 1200, 1200), + (false, true, 1200, 1472), + (true, false, 1472, 1200), + (true, true, 1472, 1472), + ]; + + for case in cases { + let mut client_config = TestPair::new_test_config(false)?; + client_config.enable_dplpmtud(case.0); + let mut server_config = TestPair::new_test_config(true)?; + server_config.enable_dplpmtud(case.1); + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + assert_eq!(test_pair.handshake(), Ok(())); + + test_pair.move_forward()?; + assert_eq!( + test_pair.client.paths.get(0)?.recovery.max_datagram_size, + case.2 + ); + assert_eq!( + test_pair.server.paths.get(0)?.recovery.max_datagram_size, + case.3 + ); + } + + Ok(()) + } + + #[test] + fn path_mtu_discovery_lost() -> Result<()> { + let cases = [ + // (router_mtu, searched_mtu) + (1472, 1463), + (1452, 1446), + (1432, 1429), + (1412, 1404), + (1392, 1387), + (1372, 1370), + ]; + + for case in cases { + let mut client_config = TestPair::new_test_config(false)?; + client_config.enable_dplpmtud(true); + let mut server_config = TestPair::new_test_config(true)?; + server_config.enable_dplpmtud(false); + server_config.set_initial_max_data(10240); + server_config.set_initial_max_stream_data_bidi_remote(10240); + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; + let router_mtu: usize = case.0; + + // Handshake + while !test_pair.client.is_established() || !test_pair.server.is_established() { + let mut packets = TestPair::conn_packets_out(&mut test_pair.client)?; + packets.retain(|p| p.0.len() < router_mtu); // fake dropping packets + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + } + + // Path MTU searching + let data = Bytes::from_static(b"data"); + for i in 0..30 { + let _ = test_pair.client.stream_write(0, data.clone(), false); + let mut packets = TestPair::conn_packets_out(&mut test_pair.client)?; + packets.retain(|p| p.0.len() < router_mtu); // fake dropping packets + + TestPair::conn_packets_in(&mut test_pair.server, packets)?; + let packets = TestPair::conn_packets_out(&mut test_pair.server)?; + TestPair::conn_packets_in(&mut test_pair.client, packets)?; + + if test_pair.client.timeout().is_some() { + let timeout = test_pair.client.timers.get(Timer::LossDetection); + test_pair.client.on_timeout(timeout.unwrap()); + } + } + + // Check final MTU + assert_eq!( + test_pair.client.paths.get(0)?.recovery.max_datagram_size, + case.1 + ); + } + + Ok(()) + } + #[test] fn conn_basic_operations() -> Result<()> { let mut test_pair = TestPair::new_with_zero_cid()?; @@ -5983,7 +6214,11 @@ pub(crate) mod tests { #[test] fn recv_packet_skipped_packet_number() -> Result<()> { - let mut test_pair = TestPair::new_with_test_config()?; + let mut client_config = TestPair::new_test_config(false)?; + client_config.enable_dplpmtud(false); + let mut server_config = TestPair::new_test_config(true)?; + server_config.enable_dplpmtud(false); + let mut test_pair = TestPair::new(&mut client_config, &mut server_config)?; assert_eq!(test_pair.handshake(), Ok(())); let info = TestPair::new_test_packet_info(false); @@ -5994,7 +6229,7 @@ pub(crate) mod tests { let packet = TestPair::conn_build_packet( &mut test_pair.client, PacketType::OneRTT, - &[frame::Frame::Ping], + &[frame::Frame::Ping { pmtu_probe: None }], )?; // Server recv OneRTT packet and send ack @@ -6028,7 +6263,7 @@ pub(crate) mod tests { TestPair::conn_build_packet( &mut test_pair.client, PacketType::OneRTT, - &[frame::Frame::Ping], + &[frame::Frame::Ping { pmtu_probe: None }], )?, info, )); @@ -6433,6 +6668,7 @@ pub(crate) mod tests { }; let mut client_config = TestPair::new_test_config(false)?; + client_config.enable_dplpmtud(false); let mut server_config = TestPair::new_test_config(true)?; server_config.local_transport_params = server_transport_params.clone(); @@ -7046,6 +7282,7 @@ pub(crate) mod tests { mod cid; mod flowcontrol; pub mod path; +mod pmtu; mod recovery; pub(crate) mod rtt; pub(crate) mod space; diff --git a/src/connection/path.rs b/src/connection/path.rs index ac8673ce..1750ddfe 100644 --- a/src/connection/path.rs +++ b/src/connection/path.rs @@ -21,6 +21,7 @@ use std::time::Duration; use slab::Slab; +use super::pmtu::Dplpmtud; use super::recovery::Recovery; use super::timer; use crate::connection::SpaceId; @@ -87,6 +88,9 @@ pub struct Path { /// Total bytes the server can send before the client's address is verified. pub(super) anti_ampl_limit: usize, + /// The current pmtu probing state of the path. + pub(super) dplpmtud: Dplpmtud, + /// Trace id. trace_id: String, @@ -112,6 +116,12 @@ impl Path { (PathState::Unknown, None, None) }; + let dplpmtud = Dplpmtud::new( + conf.enable_dplpmtud, + conf.max_datagram_size, + remote_addr.is_ipv6(), + ); + Self { local_addr, remote_addr, @@ -128,6 +138,7 @@ impl Path { verified_peer_address: false, peer_verified_local_address: false, anti_ampl_limit: 0, + dplpmtud, trace_id: trace_id.to_string(), space_id: SpaceId::Data, is_abandon: false, @@ -600,6 +611,7 @@ mod tests { initial_rtt: crate::INITIAL_RTT, pto_linear_factor: crate::DEFAULT_PTO_LINEAR_FACTOR, max_pto: crate::MAX_PTO, + ..RecoveryConfig::default() } } diff --git a/src/connection/pmtu.rs b/src/connection/pmtu.rs new file mode 100644 index 00000000..989e3632 --- /dev/null +++ b/src/connection/pmtu.rs @@ -0,0 +1,253 @@ +// Copyright (c) 2024 The TQUIC Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::cmp; +use std::time; + +/// The size of UDP payloads over IPv4 (1500-20-8) +const MAX_PACKET_SIZE_IPV4: usize = 1472; + +/// The size of UDP payloads over IPv6 (1500-40-8) +const MAX_PACKET_SIZE_IPV6: usize = 1452; + +/// MAX_PROBES represents the limit for the number of consecutive probe +/// attempts of any size. +const MAX_PROBE_COUNT: u8 = 3; + +/// A simple implementation for Packetization Layer Path MTU Discovery for +/// Datagram Transports. +/// See RFC 9000 Section 14.3 and RFC 8899 +#[derive(Default)] +pub(super) struct Dplpmtud { + /// Whether a new probe packet should be sent. + should_probe: bool, + + /// The current path MTU. + current_size: usize, + + /// The size of the current probe packet, which is awaiting confirmation by + /// an acknowledgment. + probe_size: Option, + + /// The number of successive unsuccessful probe packets that have been sent. + /// Each time a probe packet is acknowledged, the value is set to zero. + probe_count: u8, + + /// The size of last probe packet which is declared as lost. + failed_size: usize, + + /// It corresponds to the maximum datagram size. + max_pmtu: usize, + + /// Whether it is an IPv6 path. + is_ipv6: bool, +} + +impl Dplpmtud { + pub(super) fn new(enable: bool, mut max_pmtu: usize, is_ipv6: bool) -> Self { + if max_pmtu == crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE { + max_pmtu = if is_ipv6 { + MAX_PACKET_SIZE_IPV6 + } else { + MAX_PACKET_SIZE_IPV4 + }; + } + + Self { + should_probe: enable, + current_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE, + probe_size: None, + max_pmtu, + is_ipv6, + ..Self::default() + } + } + + /// Return whether a PMTU probe should be scheduled + pub(super) fn should_probe(&self) -> bool { + self.should_probe + } + + /// Return the current size of probe packet + pub(super) fn get_probe_size(&mut self, peer_max_udp_payload: usize) -> usize { + if let Some(probe_size) = self.probe_size { + return probe_size; + } + + let probe_size = self.cal_probe_size(peer_max_udp_payload); + self.probe_size = Some(probe_size); + probe_size + } + + /// Return the current validated PMTU + pub(super) fn get_current_size(&self) -> usize { + self.current_size + } + + /// Handle sent event of PMTU probe + pub(super) fn on_pmtu_probe_sent(&mut self, pkt_size: usize) { + self.should_probe = false; + } + + /// Handle acknowledgement of PMTU probe + pub(super) fn on_pmtu_probe_acked(&mut self, pkt_size: usize, peer_max_udp_payload: usize) { + self.current_size = cmp::max(self.current_size, pkt_size); + self.probe_count = 0; + self.probe_size = Some(self.cal_probe_size(peer_max_udp_payload)); + self.should_probe = !self.check_finish(peer_max_udp_payload); + } + + /// Handle loss of PMTU probe + pub(super) fn on_pmtu_probe_lost(&mut self, pkt_size: usize, peer_max_udp_payload: usize) { + if Some(pkt_size) != self.probe_size { + return; + } + + self.probe_count += 1; + if self.probe_count < MAX_PROBE_COUNT { + self.should_probe = true; + return; + } + + self.failed_size = pkt_size; + self.probe_size = Some(self.cal_probe_size(peer_max_udp_payload)); + self.probe_count = 0; + self.should_probe = !self.check_finish(peer_max_udp_payload); + } + + /// Calculate the size of probe packet + fn cal_probe_size(&self, peer_max_udp_payload: usize) -> usize { + let mtu_ceiling = self.cal_mtu_ceiling(peer_max_udp_payload); + + // Try the largest ethernet MTU immediately + if self.failed_size == 0 && mtu_ceiling < 1500 { + return mtu_ceiling; + } + + // Pick the half-way point + (self.current_size + mtu_ceiling) / 2 + } + + /// Calculate the upper limit of probe size + fn cal_mtu_ceiling(&self, peer_max_udp_payload: usize) -> usize { + let mut mtu_ceiling = if self.failed_size > 0 { + self.failed_size + } else { + self.max_pmtu + }; + if mtu_ceiling > peer_max_udp_payload { + mtu_ceiling = peer_max_udp_payload; + } + mtu_ceiling + } + + /// Check whether PMTU discovery should be stopped + fn check_finish(&self, peer_max_udp_payload: usize) -> bool { + let mtu_ceiling = self.cal_mtu_ceiling(peer_max_udp_payload); + self.current_size >= mtu_ceiling || self.current_size as f64 / mtu_ceiling as f64 >= 0.99 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn dplpmtud_default() { + let d = Dplpmtud::new(false, 1500, true); + assert_eq!(d.should_probe(), false); + assert_eq!(d.get_current_size(), crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE); + + let mut d = Dplpmtud::new(true, 1500, false); + let peer_max_udp_payload = 1400; + assert_eq!(d.should_probe(), true); + assert_eq!(d.get_current_size(), crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE); + assert_eq!(d.get_probe_size(peer_max_udp_payload), peer_max_udp_payload); + } + + #[test] + fn dplpmtud_max() { + let mut d = Dplpmtud::new(true, 1200, false); + let peer_max_udp_payload = 60000; + assert_eq!(d.should_probe(), true); + + // Fake sending a PMTU probe + let probe_size = d.get_probe_size(peer_max_udp_payload); + d.on_pmtu_probe_sent(probe_size); + assert_eq!(d.should_probe(), false); + + // Fake receiving its acknowledgement + d.on_pmtu_probe_acked(probe_size, peer_max_udp_payload); + assert_eq!(d.get_current_size(), 1472); + assert_eq!(d.should_probe(), false); + } + + #[test] + fn dplpmtud_min() { + let mut d = Dplpmtud::new(true, 1200, true); + let peer_max_udp_payload = 60000; + assert_eq!(d.should_probe(), true); + + for i in 0..10 { + let probe_size = d.get_probe_size(peer_max_udp_payload); + + // Fake failing to probe with size `probe_size` + for i in 0..MAX_PROBE_COUNT { + // Fake sending a PMTU probe + d.on_pmtu_probe_sent(probe_size); + // Fake lost the PMTU porbe + d.on_pmtu_probe_lost(probe_size, peer_max_udp_payload); + } + assert_eq!(d.failed_size, probe_size); + + if !d.should_probe() { + break; + } + } + + assert_eq!(d.get_current_size(), 1200); + assert_eq!(d.should_probe(), false); + } + + #[test] + fn dplpmtud_mid() { + let mut d = Dplpmtud::new(true, 1200, true); + let peer_max_udp_payload = 60000; + assert_eq!(d.should_probe(), true); + + let pmtu = 1350; + for i in 0..10 { + let probe_size = d.get_probe_size(peer_max_udp_payload); + + if probe_size > pmtu { + for i in 0..MAX_PROBE_COUNT { + d.on_pmtu_probe_sent(probe_size); + d.on_pmtu_probe_lost(probe_size, peer_max_udp_payload); + } + assert_eq!(d.failed_size, probe_size); + } else { + d.on_pmtu_probe_sent(probe_size); + d.on_pmtu_probe_acked(probe_size, peer_max_udp_payload); + assert_eq!(d.get_current_size(), probe_size); + } + + if !d.should_probe() { + break; + } + } + + assert_eq!(d.get_current_size(), 1349); + assert_eq!(d.should_probe(), false); + } +} diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index 1f7d474c..d0669b9a 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -55,7 +55,7 @@ pub struct Recovery { /// It is used for PTO calculation. pub max_ack_delay: Duration, - /// The maximum size of outgoing UDP payloads. + /// The validated maximum size of outgoing UDP payloads in bytes. pub max_datagram_size: usize, /// The endpoint do not backoff the first `pto_linear_factor` consecutive probe timeouts. @@ -109,7 +109,7 @@ impl Recovery { pub(super) fn new(conf: &RecoveryConfig) -> Self { Recovery { max_ack_delay: conf.max_ack_delay, - max_datagram_size: conf.max_datagram_size, + max_datagram_size: crate::DEFAULT_SEND_UDP_PAYLOAD_SIZE, pto_linear_factor: conf.pto_linear_factor, max_pto: conf.max_pto, pto_count: 0, @@ -438,7 +438,12 @@ impl Recovery { self.ack_eliciting_in_flight.saturating_sub(1); } } - latest_lost_packet = Some(unacked.clone()); + // Loss of a QUIC packet that is carried in a PMTU probe is not + // a reliable indication of congestion and SHOULD NOT trigger a + // congestion control reaction + if !unacked.pmtu_probe { + latest_lost_packet = Some(unacked.clone()); + } if let Some(qlog) = qlog.as_mut() { self.qlog_recovery_packet_lost(qlog, unacked); } @@ -1036,6 +1041,7 @@ mod tests { initial_rtt: crate::INITIAL_RTT, pto_linear_factor: crate::DEFAULT_PTO_LINEAR_FACTOR, max_pto: crate::MAX_PTO, + ..RecoveryConfig::default() } } diff --git a/src/connection/space.rs b/src/connection/space.rs index 56ddc96e..5a45c77d 100644 --- a/src/connection/space.rs +++ b/src/connection/space.rs @@ -340,6 +340,9 @@ pub struct SentPacket { /// Whether the packet contains CRYPTO or STREAM frame pub has_data: bool, + /// Whether it is a PMUT probe packet + pub pmtu_probe: bool, + /// The number of bytes sent in the packet, not including UDP or IP overhead, /// but including QUIC framing overhead. pub sent_size: usize, @@ -363,6 +366,7 @@ impl Default for SentPacket { ack_eliciting: false, in_flight: false, has_data: false, + pmtu_probe: false, sent_size: 0, rate_sample_state: RateSamplePacketState::default(), buffer_flags: BufferFlags::default(), @@ -544,7 +548,10 @@ mod tests { fn sent_packet() { let sent_pkt = SentPacket { pkt_num: 9, - frames: vec![frame::Frame::Ping, frame::Frame::Paddings { len: 200 }], + frames: vec![ + frame::Frame::Ping { pmtu_probe: None }, + frame::Frame::Paddings { len: 200 }, + ], time_sent: Instant::now(), time_acked: None, time_lost: None, @@ -585,7 +592,7 @@ mod tests { assert_eq!(queue.len(), 2); assert_eq!(queue.is_empty(), false); - let f3 = Frame::Ping; + let f3 = Frame::Ping { pmtu_probe: None }; queue.push_back(f3.clone(), BufferType::Low); assert_eq!(queue.pop_front(), Some((f2.clone(), BufferType::High))); diff --git a/src/ffi.rs b/src/ffi.rs index 41cfd87e..e943bfba 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -148,11 +148,16 @@ pub extern "C" fn quic_config_set_recv_udp_payload_size(config: &mut Config, v: config.set_recv_udp_payload_size(v); } -/// Set the initial maximum outgoing UDP payload size. -/// The default and minimum value is `1200`. -/// -/// The configuration should be changed with caution. The connection may -/// not work properly if an inappropriate value is set. +/// Enable the Datagram Packetization Layer Path MTU Discovery +/// default value is true. +#[no_mangle] +pub extern "C" fn enable_dplpmtud(config: &mut Config, v: bool) { + config.enable_dplpmtud(v); +} + +/// Set the maximum outgoing UDP payload size in bytes. +/// It corresponds to the maximum datagram size that DPLPMTUD tries to discovery. +/// The default value is `1200` which means let DPLPMTUD choose a value. #[no_mangle] pub extern "C" fn quic_config_set_send_udp_payload_size(config: &mut Config, v: usize) { config.set_send_udp_payload_size(v); diff --git a/src/frame.rs b/src/frame.rs index 745a1037..52814dc7 100644 --- a/src/frame.rs +++ b/src/frame.rs @@ -49,7 +49,9 @@ pub enum Frame { /// PING frame (type=0x01) is used to verify that peers are still alive /// or to check reachability to the peer. - Ping, + /// The extra metadata `pmtu_probe` is solely for internal use and is not + /// transmitted over the network. + Ping { pmtu_probe: Option<(usize, usize)> }, /// ACK frame (types 0x02 and 0x03) is used to inform senders of packets /// they have received and processed. The ACK frame contains one or more @@ -192,7 +194,7 @@ impl Frame { Frame::Paddings { len } } - 0x01 => Frame::Ping, + 0x01 => Frame::Ping { pmtu_probe: None }, 0x02..=0x03 => { let (frame, len) = parse_ack_frame(frame_type, b)?; @@ -407,7 +409,7 @@ impl Frame { } } - Frame::Ping => { + Frame::Ping { .. } => { b.write_varint(0x01)?; } @@ -613,7 +615,7 @@ impl Frame { match self { Frame::Paddings { len } => *len, - Frame::Ping => 1, + Frame::Ping { .. } => 1, Frame::Ack { ack_delay, @@ -962,7 +964,7 @@ impl std::fmt::Debug for Frame { write!(f, "PADDINGS len={len}")?; } - Frame::Ping => { + Frame::Ping { .. } => { write!(f, "PING")?; } @@ -1273,7 +1275,7 @@ mod tests { #[test] fn ping() -> Result<()> { - let frame = Frame::Ping; + let frame = Frame::Ping { pmtu_probe: None }; assert_eq!(format!("{:?}", &frame), "PING"); let mut buf = [0; 128]; diff --git a/src/lib.rs b/src/lib.rs index d5d99f92..2f5deb2f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,7 +350,7 @@ pub struct Config { /// Maximum numbers of packets sent in a batch. send_batch_size: usize, - /// Recovery and congestion control configurations. + /// Configurations about loss recovery, congestion control, and pmtu discovery. recovery: RecoveryConfig, /// Multipath transport configurations. @@ -427,11 +427,15 @@ impl Config { self.local_transport_params.max_udp_payload_size = cmp::min(v as u64, VINT_MAX); } - /// Set the initial maximum outgoing UDP payload size. - /// The default and minimum value is `1200`. - /// - /// The configuration should be changed with caution. The connection may - /// not work properly if an inappropriate value is set. + /// Enable the Datagram Packetization Layer Path MTU Discovery + /// default value is true. + pub fn enable_dplpmtud(&mut self, v: bool) { + self.recovery.enable_dplpmtud = v; + } + + /// Set the maximum outgoing UDP payload size in bytes. + /// It corresponds to the maximum datagram size that DPLPMTUD tries to discovery. + /// The default value is `1200` which means let DPLPMTUD choose a value. pub fn set_send_udp_payload_size(&mut self, v: usize) { self.recovery.max_datagram_size = cmp::max(v, DEFAULT_SEND_UDP_PAYLOAD_SIZE); } @@ -668,10 +672,13 @@ impl Config { } } -/// Configurations about loss recovery and congestion control. +/// Configurations about loss recovery, congestion control, and pmtu discovery. #[doc(hidden)] #[derive(Debug, Clone)] pub struct RecoveryConfig { + /// Enable Datagram Packetization Layer Path MTU Discovery. + pub enable_dplpmtud: bool, + /// The maximum size of outgoing UDP payloads. pub max_datagram_size: usize, @@ -700,14 +707,15 @@ pub struct RecoveryConfig { /// Linear factor for calculating the probe timeout. pub pto_linear_factor: u64, - // Upper limit of probe timeout. + /// Upper limit of probe timeout. pub max_pto: Duration, } impl Default for RecoveryConfig { fn default() -> RecoveryConfig { RecoveryConfig { - max_datagram_size: 1200, + enable_dplpmtud: true, + max_datagram_size: DEFAULT_SEND_UDP_PAYLOAD_SIZE, // The upper limit is determined by DPLPMTUD max_ack_delay: time::Duration::from_millis(0), congestion_control_algorithm: CongestionControlAlgorithm::Bbr, min_congestion_window: 2_u64, From f410451f8ff7a9e0f618025d292be20868a007e4 Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Fri, 26 Apr 2024 10:40:15 +0800 Subject: [PATCH 12/13] Limit memory consuption for tracking closed stream ids (#246) --- src/connection/stream.rs | 149 +++++++++++++++++++++++++++++++++++---- src/ranges.rs | 44 ++++++++++++ 2 files changed, 179 insertions(+), 14 deletions(-) diff --git a/src/connection/stream.rs b/src/connection/stream.rs index 667b88c2..6e2120bc 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -101,8 +101,9 @@ pub struct StreamMap { /// a STOP_SENDING frame. stopped: StreamIdHashMap, - /// Keep track of IDs of previously closed streams, to prevent peers from - /// re-creating them. + /// Keep track of IDs of previously closed streams. It can grow and use up a + /// lot of memory, so it is used only in unit tests. + #[cfg(test)] closed: StreamIdHashSet, /// Streams that peer are almost out of flow control capacity, and @@ -647,12 +648,13 @@ impl StreamMap { return Err(Error::ProtocolViolation); } + let closed = self.is_closed(id); match self.streams.entry(id) { // 1.Can not find any stream with the given stream ID. // It may not be created yet or it has been closed. hash_map::Entry::Vacant(v) => { // Stream has already been closed and collected into `closed`. - if self.closed.contains(&id) { + if closed { return Err(Error::Done); } @@ -700,6 +702,7 @@ impl StreamMap { self.next_stream_id_uni = self.next_stream_id_uni.saturating_add(4); } + self.concurrency_control.remove_avail_id(id, self.is_server); self.events.add(Event::StreamCreated(id)); Ok(v.insert(new_stream)) } @@ -871,7 +874,7 @@ impl StreamMap { /// Note that this method does not check if the stream id is complied with /// the role of the endpoint. fn mark_closed(&mut self, stream_id: u64, local: bool) { - if self.closed.contains(&stream_id) { + if self.is_closed(stream_id) { return; } @@ -888,6 +891,10 @@ impl StreamMap { self.mark_readable(stream_id, false); self.mark_writable(stream_id, false); + if let Some(stream) = self.get_mut(stream_id) { + stream.mark_closed(); + } + #[cfg(test)] self.closed.insert(stream_id); if self.events.add(Event::StreamClosed(stream_id)) { @@ -1093,9 +1100,23 @@ impl StreamMap { self.stopped.iter() } - /// Return true if the stream has been closed and collected to `closed`. + /// Return true if the stream has been closed. pub fn is_closed(&self, stream_id: u64) -> bool { - self.closed.contains(&stream_id) + // It is an existing stream + if let Some(stream) = self.get(stream_id) { + return stream.is_closed(); + } + + // It is a stream to be create + let is_server = self.is_server; + if self.concurrency_control.is_available(stream_id, is_server) + || self.concurrency_control.is_limited(stream_id, is_server) + { + return false; + } + + // It is a destroyed stream + true } /// Return true if there are any streams that have buffered data to send. @@ -1703,8 +1724,11 @@ enum StreamFlags { /// Upper layer want to read data from stream. WantRead = 1 << 0, - // Upper layer want to write data to stream. + /// Upper layer want to write data to stream. WantWrite = 1 << 1, + + /// The stream has been closed and is waiting to release its resources. + Closed = 1 << 2, } #[derive(Default)] @@ -1903,6 +1927,16 @@ impl Stream { Ok(()) } + + /// Check whether the stream is closed. + pub fn is_closed(&self) -> bool { + self.flags.contains(Closed) + } + + /// Mark the stream as closed. + pub fn mark_closed(&mut self) { + self.flags.insert(Closed); + } } /// Return true if the stream was created locally. @@ -2976,8 +3010,8 @@ impl StreamTransportParams { } /// Concurrency control for streams. -// RFC9000 4.6 Controlling Concurrency -// https://www.rfc-editor.org/rfc/rfc9000.html#name-controlling-concurrency +/// RFC9000 4.6 Controlling Concurrency +/// https://www.rfc-editor.org/rfc/rfc9000.html#name-controlling-concurrency #[derive(Clone, Debug, PartialEq, Default)] struct ConcurrencyControl { /// Maximum bidirectional streams that the peer allow local endpoint to open. @@ -3017,15 +3051,34 @@ struct ConcurrencyControl { /// peer's concurrency control limit, we need to send a STREAMS_BLOCKED(type 0x17) /// frame to notify peer. streams_blocked_at_uni: Option, + + /// Available stream ids for peer initiated bidirectional streams. + peer_bidi_avail_ids: ranges::RangeSet, + + /// Available stream ids for peer initiated unidirectional streams. + peer_uni_avail_ids: ranges::RangeSet, + + /// Available stream ids for local initiated bidirectional streams. + local_bidi_avail_ids: ranges::RangeSet, + + /// Available stream ids for local initiated unidirectional streams. + local_uni_avail_ids: ranges::RangeSet, } impl ConcurrencyControl { fn new(local_max_streams_bidi: u64, local_max_streams_uni: u64) -> ConcurrencyControl { + let mut peer_bidi_avail_ids = ranges::RangeSet::default(); + peer_bidi_avail_ids.insert(0..local_max_streams_bidi); + let mut peer_uni_avail_ids = ranges::RangeSet::default(); + peer_uni_avail_ids.insert(0..local_max_streams_uni); + ConcurrencyControl { local_max_streams_bidi, local_max_streams_bidi_next: local_max_streams_bidi, local_max_streams_uni, local_max_streams_uni_next: local_max_streams_uni, + peer_bidi_avail_ids, + peer_uni_avail_ids, ..ConcurrencyControl::default() } } @@ -3035,7 +3088,12 @@ impl ConcurrencyControl { fn update_peer_max_streams(&mut self, bidi: bool, max_streams: u64) { match bidi { true => { - self.peer_max_streams_bidi = cmp::max(self.peer_max_streams_bidi, max_streams); + if self.peer_max_streams_bidi < max_streams { + // insert available ids for local initiated bidi-streams + let ids = self.peer_max_streams_bidi..max_streams; + self.insert_avail_id(ids, true, true); + self.peer_max_streams_bidi = max_streams; + } // Cancel the concurrency control blocked state if the max_streams_bidi limit // is increased, avoid sending redundant STREAMS_BLOCKED(0x16) frames. @@ -3045,7 +3103,12 @@ impl ConcurrencyControl { } false => { - self.peer_max_streams_uni = cmp::max(self.peer_max_streams_uni, max_streams); + if self.peer_max_streams_uni < max_streams { + // insert available ids for local initiated uni-streams + let ids = self.peer_max_streams_uni..max_streams; + self.insert_avail_id(ids, true, false); + self.peer_max_streams_uni = max_streams; + } // Cancel the concurrency control blocked state if the max_streams_uni limit // is increased, avoid sending redundant STREAMS_BLOCKED(type: 0x17) frames. @@ -3058,9 +3121,16 @@ impl ConcurrencyControl { /// After sending a MAX_STREAMS(type: 0x12..0x13) frame, update local max_streams limit. fn update_local_max_streams(&mut self, bidi: bool) { - match bidi { - true => self.local_max_streams_bidi = self.local_max_streams_bidi_next, - false => self.local_max_streams_uni = self.local_max_streams_uni_next, + if bidi { + // insert available ids for peer initiated bidi-streams + let ids = self.local_max_streams_bidi..self.local_max_streams_bidi_next; + self.insert_avail_id(ids, false, true); + self.local_max_streams_bidi = self.local_max_streams_bidi_next; + } else { + // insert available ids for peer initiated uni-streams + let ids = self.local_max_streams_uni..self.local_max_streams_uni_next; + self.insert_avail_id(ids, false, false); + self.local_max_streams_uni = self.local_max_streams_uni_next; } } @@ -3188,6 +3258,49 @@ impl ConcurrencyControl { Ok(()) } + + /// Check whether the given stream ID exceeds stream limits. + fn is_limited(&self, stream_id: u64, is_server: bool) -> bool { + let seq = (stream_id >> 2) + 1; + match (is_local(stream_id, is_server), is_bidi(stream_id)) { + (true, true) => seq > self.peer_max_streams_bidi, + (true, false) => seq > self.peer_max_streams_uni, + (false, true) => seq > self.local_max_streams_bidi, + (false, false) => seq > self.local_max_streams_uni, + } + } + + /// Check whether the given stream id is available for stream creation. + fn is_available(&self, stream_id: u64, is_server: bool) -> bool { + let id = stream_id >> 2; + match (is_local(stream_id, is_server), is_bidi(stream_id)) { + (true, true) => self.local_bidi_avail_ids.contains(id), + (true, false) => self.local_uni_avail_ids.contains(id), + (false, true) => self.peer_bidi_avail_ids.contains(id), + (false, false) => self.peer_uni_avail_ids.contains(id), + } + } + + /// Inset the given stream ids into available set. + fn insert_avail_id(&mut self, ids: Range, is_local: bool, is_bidi: bool) { + match (is_local, is_bidi) { + (true, true) => self.local_bidi_avail_ids.insert(ids), + (true, false) => self.local_uni_avail_ids.insert(ids), + (false, true) => self.peer_bidi_avail_ids.insert(ids), + (false, false) => self.peer_uni_avail_ids.insert(ids), + } + } + + /// Remove the given stream id from available set. + fn remove_avail_id(&mut self, stream_id: u64, is_server: bool) { + let id = stream_id >> 2; + match (is_local(stream_id, is_server), is_bidi(stream_id)) { + (true, true) => self.local_bidi_avail_ids.remove_elem(id), + (true, false) => self.local_uni_avail_ids.remove_elem(id), + (false, true) => self.peer_bidi_avail_ids.remove_elem(id), + (false, false) => self.peer_uni_avail_ids.remove_elem(id), + } + } } /// Connection-level send capacity for all streams @@ -6045,6 +6158,11 @@ mod tests { #[test] fn concurrency_control_new() { let cc = ConcurrencyControl::new(10, 3); + + let mut peer_bidi_avail_ids = ranges::RangeSet::default(); + peer_bidi_avail_ids.insert(0..10); + let mut peer_uni_avail_ids = ranges::RangeSet::default(); + peer_uni_avail_ids.insert(0..3); assert_eq!( cc, ConcurrencyControl { @@ -6060,6 +6178,9 @@ mod tests { peer_opened_streams_uni: 0, streams_blocked_at_bidi: None, streams_blocked_at_uni: None, + peer_bidi_avail_ids, + peer_uni_avail_ids, + ..ConcurrencyControl::default() } ); } diff --git a/src/ranges.rs b/src/ranges.rs index 401b92b2..f7218a55 100644 --- a/src/ranges.rs +++ b/src/ranges.rs @@ -124,6 +124,11 @@ impl RangeSet { } } + /// Remove `elem` from the set, i.e. remove range [elem, elem + 1) from the set. + pub fn remove_elem(&mut self, elem: u64) { + self.remove(elem..elem + 1); + } + /// Remove all ranges that are smaller or equal to `elem` from the set. pub fn remove_until(&mut self, elem: u64) { let ranges: Vec> = self @@ -227,6 +232,21 @@ impl RangeSet { .next() } + /// Check if the element exists or not + pub fn contains(&self, elem: u64) -> bool { + if let Some(prev) = self.prev_to(elem) { + if prev.contains(&elem) { + return true; + } + } + if let Some(next) = self.next_to(elem) { + if next.contains(&elem) { + return true; + } + } + false + } + /// Peek at the smallest range in the set. pub fn peek_min(&self) -> Option> { let (&start, &end) = self.set.iter().next()?; @@ -937,6 +957,30 @@ mod tests { } } + #[test] + fn contains() { + let mut r = RangeSet::default(); + // Insert ranges: [2..6), [8, 13) + r.insert(2..6); + r.insert(8..13); + + for i in [0, 1] { + assert_eq!(r.contains(i), false); + } + for i in 2..6 { + assert_eq!(r.contains(i), true); + } + for i in [6, 7] { + assert_eq!(r.contains(i), false); + } + for i in 8..13 { + assert_eq!(r.contains(i), true); + } + for i in 13..20 { + assert_eq!(r.contains(i), false); + } + } + #[test] fn flatten() { let mut r = RangeSet::default(); From 0d6aa2a41b1d0b85f6699e68a199c6fef768a02c Mon Sep 17 00:00:00 2001 From: Sijie Yang Date: Fri, 26 Apr 2024 15:20:41 +0800 Subject: [PATCH 13/13] Update VERSION and CHANGELOG --- CHANGELOG.md | 18 ++++++++++++++++++ Cargo.toml | 2 +- tools/Cargo.toml | 4 ++-- 3 files changed, 21 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d3ccf103..e766d3ab 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,23 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [v0.10.0] - 2024-04-26 + +### Added +- Add support for responding to key updates +- Add datagram packetization layer PMTU discovery +- Improve API for stream creation +- Limit configuration value of type varint +- Add pacing_rate to PathStats +- tquic_server: output stats when server connection is closed +- Add workflow and plot tools for fct testing + +### Fixed +- Fix the issue where bbr3 cannot exit slow start due to high packet loss rate + +### Security +- Limit memory consuption for tracking closed stream ids + ## [v0.9.0] - 2024-04-10 @@ -182,6 +199,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Provide example clients and servers. +[v0.10.0]: https://github.com/tencent/tquic/compare/v0.9.0...v0.10.0 [v0.9.0]: https://github.com/tencent/tquic/compare/v0.8.1...v0.9.0 [v0.8.1]: https://github.com/tencent/tquic/compare/v0.8.0...v0.8.1 [v0.8.0]: https://github.com/tencent/tquic/compare/v0.7.0...v0.8.0 diff --git a/Cargo.toml b/Cargo.toml index cd03ec04..6a7724c4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic" -version = "0.9.0" +version = "0.10.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" diff --git a/tools/Cargo.toml b/tools/Cargo.toml index c6d244c1..952d2a57 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic_tools" -version = "0.9.0" +version = "0.10.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -23,7 +23,7 @@ rand = "0.8.5" statrs = "0.16" jemallocator = { version = "0.5", package = "tikv-jemallocator" } signal-hook = "0.3.17" -tquic = { path = "..", version = "0.9.0"} +tquic = { path = "..", version = "0.10.0"} [lib] crate-type = ["lib"]