From 1e6f27cfb948170e157214a035d169499e1a4320 Mon Sep 17 00:00:00 2001 From: Vlad Krasnov Date: Fri, 8 Mar 2024 14:36:38 -0500 Subject: [PATCH 1/5] recovery: move congestion into module --- quiche/src/lib.rs | 2 +- quiche/src/recovery/congestion.rs | 314 ++++ .../src/recovery/{ => congestion}/bbr/init.rs | 0 .../src/recovery/{ => congestion}/bbr/mod.rs | 106 +- .../recovery/{ => congestion}/bbr/pacing.rs | 0 .../recovery/{ => congestion}/bbr/per_ack.rs | 0 .../{ => congestion}/bbr/per_transmit.rs | 0 .../recovery/{ => congestion}/bbr2/init.rs | 0 .../src/recovery/{ => congestion}/bbr2/mod.rs | 32 +- .../recovery/{ => congestion}/bbr2/pacing.rs | 0 .../recovery/{ => congestion}/bbr2/per_ack.rs | 0 .../{ => congestion}/bbr2/per_loss.rs | 0 .../{ => congestion}/bbr2/per_transmit.rs | 0 quiche/src/recovery/congestion/cubic.rs | 817 ++++++++++ .../{ => congestion}/delivery_rate.rs | 0 .../src/recovery/{ => congestion}/hystart.rs | 4 - quiche/src/recovery/{ => congestion}/pacer.rs | 0 quiche/src/recovery/{ => congestion}/prr.rs | 0 quiche/src/recovery/congestion/reno.rs | 434 ++++++ quiche/src/recovery/cubic.rs | 1334 ----------------- quiche/src/recovery/mod.rs | 433 +----- quiche/src/recovery/reno.rs | 434 ------ 22 files changed, 1638 insertions(+), 2272 deletions(-) create mode 100644 quiche/src/recovery/congestion.rs rename quiche/src/recovery/{ => congestion}/bbr/init.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr/mod.rs (89%) rename quiche/src/recovery/{ => congestion}/bbr/pacing.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr/per_ack.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr/per_transmit.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr2/init.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr2/mod.rs (97%) rename quiche/src/recovery/{ => congestion}/bbr2/pacing.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr2/per_ack.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr2/per_loss.rs (100%) rename quiche/src/recovery/{ => congestion}/bbr2/per_transmit.rs (100%) create mode 100644 quiche/src/recovery/congestion/cubic.rs rename quiche/src/recovery/{ => congestion}/delivery_rate.rs (100%) rename quiche/src/recovery/{ => congestion}/hystart.rs (98%) rename quiche/src/recovery/{ => congestion}/pacer.rs (100%) rename quiche/src/recovery/{ => congestion}/prr.rs (100%) create mode 100644 quiche/src/recovery/congestion/reno.rs delete mode 100644 quiche/src/recovery/cubic.rs delete mode 100644 quiche/src/recovery/reno.rs diff --git a/quiche/src/lib.rs b/quiche/src/lib.rs index a3e7aaa11a..5e6865a719 100644 --- a/quiche/src/lib.rs +++ b/quiche/src/lib.rs @@ -17288,7 +17288,7 @@ pub use crate::path::PathEvent; pub use crate::path::PathStats; pub use crate::path::SocketAddrIter; -pub use crate::recovery::CongestionControlAlgorithm; +pub use crate::recovery::congestion::CongestionControlAlgorithm; pub use crate::stream::StreamIter; diff --git a/quiche/src/recovery/congestion.rs b/quiche/src/recovery/congestion.rs new file mode 100644 index 0000000000..54f9765b8a --- /dev/null +++ b/quiche/src/recovery/congestion.rs @@ -0,0 +1,314 @@ +mod bbr; +mod bbr2; +mod cubic; +mod delivery_rate; +mod hystart; +pub(crate) mod pacer; +mod prr; +mod reno; + +use std::str::FromStr; +use std::time::Instant; + +use super::rtt::RttStats; +use super::Acked; +use super::RecoveryConfig; +use super::Sent; + +pub const PACING_MULTIPLIER: f64 = 1.25; +pub struct Congestion { + // Congestion control. + pub(crate) cc_ops: &'static CongestionControlOps, + + cubic_state: cubic::State, + + // HyStart++. + pub(crate) hystart: hystart::Hystart, + + // Pacing. + pub(crate) pacer: pacer::Pacer, + + // RFC6937 PRR. + pub(crate) prr: prr::PRR, + + // The maximum size of a data aggregate scheduled and + // transmitted together. + send_quantum: usize, + + // BBR state. + bbr_state: bbr::State, + + // BBRv2 state. + bbr2_state: bbr2::State, + + pub(crate) congestion_window: usize, + + pub(crate) ssthresh: usize, + + bytes_acked_sl: usize, + + bytes_acked_ca: usize, + + pub(crate) congestion_recovery_start_time: Option, + + pub(crate) app_limited: bool, + + pub(crate) delivery_rate: delivery_rate::Rate, + + /// Initial congestion window size in terms of packet count. + pub(crate) initial_congestion_window_packets: usize, + + max_datagram_size: usize, + + pub(crate) lost_count: usize, +} + +impl Congestion { + pub(crate) fn from_config(recovery_config: &RecoveryConfig) -> Self { + let initial_congestion_window = recovery_config.max_send_udp_payload_size * + recovery_config.initial_congestion_window_packets; + + let mut cc = Congestion { + congestion_window: initial_congestion_window, + + ssthresh: usize::MAX, + + bytes_acked_sl: 0, + + bytes_acked_ca: 0, + + congestion_recovery_start_time: None, + + cc_ops: recovery_config.cc_algorithm.into(), + + cubic_state: cubic::State::default(), + + app_limited: false, + + lost_count: 0, + + initial_congestion_window_packets: recovery_config + .initial_congestion_window_packets, + + max_datagram_size: recovery_config.max_send_udp_payload_size, + + send_quantum: initial_congestion_window, + + delivery_rate: delivery_rate::Rate::default(), + + hystart: hystart::Hystart::new(recovery_config.hystart), + + pacer: pacer::Pacer::new( + recovery_config.pacing, + initial_congestion_window, + 0, + recovery_config.max_send_udp_payload_size, + recovery_config.max_pacing_rate, + ), + + prr: prr::PRR::default(), + + bbr_state: bbr::State::new(), + + bbr2_state: bbr2::State::new(), + }; + + (cc.cc_ops.on_init)(&mut cc); + + cc + } + + pub(crate) fn in_congestion_recovery(&self, sent_time: Instant) -> bool { + match self.congestion_recovery_start_time { + Some(congestion_recovery_start_time) => + sent_time <= congestion_recovery_start_time, + + None => false, + } + } + + pub(crate) fn delivery_rate(&self) -> u64 { + self.delivery_rate.sample_delivery_rate() + } + + pub(crate) fn send_quantum(&self) -> usize { + self.send_quantum + } + + pub(crate) fn set_pacing_rate(&mut self, rate: u64, now: Instant) { + self.pacer.update(self.send_quantum, rate, now); + } + + pub(crate) fn congestion_window(&self) -> usize { + self.congestion_window + } + + fn update_app_limited(&mut self, v: bool) { + self.app_limited = v; + } + + #[allow(clippy::too_many_arguments)] + pub(crate) fn on_packet_sent( + &mut self, bytes_in_flight: usize, sent_bytes: usize, now: Instant, + pkt: &mut Sent, rtt_stats: &RttStats, bytes_lost: u64, in_flight: bool, + ) { + if in_flight { + self.update_app_limited( + (bytes_in_flight + sent_bytes) < self.congestion_window, + ); + + (self.cc_ops.on_packet_sent)(self, sent_bytes, bytes_in_flight, now); + + self.prr.on_packet_sent(sent_bytes); + + // HyStart++: Start of the round in a slow start. + if self.hystart.enabled() && self.congestion_window < self.ssthresh { + self.hystart.start_round(pkt.pkt_num); + } + } + + // Pacing: Set the pacing rate if CC doesn't do its own. + if !(self.cc_ops.has_custom_pacing)() && + rtt_stats.first_rtt_sample.is_some() + { + let rate = PACING_MULTIPLIER * self.congestion_window as f64 / + rtt_stats.smoothed_rtt.as_secs_f64(); + self.set_pacing_rate(rate as u64, now); + } + + self.schedule_next_packet(now, sent_bytes); + + pkt.time_sent = self.get_packet_send_time(); + + // bytes_in_flight is already updated. Use previous value. + self.delivery_rate + .on_packet_sent(pkt, bytes_in_flight, bytes_lost); + } + + pub(crate) fn on_packets_acked( + &mut self, bytes_in_flight: usize, acked: &mut Vec, + rtt_stats: &RttStats, now: Instant, + ) { + // Update delivery rate sample per acked packet. + for pkt in acked.iter() { + self.delivery_rate.update_rate_sample(pkt, now); + } + + // Fill in a rate sample. + self.delivery_rate.generate_rate_sample(*rtt_stats.min_rtt); + + // Call congestion control hooks. + (self.cc_ops.on_packets_acked)( + self, + bytes_in_flight, + acked, + now, + rtt_stats, + ); + } + + fn schedule_next_packet(&mut self, now: Instant, packet_size: usize) { + // Don't pace in any of these cases: + // * Packet contains no data. + // * The congestion window is within initcwnd. + + let in_initcwnd = self.congestion_window < + self.max_datagram_size * self.initial_congestion_window_packets; + + let sent_bytes = if !self.pacer.enabled() || in_initcwnd { + 0 + } else { + packet_size + }; + + self.pacer.send(sent_bytes, now); + } + + pub(crate) fn get_packet_send_time(&self) -> Instant { + self.pacer.next_time() + } +} + +/// Available congestion control algorithms. +/// +/// This enum provides currently available list of congestion control +/// algorithms. +#[derive(Debug, Copy, Clone, PartialEq, Eq)] +#[repr(C)] +pub enum CongestionControlAlgorithm { + /// Reno congestion control algorithm. `reno` in a string form. + Reno = 0, + /// CUBIC congestion control algorithm (default). `cubic` in a string form. + CUBIC = 1, + /// BBR congestion control algorithm. `bbr` in a string form. + BBR = 2, + /// BBRv2 congestion control algorithm. `bbr2` in a string form. + BBR2 = 3, +} + +impl FromStr for CongestionControlAlgorithm { + type Err = crate::Error; + + /// Converts a string to `CongestionControlAlgorithm`. + /// + /// If `name` is not valid, `Error::CongestionControl` is returned. + fn from_str(name: &str) -> std::result::Result { + match name { + "reno" => Ok(CongestionControlAlgorithm::Reno), + "cubic" => Ok(CongestionControlAlgorithm::CUBIC), + "bbr" => Ok(CongestionControlAlgorithm::BBR), + "bbr2" => Ok(CongestionControlAlgorithm::BBR2), + + _ => Err(crate::Error::CongestionControl), + } + } +} + +pub(crate) struct CongestionControlOps { + pub on_init: fn(r: &mut Congestion), + + pub on_packet_sent: fn( + r: &mut Congestion, + sent_bytes: usize, + bytes_in_flight: usize, + now: Instant, + ), + + pub on_packets_acked: fn( + r: &mut Congestion, + bytes_in_flight: usize, + packets: &mut Vec, + now: Instant, + rtt_stats: &RttStats, + ), + + pub congestion_event: fn( + r: &mut Congestion, + bytes_in_flight: usize, + lost_bytes: usize, + largest_lost_packet: &Sent, + now: Instant, + ), + + pub checkpoint: fn(r: &mut Congestion), + + pub rollback: fn(r: &mut Congestion) -> bool, + + pub has_custom_pacing: fn() -> bool, + + pub debug_fmt: fn( + r: &Congestion, + formatter: &mut std::fmt::Formatter, + ) -> std::fmt::Result, +} + +impl From for &'static CongestionControlOps { + fn from(algo: CongestionControlAlgorithm) -> Self { + match algo { + CongestionControlAlgorithm::Reno => &reno::RENO, + CongestionControlAlgorithm::CUBIC => &cubic::CUBIC, + CongestionControlAlgorithm::BBR => &bbr::BBR, + CongestionControlAlgorithm::BBR2 => &bbr2::BBR2, + } + } +} diff --git a/quiche/src/recovery/bbr/init.rs b/quiche/src/recovery/congestion/bbr/init.rs similarity index 100% rename from quiche/src/recovery/bbr/init.rs rename to quiche/src/recovery/congestion/bbr/init.rs diff --git a/quiche/src/recovery/bbr/mod.rs b/quiche/src/recovery/congestion/bbr/mod.rs similarity index 89% rename from quiche/src/recovery/bbr/mod.rs rename to quiche/src/recovery/congestion/bbr/mod.rs index 6adc009698..da3f4f94a6 100644 --- a/quiche/src/recovery/bbr/mod.rs +++ b/quiche/src/recovery/congestion/bbr/mod.rs @@ -34,13 +34,13 @@ use crate::recovery::*; use std::time::Duration; -pub static BBR: CongestionControlOps = CongestionControlOps { +use super::CongestionControlOps; + +pub(crate) static BBR: CongestionControlOps = CongestionControlOps { on_init, - reset, on_packet_sent, on_packets_acked, congestion_event, - collapse_cwnd, checkpoint, rollback, has_custom_pacing, @@ -281,12 +281,6 @@ fn on_init(r: &mut Congestion) { init::bbr_init(r); } -fn reset(r: &mut Congestion) { - r.bbr_state = State::new(); - - init::bbr_init(r); -} - fn on_packet_sent( r: &mut Congestion, _sent_bytes: usize, bytes_in_flight: usize, _now: Instant, ) { @@ -333,12 +327,6 @@ fn congestion_event( } } -fn collapse_cwnd(r: &mut Congestion, bytes_in_flight: usize) { - r.bbr_state.prior_cwnd = per_ack::bbr_save_cwnd(r); - - reno::collapse_cwnd(r, bytes_in_flight); -} - fn checkpoint(_r: &mut Congestion) {} fn rollback(_r: &mut Congestion) -> bool { @@ -361,12 +349,18 @@ fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result { #[cfg(test)] mod tests { + use self::congestion::reno::test_sender::TestSender; + use super::*; use crate::recovery; use smallvec::smallvec; + fn test_sender() -> TestSender { + TestSender::new(recovery::CongestionControlAlgorithm::BBR, false) + } + #[test] fn bbr_init() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); @@ -383,86 +377,34 @@ mod tests { assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::Startup); } - #[test] - fn bbr_send() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - r.on_packet_sent_cc(0, 1000, now); - - assert_eq!(r.bytes_in_flight, 1000); - } - #[test] fn bbr_startup() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR); + let mut sender = test_sender(); + let mss = sender.max_datagram_size; - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - let mss = r.max_datagram_size; + let rtt = Duration::from_millis(50); + sender.update_rtt(rtt); + sender.advance_time(rtt); // Send 5 packets. - for pn in 0..5 { - let pkt = Sent { - pkt_num: pn, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: mss, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - r.on_packet_sent( - pkt, - packet::Epoch::Application, - HandshakeStatus::default(), - now, - "", - ); + for _ in 0..5 { + sender.send_packet(mss); } - let rtt = Duration::from_millis(50); - let now = now + rtt; - let cwnd_prev = r.cwnd(); + sender.advance_time(rtt); - let mut acked = ranges::RangeSet::default(); - acked.insert(0..5); + let cwnd_prev = sender.congestion_window; - assert_eq!( - r.on_ack_received( - &acked, - 25, - packet::Epoch::Application, - HandshakeStatus::default(), - now, - "", - &mut Vec::new(), - ), - Ok((0, 0)), - ); + sender.ack_n_packets(5, mss); - assert_eq!(r.congestion.bbr_state.state, BBRStateMachine::Startup); - assert_eq!(r.cwnd(), cwnd_prev + mss * 5); - assert_eq!(r.bytes_in_flight, 0); + assert_eq!(sender.bbr_state.state, BBRStateMachine::Startup); + assert_eq!(sender.congestion_window, cwnd_prev + mss * 5); + assert_eq!(sender.bytes_in_flight, 0); assert_eq!( - r.delivery_rate(), + sender.delivery_rate(), ((mss * 5) as f64 / rtt.as_secs_f64()) as u64 ); - assert_eq!(r.congestion.bbr_state.btlbw, r.delivery_rate()); + assert_eq!(sender.bbr_state.btlbw, sender.delivery_rate()); } #[test] diff --git a/quiche/src/recovery/bbr/pacing.rs b/quiche/src/recovery/congestion/bbr/pacing.rs similarity index 100% rename from quiche/src/recovery/bbr/pacing.rs rename to quiche/src/recovery/congestion/bbr/pacing.rs diff --git a/quiche/src/recovery/bbr/per_ack.rs b/quiche/src/recovery/congestion/bbr/per_ack.rs similarity index 100% rename from quiche/src/recovery/bbr/per_ack.rs rename to quiche/src/recovery/congestion/bbr/per_ack.rs diff --git a/quiche/src/recovery/bbr/per_transmit.rs b/quiche/src/recovery/congestion/bbr/per_transmit.rs similarity index 100% rename from quiche/src/recovery/bbr/per_transmit.rs rename to quiche/src/recovery/congestion/bbr/per_transmit.rs diff --git a/quiche/src/recovery/bbr2/init.rs b/quiche/src/recovery/congestion/bbr2/init.rs similarity index 100% rename from quiche/src/recovery/bbr2/init.rs rename to quiche/src/recovery/congestion/bbr2/init.rs diff --git a/quiche/src/recovery/bbr2/mod.rs b/quiche/src/recovery/congestion/bbr2/mod.rs similarity index 97% rename from quiche/src/recovery/bbr2/mod.rs rename to quiche/src/recovery/congestion/bbr2/mod.rs index 0169637805..7b8e80bc2c 100644 --- a/quiche/src/recovery/bbr2/mod.rs +++ b/quiche/src/recovery/congestion/bbr2/mod.rs @@ -35,13 +35,13 @@ use crate::recovery::*; use std::time::Duration; use std::time::Instant; -pub static BBR2: CongestionControlOps = CongestionControlOps { +use super::CongestionControlOps; + +pub(crate) static BBR2: CongestionControlOps = CongestionControlOps { on_init, - reset, on_packet_sent, on_packets_acked, congestion_event, - collapse_cwnd, checkpoint, rollback, has_custom_pacing, @@ -548,12 +548,6 @@ fn on_init(r: &mut Congestion) { init::bbr2_init(r); } -fn reset(r: &mut Congestion) { - r.bbr2_state = State::new(); - - init::bbr2_init(r); -} - fn on_packet_sent( r: &mut Congestion, _sent_bytes: usize, bytes_in_flight: usize, now: Instant, ) { @@ -607,13 +601,6 @@ fn congestion_event( } } -fn collapse_cwnd(r: &mut Congestion, bytes_in_flight: usize) { - // BBROnEnterRTO() - r.bbr2_state.prior_cwnd = per_ack::bbr2_save_cwnd(r); - - r.congestion_window = bytes_in_flight + r.max_datagram_size; -} - fn checkpoint(_r: &mut Congestion) {} fn rollback(_r: &mut Congestion) -> bool { @@ -697,19 +684,6 @@ mod tests { assert_eq!(r.congestion.bbr2_state.state, BBR2StateMachine::Startup); } - #[test] - fn bbr2_send() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::BBR2); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - r.on_packet_sent_cc(0, 1000, now); - - assert_eq!(r.bytes_in_flight, 1000); - } - #[test] fn bbr2_startup() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); diff --git a/quiche/src/recovery/bbr2/pacing.rs b/quiche/src/recovery/congestion/bbr2/pacing.rs similarity index 100% rename from quiche/src/recovery/bbr2/pacing.rs rename to quiche/src/recovery/congestion/bbr2/pacing.rs diff --git a/quiche/src/recovery/bbr2/per_ack.rs b/quiche/src/recovery/congestion/bbr2/per_ack.rs similarity index 100% rename from quiche/src/recovery/bbr2/per_ack.rs rename to quiche/src/recovery/congestion/bbr2/per_ack.rs diff --git a/quiche/src/recovery/bbr2/per_loss.rs b/quiche/src/recovery/congestion/bbr2/per_loss.rs similarity index 100% rename from quiche/src/recovery/bbr2/per_loss.rs rename to quiche/src/recovery/congestion/bbr2/per_loss.rs diff --git a/quiche/src/recovery/bbr2/per_transmit.rs b/quiche/src/recovery/congestion/bbr2/per_transmit.rs similarity index 100% rename from quiche/src/recovery/bbr2/per_transmit.rs rename to quiche/src/recovery/congestion/bbr2/per_transmit.rs diff --git a/quiche/src/recovery/congestion/cubic.rs b/quiche/src/recovery/congestion/cubic.rs new file mode 100644 index 0000000000..8005e30367 --- /dev/null +++ b/quiche/src/recovery/congestion/cubic.rs @@ -0,0 +1,817 @@ +// Copyright (C) 2019, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +//! CUBIC Congestion Control +//! +//! This implementation is based on the following draft: +//! +//! +//! Note that Slow Start can use HyStart++ when enabled. + +use std::cmp; + +use std::time::Duration; +use std::time::Instant; + +use crate::recovery; +use crate::recovery::rtt::RttStats; +use crate::recovery::Acked; +use crate::recovery::Sent; + +use super::reno; +use super::Congestion; +use super::CongestionControlOps; + +pub(crate) static CUBIC: CongestionControlOps = CongestionControlOps { + on_init, + on_packet_sent, + on_packets_acked, + congestion_event, + checkpoint, + rollback, + has_custom_pacing, + debug_fmt, +}; + +/// CUBIC Constants. +/// +/// These are recommended value in RFC8312. +const BETA_CUBIC: f64 = 0.7; + +const C: f64 = 0.4; + +/// Threshold for rolling back state, as percentage of lost packets relative to +/// cwnd. +const ROLLBACK_THRESHOLD_PERCENT: usize = 20; + +/// Minimum threshold for rolling back state, as number of packets. +const MIN_ROLLBACK_THRESHOLD: usize = 2; + +/// Default value of alpha_aimd in the beginning of congestion avoidance. +const ALPHA_AIMD: f64 = 3.0 * (1.0 - BETA_CUBIC) / (1.0 + BETA_CUBIC); + +/// CUBIC State Variables. +/// +/// We need to keep those variables across the connection. +/// k, w_max, w_est are described in the RFC. +#[derive(Debug, Default)] +pub struct State { + k: f64, + + w_max: f64, + + w_est: f64, + + alpha_aimd: f64, + + // Used in CUBIC fix (see on_packet_sent()) + last_sent_time: Option, + + // Store cwnd increment during congestion avoidance. + cwnd_inc: usize, + + // CUBIC state checkpoint preceding the last congestion event. + prior: PriorState, +} + +/// Stores the CUBIC state from before the last congestion event. +/// +/// +#[derive(Debug, Default)] +struct PriorState { + congestion_window: usize, + + ssthresh: usize, + + w_max: f64, + + k: f64, + + epoch_start: Option, + + lost_count: usize, +} + +/// CUBIC Functions. +/// +/// Note that these calculations are based on a count of cwnd as bytes, +/// not packets. +/// Unit of t (duration) and RTT are based on seconds (f64). +impl State { + // K = cubic_root ((w_max - cwnd) / C) (Eq. 2) + fn cubic_k(&self, cwnd: usize, max_datagram_size: usize) -> f64 { + let w_max = self.w_max / max_datagram_size as f64; + let cwnd = cwnd as f64 / max_datagram_size as f64; + + libm::cbrt((w_max - cwnd) / C) + } + + // W_cubic(t) = C * (t - K)^3 + w_max (Eq. 1) + fn w_cubic(&self, t: Duration, max_datagram_size: usize) -> f64 { + let w_max = self.w_max / max_datagram_size as f64; + + (C * (t.as_secs_f64() - self.k).powi(3) + w_max) * + max_datagram_size as f64 + } + + // W_est = W_est + alpha_aimd * (segments_acked / cwnd) (Eq. 4) + fn w_est_inc( + &self, acked: usize, cwnd: usize, max_datagram_size: usize, + ) -> f64 { + self.alpha_aimd * (acked as f64 / cwnd as f64) * max_datagram_size as f64 + } +} + +fn on_init(_r: &mut Congestion) {} + +fn on_packet_sent( + r: &mut Congestion, sent_bytes: usize, bytes_in_flight: usize, now: Instant, +) { + // See https://github.com/torvalds/linux/commit/30927520dbae297182990bb21d08762bcc35ce1d + // First transmit when no packets in flight + let cubic = &mut r.cubic_state; + + if let Some(last_sent_time) = cubic.last_sent_time { + if bytes_in_flight == 0 { + let delta = now - last_sent_time; + + // We were application limited (idle) for a while. + // Shift epoch start to keep cwnd growth to cubic curve. + if let Some(recovery_start_time) = r.congestion_recovery_start_time { + if delta.as_nanos() > 0 { + r.congestion_recovery_start_time = + Some(recovery_start_time + delta); + } + } + } + } + + cubic.last_sent_time = Some(now); + + reno::on_packet_sent(r, sent_bytes, bytes_in_flight, now); +} + +fn on_packets_acked( + r: &mut Congestion, bytes_in_flight: usize, packets: &mut Vec, + now: Instant, rtt_stats: &RttStats, +) { + for pkt in packets.drain(..) { + on_packet_acked(r, bytes_in_flight, &pkt, now, rtt_stats); + } +} + +fn on_packet_acked( + r: &mut Congestion, bytes_in_flight: usize, packet: &Acked, now: Instant, + rtt_stats: &RttStats, +) { + let in_congestion_recovery = r.in_congestion_recovery(packet.time_sent); + + if in_congestion_recovery { + r.prr.on_packet_acked( + packet.size, + bytes_in_flight, + r.ssthresh, + r.max_datagram_size, + ); + + return; + } + + if r.app_limited { + return; + } + + // Detecting spurious congestion events. + // + // + // When the recovery episode ends with recovering + // a few packets (less than cwnd / mss * ROLLBACK_THRESHOLD_PERCENT(%)), it's + // considered as spurious and restore to the previous state. + if r.congestion_recovery_start_time.is_some() { + let new_lost = r.lost_count - r.cubic_state.prior.lost_count; + + let rollback_threshold = (r.congestion_window / r.max_datagram_size) * + ROLLBACK_THRESHOLD_PERCENT / + 100; + + let rollback_threshold = rollback_threshold.max(MIN_ROLLBACK_THRESHOLD); + + if new_lost < rollback_threshold { + let did_rollback = rollback(r); + if did_rollback { + return; + } + } + } + + if r.congestion_window < r.ssthresh { + // In Slow slart, bytes_acked_sl is used for counting + // acknowledged bytes. + r.bytes_acked_sl += packet.size; + + if r.bytes_acked_sl >= r.max_datagram_size { + if r.hystart.in_css() { + r.congestion_window += + r.hystart.css_cwnd_inc(r.max_datagram_size); + } else { + r.congestion_window += r.max_datagram_size; + } + + r.bytes_acked_sl -= r.max_datagram_size; + } + + if r.hystart.on_packet_acked(packet, rtt_stats.latest_rtt, now) { + // Exit to congestion avoidance if CSS ends. + r.ssthresh = r.congestion_window; + } + } else { + // Congestion avoidance. + let ca_start_time; + + // In CSS, use css_start_time instead of congestion_recovery_start_time. + if r.hystart.in_css() { + ca_start_time = r.hystart.css_start_time().unwrap(); + + // Reset w_max and k when CSS started. + if r.cubic_state.w_max == 0.0 { + r.cubic_state.w_max = r.congestion_window as f64; + r.cubic_state.k = 0.0; + + r.cubic_state.w_est = r.congestion_window as f64; + r.cubic_state.alpha_aimd = ALPHA_AIMD; + } + } else { + match r.congestion_recovery_start_time { + Some(t) => ca_start_time = t, + None => { + // When we come here without congestion_event() triggered, + // initialize congestion_recovery_start_time, w_max and k. + ca_start_time = now; + r.congestion_recovery_start_time = Some(now); + + r.cubic_state.w_max = r.congestion_window as f64; + r.cubic_state.k = 0.0; + + r.cubic_state.w_est = r.congestion_window as f64; + r.cubic_state.alpha_aimd = ALPHA_AIMD; + }, + } + } + + let t = now.saturating_duration_since(ca_start_time); + + // target = w_cubic(t + rtt) + let target = r + .cubic_state + .w_cubic(t + *rtt_stats.min_rtt, r.max_datagram_size); + + // Clipping target to [cwnd, 1.5 x cwnd] + let target = f64::max(target, r.congestion_window as f64); + let target = f64::min(target, r.congestion_window as f64 * 1.5); + + // Update w_est. + let w_est_inc = r.cubic_state.w_est_inc( + packet.size, + r.congestion_window, + r.max_datagram_size, + ); + r.cubic_state.w_est += w_est_inc; + + if r.cubic_state.w_est >= r.cubic_state.w_max { + r.cubic_state.alpha_aimd = 1.0; + } + + let mut cubic_cwnd = r.congestion_window; + + if r.cubic_state.w_cubic(t, r.max_datagram_size) < r.cubic_state.w_est { + // AIMD friendly region (W_cubic(t) < W_est) + cubic_cwnd = cmp::max(cubic_cwnd, r.cubic_state.w_est as usize); + } else { + // Concave region or convex region use same increment. + let cubic_inc = + r.max_datagram_size * (target as usize - cubic_cwnd) / cubic_cwnd; + + cubic_cwnd += cubic_inc; + } + + // Update the increment and increase cwnd by MSS. + r.cubic_state.cwnd_inc += cubic_cwnd - r.congestion_window; + + if r.cubic_state.cwnd_inc >= r.max_datagram_size { + r.congestion_window += r.max_datagram_size; + r.cubic_state.cwnd_inc -= r.max_datagram_size; + } + } +} + +fn congestion_event( + r: &mut Congestion, bytes_in_flight: usize, _lost_bytes: usize, + largest_lost_pkt: &Sent, now: Instant, +) { + let time_sent = largest_lost_pkt.time_sent; + let in_congestion_recovery = r.in_congestion_recovery(time_sent); + + // Start a new congestion event if packet was sent after the + // start of the previous congestion recovery period. + if !in_congestion_recovery { + r.congestion_recovery_start_time = Some(now); + + // Fast convergence + if (r.congestion_window as f64) < r.cubic_state.w_max { + r.cubic_state.w_max = + r.congestion_window as f64 * (1.0 + BETA_CUBIC) / 2.0; + } else { + r.cubic_state.w_max = r.congestion_window as f64; + } + + r.ssthresh = (r.congestion_window as f64 * BETA_CUBIC) as usize; + r.ssthresh = cmp::max( + r.ssthresh, + r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS, + ); + r.congestion_window = r.ssthresh; + + r.cubic_state.k = if r.cubic_state.w_max < r.congestion_window as f64 { + 0.0 + } else { + r.cubic_state + .cubic_k(r.congestion_window, r.max_datagram_size) + }; + + r.cubic_state.cwnd_inc = + (r.cubic_state.cwnd_inc as f64 * BETA_CUBIC) as usize; + + r.cubic_state.w_est = r.congestion_window as f64; + r.cubic_state.alpha_aimd = ALPHA_AIMD; + + if r.hystart.in_css() { + r.hystart.congestion_event(); + } + + r.prr.congestion_event(bytes_in_flight); + } +} + +fn checkpoint(r: &mut Congestion) { + r.cubic_state.prior.congestion_window = r.congestion_window; + r.cubic_state.prior.ssthresh = r.ssthresh; + r.cubic_state.prior.w_max = r.cubic_state.w_max; + r.cubic_state.prior.k = r.cubic_state.k; + r.cubic_state.prior.epoch_start = r.congestion_recovery_start_time; + r.cubic_state.prior.lost_count = r.lost_count; +} + +fn rollback(r: &mut Congestion) -> bool { + // Don't go back to slow start. + if r.cubic_state.prior.congestion_window < r.cubic_state.prior.ssthresh { + return false; + } + + if r.congestion_window >= r.cubic_state.prior.congestion_window { + return false; + } + + r.congestion_window = r.cubic_state.prior.congestion_window; + r.ssthresh = r.cubic_state.prior.ssthresh; + r.cubic_state.w_max = r.cubic_state.prior.w_max; + r.cubic_state.k = r.cubic_state.prior.k; + r.congestion_recovery_start_time = r.cubic_state.prior.epoch_start; + + true +} + +fn has_custom_pacing() -> bool { + false +} + +fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!( + f, + "cubic={{ k={} w_max={} }} ", + r.cubic_state.k, r.cubic_state.w_max + ) +} + +#[cfg(test)] +mod tests { + use self::recovery::congestion::reno::test_sender::TestSender; + use super::*; + use crate::recovery::congestion::hystart; + use crate::recovery::Recovery; + use crate::CongestionControlAlgorithm; + + fn test_sender() -> TestSender { + TestSender::new(recovery::CongestionControlAlgorithm::CUBIC, false) + } + + fn hystart_test_sender() -> TestSender { + TestSender::new(recovery::CongestionControlAlgorithm::CUBIC, true) + } + + #[test] + fn cubic_init() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); + + let r = Recovery::new(&cfg); + + assert!(r.cwnd() > 0); + assert_eq!(r.bytes_in_flight, 0); + } + + #[test] + fn cubic_slow_start() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + + let cwnd_prev = sender.congestion_window; + + sender.ack_n_packets(1, size); + + // Check if cwnd increased by packet size (slow start) + assert_eq!(sender.congestion_window, cwnd_prev + size); + } + + #[test] + fn cubic_slow_start_multi_acks() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + + let cwnd_prev = sender.congestion_window; + + sender.ack_n_packets(3, size); + + // Acked 3 packets. + assert_eq!(sender.congestion_window, cwnd_prev + size * 3); + } + + #[test] + fn cubic_congestion_event() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + sender.send_packet(size); + + let cwnd_prev = sender.congestion_window; + + sender.lose_n_packets(1, size, None); + + // In CUBIC, after congestion event, cwnd will be reduced by (1 - + // CUBIC_BETA) + assert_eq!( + cwnd_prev as f64 * BETA_CUBIC, + sender.congestion_window as f64 + ); + } + + #[test] + fn cubic_congestion_avoidance() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + let prev_cwnd = sender.congestion_window; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + + // Trigger congestion event to update ssthresh + sender.lose_n_packets(1, size, None); + + // After congestion event, cwnd will be reduced. + let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; + assert_eq!(sender.congestion_window, cur_cwnd); + + // Shift current time by 1 RTT. + let rtt = Duration::from_millis(100); + sender.update_rtt(rtt); + // Exit from the recovery. + sender.advance_time(rtt); + + // During Congestion Avoidance, it will take + // 5 ACKs to increase cwnd by 1 MSS. + for _ in 0..5 { + sender.ack_n_packets(1, size); + sender.advance_time(rtt); + } + + assert_eq!(sender.congestion_window, cur_cwnd + size); + } + + #[test] + fn cubic_hystart_css_to_ss() { + let mut sender = hystart_test_sender(); + let size = sender.max_datagram_size; + + // 1st round. + let n_rtt_sample = hystart::N_RTT_SAMPLE; + + let rtt_1st = Duration::from_millis(50); + + let next_rnd = sender.next_pkt + n_rtt_sample as u64 - 1; + sender.hystart.start_round(next_rnd); + // Send 1st round packets. + for _ in 0..n_rtt_sample { + sender.send_packet(size); + } + sender.update_app_limited(false); + + // Receiving Acks. + sender.advance_time(rtt_1st); + sender.update_rtt(rtt_1st); + sender.ack_n_packets(n_rtt_sample, size); + + // Not in CSS yet. + assert!(sender.hystart.css_start_time().is_none()); + + // 2nd round. + let mut rtt_2nd = Duration::from_millis(100); + + sender.advance_time(rtt_2nd); + + let next_rnd = sender.next_pkt + n_rtt_sample as u64 - 1; + sender.hystart.start_round(next_rnd); + // Send 2nd round packets. + for _ in 0..n_rtt_sample { + sender.send_packet(size); + } + sender.update_app_limited(false); + + // Receiving Acks. + // Last ack will cause to exit to CSS. + let mut cwnd_prev = sender.congestion_window(); + + for _ in 0..n_rtt_sample { + cwnd_prev = sender.congestion_window(); + sender.update_rtt(rtt_2nd); + sender.ack_n_packets(1, size); + // Keep increasing RTT so that hystart exits to CSS. + rtt_2nd += rtt_2nd.saturating_add(Duration::from_millis(4)); + } + + // Now we are in CSS. + assert!(sender.hystart.css_start_time().is_some()); + assert_eq!(sender.congestion_window(), cwnd_prev + size); + + // 3rd round, which RTT is less than previous round to + // trigger back to Slow Start. + let rtt_3rd = Duration::from_millis(80); + sender.advance_time(rtt_3rd); + cwnd_prev = sender.congestion_window(); + + let next_rnd = sender.next_pkt + n_rtt_sample as u64 - 1; + sender.hystart.start_round(next_rnd); + // Send 3nd round packets. + for _ in 0..n_rtt_sample { + sender.send_packet(size); + } + sender.update_app_limited(false); + + // Receiving Acks. + // Last ack will cause to exit to SS. + sender.update_rtt(rtt_3rd); + sender.ack_n_packets(n_rtt_sample, size); + + // Now we are back in Slow Start. + assert!(sender.hystart.css_start_time().is_none()); + assert_eq!( + sender.congestion_window(), + cwnd_prev + + size / hystart::CSS_GROWTH_DIVISOR * hystart::N_RTT_SAMPLE + ); + } + + #[test] + fn cubic_hystart_css_to_ca() { + let mut sender = hystart_test_sender(); + let size = sender.max_datagram_size; + + // 1st round. + let n_rtt_sample = hystart::N_RTT_SAMPLE; + + let rtt_1st = Duration::from_millis(50); + + let next_rnd = sender.next_pkt + n_rtt_sample as u64 - 1; + sender.hystart.start_round(next_rnd); + // Send 1st round packets. + for _ in 0..n_rtt_sample { + sender.send_packet(size); + } + sender.update_app_limited(false); + + // Receiving Acks. + sender.advance_time(rtt_1st); + sender.update_rtt(rtt_1st); + sender.ack_n_packets(n_rtt_sample, size); + + // Not in CSS yet. + assert!(sender.hystart.css_start_time().is_none()); + + // 2nd round. + let mut rtt_2nd = Duration::from_millis(100); + sender.advance_time(rtt_2nd); + // Send 2nd round packets. + let next_rnd = sender.next_pkt + n_rtt_sample as u64 - 1; + sender.hystart.start_round(next_rnd); + for _ in 0..n_rtt_sample { + sender.send_packet(size); + } + sender.update_app_limited(false); + + // Receiving Acks. + // Last ack will cause to exit to CSS. + let mut cwnd_prev = sender.congestion_window(); + + for _ in 0..n_rtt_sample { + cwnd_prev = sender.congestion_window(); + sender.update_rtt(rtt_2nd); + sender.ack_n_packets(1, size); + // Keep increasing RTT so that hystart exits to CSS. + rtt_2nd += rtt_2nd.saturating_add(Duration::from_millis(4)); + } + + // Now we are in CSS. + assert!(sender.hystart.css_start_time().is_some()); + assert_eq!(sender.congestion_window(), cwnd_prev + size); + + // Run 5 (CSS_ROUNDS) in CSS, to exit to congestion avoidance. + let rtt_css = Duration::from_millis(100); + sender.advance_time(rtt_css); + + for _ in 0..hystart::CSS_ROUNDS { + // Send a round of packets. + let next_rnd = sender.next_pkt + n_rtt_sample as u64 - 1; + sender.hystart.start_round(next_rnd); + for _ in 0..n_rtt_sample { + sender.send_packet(size); + } + sender.update_app_limited(false); + + // Receiving Acks. + sender.update_rtt(rtt_css); + sender.ack_n_packets(n_rtt_sample, size); + } + // Now we are in congestion avoidance. + assert_eq!(sender.congestion_window(), sender.ssthresh); + } + + #[test] + fn cubic_spurious_congestion_event() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + let prev_cwnd = sender.congestion_window(); + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + sender.lose_n_packets(1, size, None); + + // After congestion event, cwnd will be reduced. + let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; + assert_eq!(sender.congestion_window(), cur_cwnd); + + // Ack more than cwnd bytes with rtt=100ms + let rtt = Duration::from_millis(100); + sender.update_rtt(rtt); + + let acked = Acked { + pkt_num: 0, + // To exit from recovery + time_sent: sender.time + rtt, + size, + delivered: 0, + delivered_time: sender.time, + first_sent_time: sender.time, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }; + + // Trigger detecting spurious congestion event + sender.inject_ack(acked, sender.time + rtt + Duration::from_millis(5)); + + // This is from slow start, no rollback. + assert_eq!(sender.congestion_window(), cur_cwnd); + + sender.advance_time(rtt); + + let prev_cwnd = sender.congestion_window(); + + sender.lose_n_packets(1, size, Some(sender.time)); + + // After congestion event, cwnd will be reduced. + let cur_cwnd = (cur_cwnd as f64 * BETA_CUBIC) as usize; + assert_eq!(sender.congestion_window(), cur_cwnd); + + sender.advance_time(rtt + Duration::from_millis(5)); + + let acked = Acked { + pkt_num: 0, + // To exit from recovery + time_sent: sender.time + rtt, + size, + delivered: 0, + delivered_time: sender.time, + first_sent_time: sender.time, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + rtt: Duration::ZERO, + }; + + // Trigger detecting spurious congestion event. + sender.inject_ack(acked, sender.time + rtt + Duration::from_millis(5)); + + // cwnd is rolled back to the previous one. + assert_eq!(sender.congestion_window(), prev_cwnd); + } + + #[test] + fn cubic_fast_convergence() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + let prev_cwnd = sender.congestion_window; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + + // Trigger congestion event to update ssthresh + sender.lose_n_packets(1, size, None); + + // After 1st congestion event, cwnd will be reduced. + let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; + assert_eq!(sender.congestion_window, cur_cwnd); + + // Shift current time by 1 RTT. + let rtt = Duration::from_millis(100); + sender.update_rtt(rtt); + // Exit from the recovery. + sender.advance_time(rtt); + + // During Congestion Avoidance, it will take + // 5 ACKs to increase cwnd by 1 MSS. + for _ in 0..5 { + sender.ack_n_packets(1, size); + sender.advance_time(rtt); + } + + assert_eq!(sender.congestion_window, cur_cwnd + size); + + let prev_cwnd = sender.congestion_window; + + // Fast convergence: now there is 2nd congestion event and + // cwnd is not fully recovered to w_max, w_max will be + // further reduced. + sender.lose_n_packets(1, size, None); + + // After 2nd congestion event, cwnd will be reduced. + let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; + assert_eq!(sender.congestion_window, cur_cwnd); + + // w_max will be further reduced, not prev_cwnd + assert_eq!( + sender.cubic_state.w_max, + prev_cwnd as f64 * (1.0 + BETA_CUBIC) / 2.0 + ); + } +} diff --git a/quiche/src/recovery/delivery_rate.rs b/quiche/src/recovery/congestion/delivery_rate.rs similarity index 100% rename from quiche/src/recovery/delivery_rate.rs rename to quiche/src/recovery/congestion/delivery_rate.rs diff --git a/quiche/src/recovery/hystart.rs b/quiche/src/recovery/congestion/hystart.rs similarity index 98% rename from quiche/src/recovery/hystart.rs rename to quiche/src/recovery/congestion/hystart.rs index 2fdea159e0..591071f268 100644 --- a/quiche/src/recovery/hystart.rs +++ b/quiche/src/recovery/congestion/hystart.rs @@ -95,10 +95,6 @@ impl Hystart { } } - pub fn reset(&mut self) { - *self = Self::new(self.enabled); - } - pub fn enabled(&self) -> bool { self.enabled } diff --git a/quiche/src/recovery/pacer.rs b/quiche/src/recovery/congestion/pacer.rs similarity index 100% rename from quiche/src/recovery/pacer.rs rename to quiche/src/recovery/congestion/pacer.rs diff --git a/quiche/src/recovery/prr.rs b/quiche/src/recovery/congestion/prr.rs similarity index 100% rename from quiche/src/recovery/prr.rs rename to quiche/src/recovery/congestion/prr.rs diff --git a/quiche/src/recovery/congestion/reno.rs b/quiche/src/recovery/congestion/reno.rs new file mode 100644 index 0000000000..62f5ab9bd4 --- /dev/null +++ b/quiche/src/recovery/congestion/reno.rs @@ -0,0 +1,434 @@ +// Copyright (C) 2019, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +//! Reno Congestion Control +//! +//! Note that Slow Start can use HyStart++ when enabled. + +use std::cmp; +use std::time::Instant; + +use crate::recovery; + +use crate::recovery::rtt::RttStats; +use crate::recovery::Acked; +use crate::recovery::Sent; + +use super::Congestion; +use super::CongestionControlOps; + +pub(crate) static RENO: CongestionControlOps = CongestionControlOps { + on_init, + on_packet_sent, + on_packets_acked, + congestion_event, + checkpoint, + rollback, + has_custom_pacing, + debug_fmt, +}; + +pub fn on_init(_r: &mut Congestion) {} + +pub fn on_packet_sent( + _r: &mut Congestion, _sent_bytes: usize, _bytes_in_flight: usize, + _now: Instant, +) { +} + +fn on_packets_acked( + r: &mut Congestion, _bytes_in_flight: usize, packets: &mut Vec, + now: Instant, rtt_stats: &RttStats, +) { + for pkt in packets.drain(..) { + on_packet_acked(r, &pkt, now, rtt_stats); + } +} + +fn on_packet_acked( + r: &mut Congestion, packet: &Acked, now: Instant, rtt_stats: &RttStats, +) { + if r.in_congestion_recovery(packet.time_sent) { + return; + } + + if r.app_limited { + return; + } + + if r.congestion_window < r.ssthresh { + // In Slow slart, bytes_acked_sl is used for counting + // acknowledged bytes. + r.bytes_acked_sl += packet.size; + + if r.hystart.in_css() { + r.congestion_window += r.hystart.css_cwnd_inc(r.max_datagram_size); + } else { + r.congestion_window += r.max_datagram_size; + } + + if r.hystart.on_packet_acked(packet, rtt_stats.latest_rtt, now) { + // Exit to congestion avoidance if CSS ends. + r.ssthresh = r.congestion_window; + } + } else { + // Congestion avoidance. + r.bytes_acked_ca += packet.size; + + if r.bytes_acked_ca >= r.congestion_window { + r.bytes_acked_ca -= r.congestion_window; + r.congestion_window += r.max_datagram_size; + } + } +} + +fn congestion_event( + r: &mut Congestion, _bytes_in_flight: usize, _lost_bytes: usize, + largest_lost_pkt: &Sent, now: Instant, +) { + // Start a new congestion event if packet was sent after the + // start of the previous congestion recovery period. + let time_sent = largest_lost_pkt.time_sent; + + if !r.in_congestion_recovery(time_sent) { + r.congestion_recovery_start_time = Some(now); + + r.congestion_window = (r.congestion_window as f64 * + recovery::LOSS_REDUCTION_FACTOR) + as usize; + + r.congestion_window = cmp::max( + r.congestion_window, + r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS, + ); + + r.bytes_acked_ca = (r.congestion_window as f64 * + recovery::LOSS_REDUCTION_FACTOR) as usize; + + r.ssthresh = r.congestion_window; + + if r.hystart.in_css() { + r.hystart.congestion_event(); + } + } +} + +fn checkpoint(_r: &mut Congestion) {} + +fn rollback(_r: &mut Congestion) -> bool { + true +} + +fn has_custom_pacing() -> bool { + false +} + +fn debug_fmt(_r: &Congestion, _f: &mut std::fmt::Formatter) -> std::fmt::Result { + Ok(()) +} + +#[cfg(test)] +pub(crate) mod test_sender { + use std::collections::VecDeque; + use std::ops::Deref; + use std::ops::DerefMut; + use std::time::Duration; + use std::time::Instant; + + use crate::recovery::congestion::Congestion; + use crate::recovery::rtt::RttStats; + use crate::recovery::Acked; + use crate::recovery::RecoveryConfig; + use crate::recovery::Sent; + use crate::CongestionControlAlgorithm; + + pub(crate) struct TestSender { + cc: Congestion, + pub(crate) next_pkt: u64, + pub(crate) next_ack: u64, + pub(crate) bytes_in_flight: usize, + pub(crate) time: Instant, + rtt_stats: RttStats, + sent_packets: VecDeque, + } + + impl TestSender { + pub(crate) fn new( + algo: CongestionControlAlgorithm, hystart: bool, + ) -> Self { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(algo); + cfg.enable_hystart(hystart); + + TestSender { + next_pkt: 0, + next_ack: 0, + bytes_in_flight: 0, + time: Instant::now(), + rtt_stats: RttStats::new(Duration::from_micros(0)), + cc: Congestion::from_config(&RecoveryConfig::from_config(&cfg)), + sent_packets: VecDeque::new(), + } + } + + pub(crate) fn send_packet(&mut self, bytes: usize) { + let mut sent = Sent { + pkt_num: self.next_pkt, + frames: Default::default(), + time_sent: self.time, + time_acked: None, + time_lost: None, + size: bytes, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: self.time, + first_sent_time: self.time, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + pmtud: false, + }; + + self.cc.on_packet_sent( + self.bytes_in_flight, + bytes, + self.time, + &mut sent, + &self.rtt_stats, + 0, + true, + ); + + self.sent_packets.push_back(sent); + + self.bytes_in_flight += bytes; + self.next_pkt += 1; + } + + pub(crate) fn inject_ack(&mut self, acked: Acked, now: Instant) { + let _ = self.sent_packets.pop_front().unwrap(); + + self.cc.on_packets_acked( + self.bytes_in_flight, + &mut vec![acked], + &self.rtt_stats, + now, + ); + } + + pub(crate) fn ack_n_packets(&mut self, n: usize, bytes: usize) { + let mut acked = Vec::new(); + + for _ in 0..n { + let unacked = self.sent_packets.pop_front().unwrap(); + + acked.push(Acked { + pkt_num: unacked.pkt_num, + time_sent: unacked.time_sent, + size: unacked.size, + + rtt: self.time.saturating_duration_since(unacked.time_sent), + delivered: unacked.delivered, + delivered_time: unacked.delivered_time, + first_sent_time: unacked.first_sent_time, + is_app_limited: unacked.is_app_limited, + tx_in_flight: unacked.tx_in_flight, + lost: unacked.lost, + }); + + self.next_ack += 1; + } + + self.cc.on_packets_acked( + self.bytes_in_flight, + &mut acked, + &self.rtt_stats, + self.time, + ); + + self.bytes_in_flight -= n * bytes; + } + + pub(crate) fn lose_n_packets( + &mut self, n: usize, bytes: usize, time_sent: Option, + ) { + let mut unacked = None; + + for _ in 0..n { + self.next_ack += 1; + unacked = self.sent_packets.pop_front(); + } + + let mut unacked = unacked.unwrap(); + if let Some(time) = time_sent { + unacked.time_sent = time; + } + + if !self.cc.in_congestion_recovery(unacked.time_sent) { + (self.cc.cc_ops.checkpoint)(&mut self.cc); + } + + (self.cc_ops.congestion_event)( + &mut self.cc, + self.bytes_in_flight, + n * bytes, + &unacked, + self.time, + ); + + self.cc.lost_count += n; + self.bytes_in_flight -= n * bytes; + } + + pub(crate) fn update_rtt(&mut self, rtt: Duration) { + self.rtt_stats + .update_rtt(rtt, Duration::ZERO, self.time, true) + } + + pub(crate) fn advance_time(&mut self, period: Duration) { + self.time += period; + } + } + + impl Deref for TestSender { + type Target = Congestion; + + fn deref(&self) -> &Self::Target { + &self.cc + } + } + + impl DerefMut for TestSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.cc + } + } +} + +#[cfg(test)] +mod tests { + use self::test_sender::TestSender; + + use super::*; + + use crate::recovery::Recovery; + use std::time::Duration; + + fn test_sender() -> TestSender { + TestSender::new(recovery::CongestionControlAlgorithm::Reno, false) + } + + #[test] + fn reno_init() { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); + + let r = Recovery::new(&cfg); + + assert!(r.cwnd() > 0); + assert_eq!(r.bytes_in_flight, 0); + } + + #[test] + fn reno_slow_start() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + + let cwnd_prev = sender.congestion_window; + + sender.ack_n_packets(1, size); + + // Check if cwnd increased by packet size (slow start). + assert_eq!(sender.congestion_window, cwnd_prev + size); + } + + #[test] + fn reno_slow_start_multi_acks() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..sender.initial_congestion_window_packets { + sender.send_packet(size); + } + + let cwnd_prev = sender.congestion_window; + + sender.ack_n_packets(3, size); + + // Acked 3 packets. + assert_eq!(sender.congestion_window, cwnd_prev + size * 3); + } + + #[test] + fn reno_congestion_event() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + let prev_cwnd = sender.congestion_window; + + sender.send_packet(size); + sender.lose_n_packets(1, size, None); + + // In Reno, after congestion event, cwnd will be cut in half. + assert_eq!(prev_cwnd / 2, sender.congestion_window); + } + + #[test] + fn reno_congestion_avoidance() { + let mut sender = test_sender(); + let size = sender.max_datagram_size; + + // Send initcwnd full MSS packets to become no longer app limited + for _ in 0..14 { + sender.send_packet(size); + } + + let prev_cwnd = sender.congestion_window; + + sender.lose_n_packets(1, size, None); + + // After congestion event, cwnd will be reduced. + let cur_cwnd = + (prev_cwnd as f64 * recovery::LOSS_REDUCTION_FACTOR) as usize; + assert_eq!(sender.congestion_window, cur_cwnd); + + let rtt = Duration::from_millis(100); + sender.update_rtt(rtt); + sender.advance_time(2 * rtt); + + sender.ack_n_packets(8, size); + // After acking more than cwnd, expect cwnd increased by MSS + assert_eq!(sender.congestion_window, cur_cwnd + size); + } +} diff --git a/quiche/src/recovery/cubic.rs b/quiche/src/recovery/cubic.rs deleted file mode 100644 index 56f8b11fed..0000000000 --- a/quiche/src/recovery/cubic.rs +++ /dev/null @@ -1,1334 +0,0 @@ -// Copyright (C) 2019, Cloudflare, Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -//! CUBIC Congestion Control -//! -//! This implementation is based on the following draft: -//! -//! -//! Note that Slow Start can use HyStart++ when enabled. - -use std::cmp; - -use std::time::Duration; -use std::time::Instant; - -use crate::recovery; -use crate::recovery::reno; - -use crate::recovery::Acked; -use crate::recovery::CongestionControlOps; -use crate::recovery::Sent; - -use super::rtt::RttStats; -use super::Congestion; - -pub static CUBIC: CongestionControlOps = CongestionControlOps { - on_init, - reset, - on_packet_sent, - on_packets_acked, - congestion_event, - collapse_cwnd, - checkpoint, - rollback, - has_custom_pacing, - debug_fmt, -}; - -/// CUBIC Constants. -/// -/// These are recommended value in RFC8312. -const BETA_CUBIC: f64 = 0.7; - -const C: f64 = 0.4; - -/// Threshold for rolling back state, as percentage of lost packets relative to -/// cwnd. -const ROLLBACK_THRESHOLD_PERCENT: usize = 20; - -/// Minimum threshold for rolling back state, as number of packets. -const MIN_ROLLBACK_THRESHOLD: usize = 2; - -/// Default value of alpha_aimd in the beginning of congestion avoidance. -const ALPHA_AIMD: f64 = 3.0 * (1.0 - BETA_CUBIC) / (1.0 + BETA_CUBIC); - -/// CUBIC State Variables. -/// -/// We need to keep those variables across the connection. -/// k, w_max, w_est are described in the RFC. -#[derive(Debug, Default)] -pub struct State { - k: f64, - - w_max: f64, - - w_est: f64, - - alpha_aimd: f64, - - // Used in CUBIC fix (see on_packet_sent()) - last_sent_time: Option, - - // Store cwnd increment during congestion avoidance. - cwnd_inc: usize, - - // CUBIC state checkpoint preceding the last congestion event. - prior: PriorState, -} - -/// Stores the CUBIC state from before the last congestion event. -/// -/// -#[derive(Debug, Default)] -struct PriorState { - congestion_window: usize, - - ssthresh: usize, - - w_max: f64, - - k: f64, - - epoch_start: Option, - - lost_count: usize, -} - -/// CUBIC Functions. -/// -/// Note that these calculations are based on a count of cwnd as bytes, -/// not packets. -/// Unit of t (duration) and RTT are based on seconds (f64). -impl State { - // K = cubic_root ((w_max - cwnd) / C) (Eq. 2) - fn cubic_k(&self, cwnd: usize, max_datagram_size: usize) -> f64 { - let w_max = self.w_max / max_datagram_size as f64; - let cwnd = cwnd as f64 / max_datagram_size as f64; - - libm::cbrt((w_max - cwnd) / C) - } - - // W_cubic(t) = C * (t - K)^3 + w_max (Eq. 1) - fn w_cubic(&self, t: Duration, max_datagram_size: usize) -> f64 { - let w_max = self.w_max / max_datagram_size as f64; - - (C * (t.as_secs_f64() - self.k).powi(3) + w_max) * - max_datagram_size as f64 - } - - // W_est = W_est + alpha_aimd * (segments_acked / cwnd) (Eq. 4) - fn w_est_inc( - &self, acked: usize, cwnd: usize, max_datagram_size: usize, - ) -> f64 { - self.alpha_aimd * (acked as f64 / cwnd as f64) * max_datagram_size as f64 - } -} - -fn on_init(_r: &mut Congestion) {} - -fn reset(r: &mut Congestion) { - r.cubic_state = State::default(); -} - -fn collapse_cwnd(r: &mut Congestion, bytes_in_flight: usize) { - let cubic = &mut r.cubic_state; - - r.congestion_recovery_start_time = None; - - cubic.w_max = r.congestion_window as f64; - - // 4.7 Timeout - reduce ssthresh based on BETA_CUBIC - r.ssthresh = (r.congestion_window as f64 * BETA_CUBIC) as usize; - r.ssthresh = cmp::max( - r.ssthresh, - r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS, - ); - - cubic.cwnd_inc = 0; - - reno::collapse_cwnd(r, bytes_in_flight); -} - -fn on_packet_sent( - r: &mut Congestion, sent_bytes: usize, bytes_in_flight: usize, now: Instant, -) { - // See https://github.com/torvalds/linux/commit/30927520dbae297182990bb21d08762bcc35ce1d - // First transmit when no packets in flight - let cubic = &mut r.cubic_state; - - if let Some(last_sent_time) = cubic.last_sent_time { - if bytes_in_flight == 0 { - let delta = now - last_sent_time; - - // We were application limited (idle) for a while. - // Shift epoch start to keep cwnd growth to cubic curve. - if let Some(recovery_start_time) = r.congestion_recovery_start_time { - if delta.as_nanos() > 0 { - r.congestion_recovery_start_time = - Some(recovery_start_time + delta); - } - } - } - } - - cubic.last_sent_time = Some(now); - - reno::on_packet_sent(r, sent_bytes, bytes_in_flight, now); -} - -fn on_packets_acked( - r: &mut Congestion, bytes_in_flight: usize, packets: &mut Vec, - now: Instant, rtt_stats: &RttStats, -) { - for pkt in packets.drain(..) { - on_packet_acked(r, bytes_in_flight, &pkt, now, rtt_stats); - } -} - -fn on_packet_acked( - r: &mut Congestion, bytes_in_flight: usize, packet: &Acked, now: Instant, - rtt_stats: &RttStats, -) { - let in_congestion_recovery = r.in_congestion_recovery(packet.time_sent); - - if in_congestion_recovery { - r.prr.on_packet_acked( - packet.size, - bytes_in_flight, - r.ssthresh, - r.max_datagram_size, - ); - - return; - } - - if r.app_limited { - return; - } - - // Detecting spurious congestion events. - // - // - // When the recovery episode ends with recovering - // a few packets (less than cwnd / mss * ROLLBACK_THRESHOLD_PERCENT(%)), it's - // considered as spurious and restore to the previous state. - if r.congestion_recovery_start_time.is_some() { - let new_lost = r.lost_count - r.cubic_state.prior.lost_count; - let rollback_threshold = (r.congestion_window / r.max_datagram_size) * - ROLLBACK_THRESHOLD_PERCENT / - 100; - let rollback_threshold = rollback_threshold.max(MIN_ROLLBACK_THRESHOLD); - - if new_lost < rollback_threshold { - let did_rollback = rollback(r); - - if did_rollback { - return; - } - } - } - - if r.congestion_window < r.ssthresh { - // In Slow slart, bytes_acked_sl is used for counting - // acknowledged bytes. - r.bytes_acked_sl += packet.size; - - if r.bytes_acked_sl >= r.max_datagram_size { - if r.hystart.in_css() { - r.congestion_window += - r.hystart.css_cwnd_inc(r.max_datagram_size); - } else { - r.congestion_window += r.max_datagram_size; - } - - r.bytes_acked_sl -= r.max_datagram_size; - } - - if r.hystart.on_packet_acked(packet, rtt_stats.latest_rtt, now) { - // Exit to congestion avoidance if CSS ends. - r.ssthresh = r.congestion_window; - } - } else { - // Congestion avoidance. - let ca_start_time; - - // In CSS, use css_start_time instead of congestion_recovery_start_time. - if r.hystart.in_css() { - ca_start_time = r.hystart.css_start_time().unwrap(); - - // Reset w_max and k when CSS started. - if r.cubic_state.w_max == 0.0 { - r.cubic_state.w_max = r.congestion_window as f64; - r.cubic_state.k = 0.0; - - r.cubic_state.w_est = r.congestion_window as f64; - r.cubic_state.alpha_aimd = ALPHA_AIMD; - } - } else { - match r.congestion_recovery_start_time { - Some(t) => ca_start_time = t, - None => { - // When we come here without congestion_event() triggered, - // initialize congestion_recovery_start_time, w_max and k. - ca_start_time = now; - r.congestion_recovery_start_time = Some(now); - - r.cubic_state.w_max = r.congestion_window as f64; - r.cubic_state.k = 0.0; - - r.cubic_state.w_est = r.congestion_window as f64; - r.cubic_state.alpha_aimd = ALPHA_AIMD; - }, - } - } - - let t = now.saturating_duration_since(ca_start_time); - - // target = w_cubic(t + rtt) - let target = r - .cubic_state - .w_cubic(t + *rtt_stats.min_rtt, r.max_datagram_size); - - // Clipping target to [cwnd, 1.5 x cwnd] - let target = f64::max(target, r.congestion_window as f64); - let target = f64::min(target, r.congestion_window as f64 * 1.5); - - // Update w_est. - let w_est_inc = r.cubic_state.w_est_inc( - packet.size, - r.congestion_window, - r.max_datagram_size, - ); - r.cubic_state.w_est += w_est_inc; - - if r.cubic_state.w_est >= r.cubic_state.w_max { - r.cubic_state.alpha_aimd = 1.0; - } - - let mut cubic_cwnd = r.congestion_window; - - if r.cubic_state.w_cubic(t, r.max_datagram_size) < r.cubic_state.w_est { - // AIMD friendly region (W_cubic(t) < W_est) - cubic_cwnd = cmp::max(cubic_cwnd, r.cubic_state.w_est as usize); - } else { - // Concave region or convex region use same increment. - let cubic_inc = - r.max_datagram_size * (target as usize - cubic_cwnd) / cubic_cwnd; - - cubic_cwnd += cubic_inc; - } - - // Update the increment and increase cwnd by MSS. - r.cubic_state.cwnd_inc += cubic_cwnd - r.congestion_window; - - if r.cubic_state.cwnd_inc >= r.max_datagram_size { - r.congestion_window += r.max_datagram_size; - r.cubic_state.cwnd_inc -= r.max_datagram_size; - } - } -} - -fn congestion_event( - r: &mut Congestion, bytes_in_flight: usize, _lost_bytes: usize, - largest_lost_pkt: &Sent, now: Instant, -) { - let time_sent = largest_lost_pkt.time_sent; - let in_congestion_recovery = r.in_congestion_recovery(time_sent); - - // Start a new congestion event if packet was sent after the - // start of the previous congestion recovery period. - if !in_congestion_recovery { - r.congestion_recovery_start_time = Some(now); - - // Fast convergence - if (r.congestion_window as f64) < r.cubic_state.w_max { - r.cubic_state.w_max = - r.congestion_window as f64 * (1.0 + BETA_CUBIC) / 2.0; - } else { - r.cubic_state.w_max = r.congestion_window as f64; - } - - r.ssthresh = (r.congestion_window as f64 * BETA_CUBIC) as usize; - r.ssthresh = cmp::max( - r.ssthresh, - r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS, - ); - r.congestion_window = r.ssthresh; - - r.cubic_state.k = if r.cubic_state.w_max < r.congestion_window as f64 { - 0.0 - } else { - r.cubic_state - .cubic_k(r.congestion_window, r.max_datagram_size) - }; - - r.cubic_state.cwnd_inc = - (r.cubic_state.cwnd_inc as f64 * BETA_CUBIC) as usize; - - r.cubic_state.w_est = r.congestion_window as f64; - r.cubic_state.alpha_aimd = ALPHA_AIMD; - - if r.hystart.in_css() { - r.hystart.congestion_event(); - } - - r.prr.congestion_event(bytes_in_flight); - } -} - -fn checkpoint(r: &mut Congestion) { - r.cubic_state.prior.congestion_window = r.congestion_window; - r.cubic_state.prior.ssthresh = r.ssthresh; - r.cubic_state.prior.w_max = r.cubic_state.w_max; - r.cubic_state.prior.k = r.cubic_state.k; - r.cubic_state.prior.epoch_start = r.congestion_recovery_start_time; - r.cubic_state.prior.lost_count = r.lost_count; -} - -fn rollback(r: &mut Congestion) -> bool { - // Don't go back to slow start. - if r.cubic_state.prior.congestion_window < r.cubic_state.prior.ssthresh { - return false; - } - - if r.congestion_window >= r.cubic_state.prior.congestion_window { - return false; - } - - r.congestion_window = r.cubic_state.prior.congestion_window; - r.ssthresh = r.cubic_state.prior.ssthresh; - r.cubic_state.w_max = r.cubic_state.prior.w_max; - r.cubic_state.k = r.cubic_state.prior.k; - r.congestion_recovery_start_time = r.cubic_state.prior.epoch_start; - - true -} - -fn has_custom_pacing() -> bool { - false -} - -fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!( - f, - "cubic={{ k={} w_max={} }} ", - r.cubic_state.k, r.cubic_state.w_max - ) -} - -#[cfg(test)] -mod tests { - use crate::recovery::hystart; - use crate::CongestionControlAlgorithm; - - use super::*; - - use crate::recovery::Recovery; - use smallvec::smallvec; - - #[test] - fn cubic_init() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let r = Recovery::new(&cfg); - - assert!(r.cwnd() > 0); - assert_eq!(r.bytes_in_flight, 0); - } - - #[test] - fn cubic_send() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - - r.on_packet_sent_cc(0, 1000, Instant::now()); - - assert_eq!(r.bytes_in_flight, 1000); - } - - #[test] - fn cubic_slow_start() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, p.size, now); - } - - let cwnd_prev = r.cwnd(); - - let mut acked = vec![Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - - // Check if cwnd increased by packet size (slow start) - assert_eq!(r.cwnd(), cwnd_prev + p.size); - } - - #[test] - fn cubic_slow_start_multi_acks() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, p.size, now); - } - - let cwnd_prev = r.cwnd(); - - let mut acked = vec![ - Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }, - Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }, - Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }, - ]; - - r.on_packets_acked(&mut acked, now); - - // Acked 3 packets. - assert_eq!(r.cwnd(), cwnd_prev + p.size * 3); - } - - #[test] - fn cubic_congestion_event() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - let prev_cwnd = r.cwnd(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - r.congestion_event(r.max_datagram_size, &p, now); - - // In CUBIC, after congestion event, cwnd will be reduced by (1 - - // CUBIC_BETA) - assert_eq!(prev_cwnd as f64 * BETA_CUBIC, r.cwnd() as f64); - } - - #[test] - fn cubic_congestion_avoidance() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let mut now = Instant::now(); - let prev_cwnd = r.cwnd(); - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, r.max_datagram_size, now); - } - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - // Trigger congestion event to update ssthresh - r.congestion_event(r.max_datagram_size, &p, now); - - // After congestion event, cwnd will be reduced. - let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; - assert_eq!(r.cwnd(), cur_cwnd); - - // Shift current time by 1 RTT. - let rtt = Duration::from_millis(100); - - r.rtt_stats - .update_rtt(rtt, Duration::from_millis(0), now, true); - - // Exit from the recovery. - now += rtt; - - // During Congestion Avoidance, it will take - // 5 ACKs to increase cwnd by 1 MSS. - for _ in 0..5 { - let mut acked = vec![Acked { - pkt_num: 0, - time_sent: now, - size: r.max_datagram_size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - now += rtt; - } - - assert_eq!(r.cwnd(), cur_cwnd + r.max_datagram_size); - } - - #[test] - fn cubic_collapse_cwnd_and_restart() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - // Fill up bytes_in_flight to avoid app_limited=true - r.on_packet_sent_cc(0, 30000, now); - - // Trigger congestion event to update ssthresh - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - r.congestion_event(r.max_datagram_size, &p, now); - - // After persistent congestion, cwnd should be the minimum window - r.collapse_cwnd(); - assert_eq!( - r.cwnd(), - r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS - ); - - let mut acked = vec![Acked { - pkt_num: 0, - // To exit from recovery - time_sent: now + Duration::from_millis(1), - size: r.max_datagram_size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - - // Slow start again - cwnd will be increased by 1 MSS - assert_eq!( - r.cwnd(), - r.max_datagram_size * (recovery::MINIMUM_WINDOW_PACKETS + 1) - ); - } - - #[test] - fn cubic_hystart_css_to_ss() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - cfg.enable_hystart(true); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - // 1st round. - let n_rtt_sample = hystart::N_RTT_SAMPLE; - let mut send_pn = 0; - let mut ack_pn = 0; - - let rtt_1st = Duration::from_millis(50); - - r.congestion - .hystart - .start_round(send_pn + n_rtt_sample as u64 - 1); - // Send 1st round packets. - for _ in 0..n_rtt_sample { - r.on_packet_sent_cc(send_pn, p.size, now); - send_pn += 1; - } - r.update_app_limited(false); - - // Receiving Acks. - let now = now + rtt_1st; - for _ in 0..n_rtt_sample { - r.rtt_stats - .update_rtt(rtt_1st, Duration::from_millis(0), now, true); - - let mut acked = vec![Acked { - pkt_num: ack_pn, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - ack_pn += 1; - } - - // Not in CSS yet. - assert!(r.congestion.hystart.css_start_time().is_none()); - - // 2nd round. - let mut rtt_2nd = Duration::from_millis(100); - let now = now + rtt_2nd; - - r.congestion - .hystart - .start_round(send_pn + n_rtt_sample as u64 - 1); - // Send 2nd round packets. - for _ in 0..n_rtt_sample { - r.on_packet_sent_cc(send_pn, p.size, now); - send_pn += 1; - } - r.update_app_limited(false); - - // Receiving Acks. - // Last ack will cause to exit to CSS. - let mut cwnd_prev = r.cwnd(); - - for _ in 0..n_rtt_sample { - cwnd_prev = r.cwnd(); - r.rtt_stats - .update_rtt(rtt_2nd, Duration::from_millis(0), now, true); - - let mut acked = vec![Acked { - pkt_num: ack_pn, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - ack_pn += 1; - - // Keep increasing RTT so that hystart exits to CSS. - rtt_2nd += rtt_2nd.saturating_add(Duration::from_millis(4)); - } - - // Now we are in CSS. - assert!(r.congestion.hystart.css_start_time().is_some()); - assert_eq!(r.cwnd(), cwnd_prev + r.max_datagram_size); - - // 3rd round, which RTT is less than previous round to - // trigger back to Slow Start. - let rtt_3rd = Duration::from_millis(80); - let now = now + rtt_3rd; - cwnd_prev = r.cwnd(); - - r.congestion - .hystart - .start_round(send_pn + n_rtt_sample as u64 - 1); - // Send 3nd round packets. - for _ in 0..n_rtt_sample { - r.on_packet_sent_cc(send_pn, p.size, now); - send_pn += 1; - } - r.update_app_limited(false); - - // Receiving Acks. - // Last ack will cause to exit to SS. - for _ in 0..n_rtt_sample { - r.rtt_stats - .update_rtt(rtt_3rd, Duration::from_millis(0), now, true); - - let mut acked = vec![Acked { - pkt_num: ack_pn, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - ack_pn += 1; - } - - // Now we are back in Slow Start. - assert!(r.congestion.hystart.css_start_time().is_none()); - assert_eq!( - r.cwnd(), - cwnd_prev + - r.max_datagram_size / hystart::CSS_GROWTH_DIVISOR * - hystart::N_RTT_SAMPLE - ); - } - - #[test] - fn cubic_hystart_css_to_ca() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - cfg.enable_hystart(true); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - // 1st round. - let n_rtt_sample = hystart::N_RTT_SAMPLE; - let mut send_pn = 0; - let mut ack_pn = 0; - - let rtt_1st = Duration::from_millis(50); - - r.congestion - .hystart - .start_round(send_pn + n_rtt_sample as u64 - 1); - // Send 1st round packets. - for _ in 0..n_rtt_sample { - r.on_packet_sent_cc(send_pn, p.size, now); - send_pn += 1; - } - r.update_app_limited(false); - - // Receiving Acks. - let now = now + rtt_1st; - for _ in 0..n_rtt_sample { - r.rtt_stats - .update_rtt(rtt_1st, Duration::from_millis(0), now, true); - - let mut acked = vec![Acked { - pkt_num: ack_pn, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - ack_pn += 1; - } - - // Not in CSS yet. - assert!(r.congestion.hystart.css_start_time().is_none()); - - // 2nd round. - let mut rtt_2nd = Duration::from_millis(100); - let now = now + rtt_2nd; - // Send 2nd round packets. - r.congestion - .hystart - .start_round(send_pn + n_rtt_sample as u64 - 1); - for _ in 0..n_rtt_sample { - r.on_packet_sent_cc(send_pn, p.size, now); - send_pn += 1; - } - r.update_app_limited(false); - - // Receiving Acks. - // Last ack will cause to exit to CSS. - let mut cwnd_prev = r.cwnd(); - - for _ in 0..n_rtt_sample { - cwnd_prev = r.cwnd(); - r.rtt_stats - .update_rtt(rtt_2nd, Duration::from_millis(0), now, true); - - let mut acked = vec![Acked { - pkt_num: ack_pn, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - r.on_packets_acked(&mut acked, now); - ack_pn += 1; - - // Keep increasing RTT so that hystart exits to CSS. - rtt_2nd += rtt_2nd.saturating_add(Duration::from_millis(4)); - } - - // Now we are in CSS. - assert!(r.congestion.hystart.css_start_time().is_some()); - assert_eq!(r.cwnd(), cwnd_prev + r.max_datagram_size); - - // Run 5 (CSS_ROUNDS) in CSS, to exit to congestion avoidance. - let rtt_css = Duration::from_millis(100); - let now = now + rtt_css; - - for _ in 0..hystart::CSS_ROUNDS { - // Send a round of packets. - r.congestion - .hystart - .start_round(send_pn + n_rtt_sample as u64 - 1); - for _ in 0..n_rtt_sample { - r.on_packet_sent_cc(send_pn, p.size, now); - send_pn += 1; - } - r.update_app_limited(false); - - // Receiving Acks. - for _ in 0..n_rtt_sample { - r.rtt_stats.update_rtt( - rtt_css, - Duration::from_millis(0), - now, - true, - ); - - let mut acked = vec![Acked { - pkt_num: ack_pn, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - ack_pn += 1; - } - } - // Now we are in congestion avoidance. - assert_eq!(r.cwnd(), r.congestion.ssthresh); - } - - #[test] - fn cubic_spurious_congestion_event() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - let prev_cwnd = r.cwnd(); - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, r.max_datagram_size, now); - } - - // Trigger congestion event to update ssthresh - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - r.congestion_event(r.max_datagram_size, &p, now); - - // After congestion event, cwnd will be reduced. - let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; - assert_eq!(r.cwnd(), cur_cwnd); - - let rtt = Duration::from_millis(100); - - let mut acked = vec![Acked { - pkt_num: 0, - // To exit from recovery - time_sent: now + rtt, - size: r.max_datagram_size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - // Ack more than cwnd bytes with rtt=100ms - r.rtt_stats - .update_rtt(rtt, Duration::from_millis(0), now, true); - - // Trigger detecting spurious congestion event - r.on_packets_acked(&mut acked, now + rtt + Duration::from_millis(5)); - - // This is from slow start, no rollback. - assert_eq!(r.cwnd(), cur_cwnd); - - let now = now + rtt; - - // Trigger another congestion event. - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - let prev_cwnd = r.cwnd(); - r.congestion_event(r.max_datagram_size, &p, now); - - // After congestion event, cwnd will be reduced. - let cur_cwnd = (cur_cwnd as f64 * BETA_CUBIC) as usize; - assert_eq!(r.cwnd(), cur_cwnd); - - let rtt = Duration::from_millis(100); - - let mut acked = vec![Acked { - pkt_num: 0, - // To exit from recovery - time_sent: now + rtt, - size: r.max_datagram_size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - // Ack more than cwnd bytes with rtt=100ms. - r.rtt_stats - .update_rtt(rtt, Duration::from_millis(0), now, true); - - // Trigger detecting spurious congestion event. - r.on_packets_acked(&mut acked, now + rtt + Duration::from_millis(5)); - - // cwnd is rolled back to the previous one. - assert_eq!(r.cwnd(), prev_cwnd); - } - - #[test] - fn cubic_fast_convergence() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::CUBIC); - - let mut r = Recovery::new(&cfg); - let mut now = Instant::now(); - let prev_cwnd = r.cwnd(); - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, r.max_datagram_size, now); - } - - // Trigger congestion event to update ssthresh - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - r.congestion_event(r.max_datagram_size, &p, now); - - // After 1st congestion event, cwnd will be reduced. - let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; - assert_eq!(r.cwnd(), cur_cwnd); - - // Shift current time by 1 RTT. - let rtt = Duration::from_millis(100); - r.rtt_stats - .update_rtt(rtt, Duration::from_millis(0), now, true); - - // Exit from the recovery. - now += rtt; - - // During Congestion Avoidance, it will take - // 5 ACKs to increase cwnd by 1 MSS. - for _ in 0..5 { - let mut acked = vec![Acked { - pkt_num: 0, - time_sent: now, - size: r.max_datagram_size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - now += rtt; - } - - assert_eq!(r.cwnd(), cur_cwnd + r.max_datagram_size); - - let prev_cwnd = r.cwnd(); - - // Fast convergence: now there is 2nd congestion event and - // cwnd is not fully recovered to w_max, w_max will be - // further reduced. - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - r.congestion_event(r.max_datagram_size, &p, now); - - // After 2nd congestion event, cwnd will be reduced. - let cur_cwnd = (prev_cwnd as f64 * BETA_CUBIC) as usize; - assert_eq!(r.cwnd(), cur_cwnd); - - // w_max will be further reduced, not prev_cwnd - assert_eq!( - r.congestion.cubic_state.w_max, - prev_cwnd as f64 * (1.0 + BETA_CUBIC) / 2.0 - ); - } -} diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index f9915e771a..4607a60d3c 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -26,8 +26,6 @@ use std::cmp; -use std::str::FromStr; - use std::time::Duration; use std::time::Instant; @@ -36,6 +34,7 @@ use std::collections::VecDeque; use crate::packet::Epoch; use crate::ranges::RangeSet; use crate::Config; +use crate::CongestionControlAlgorithm; use crate::Result; use crate::frame; @@ -47,6 +46,8 @@ use qlog::events::EventData; use smallvec::SmallVec; +use self::congestion::pacer; +use self::congestion::Congestion; use self::rtt::RttStats; // Loss Recovery @@ -58,16 +59,12 @@ const INITIAL_TIME_THRESHOLD: f64 = 9.0 / 8.0; const GRANULARITY: Duration = Duration::from_millis(1); -const PERSISTENT_CONGESTION_THRESHOLD: u32 = 3; - const MAX_PTO_PROBES_COUNT: usize = 2; const MINIMUM_WINDOW_PACKETS: usize = 2; const LOSS_REDUCTION_FACTOR: f64 = 0.5; -const PACING_MULTIPLIER: f64 = 1.25; - // How many non ACK eliciting packets we send before including a PING to solicit // an ACK. pub(super) const MAX_OUTSTANDING_NON_ACK_ELICITING: usize = 24; @@ -323,131 +320,6 @@ impl LossDetectionTimer { self.time = None; } } - -pub struct Congestion { - // Congestion control. - cc_ops: &'static CongestionControlOps, - - cubic_state: cubic::State, - - // HyStart++. - hystart: hystart::Hystart, - - // Pacing. - pacer: pacer::Pacer, - - // RFC6937 PRR. - prr: prr::PRR, - - // The maximum size of a data aggregate scheduled and - // transmitted together. - send_quantum: usize, - - // BBR state. - bbr_state: bbr::State, - - // BBRv2 state. - bbr2_state: bbr2::State, - - congestion_window: usize, - - ssthresh: usize, - - bytes_acked_sl: usize, - - bytes_acked_ca: usize, - - congestion_recovery_start_time: Option, - - app_limited: bool, - - delivery_rate: delivery_rate::Rate, - - /// Initial congestion window size in terms of packet count. - initial_congestion_window_packets: usize, - - max_datagram_size: usize, - - lost_count: usize, -} - -impl Congestion { - fn from_config(recovery_config: &RecoveryConfig) -> Self { - let initial_congestion_window = recovery_config.max_send_udp_payload_size * - recovery_config.initial_congestion_window_packets; - - let mut cc = Congestion { - congestion_window: initial_congestion_window, - - ssthresh: usize::MAX, - - bytes_acked_sl: 0, - - bytes_acked_ca: 0, - - congestion_recovery_start_time: None, - - cc_ops: recovery_config.cc_ops, - - cubic_state: cubic::State::default(), - - app_limited: false, - - initial_congestion_window_packets: recovery_config - .initial_congestion_window_packets, - - max_datagram_size: recovery_config.max_send_udp_payload_size, - - lost_count: 0, - - send_quantum: initial_congestion_window, - - delivery_rate: delivery_rate::Rate::default(), - - hystart: hystart::Hystart::new(recovery_config.hystart), - - pacer: pacer::Pacer::new( - recovery_config.pacing, - initial_congestion_window, - 0, - recovery_config.max_send_udp_payload_size, - recovery_config.max_pacing_rate, - ), - - prr: prr::PRR::default(), - - bbr_state: bbr::State::new(), - - bbr2_state: bbr2::State::new(), - }; - - (cc.cc_ops.on_init)(&mut cc); - - cc - } - - fn in_congestion_recovery(&self, sent_time: Instant) -> bool { - match self.congestion_recovery_start_time { - Some(congestion_recovery_start_time) => - sent_time <= congestion_recovery_start_time, - - None => false, - } - } - - fn delivery_rate(&self) -> u64 { - self.delivery_rate.sample_delivery_rate() - } - - fn send_quantum(&self) -> usize { - self.send_quantum - } - - fn set_pacing_rate(&mut self, rate: u64, now: Instant) { - self.pacer.update(self.send_quantum, rate, now); - } -} - pub struct Recovery { epochs: [RecoveryEpoch; packet::Epoch::count()], @@ -483,7 +355,7 @@ pub struct Recovery { pub struct RecoveryConfig { max_send_udp_payload_size: usize, pub max_ack_delay: Duration, - cc_ops: &'static CongestionControlOps, + cc_algorithm: CongestionControlAlgorithm, hystart: bool, pacing: bool, max_pacing_rate: Option, @@ -495,7 +367,7 @@ impl RecoveryConfig { Self { max_send_udp_payload_size: config.max_send_udp_payload_size, max_ack_delay: Duration::ZERO, - cc_ops: config.cc_algorithm.into(), + cc_algorithm: config.cc_algorithm, hystart: config.hystart, pacing: config.pacing, max_pacing_rate: config.max_pacing_rate, @@ -589,7 +461,6 @@ impl Recovery { let ack_eliciting = pkt.ack_eliciting; let in_flight = pkt.in_flight; let sent_bytes = pkt.size; - let pkt_num = pkt.pkt_num; if ack_eliciting { self.outstanding_non_ack_eliciting = 0; @@ -597,95 +468,36 @@ impl Recovery { self.outstanding_non_ack_eliciting += 1; } - if in_flight { - if ack_eliciting { - self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now); - } + if in_flight && ack_eliciting { + self.epochs[epoch].time_of_last_ack_eliciting_packet = Some(now); + } - self.on_packet_sent_cc(pkt_num, sent_bytes, now); + self.congestion.on_packet_sent( + self.bytes_in_flight, + sent_bytes, + now, + &mut pkt, + &self.rtt_stats, + self.bytes_lost, + in_flight, + ); + if in_flight { self.epochs[epoch].in_flight_count += 1; + self.bytes_in_flight += sent_bytes; self.set_loss_detection_timer(handshake_status, now); } self.bytes_sent += sent_bytes; - // Pacing: Set the pacing rate if CC doesn't do its own. - if !(self.congestion.cc_ops.has_custom_pacing)() && - self.rtt_stats.first_rtt_sample.is_some() - { - let rate = PACING_MULTIPLIER * self.cwnd() as f64 / - self.rtt_stats.smoothed_rtt.as_secs_f64(); - self.set_pacing_rate(rate as u64, now); - } - - self.schedule_next_packet(now, sent_bytes); - - pkt.time_sent = self.get_packet_send_time(); - - // bytes_in_flight is already updated. Use previous value. - self.congestion.delivery_rate.on_packet_sent( - &mut pkt, - self.bytes_in_flight - sent_bytes, - self.bytes_lost, - ); - self.epochs[epoch].sent_packets.push_back(pkt); trace!("{} {:?}", trace_id, self); } - fn on_packet_sent_cc( - &mut self, pkt_num: u64, sent_bytes: usize, now: Instant, - ) { - self.update_app_limited( - (self.bytes_in_flight + sent_bytes) < self.cwnd(), - ); - - (self.congestion.cc_ops.on_packet_sent)( - &mut self.congestion, - sent_bytes, - self.bytes_in_flight, - now, - ); - - self.congestion.prr.on_packet_sent(sent_bytes); - - // HyStart++: Start of the round in a slow start. - if self.congestion.hystart.enabled() && - self.cwnd() < self.congestion.ssthresh - { - self.congestion.hystart.start_round(pkt_num); - } - - self.bytes_in_flight += sent_bytes; - } - - pub fn set_pacing_rate(&mut self, rate: u64, now: Instant) { - self.congestion.set_pacing_rate(rate, now) - } - pub fn get_packet_send_time(&self) -> Instant { - self.congestion.pacer.next_time() - } - - fn schedule_next_packet(&mut self, now: Instant, packet_size: usize) { - // Don't pace in any of these cases: - // * Packet contains no data. - // * The congestion window is within initcwnd. - - let in_initcwnd = self.bytes_sent < - self.max_datagram_size * - self.congestion.initial_congestion_window_packets; - - let sent_bytes = if !self.congestion.pacer.enabled() || in_initcwnd { - 0 - } else { - packet_size - }; - - self.congestion.pacer.send(sent_bytes, now); + self.congestion.get_packet_send_time() } #[allow(clippy::too_many_arguments)] @@ -750,7 +562,12 @@ impl Recovery { // packets list. let loss = self.detect_lost_packets(epoch, now, trace_id); - self.on_packets_acked(newly_acked, now); + self.congestion.on_packets_acked( + self.bytes_in_flight, + newly_acked, + &self.rtt_stats, + now, + ); self.bytes_in_flight -= acked_bytes; @@ -871,7 +688,7 @@ impl Recovery { } pub fn cwnd(&self) -> usize { - self.congestion.congestion_window + self.congestion.congestion_window() } pub fn cwnd_available(&self) -> usize { @@ -881,9 +698,7 @@ impl Recovery { } // Open more space (snd_cnt) for PRR when allowed. - self.congestion - .congestion_window - .saturating_sub(self.bytes_in_flight) + + self.cwnd().saturating_sub(self.bytes_in_flight) + self.congestion.prr.snd_cnt } @@ -1047,7 +862,19 @@ impl Recovery { ); if let Some(pkt) = loss.largest_lost_pkt { - self.on_packets_lost(loss.lost_bytes, &pkt, now); + if !self.congestion.in_congestion_recovery(pkt.time_sent) { + (self.congestion.cc_ops.checkpoint)(&mut self.congestion); + } + + (self.congestion.cc_ops.congestion_event)( + &mut self.congestion, + self.bytes_in_flight, + loss.lost_bytes, + &pkt, + now, + ); + + self.bytes_in_flight -= loss.lost_bytes; }; self.bytes_in_flight -= loss.pmtud_lost_bytes; @@ -1060,71 +887,6 @@ impl Recovery { (loss.lost_packets, loss.lost_bytes) } - fn on_packets_acked(&mut self, acked: &mut Vec, now: Instant) { - // Update delivery rate sample per acked packet. - for pkt in acked.iter() { - self.congestion.delivery_rate.update_rate_sample(pkt, now); - } - - // Fill in a rate sample. - self.congestion - .delivery_rate - .generate_rate_sample(*self.rtt_stats.min_rtt); - - // Call congestion control hooks. - (self.congestion.cc_ops.on_packets_acked)( - &mut self.congestion, - self.bytes_in_flight, - acked, - now, - &self.rtt_stats, - ); - } - - fn in_persistent_congestion(&mut self, _largest_lost_pkt_num: u64) -> bool { - let _congestion_period = self.pto() * PERSISTENT_CONGESTION_THRESHOLD; - - // TODO: properly detect persistent congestion - false - } - - fn on_packets_lost( - &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, now: Instant, - ) { - self.congestion_event(lost_bytes, largest_lost_pkt, now); - - if self.in_persistent_congestion(largest_lost_pkt.pkt_num) { - self.collapse_cwnd(); - } - - self.bytes_in_flight -= lost_bytes; - } - - fn congestion_event( - &mut self, lost_bytes: usize, largest_lost_pkt: &Sent, now: Instant, - ) { - let time_sent = largest_lost_pkt.time_sent; - - if !self.congestion.in_congestion_recovery(time_sent) { - (self.congestion.cc_ops.checkpoint)(&mut self.congestion); - } - - (self.congestion.cc_ops.congestion_event)( - &mut self.congestion, - self.bytes_in_flight, - lost_bytes, - largest_lost_pkt, - now, - ); - } - - fn collapse_cwnd(&mut self) { - (self.congestion.cc_ops.collapse_cwnd)( - &mut self.congestion, - self.bytes_in_flight, - ); - } - pub fn update_app_limited(&mut self, v: bool) { self.congestion.app_limited = v; } @@ -1167,94 +929,6 @@ impl Recovery { } } -/// Available congestion control algorithms. -/// -/// This enum provides currently available list of congestion control -/// algorithms. -#[derive(Debug, Copy, Clone, PartialEq, Eq)] -#[repr(C)] -pub enum CongestionControlAlgorithm { - /// Reno congestion control algorithm. `reno` in a string form. - Reno = 0, - /// CUBIC congestion control algorithm (default). `cubic` in a string form. - CUBIC = 1, - /// BBR congestion control algorithm. `bbr` in a string form. - BBR = 2, - /// BBRv2 congestion control algorithm. `bbr2` in a string form. - BBR2 = 3, -} - -impl FromStr for CongestionControlAlgorithm { - type Err = crate::Error; - - /// Converts a string to `CongestionControlAlgorithm`. - /// - /// If `name` is not valid, `Error::CongestionControl` is returned. - fn from_str(name: &str) -> std::result::Result { - match name { - "reno" => Ok(CongestionControlAlgorithm::Reno), - "cubic" => Ok(CongestionControlAlgorithm::CUBIC), - "bbr" => Ok(CongestionControlAlgorithm::BBR), - "bbr2" => Ok(CongestionControlAlgorithm::BBR2), - - _ => Err(crate::Error::CongestionControl), - } - } -} - -pub struct CongestionControlOps { - pub on_init: fn(r: &mut Congestion), - - pub reset: fn(r: &mut Congestion), - - pub on_packet_sent: fn( - r: &mut Congestion, - sent_bytes: usize, - bytes_in_flight: usize, - now: Instant, - ), - - pub on_packets_acked: fn( - r: &mut Congestion, - bytes_in_flight: usize, - packets: &mut Vec, - now: Instant, - rtt_stats: &RttStats, - ), - - pub congestion_event: fn( - r: &mut Congestion, - bytes_in_flight: usize, - lost_bytes: usize, - largest_lost_packet: &Sent, - now: Instant, - ), - - pub collapse_cwnd: fn(r: &mut Congestion, bytes_in_flight: usize), - - pub checkpoint: fn(r: &mut Congestion), - - pub rollback: fn(r: &mut Congestion) -> bool, - - pub has_custom_pacing: fn() -> bool, - - pub debug_fmt: fn( - r: &Congestion, - formatter: &mut std::fmt::Formatter, - ) -> std::fmt::Result, -} - -impl From for &'static CongestionControlOps { - fn from(algo: CongestionControlAlgorithm) -> Self { - match algo { - CongestionControlAlgorithm::Reno => &reno::RENO, - CongestionControlAlgorithm::CUBIC => &cubic::CUBIC, - CongestionControlAlgorithm::BBR => &bbr::BBR, - CongestionControlAlgorithm::BBR2 => &bbr2::BBR2, - } - } -} - impl std::fmt::Debug for Recovery { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self.loss_timer.time { @@ -1517,6 +1191,7 @@ impl QlogMetrics { mod tests { use super::*; use smallvec::smallvec; + use std::str::FromStr; #[test] fn lookup_cc_algo_ok() { @@ -1532,18 +1207,6 @@ mod tests { ); } - #[test] - fn collapse_cwnd() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(CongestionControlAlgorithm::Reno); - - let mut r = Recovery::new(&cfg); - - // cwnd will be reset. - r.collapse_cwnd(); - assert_eq!(r.cwnd(), r.max_datagram_size * MINIMUM_WINDOW_PACKETS); - } - #[test] fn loss_on_pto() { let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); @@ -2328,7 +1991,8 @@ mod tests { // We pace this outgoing packet. as all conditions for pacing // are passed. - let pacing_rate = (r.cwnd() as f64 * PACING_MULTIPLIER / 0.05) as u64; + let pacing_rate = + (r.cwnd() as f64 * congestion::PACING_MULTIPLIER / 0.05) as u64; assert_eq!(r.congestion.pacer.rate(), pacing_rate); assert_eq!( @@ -2489,12 +2153,5 @@ mod tests { } } -mod bbr; -mod bbr2; -mod cubic; -mod delivery_rate; -mod hystart; -mod pacer; -mod prr; -mod reno; +pub mod congestion; mod rtt; diff --git a/quiche/src/recovery/reno.rs b/quiche/src/recovery/reno.rs deleted file mode 100644 index 64beb4f34f..0000000000 --- a/quiche/src/recovery/reno.rs +++ /dev/null @@ -1,434 +0,0 @@ -// Copyright (C) 2019, Cloudflare, Inc. -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are -// met: -// -// * Redistributions of source code must retain the above copyright notice, -// this list of conditions and the following disclaimer. -// -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS -// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, -// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR -// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR -// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, -// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR -// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF -// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING -// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -//! Reno Congestion Control -//! -//! Note that Slow Start can use HyStart++ when enabled. - -use std::cmp; -use std::time::Instant; - -use crate::recovery; - -use crate::recovery::Acked; -use crate::recovery::CongestionControlOps; -use crate::recovery::Sent; - -use super::rtt::RttStats; -use super::Congestion; - -pub static RENO: CongestionControlOps = CongestionControlOps { - on_init, - reset, - on_packet_sent, - on_packets_acked, - congestion_event, - collapse_cwnd, - checkpoint, - rollback, - has_custom_pacing, - debug_fmt, -}; - -pub fn on_init(_r: &mut Congestion) {} - -pub fn reset(_r: &mut Congestion) {} - -pub fn on_packet_sent( - _r: &mut Congestion, _sent_bytes: usize, _bytes_in_flight: usize, - _now: Instant, -) { -} - -fn on_packets_acked( - r: &mut Congestion, _bytes_in_flight: usize, packets: &mut Vec, - now: Instant, rtt_stats: &RttStats, -) { - for pkt in packets.drain(..) { - on_packet_acked(r, &pkt, now, rtt_stats); - } -} - -fn on_packet_acked( - r: &mut Congestion, packet: &Acked, now: Instant, rtt_stats: &RttStats, -) { - if r.in_congestion_recovery(packet.time_sent) { - return; - } - - if r.app_limited { - return; - } - - if r.congestion_window < r.ssthresh { - // In Slow slart, bytes_acked_sl is used for counting - // acknowledged bytes. - r.bytes_acked_sl += packet.size; - - if r.hystart.in_css() { - r.congestion_window += r.hystart.css_cwnd_inc(r.max_datagram_size); - } else { - r.congestion_window += r.max_datagram_size; - } - - if r.hystart.on_packet_acked(packet, rtt_stats.latest_rtt, now) { - // Exit to congestion avoidance if CSS ends. - r.ssthresh = r.congestion_window; - } - } else { - // Congestion avoidance. - r.bytes_acked_ca += packet.size; - - if r.bytes_acked_ca >= r.congestion_window { - r.bytes_acked_ca -= r.congestion_window; - r.congestion_window += r.max_datagram_size; - } - } -} - -fn congestion_event( - r: &mut Congestion, _bytes_in_flight: usize, _lost_bytes: usize, - largest_lost_pkt: &Sent, now: Instant, -) { - // Start a new congestion event if packet was sent after the - // start of the previous congestion recovery period. - let time_sent = largest_lost_pkt.time_sent; - - if !r.in_congestion_recovery(time_sent) { - r.congestion_recovery_start_time = Some(now); - - r.congestion_window = (r.congestion_window as f64 * - recovery::LOSS_REDUCTION_FACTOR) - as usize; - - r.congestion_window = cmp::max( - r.congestion_window, - r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS, - ); - - r.bytes_acked_ca = (r.congestion_window as f64 * - recovery::LOSS_REDUCTION_FACTOR) as usize; - - r.ssthresh = r.congestion_window; - - if r.hystart.in_css() { - r.hystart.congestion_event(); - } - } -} - -pub fn collapse_cwnd(r: &mut Congestion, _bytes_in_flight: usize) { - r.congestion_window = r.max_datagram_size * recovery::MINIMUM_WINDOW_PACKETS; - r.bytes_acked_sl = 0; - r.bytes_acked_ca = 0; - - if r.hystart.enabled() { - r.hystart.reset(); - } -} - -fn checkpoint(_r: &mut Congestion) {} - -fn rollback(_r: &mut Congestion) -> bool { - true -} - -fn has_custom_pacing() -> bool { - false -} - -fn debug_fmt(_r: &Congestion, _f: &mut std::fmt::Formatter) -> std::fmt::Result { - Ok(()) -} - -#[cfg(test)] -mod tests { - use super::*; - - use crate::recovery::Recovery; - use smallvec::smallvec; - use std::time::Duration; - - #[test] - fn reno_init() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); - - let r = Recovery::new(&cfg); - - assert!(r.cwnd() > 0); - assert_eq!(r.bytes_in_flight, 0); - } - - #[test] - fn reno_send() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); - - let mut r = Recovery::new(&cfg); - - let now = Instant::now(); - - r.on_packet_sent_cc(0, 1000, now); - - assert_eq!(r.bytes_in_flight, 1000); - } - - #[test] - fn reno_slow_start() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); - - let mut r = Recovery::new(&cfg); - - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: std::time::Instant::now(), - first_sent_time: std::time::Instant::now(), - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, p.size, now); - } - - let cwnd_prev = r.cwnd(); - - let mut acked = vec![Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - r.on_packets_acked(&mut acked, now); - - // Check if cwnd increased by packet size (slow start). - assert_eq!(r.cwnd(), cwnd_prev + p.size); - } - - #[test] - fn reno_slow_start_multi_acks() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); - - let mut r = Recovery::new(&cfg); - - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: std::time::Instant::now(), - first_sent_time: std::time::Instant::now(), - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - // Send initcwnd full MSS packets to become no longer app limited - for pn in 0..r.congestion.initial_congestion_window_packets { - r.on_packet_sent_cc(pn as _, p.size, now); - } - - let cwnd_prev = r.cwnd(); - - let mut acked = vec![ - Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }, - Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }, - Acked { - pkt_num: p.pkt_num, - time_sent: p.time_sent, - size: p.size, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }, - ]; - - r.on_packets_acked(&mut acked, now); - - // Acked 3 packets. - assert_eq!(r.cwnd(), cwnd_prev + p.size * 3); - } - - #[test] - fn reno_congestion_event() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); - - let mut r = Recovery::new(&cfg); - - let prev_cwnd = r.cwnd(); - - let now = Instant::now(); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: std::time::Instant::now(), - first_sent_time: std::time::Instant::now(), - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - r.congestion_event(r.max_datagram_size, &p, now); - - // In Reno, after congestion event, cwnd will be cut in half. - assert_eq!(prev_cwnd / 2, r.cwnd()); - } - - #[test] - fn reno_congestion_avoidance() { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(recovery::CongestionControlAlgorithm::Reno); - - let mut r = Recovery::new(&cfg); - let now = Instant::now(); - let prev_cwnd = r.cwnd(); - - // Fill up bytes_in_flight to avoid app_limited=true - r.on_packet_sent_cc(0, 20000, now); - - let p = recovery::Sent { - pkt_num: 0, - frames: smallvec![], - time_sent: now, - time_acked: None, - time_lost: None, - size: r.max_datagram_size, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: std::time::Instant::now(), - first_sent_time: std::time::Instant::now(), - is_app_limited: false, - has_data: false, - tx_in_flight: 0, - lost: 0, - pmtud: false, - }; - - // Trigger congestion event to update ssthresh - r.congestion_event(r.max_datagram_size, &p, now); - - // After congestion event, cwnd will be reduced. - let cur_cwnd = - (prev_cwnd as f64 * recovery::LOSS_REDUCTION_FACTOR) as usize; - assert_eq!(r.cwnd(), cur_cwnd); - - let rtt = Duration::from_millis(100); - - let mut acked = vec![Acked { - pkt_num: 0, - // To exit from recovery - time_sent: now + rtt, - // More than cur_cwnd to increase cwnd - size: 8000, - delivered: 0, - delivered_time: now, - first_sent_time: now, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - rtt: Duration::ZERO, - }]; - - // Ack more than cwnd bytes with rtt=100ms - r.rtt_stats - .update_rtt(rtt, Duration::from_millis(0), now, true); - r.on_packets_acked(&mut acked, now + rtt * 2); - - // After acking more than cwnd, expect cwnd increased by MSS - assert_eq!(r.cwnd(), cur_cwnd + r.max_datagram_size); - } -} From b5870768cb127bb5f5a0b3e897474187e3618094 Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Wed, 8 May 2024 12:00:08 +0100 Subject: [PATCH 2/5] fixup! congestion.rs -> congestion/mod.rs --- quiche/src/recovery/{congestion.rs => congestion/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename quiche/src/recovery/{congestion.rs => congestion/mod.rs} (100%) diff --git a/quiche/src/recovery/congestion.rs b/quiche/src/recovery/congestion/mod.rs similarity index 100% rename from quiche/src/recovery/congestion.rs rename to quiche/src/recovery/congestion/mod.rs From 03328e8084c832fb875b748c298dfeb23d701653 Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Wed, 8 May 2024 12:01:33 +0100 Subject: [PATCH 3/5] fixup! add copyright header, move mod to bottom --- quiche/src/recovery/congestion/mod.rs | 42 ++++++++++++++++++++++----- 1 file changed, 34 insertions(+), 8 deletions(-) diff --git a/quiche/src/recovery/congestion/mod.rs b/quiche/src/recovery/congestion/mod.rs index 54f9765b8a..50d6a2023d 100644 --- a/quiche/src/recovery/congestion/mod.rs +++ b/quiche/src/recovery/congestion/mod.rs @@ -1,11 +1,28 @@ -mod bbr; -mod bbr2; -mod cubic; -mod delivery_rate; -mod hystart; -pub(crate) mod pacer; -mod prr; -mod reno; +// Copyright (C) 2024, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::str::FromStr; use std::time::Instant; @@ -312,3 +329,12 @@ impl From for &'static CongestionControlOps { } } } + +mod bbr; +mod bbr2; +mod cubic; +mod delivery_rate; +mod hystart; +pub(crate) mod pacer; +mod prr; +mod reno; From 03030156b483e4ad25adf68c78501323bc24e942 Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Wed, 8 May 2024 12:34:54 +0100 Subject: [PATCH 4/5] fixup! move TestSender to its own file --- quiche/src/recovery/congestion/bbr/mod.rs | 4 +- quiche/src/recovery/congestion/cubic.rs | 6 +- quiche/src/recovery/congestion/mod.rs | 3 + quiche/src/recovery/congestion/reno.rs | 184 +--------------- quiche/src/recovery/congestion/test_sender.rs | 200 ++++++++++++++++++ 5 files changed, 211 insertions(+), 186 deletions(-) create mode 100644 quiche/src/recovery/congestion/test_sender.rs diff --git a/quiche/src/recovery/congestion/bbr/mod.rs b/quiche/src/recovery/congestion/bbr/mod.rs index da3f4f94a6..17d17713d2 100644 --- a/quiche/src/recovery/congestion/bbr/mod.rs +++ b/quiche/src/recovery/congestion/bbr/mod.rs @@ -349,12 +349,12 @@ fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result { #[cfg(test)] mod tests { - use self::congestion::reno::test_sender::TestSender; - use super::*; use crate::recovery; + use self::congestion::test_sender::TestSender; + use smallvec::smallvec; fn test_sender() -> TestSender { diff --git a/quiche/src/recovery/congestion/cubic.rs b/quiche/src/recovery/congestion/cubic.rs index 8005e30367..38758e4576 100644 --- a/quiche/src/recovery/congestion/cubic.rs +++ b/quiche/src/recovery/congestion/cubic.rs @@ -417,11 +417,13 @@ fn debug_fmt(r: &Congestion, f: &mut std::fmt::Formatter) -> std::fmt::Result { #[cfg(test)] mod tests { - use self::recovery::congestion::reno::test_sender::TestSender; use super::*; + + use crate::CongestionControlAlgorithm; + use crate::recovery::congestion::hystart; + use crate::recovery::congestion::test_sender::TestSender; use crate::recovery::Recovery; - use crate::CongestionControlAlgorithm; fn test_sender() -> TestSender { TestSender::new(recovery::CongestionControlAlgorithm::CUBIC, false) diff --git a/quiche/src/recovery/congestion/mod.rs b/quiche/src/recovery/congestion/mod.rs index 50d6a2023d..492f12b1fc 100644 --- a/quiche/src/recovery/congestion/mod.rs +++ b/quiche/src/recovery/congestion/mod.rs @@ -338,3 +338,6 @@ mod hystart; pub(crate) mod pacer; mod prr; mod reno; + +#[cfg(test)] +mod test_sender; diff --git a/quiche/src/recovery/congestion/reno.rs b/quiche/src/recovery/congestion/reno.rs index 62f5ab9bd4..1780b2d39c 100644 --- a/quiche/src/recovery/congestion/reno.rs +++ b/quiche/src/recovery/congestion/reno.rs @@ -150,193 +150,13 @@ fn debug_fmt(_r: &Congestion, _f: &mut std::fmt::Formatter) -> std::fmt::Result Ok(()) } -#[cfg(test)] -pub(crate) mod test_sender { - use std::collections::VecDeque; - use std::ops::Deref; - use std::ops::DerefMut; - use std::time::Duration; - use std::time::Instant; - - use crate::recovery::congestion::Congestion; - use crate::recovery::rtt::RttStats; - use crate::recovery::Acked; - use crate::recovery::RecoveryConfig; - use crate::recovery::Sent; - use crate::CongestionControlAlgorithm; - - pub(crate) struct TestSender { - cc: Congestion, - pub(crate) next_pkt: u64, - pub(crate) next_ack: u64, - pub(crate) bytes_in_flight: usize, - pub(crate) time: Instant, - rtt_stats: RttStats, - sent_packets: VecDeque, - } - - impl TestSender { - pub(crate) fn new( - algo: CongestionControlAlgorithm, hystart: bool, - ) -> Self { - let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); - cfg.set_cc_algorithm(algo); - cfg.enable_hystart(hystart); - - TestSender { - next_pkt: 0, - next_ack: 0, - bytes_in_flight: 0, - time: Instant::now(), - rtt_stats: RttStats::new(Duration::from_micros(0)), - cc: Congestion::from_config(&RecoveryConfig::from_config(&cfg)), - sent_packets: VecDeque::new(), - } - } - - pub(crate) fn send_packet(&mut self, bytes: usize) { - let mut sent = Sent { - pkt_num: self.next_pkt, - frames: Default::default(), - time_sent: self.time, - time_acked: None, - time_lost: None, - size: bytes, - ack_eliciting: true, - in_flight: true, - delivered: 0, - delivered_time: self.time, - first_sent_time: self.time, - is_app_limited: false, - tx_in_flight: 0, - lost: 0, - has_data: false, - pmtud: false, - }; - - self.cc.on_packet_sent( - self.bytes_in_flight, - bytes, - self.time, - &mut sent, - &self.rtt_stats, - 0, - true, - ); - - self.sent_packets.push_back(sent); - - self.bytes_in_flight += bytes; - self.next_pkt += 1; - } - - pub(crate) fn inject_ack(&mut self, acked: Acked, now: Instant) { - let _ = self.sent_packets.pop_front().unwrap(); - - self.cc.on_packets_acked( - self.bytes_in_flight, - &mut vec![acked], - &self.rtt_stats, - now, - ); - } - - pub(crate) fn ack_n_packets(&mut self, n: usize, bytes: usize) { - let mut acked = Vec::new(); - - for _ in 0..n { - let unacked = self.sent_packets.pop_front().unwrap(); - - acked.push(Acked { - pkt_num: unacked.pkt_num, - time_sent: unacked.time_sent, - size: unacked.size, - - rtt: self.time.saturating_duration_since(unacked.time_sent), - delivered: unacked.delivered, - delivered_time: unacked.delivered_time, - first_sent_time: unacked.first_sent_time, - is_app_limited: unacked.is_app_limited, - tx_in_flight: unacked.tx_in_flight, - lost: unacked.lost, - }); - - self.next_ack += 1; - } - - self.cc.on_packets_acked( - self.bytes_in_flight, - &mut acked, - &self.rtt_stats, - self.time, - ); - - self.bytes_in_flight -= n * bytes; - } - - pub(crate) fn lose_n_packets( - &mut self, n: usize, bytes: usize, time_sent: Option, - ) { - let mut unacked = None; - - for _ in 0..n { - self.next_ack += 1; - unacked = self.sent_packets.pop_front(); - } - - let mut unacked = unacked.unwrap(); - if let Some(time) = time_sent { - unacked.time_sent = time; - } - - if !self.cc.in_congestion_recovery(unacked.time_sent) { - (self.cc.cc_ops.checkpoint)(&mut self.cc); - } - - (self.cc_ops.congestion_event)( - &mut self.cc, - self.bytes_in_flight, - n * bytes, - &unacked, - self.time, - ); - - self.cc.lost_count += n; - self.bytes_in_flight -= n * bytes; - } - - pub(crate) fn update_rtt(&mut self, rtt: Duration) { - self.rtt_stats - .update_rtt(rtt, Duration::ZERO, self.time, true) - } - - pub(crate) fn advance_time(&mut self, period: Duration) { - self.time += period; - } - } - - impl Deref for TestSender { - type Target = Congestion; - - fn deref(&self) -> &Self::Target { - &self.cc - } - } - - impl DerefMut for TestSender { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.cc - } - } -} - #[cfg(test)] mod tests { - use self::test_sender::TestSender; - use super::*; + use crate::recovery::congestion::test_sender::TestSender; use crate::recovery::Recovery; + use std::time::Duration; fn test_sender() -> TestSender { diff --git a/quiche/src/recovery/congestion/test_sender.rs b/quiche/src/recovery/congestion/test_sender.rs new file mode 100644 index 0000000000..0fe28eb8e7 --- /dev/null +++ b/quiche/src/recovery/congestion/test_sender.rs @@ -0,0 +1,200 @@ +// Copyright (C) 2024, Cloudflare, Inc. +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright notice, +// this list of conditions and the following disclaimer. +// +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS +// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, +// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF +// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING +// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::collections::VecDeque; +use std::ops::Deref; +use std::ops::DerefMut; +use std::time::Duration; +use std::time::Instant; + +use crate::recovery::congestion::Congestion; +use crate::recovery::rtt::RttStats; +use crate::recovery::Acked; +use crate::recovery::RecoveryConfig; +use crate::recovery::Sent; +use crate::CongestionControlAlgorithm; + +pub(crate) struct TestSender { + cc: Congestion, + pub(crate) next_pkt: u64, + pub(crate) next_ack: u64, + pub(crate) bytes_in_flight: usize, + pub(crate) time: Instant, + rtt_stats: RttStats, + sent_packets: VecDeque, +} + +impl TestSender { + pub(crate) fn new(algo: CongestionControlAlgorithm, hystart: bool) -> Self { + let mut cfg = crate::Config::new(crate::PROTOCOL_VERSION).unwrap(); + cfg.set_cc_algorithm(algo); + cfg.enable_hystart(hystart); + + TestSender { + next_pkt: 0, + next_ack: 0, + bytes_in_flight: 0, + time: Instant::now(), + rtt_stats: RttStats::new(Duration::from_micros(0)), + cc: Congestion::from_config(&RecoveryConfig::from_config(&cfg)), + sent_packets: VecDeque::new(), + } + } + + pub(crate) fn send_packet(&mut self, bytes: usize) { + let mut sent = Sent { + pkt_num: self.next_pkt, + frames: Default::default(), + time_sent: self.time, + time_acked: None, + time_lost: None, + size: bytes, + ack_eliciting: true, + in_flight: true, + delivered: 0, + delivered_time: self.time, + first_sent_time: self.time, + is_app_limited: false, + tx_in_flight: 0, + lost: 0, + has_data: false, + pmtud: false, + }; + + self.cc.on_packet_sent( + self.bytes_in_flight, + bytes, + self.time, + &mut sent, + &self.rtt_stats, + 0, + true, + ); + + self.sent_packets.push_back(sent); + + self.bytes_in_flight += bytes; + self.next_pkt += 1; + } + + pub(crate) fn inject_ack(&mut self, acked: Acked, now: Instant) { + let _ = self.sent_packets.pop_front().unwrap(); + + self.cc.on_packets_acked( + self.bytes_in_flight, + &mut vec![acked], + &self.rtt_stats, + now, + ); + } + + pub(crate) fn ack_n_packets(&mut self, n: usize, bytes: usize) { + let mut acked = Vec::new(); + + for _ in 0..n { + let unacked = self.sent_packets.pop_front().unwrap(); + + acked.push(Acked { + pkt_num: unacked.pkt_num, + time_sent: unacked.time_sent, + size: unacked.size, + + rtt: self.time.saturating_duration_since(unacked.time_sent), + delivered: unacked.delivered, + delivered_time: unacked.delivered_time, + first_sent_time: unacked.first_sent_time, + is_app_limited: unacked.is_app_limited, + tx_in_flight: unacked.tx_in_flight, + lost: unacked.lost, + }); + + self.next_ack += 1; + } + + self.cc.on_packets_acked( + self.bytes_in_flight, + &mut acked, + &self.rtt_stats, + self.time, + ); + + self.bytes_in_flight -= n * bytes; + } + + pub(crate) fn lose_n_packets( + &mut self, n: usize, bytes: usize, time_sent: Option, + ) { + let mut unacked = None; + + for _ in 0..n { + self.next_ack += 1; + unacked = self.sent_packets.pop_front(); + } + + let mut unacked = unacked.unwrap(); + if let Some(time) = time_sent { + unacked.time_sent = time; + } + + if !self.cc.in_congestion_recovery(unacked.time_sent) { + (self.cc.cc_ops.checkpoint)(&mut self.cc); + } + + (self.cc_ops.congestion_event)( + &mut self.cc, + self.bytes_in_flight, + n * bytes, + &unacked, + self.time, + ); + + self.cc.lost_count += n; + self.bytes_in_flight -= n * bytes; + } + + pub(crate) fn update_rtt(&mut self, rtt: Duration) { + self.rtt_stats + .update_rtt(rtt, Duration::ZERO, self.time, true) + } + + pub(crate) fn advance_time(&mut self, period: Duration) { + self.time += period; + } +} + +impl Deref for TestSender { + type Target = Congestion; + + fn deref(&self) -> &Self::Target { + &self.cc + } +} + +impl DerefMut for TestSender { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.cc + } +} From 01529138d12236dd0e81994c4132d468f9c0f6ca Mon Sep 17 00:00:00 2001 From: Alessandro Ghedini Date: Wed, 8 May 2024 13:08:37 +0100 Subject: [PATCH 5/5] fixup! remove tx_in_flight and lost from Acked AFAICT these were never actually used (as in, read) anywhere. --- quiche/src/recovery/congestion/cubic.rs | 4 ---- quiche/src/recovery/congestion/delivery_rate.rs | 2 -- quiche/src/recovery/congestion/test_sender.rs | 2 -- quiche/src/recovery/mod.rs | 6 ------ 4 files changed, 14 deletions(-) diff --git a/quiche/src/recovery/congestion/cubic.rs b/quiche/src/recovery/congestion/cubic.rs index 38758e4576..13fafc2189 100644 --- a/quiche/src/recovery/congestion/cubic.rs +++ b/quiche/src/recovery/congestion/cubic.rs @@ -721,8 +721,6 @@ mod tests { delivered_time: sender.time, first_sent_time: sender.time, is_app_limited: false, - tx_in_flight: 0, - lost: 0, rtt: Duration::ZERO, }; @@ -753,8 +751,6 @@ mod tests { delivered_time: sender.time, first_sent_time: sender.time, is_app_limited: false, - tx_in_flight: 0, - lost: 0, rtt: Duration::ZERO, }; diff --git a/quiche/src/recovery/congestion/delivery_rate.rs b/quiche/src/recovery/congestion/delivery_rate.rs index ecb2d9eae2..3bcac6d3b9 100644 --- a/quiche/src/recovery/congestion/delivery_rate.rs +++ b/quiche/src/recovery/congestion/delivery_rate.rs @@ -270,8 +270,6 @@ mod tests { delivered_time: now, first_sent_time: now.checked_sub(rtt).unwrap(), is_app_limited: false, - tx_in_flight: 0, - lost: 0, }; r.congestion.delivery_rate.update_rate_sample(&acked, now); diff --git a/quiche/src/recovery/congestion/test_sender.rs b/quiche/src/recovery/congestion/test_sender.rs index 0fe28eb8e7..45b9c1ab7a 100644 --- a/quiche/src/recovery/congestion/test_sender.rs +++ b/quiche/src/recovery/congestion/test_sender.rs @@ -127,8 +127,6 @@ impl TestSender { delivered_time: unacked.delivered_time, first_sent_time: unacked.first_sent_time, is_app_limited: unacked.is_app_limited, - tx_in_flight: unacked.tx_in_flight, - lost: unacked.lost, }); self.next_ack += 1; diff --git a/quiche/src/recovery/mod.rs b/quiche/src/recovery/mod.rs index 4607a60d3c..96aaf2c15c 100644 --- a/quiche/src/recovery/mod.rs +++ b/quiche/src/recovery/mod.rs @@ -173,8 +173,6 @@ impl RecoveryEpoch { delivered_time: unacked.delivered_time, first_sent_time: unacked.first_sent_time, is_app_limited: unacked.is_app_limited, - tx_in_flight: unacked.tx_in_flight, - lost: unacked.lost, }); trace!("{} packet newly acked {}", trace_id, unacked.pkt_num); @@ -1045,10 +1043,6 @@ pub struct Acked { pub first_sent_time: Instant, pub is_app_limited: bool, - - pub tx_in_flight: usize, - - pub lost: u64, } #[derive(Clone, Copy, Debug)]