diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index a5e45581..0a954b6a 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -32,7 +32,26 @@ jobs: - name: Update rust run: rustup update - name: Build TQUIC library and tools - run: cargo build --all -F ffi --verbose + run: cargo build --all -F ffi --verbose && cargo test + + build_freebsd: + name: Build for FreeBSD + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + with: + submodules: 'recursive' + - name: Build in FreeBSD VM + uses: vmactions/freebsd-vm@v1 + with: + usesh: true + prepare: | + freebsd-version + pkg install -y curl gmake cmake + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- -y + run: | + . "$HOME/.cargo/env" + cargo build --all -F ffi --verbose && cargo test build_ios: name: Build for iOS diff --git a/.github/workflows/tquic-goodput.yml b/.github/workflows/tquic-goodput.yml index 17ba9d3a..c395fd25 100644 --- a/.github/workflows/tquic-goodput.yml +++ b/.github/workflows/tquic-goodput.yml @@ -41,9 +41,9 @@ jobs: python3 run.py -r $QUIC_IMAGES -s ${{ matrix.impl }} -c ${{ matrix.impl }} -t ${{ matrix.case }} -a ${{ matrix.cc }} -d -n "drop-rate --delay=15ms --bandwidth=10Mbps --queue=25 --rate_to_server=5 --rate_to_client=5" -j ${{ matrix.case }}-5-${{ matrix.cc }}-${{ matrix.impl }}.json - name: Store measurement results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: - name: ${{ matrix.case }}-${{ matrix.impl }} + name: ${{ matrix.impl }}-${{ matrix.case }}-${{ matrix.cc }} path: quic-interop-runner/goodput*.json result: @@ -51,11 +51,11 @@ jobs: needs: measure steps: - name: Download all workflow run artifacts - uses: actions/download-artifact@v3 + uses: actions/download-artifact@v4 - name: Display structure of downloaded files run: ls -R - name: Store all measurement results - uses: actions/upload-artifact@v3 + uses: actions/upload-artifact@v4 with: name: goodput-all-result path: goodput* diff --git a/.github/workflows/tquic-interop-all.yml b/.github/workflows/tquic-interop-all.yml index a1b38808..df606dea 100644 --- a/.github/workflows/tquic-interop-all.yml +++ b/.github/workflows/tquic-interop-all.yml @@ -101,4 +101,14 @@ jobs: - name: Run the interop tests run: | cd quic-interop-runner - python3 run.py -s ${{ matrix.server }} -c ${{ matrix.client }} -t handshake,retry,resumption,http3,ipv6,transfer,multiplexing,longrtt,blackhole,transferloss,transfercorruption,goodput,crosstraffic -d -r tquic=tquic_interop:v1 + python3 run.py -s ${{ matrix.server }} -c ${{ matrix.client }} -t handshake,handshakeloss,handshakecorruption,retry,resumption,zerortt,http3,ipv6,transfer,multiplexing,longrtt,blackhole,transferloss,transfercorruption,goodput,crosstraffic -d -r tquic=tquic_interop:v1 -l ${{ matrix.client }}-${{ matrix.server }}-logs + + - name: Store run logs + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.client }}-${{ matrix.server }} + path: | + quic-interop-runner/*logs/* + !quic-interop-runner/*logs/**/crosstraffic/ + !quic-interop-runner/*logs/**/goodput/ diff --git a/.github/workflows/tquic-interop.yml b/.github/workflows/tquic-interop.yml index 99fb5790..f37e1dc3 100644 --- a/.github/workflows/tquic-interop.yml +++ b/.github/workflows/tquic-interop.yml @@ -5,6 +5,7 @@ on: branches: [ "develop" ] pull_request: branches: [ "develop" ] + workflow_dispatch: env: CARGO_TERM_COLOR: always @@ -15,6 +16,7 @@ jobs: runs-on: ubuntu-latest strategy: + fail-fast: false matrix: server: [tquic] client: [tquic] @@ -47,4 +49,14 @@ jobs: - name: Run the interop tests run: | cd quic-interop-runner - python3 run.py -s ${{ matrix.server }} -c ${{ matrix.client }} -t handshake,retry,resumption,http3,ipv6,transfer,multiplexing,longrtt,blackhole,transferloss,transfercorruption,goodput,crosstraffic -d -r tquic=tquic_interop:v1 + python3 run.py -s ${{ matrix.server }} -c ${{ matrix.client }} -t handshake,handshakeloss,handshakecorruption,retry,resumption,zerortt,http3,ipv6,transfer,multiplexing,longrtt,blackhole,transferloss,transfercorruption,goodput,crosstraffic -d -r tquic=tquic_interop:v1 -l ${{ matrix.client }}-${{ matrix.server }}-logs + + - name: Store run logs + if: ${{ failure() }} + uses: actions/upload-artifact@v4 + with: + name: ${{ matrix.client }}-${{ matrix.server }} + path: | + quic-interop-runner/*logs/* + !quic-interop-runner/*logs/**/crosstraffic/ + !quic-interop-runner/*logs/**/goodput/ diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fa45e43..5a2a797e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,7 +11,32 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## [v0.6.0] - 2024-01-07 +## [v0.7.0] - 2024-02-02 + +### Added +- Add support for building on FreeBSD +- Add more path level metrics +- Add more quic and recovery events in qlog +- Add tquic_qvis.sh to convert qlog files to be compatible with qvis +- Update MultipathScheduler interface for some advanced schedulers +- Add tquic_tools_test.sh for additional end-to-end testing +- tquic_client: support early data +- tquic_tools: use millisecond precision for log timestamp +- tquic_tools: add `log-file` option to write logs to specified file +- tquic_tools: add `active-cid-limit` option to allow more paths + +### Changed +- tquic_tools: change the `qlog-log` option to the `qlog-dir` option +- tquic_tools: change the `dump-path` option to the `dump-dir` option +- tquic_tools: update default pto linear factor +- tquic_client: change the `local_addresses` option to allow the os to choose available ports +- tquic_client: use `local_addresses` option to specify the addresses to bind in both singlepath and multipath mode. + +### Fixed +- Fix record separator of qlog in json-seq format + + +## [v0.6.0] - 2024-01-17 ### Added - Support the latest version of qlog (v0.4) @@ -117,6 +142,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Provide example clients and servers. +[v0.7.0]: https://github.com/tencent/tquic/compare/v0.6.0...v0.7.0 [v0.6.0]: https://github.com/tencent/tquic/compare/v0.5.0...v0.6.0 [v0.5.0]: https://github.com/tencent/tquic/compare/v0.4.0...v0.5.0 [v0.4.0]: https://github.com/tencent/tquic/compare/v0.3.0...v0.4.0 diff --git a/Cargo.toml b/Cargo.toml index b2f53695..eb515dc8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic" -version = "0.6.0" +version = "0.7.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" diff --git a/include/tquic.h b/include/tquic.h index fffebc14..724b28c8 100644 --- a/include/tquic.h +++ b/include/tquic.h @@ -191,7 +191,7 @@ typedef struct quic_transport_methods_t { typedef void *quic_transport_context_t; /** - * Data and meta information of a outgoing packet. + * Data and meta information of an outgoing packet. */ typedef struct quic_packet_out_spec_t { const struct iovec *iov; @@ -216,7 +216,7 @@ typedef struct quic_packet_send_methods_t { typedef void *quic_packet_send_context_t; /** - * Meta information of a incoming packet. + * Meta information of an incoming packet. */ typedef struct quic_packet_info_t { const struct sockaddr *src; diff --git a/interop/run_endpoint.sh b/interop/run_endpoint.sh index 3f6aa03f..3a72685c 100644 --- a/interop/run_endpoint.sh +++ b/interop/run_endpoint.sh @@ -21,14 +21,14 @@ set -x /setup.sh case "$TESTCASE" in -handshake|http3|resumption|ipv6|goodput|crosstraffic|transfer|transferloss|transfercorruption|multiplexing|longrtt|chacha20|blackhole|retry|handshakeloss|handshakecorruption|multiconnect) +handshake|http3|multiconnect|resumption|retry|transfer|zerortt) ;; -zerortt|chacha20) +chacha20) if [ "$ROLE" == "client" ]; then exit 127 fi ;; -keyupdate|ecn|amplificationlimit|v2) +keyupdate|ecn|v2) exit 127 ;; *) @@ -42,6 +42,7 @@ TQUIC_SERVER="tquic_server" ROOT_DIR="/www" DOWNLOAD_DIR="/downloads" LOG_DIR="/logs" +QLOG_DIR="/logs/qlog" CC_ALGOR="CUBIC" case ${CONGESTION^^} in @@ -58,7 +59,7 @@ COPA) ;; esac -COMMON_ARGS="--keylog-file $SSLKEYLOGFILE --log-level TRACE --idle-timeout 30000 --handshake-timeout 30000 --congestion-control-algor $CC_ALGOR" +COMMON_ARGS="--keylog-file $SSLKEYLOGFILE --qlog-dir $QLOG_DIR --log-level TRACE --log-file $LOG_DIR/$ROLE.log --idle-timeout 30000 --handshake-timeout 30000 --congestion-control-algor $CC_ALGOR" if [ "$ROLE" == "client" ]; then # Wait for the simulator to start up. @@ -66,7 +67,7 @@ if [ "$ROLE" == "client" ]; then REQS=($REQUESTS) - CLIENT_ARGS="$COMMON_ARGS --dump-path ${DOWNLOAD_DIR} --max-concurrent-requests ${#REQS[@]}" + CLIENT_ARGS="$COMMON_ARGS --dump-dir ${DOWNLOAD_DIR} --max-concurrent-requests ${#REQS[@]}" CLIENT_ALPN="--alpn hq-interop" case $TESTCASE in resumption) @@ -91,15 +92,15 @@ if [ "$ROLE" == "client" ]; then CLIENT_ARGS="$CLIENT_ARGS --initial-rtt 100" for REQ in $REQUESTS do - $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS $REQ >> $LOG_DIR/$ROLE.log 2>&1 + $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS $REQ done ;; zerortt) - $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS ${REQS[0]} > $LOG_DIR/$ROLE.log 2>&1 - $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS ${REQS[@]:1} >> $LOG_DIR/$ROLE.log 2>&1 + $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS ${REQS[0]} + $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS ${REQS[@]:1} ;; *) - $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS $REQUESTS > $LOG_DIR/$ROLE.log 2>&1 + $TQUIC_DIR/$TQUIC_CLIENT $CLIENT_ARGS $REQUESTS ;; esac elif [ "$ROLE" == "server" ]; then @@ -117,5 +118,5 @@ elif [ "$ROLE" == "server" ]; then *) ;; esac - $TQUIC_DIR/$TQUIC_SERVER $SERVER_ARGS > $LOG_DIR/$ROLE.log 2>&1 + $TQUIC_DIR/$TQUIC_SERVER $SERVER_ARGS fi diff --git a/src/codec.rs b/src/codec.rs index 12c7c8de..0c0cf836 100644 --- a/src/codec.rs +++ b/src/codec.rs @@ -313,7 +313,7 @@ pub fn decode_varint_len(first: u8) -> usize { } } -/// Return the encoding length of a int using variable-length integer encoding. +/// Return the encoding length of an int using variable-length integer encoding. /// /// See RFC 9000 Section 16 Table 4 Summary of Integer Encodings. pub fn encode_varint_len(n: u64) -> usize { diff --git a/src/congestion_control/cubic.rs b/src/congestion_control/cubic.rs index 7a4cbbf9..1bdbe9e4 100644 --- a/src/congestion_control/cubic.rs +++ b/src/congestion_control/cubic.rs @@ -579,7 +579,7 @@ mod tests { has_data: false, sent_size: pkt_size as usize, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }); } @@ -644,7 +644,7 @@ mod tests { has_data: false, sent_size: pkt_size as usize, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }); } diff --git a/src/congestion_control/delivery_rate.rs b/src/congestion_control/delivery_rate.rs index 277724ff..596e3fe1 100644 --- a/src/congestion_control/delivery_rate.rs +++ b/src/congestion_control/delivery_rate.rs @@ -285,7 +285,7 @@ mod tests { has_data: false, sent_size: 240, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }; rate_estimator.on_packet_sent(&mut pkt_n1, bytes_in_flight, bytes_lost); @@ -308,7 +308,7 @@ mod tests { has_data: false, sent_size: 240, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }; bytes_in_flight += pkt_n1.sent_size as u64; @@ -361,7 +361,6 @@ mod tests { let now = Instant::now(); let mut pkts_part1: Vec = Vec::new(); let mut pkts_part2: Vec = Vec::new(); - // let mut pkts_part3: Vec = Vec::new(); let bytes_lost = 0; let mut bytes_in_flight = 0; let pkt_size: u64 = 240; @@ -380,7 +379,7 @@ mod tests { has_data: false, sent_size: pkt_size as usize, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }); } @@ -397,7 +396,7 @@ mod tests { has_data: false, sent_size: pkt_size as usize, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }); } diff --git a/src/congestion_control/pacing.rs b/src/congestion_control/pacing.rs index 896f716b..f7d77ae3 100644 --- a/src/congestion_control/pacing.rs +++ b/src/congestion_control/pacing.rs @@ -27,7 +27,7 @@ const MIN_BURST_PACKET_NUM: u64 = 10; /// The upper bound of burst packet number. /// -/// Used to restrict capacity. A extremely large capacity is meaningless. +/// Used to restrict capacity. An extremely large capacity is meaningless. const MAX_BURST_PACKET_NUM: u64 = 128; /// Using a value for N that is small, but at least 1 (for example, 1.25) ensures diff --git a/src/connection/cid.rs b/src/connection/cid.rs index e9aea29a..435270cf 100644 --- a/src/connection/cid.rs +++ b/src/connection/cid.rs @@ -67,7 +67,7 @@ impl ConnectionIdDeque { /// Insert the given ConnectionIdItem. /// - /// The caller must not add a invalid duplicated cid + /// The caller must not add an invalid duplicated cid fn insert(&mut self, cid: ConnectionIdItem) -> Result<()> { if self.queue.len() == self.capacity { return Err(Error::ConnectionIdLimitError); @@ -585,7 +585,7 @@ mod tests { assert_eq!(cids.unused_scids(), 0); assert_eq!(cids.next_scid_to_advertise(), None); - // Add a invalid scid without reset token + // Add an invalid scid without reset token let scid1 = ConnectionId::random(); assert_eq!( cids.add_scid(scid1, None, true, None, false), @@ -786,7 +786,7 @@ mod tests { ); // Fake receiving of NEW_CONNECTION_ID that carrys a new issued CID with - // a invalid Retire Prior To field + // an invalid Retire Prior To field assert_eq!( cids.add_dcid(ConnectionId::random(), 1, 1, 2), Err(Error::ProtocolViolation) @@ -813,7 +813,7 @@ mod tests { // Fake receiving of RETIRE_CONNECTION_ID that carrys invalid sequence number assert_eq!(cids.retire_scid(2, &scid1), Err(Error::ProtocolViolation)); - // Fake receiving of RETIRE_CONNECTION_ID that use a unexpected path + // Fake receiving of RETIRE_CONNECTION_ID that use an unexpected path assert_eq!(cids.retire_scid(0, &scid0), Err(Error::ProtocolViolation)); Ok(()) diff --git a/src/connection/connection.rs b/src/connection/connection.rs index 8690cbec..7d0a878e 100644 --- a/src/connection/connection.rs +++ b/src/connection/connection.rs @@ -39,6 +39,8 @@ use self::ConnectionFlags::*; use crate::codec; use crate::codec::Decoder; use crate::codec::Encoder; +use crate::connection::space::BufferFlags; +use crate::connection::space::BufferType; use crate::connection::space::RateSamplePacketState; use crate::error::ConnectionError; use crate::error::Error; @@ -65,6 +67,7 @@ use crate::FourTuple; use crate::FourTupleIter; use crate::MultipathConfig; use crate::PacketInfo; +use crate::PathEvent; use crate::RecoveryConfig; use crate::Result; use crate::Shutdown; @@ -203,7 +206,7 @@ impl Connection { let mut path = path::Path::new(local, remote, true, &conf.recovery, &trace_id); if is_server { - // The server connection is created upon receiving a Initial packet + // The server connection is created upon receiving an Initial packet // with a valid token sent by the client. path.verified_peer_address = addr_token.is_some(); // The server connection assumes the peer has validate the server's @@ -388,7 +391,7 @@ impl Connection { /// Process an incoming UDP datagram from the peer. /// /// On success the number of bytes processed is returned. On error the - /// connection will be closed with a error code. + /// connection will be closed with an error code. #[doc(hidden)] pub fn recv(&mut self, buf: &mut [u8], info: &PacketInfo) -> Result { let len = buf.len(); @@ -583,6 +586,7 @@ impl Connection { // Process each QUIC frame in the QUIC packet let mut ack_eliciting_pkt = false; let mut probing_pkt = true; + let mut qframes = vec![]; while !payload.is_empty() { let (frame, len) = Frame::from_bytes(&mut payload, hdr.pkt_type)?; @@ -592,14 +596,23 @@ impl Connection { if !frame.probing() { probing_pkt = false; } + if self.qlog.is_some() { + qframes.push(frame.to_qlog()); + } self.recv_frame(frame, &hdr, pid, space_id, info.time)?; let _ = payload.split_to(len); } - // Write TransportPacketReceived event to qlog. + // Write events to qlog. if let Some(qlog) = &mut self.qlog { - Self::qlog_quic_packet_received(qlog, &hdr, pkt_num, read, payload_len); + // Write TransportPacketReceived event to qlog. + Self::qlog_quic_packet_received(qlog, &hdr, pkt_num, read, payload_len, qframes); + + // Write RecoveryMetricsUpdate event to qlog. + if let Ok(path) = self.paths.get_mut(pid) { + path.recovery.qlog_recovery_metrics_updated(qlog); + } } // Process acknowledged frames. @@ -634,8 +647,10 @@ impl Connection { // Update statistic metrics self.stats.recv_count += 1; self.stats.recv_bytes += read as u64; - self.paths.get_mut(pid)?.stats.recv_count += 1; - self.paths.get_mut(pid)?.stats.recv_bytes += read as u64; + self.paths + .get_mut(pid)? + .recovery + .stat_recv_event(1, read as u64); // The successful use of Handshake packets indicates that no more // Initial packets need to be exchanged, as these keys can only be @@ -715,10 +730,11 @@ impl Connection { space_id, &mut self.spaces, handshake_status, + self.qlog.as_mut(), now, )?; self.stats.lost_count += lost_pkts; - self.stats.lost_bytes += lost_bytes as u64; + self.stats.lost_bytes += lost_bytes; // An endpoint MUST discard its Handshake keys when the TLS // handshake is confirmed. @@ -833,7 +849,12 @@ impl Connection { } Frame::PathResponse { data } => { - self.paths.on_path_resp_received(path_id, data); + if self.paths.on_path_resp_received(path_id, data) { + // Notify the path event to the multipath scheduler + if let Some(ref mut scheduler) = self.multipath_scheduler { + scheduler.on_path_updated(&mut self.paths, PathEvent::Validated(path_id)); + } + } } frame::Frame::PathAbandon { @@ -1557,6 +1578,7 @@ impl Connection { )?; let sent_pkt = space::SentPacket { + pkt_type, pkt_num, time_sent: now, time_acked: None, @@ -1567,7 +1589,7 @@ impl Connection { has_data: write_status.has_data, frames: write_status.frames, rate_sample_state: Default::default(), - reinjected: write_status.reinjected, + buffer_flags: write_status.buffer_flags, }; debug!( "{} sent packet {:?} {:?} {:?}", @@ -1577,9 +1599,19 @@ impl Connection { self.paths.get(path_id)? ); - // Write TransportPacketSent event to qlog. + // Write events to qlog. if let Some(qlog) = &mut self.qlog { - Self::qlog_quic_packet_sent(qlog, &hdr, pkt_num, written, payload_len); + // Write TransportPacketSent event to qlog. + let mut qframes = Vec::with_capacity(sent_pkt.frames.len()); + for frame in &sent_pkt.frames { + qframes.push(frame.to_qlog()); + } + Self::qlog_quic_packet_sent(qlog, &hdr, pkt_num, written, payload_len, qframes); + + // Write RecoveryMetricsUpdate event to qlog. + if let Ok(path) = self.paths.get_mut(path_id) { + path.recovery.qlog_recovery_metrics_updated(qlog); + } } // Notify the packet sent event to the multipath scheduler @@ -1614,8 +1646,10 @@ impl Connection { // Update connection state and statistic metrics self.stats.sent_count += 1; self.stats.sent_bytes += written as u64; - self.paths.get_mut(path_id)?.stats.sent_count += 1; - self.paths.get_mut(path_id)?.stats.sent_bytes += written as u64; + self.paths + .get_mut(path_id)? + .recovery + .stat_sent_event(1, written as u64); { let space = self.spaces.get_mut(space_id).ok_or(Error::InternalError)?; space.next_pkt_num += 1; @@ -1662,18 +1696,21 @@ impl Connection { path_id: usize, has_initial: bool, ) -> Result<()> { - // Write a ACK frame + // Write an ACK frame self.try_write_ack_frame(out, st, pkt_type, path_id)?; // Write a CONNECTION_CLOSE frame self.try_write_close_frame(out, st, pkt_type, path_id)?; + let path = self.paths.get_mut(path_id)?; + path.recovery.stat_cwnd_limited(); + // Check the congestion window // - Packets containing frames besides ACK or CONNECTION_CLOSE frames // count toward congestion control limits. (RFC 9002 Section 3) // - Probe packets are allowed to temporarily exceed the congestion // window. (RFC 9002 Section 4.7) - if !st.is_probe && !self.paths.get(path_id)?.recovery.can_send() { + if !st.is_probe && !path.recovery.can_send() { return Err(Error::Done); } @@ -1702,8 +1739,8 @@ impl Connection { // Write a CRYPTO frame self.try_write_crypto_frame(out, st, pkt_type, path_id)?; - // Write reinjected frames - self.try_write_reinjected_frames(out, st, pkt_type, path_id)?; + // Write buffered frames + self.try_write_buffered_frames(out, st, pkt_type, path_id)?; // Write STREAM frames self.try_write_stream_frames(out, st, pkt_type, path_id)?; @@ -2199,9 +2236,9 @@ impl Connection { // Read stream data and write into the packet buffer directly. let (frame_data_len, fin) = stream.send.read(&mut out[len + frame_hdr_len..])?; - // Retain stream data for reinjection if needed. + // Retain stream data if needed. let data = if self.flags.contains(EnableMultipath) - && reinjection_required(self.multipath_conf.multipath_algorithm) + && buffer_required(self.multipath_conf.multipath_algorithm) { let start = len + frame_hdr_len; Bytes::copy_from_slice(&out[start..start + frame_data_len]) @@ -2277,8 +2314,8 @@ impl Connection { Ok(()) } - /// Populate reinjected frame to packet payload buffer. - fn try_write_reinjected_frames( + /// Populate buffered frame to packet payload buffer. + fn try_write_buffered_frames( &mut self, out: &mut [u8], st: &mut FrameWriteStatus, @@ -2289,33 +2326,31 @@ impl Connection { return Ok(()); } - let out = &mut out[st.written..]; let path = self.paths.get(path_id)?; if pkt_type != PacketType::OneRTT || self.is_closing() - || out.len() <= frame::MAX_STREAM_OVERHEAD + || out.len() - st.written <= frame::MAX_STREAM_OVERHEAD || !path.active() { return Ok(()); } - // Get reinjected frames on the path. + // Get buffered frames on the path. let space = self .spaces .get_mut(path.space_id) .ok_or(Error::InternalError)?; - let frames = &mut space.reinject.frames; + if space.buffered.is_empty() { + return Ok(()); + } debug!( - "{} try to write reinjected frames: path_id={} reinjected_frames={}", + "{} try to write buffered frames: path_id={} frames={}", self.trace_id, path_id, - frames.len() + space.buffered.len() ); - if frames.is_empty() { - return Ok(()); - } - while let Some(frame) = frames.pop_front() { + while let Some((frame, buffer_type)) = space.buffered.pop_front() { match frame { Frame::Stream { stream_id, @@ -2329,65 +2364,74 @@ impl Connection { _ => continue, }; - // Check acked range and injected the first non-acked subrange + // Check acked range and write the first non-acked subrange let range = offset..offset + length as u64; if let Some(r) = stream.send.filter_acked(range) { - let data_len = (r.end - r.start) as usize; - let frame_len = Self::reinjected_stream_frame_to_packet( + let data_len = Self::write_buffered_stream_frame_to_packet( stream_id, r.start, - data_len, - fin && data_len == length, + fin && r.end == offset + length as u64, data.slice((r.start - offset) as usize..(r.end - offset) as usize), out, + buffer_type, st, )?; // Processing the following subrange. - if r.end < offset + length as u64 { + if r.start + (data_len as u64) < offset + length as u64 { let frame = Frame::Stream { stream_id, - offset: r.end, - length: length - (r.end - offset) as usize, + offset: r.start + data_len as u64, + length: length - data_len, fin, - data: data.slice((r.end - offset) as usize..), + data: data.slice(data_len..), }; - frames.push_front(frame); + space.buffered.push_front(frame, buffer_type); + } + + if data_len == 0 { + break; } } } - // Ignore other reinjected frames. + // Ignore other buffered frames. _ => continue, } - - let out = &mut out[st.written..]; - if out.len() <= frame::MAX_STREAM_OVERHEAD { - break; - } } Ok(()) } - fn reinjected_stream_frame_to_packet( + fn write_buffered_stream_frame_to_packet( stream_id: u64, offset: u64, - length: usize, - fin: bool, - data: Bytes, + mut fin: bool, + mut data: Bytes, out: &mut [u8], + buffer_type: BufferType, st: &mut FrameWriteStatus, ) -> Result { - let len = frame::encode_stream_header(stream_id, offset, length as u64, fin, out)?; - out[len..len + data.len()].copy_from_slice(&data); + let out = &mut out[st.written..]; + if out.len() <= frame::MAX_STREAM_OVERHEAD { + return Ok(0); + } + + let hdr_len = frame::stream_header_wire_len(stream_id, offset); + let data_len = cmp::min(data.len(), out.len() - hdr_len); + if data_len < data.len() { + data.truncate(data_len); + fin = false; + } + + frame::encode_stream_header(stream_id, offset, data_len as u64, fin, out)?; + out[hdr_len..hdr_len + data.len()].copy_from_slice(&data); - let frame_len = len + data.len(); - st.written += frame_len; + st.written += hdr_len + data_len; st.ack_eliciting = true; st.in_flight = true; st.has_data = true; - st.reinjected = true; + st.buffer_flags.mark(buffer_type); st.frames.push(Frame::Stream { stream_id, offset, @@ -2395,7 +2439,7 @@ impl Connection { fin, data, }); - Ok(frame_len) + Ok(data_len) } /// Populate a QUIC frame to the give buffer. @@ -2561,12 +2605,12 @@ impl Connection { } } - /// Select a available path for sending packet + /// Select an available path for sending packet /// /// The selected path should have a packet that can be sent out, unless none /// of the paths are feasible. fn select_send_path(&mut self) -> Result { - // Select a unvalidated path with path probing packets to send + // Select an unvalidated path with path probing packets to send if self.is_established() { let mut probing = self .paths @@ -2594,7 +2638,7 @@ impl Connection { } } - // Select a validated path with ACK/PTO/Reinjected packets to send. + // Select a validated path with ACK/PTO/Buffered packets to send. for (pid, path) in self.paths.iter_mut() { if !path.active() { continue; @@ -2607,7 +2651,7 @@ impl Connection { if space.loss_probes > 0 { return Ok(pid); } - if space.need_send_reinjected_frames() && path.recovery.can_send() { + if space.need_send_buffered_frames() && path.recovery.can_send() { return Ok(pid); } continue; @@ -2703,7 +2747,7 @@ impl Connection { || path.need_send_validation_frames() || self.cids.need_send_cid_control_frames() || self.streams.need_send_stream_frames() - || self.spaces.need_send_reinjected_frames()) + || self.spaces.need_send_buffered_frames()) { if !self.is_server && self.tls_session.is_in_early_data() { return Ok(PacketType::ZeroRTT); @@ -2858,12 +2902,20 @@ impl Connection { if timer > now { continue; } - path.recovery.on_loss_detection_timeout( + let (lost_pkts, lost_bytes) = path.recovery.on_loss_detection_timeout( SpaceId::Data, // TODO: update for multipath &mut self.spaces, handshake_status, + self.qlog.as_mut(), now, ); + self.stats.lost_count += lost_pkts; + self.stats.lost_bytes += lost_bytes; + + // Write RecoveryMetricsUpdate event to qlog. + if let Some(qlog) = &mut self.qlog { + path.recovery.qlog_recovery_metrics_updated(qlog); + } } } } @@ -2878,7 +2930,7 @@ impl Connection { Timer::KeyDiscard => (), // TODO: support key discarding - Timer::KeepAlive => (), // TODO: schedule a outgoing Ping + Timer::KeepAlive => (), // TODO: schedule an outgoing Ping Timer::PathChallenge => self.paths.on_path_chal_timeout(now), @@ -3530,7 +3582,7 @@ impl Connection { } } - /// Return a endpoint-facing event. + /// Return an endpoint-facing event. pub(crate) fn poll(&mut self) -> Option { if let Some(event) = self.events.poll() { return Some(event); @@ -3637,6 +3689,7 @@ impl Connection { pkt_num: u64, pkt_len: usize, payload_len: usize, + qlog_frames: Vec, ) { let qlog_pkt_hdr = events::PacketHeader::new_with_type( hdr.pkt_type.to_qlog(), @@ -3652,6 +3705,7 @@ impl Connection { }; let ev_data = events::EventData::QuicPacketReceived { header: qlog_pkt_hdr, + frames: Some(qlog_frames.into()), is_coalesced: None, retry_token: None, stateless_reset_token: None, @@ -3670,6 +3724,7 @@ impl Connection { pkt_num: u64, pkt_len: usize, payload_len: usize, + qlog_frames: Vec, ) { let qlog_pkt_hdr = events::PacketHeader::new_with_type( hdr.pkt_type.to_qlog(), @@ -3687,6 +3742,7 @@ impl Connection { let ev_data = events::EventData::QuicPacketSent { header: qlog_pkt_hdr, + frames: Some(qlog_frames.into()), is_coalesced: None, retry_token: None, stateless_reset_token: None, @@ -3884,20 +3940,20 @@ enum ConnectionFlags { #[derive(Default)] pub struct ConnectionStats { /// Total number of received packets. - pub recv_count: usize, - - /// Total number of sent packets. - pub sent_count: usize, - - /// Total number of lost packets. - pub lost_count: usize, + pub recv_count: u64, /// Total number of bytes received on the connection. pub recv_bytes: u64, + /// Total number of sent packets. + pub sent_count: u64, + /// Total number of bytes sent on the connection. pub sent_bytes: u64, + /// Total number of lost packets. + pub lost_count: u64, + /// Total number of bytes lost on the connection. pub lost_bytes: u64, } @@ -3915,7 +3971,7 @@ struct FrameWriteStatus { /// Whether it contains frames other than ACK, PADDING, and CONNECTION_CLOSE ack_eliciting: bool, - /// Whether it is a in-flight packet (ack-eliciting packet or contain a + /// Whether it is an in-flight packet (ack-eliciting packet or contain a /// PADDING frame) in_flight: bool, @@ -3931,8 +3987,8 @@ struct FrameWriteStatus { /// Whether the congestion window should be ignored. is_probe: bool, - /// Whether it is a reinjected packet. - reinjected: bool, + /// Status about buffered frames written to the packet. + buffer_flags: BufferFlags, } /// Handshake status for loss recovery @@ -6568,7 +6624,7 @@ pub(crate) mod tests { conn_multipath_transfer(&mut test_pair, blocks)?; - for (i, path) in test_pair.server.paths.iter() { + for (i, path) in test_pair.server.paths.iter_mut() { let s = path.stats(); assert!(s.sent_count > 3); assert!(s.recv_count > 3); @@ -6595,7 +6651,7 @@ pub(crate) mod tests { } conn_multipath_transfer(&mut test_pair, blocks)?; - for (i, path) in test_pair.server.paths.iter() { + for (i, path) in test_pair.server.paths.iter_mut() { let s = path.stats(); assert!(s.sent_count > 50); assert!(s.recv_count > 50); @@ -6611,29 +6667,38 @@ pub(crate) mod tests { let mut sfile = slog.reopen().unwrap(); let mut test_pair = TestPair::new_with_test_config()?; - assert_eq!(test_pair.handshake(), Ok(())); test_pair .client .set_qlog(Box::new(clog), "title".into(), "desc".into()); test_pair .server .set_qlog(Box::new(slog), "title".into(), "desc".into()); + assert_eq!(test_pair.handshake(), Ok(())); // Client create a stream and send data let data = Bytes::from_static(b"test data over quic"); - test_pair.client.stream_set_priority(0, 0, false)?; - test_pair.client.stream_write(0, data.clone(), true)?; - test_pair.client.stream_shutdown(0, Shutdown::Read, 0)?; + test_pair.client.stream_write(0, data.clone(), false)?; let packets = TestPair::conn_packets_out(&mut test_pair.client)?; + TestPair::conn_packets_in(&mut test_pair.server, packets)?; - // Server read data from the stream + // Client lost some packets + test_pair.client.stream_write(0, data.clone(), false)?; + let _ = TestPair::conn_packets_out(&mut test_pair.client)?; + test_pair.client.stream_write(0, data.clone(), false)?; + let packets = TestPair::conn_packets_out(&mut test_pair.client)?; TestPair::conn_packets_in(&mut test_pair.server, packets)?; + + // Server read data from the stream let mut buf = vec![0; data.len()]; test_pair.server.stream_read(0, &mut buf)?; - let packets = TestPair::conn_packets_out(&mut test_pair.server)?; TestPair::conn_packets_in(&mut test_pair.client, packets)?; + // Advance ticks until loss timeout + assert!(test_pair.client.timeout().is_some()); + let timeout = test_pair.client.timers.get(Timer::LossDetection); + test_pair.client.on_timeout(timeout.unwrap()); + // Check client qlog let mut clog_content = String::new(); cfile.read_to_string(&mut clog_content).unwrap(); @@ -6641,6 +6706,8 @@ pub(crate) mod tests { assert_eq!(clog_content.contains("quic:parameters_set"), true); assert_eq!(clog_content.contains("quic:stream_data_moved"), true); assert_eq!(clog_content.contains("quic:packet_sent"), true); + assert_eq!(clog_content.contains("recovery:metrics_updated"), true); + assert_eq!(clog_content.contains("recovery:packet_lost"), true); // Check server qlog let mut slog_content = String::new(); @@ -6649,6 +6716,7 @@ pub(crate) mod tests { assert_eq!(slog_content.contains("quic:parameters_set"), true); assert_eq!(slog_content.contains("quic:stream_data_moved"), true); assert_eq!(slog_content.contains("quic:packet_received"), true); + assert_eq!(slog_content.contains("recovery:metrics_updated"), true); Ok(()) } diff --git a/src/connection/path.rs b/src/connection/path.rs index 4fb6b6d9..7e00a48a 100644 --- a/src/connection/path.rs +++ b/src/connection/path.rs @@ -17,9 +17,11 @@ use std::collections::BTreeMap; use std::collections::VecDeque; use std::net::SocketAddr; use std::time; +use std::time::Duration; use slab::Slab; +use super::recovery::PathStats; use super::recovery::Recovery; use super::timer; use crate::connection::SpaceId; @@ -33,25 +35,6 @@ pub(crate) const INITIAL_CHAL_TIMEOUT: u64 = 25; pub(crate) const MAX_PROBING_TIMEOUTS: usize = 8; -/// The states about the path validation. -#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] -pub enum PathState { - /// The path validation failed - Failed, - - /// No path validation has been performed. - Unknown, - - /// The path is under validation. - Validating, - - /// The remote address has been validated, but not the path MTU. - ValidatingMTU, - - /// The path has been validated. - Validated, -} - /// A network path on which QUIC packets can be sent. pub struct Path { /// The local address. @@ -73,9 +56,6 @@ pub struct Path { /// Loss recovery and congestion control. pub(crate) recovery: Recovery, - /// Statistics about the path. - pub(super) stats: PathStats, - /// The current validation state of the path. state: PathState, @@ -135,7 +115,6 @@ impl Path { dcid_seq, active: false, recovery: Recovery::new(conf), - stats: PathStats::default(), state, recv_chals: VecDeque::new(), sent_chals: VecDeque::new(), @@ -175,9 +154,10 @@ impl Path { } /// Handle incoming PATH_RESPONSE data. - pub(super) fn on_path_resp_received(&mut self, data: [u8; 8], multipath: bool) { + /// Return true if the path status changes to `Validated`. + pub(super) fn on_path_resp_received(&mut self, data: [u8; 8], multipath: bool) -> bool { if self.state == PathState::Validated { - return; + return false; } self.verified_peer_address = true; @@ -201,11 +181,12 @@ impl Path { self.promote_to(PathState::Validated); self.set_active(multipath); self.sent_chals.clear(); - return; + return true; } // If the MTU was not validated, probe again. self.need_send_challenge = true; + false } /// Fetch a received challenge data item. @@ -300,9 +281,10 @@ impl Path { !self.active && self.dcid_seq.is_none() } - /// Return statistics about the path - pub fn stats(&self) -> &PathStats { - &self.stats + /// Update and return the latest statistics about the path + pub fn stats(&mut self) -> &PathStats { + self.recovery.stat_lazy_update(); + &self.recovery.stats } /// Return the validation state of the path @@ -319,26 +301,23 @@ impl std::fmt::Debug for Path { } } -/// Statistics about a path. -#[derive(Debug, Default)] -pub struct PathStats { - /// The number of QUIC packets received. - pub recv_count: usize, - - /// The number of QUIC packets sent. - pub sent_count: usize, +/// The states about the path validation. +#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub enum PathState { + /// The path validation failed + Failed, - /// The number of QUIC packets lost. - pub lost_count: usize, + /// No path validation has been performed. + Unknown, - /// The number of received bytes. - pub recv_bytes: u64, + /// The path is under validation. + Validating, - /// The number of sent bytes. - pub sent_bytes: u64, + /// The remote address has been validated, but not the path MTU. + ValidatingMTU, - /// The number of lost bytes. - pub lost_bytes: u64, + /// The path has been validated. + Validated, } /// Path manager for a QUIC connection @@ -451,7 +430,7 @@ impl PathMap { Ok(pid) } - /// Return a immutable iterator over all existing paths. + /// Return an immutable iterator over all existing paths. pub fn iter(&self) -> slab::Iter { self.paths.iter() } @@ -474,10 +453,11 @@ impl PathMap { } /// Process a PATH_RESPONSE frame on the give path - pub fn on_path_resp_received(&mut self, path_id: usize, data: [u8; 8]) { + pub fn on_path_resp_received(&mut self, path_id: usize, data: [u8; 8]) -> bool { if let Some(path) = self.paths.get_mut(path_id) { - path.on_path_resp_received(data, self.is_multipath); + return path.on_path_resp_received(data, self.is_multipath); } + false } /// Handle the sent event of PATH_CHALLENGE. @@ -584,8 +564,8 @@ mod tests { assert_eq!(path_mgr.get(pid)?.remote_addr(), server_addr); assert_eq!(path_mgr.get(pid)?.active(), true); assert_eq!(path_mgr.get(pid)?.unused(), false); - assert_eq!(path_mgr.get(pid)?.stats().recv_count, 0); - assert_eq!(path_mgr.get(pid)?.stats().sent_count, 0); + assert_eq!(path_mgr.get_mut(pid)?.stats().recv_count, 0); + assert_eq!(path_mgr.get_mut(pid)?.stats().sent_count, 0); assert_eq!(path_mgr.get_active()?.local_addr(), client_addr); assert_eq!(path_mgr.get_active_mut()?.remote_addr(), server_addr); assert_eq!(path_mgr.get_active_path_id()?, 0); @@ -623,11 +603,11 @@ mod tests { assert_eq!(path_mgr.get_mut(pid)?.state, PathState::Validating); // Fake receiving of unmatched PATH_RESPONSE - path_mgr.on_path_resp_received(pid, [0xab; 8]); + assert_eq!(path_mgr.on_path_resp_received(pid, [0xab; 8]), false); assert_eq!(path_mgr.get_mut(pid)?.state, PathState::ValidatingMTU); // Fake receiving of PATH_RESPONSE - path_mgr.on_path_resp_received(pid, data); + assert_eq!(path_mgr.on_path_resp_received(pid, data), false); assert_eq!(path_mgr.get_mut(pid)?.path_chal_initiated(), true); assert_eq!(path_mgr.get_mut(pid)?.validated(), false); assert_eq!(path_mgr.get_mut(pid)?.state, PathState::ValidatingMTU); @@ -636,14 +616,14 @@ mod tests { path_mgr.on_path_chal_sent(pid, data, 1300, now)?; // Fake receiving of PATH_RESPONSE - path_mgr.on_path_resp_received(pid, data); + assert_eq!(path_mgr.on_path_resp_received(pid, data), true); assert_eq!(path_mgr.get_mut(pid)?.path_chal_initiated(), false); assert_eq!(path_mgr.get_mut(pid)?.validated(), true); assert_eq!(path_mgr.get_mut(pid)?.state, PathState::Validated); assert_eq!(path_mgr.get_mut(pid)?.sent_chals.len(), 0); // Fake receiving of depulicated PATH_RESPONSE - path_mgr.on_path_resp_received(pid, data); + assert_eq!(path_mgr.on_path_resp_received(pid, data), false); assert_eq!(path_mgr.get_mut(pid)?.validated(), true); // Timeout event diff --git a/src/connection/recovery.rs b/src/connection/recovery.rs index 6128f486..e3bff209 100644 --- a/src/connection/recovery.rs +++ b/src/connection/recovery.rs @@ -32,6 +32,8 @@ use super::HandshakeStatus; use crate::congestion_control; use crate::congestion_control::CongestionController; use crate::frame; +use crate::qlog; +use crate::qlog::events::EventData; use crate::ranges::RangeSet; use crate::Error; use crate::RecoveryConfig; @@ -91,6 +93,13 @@ pub struct Recovery { /// Congestion controller for the corresponding path. pub congestion: Box, + /// Path level Statistics. + pub stats: PathStats, + + /// It tracks the last metrics used for emitting qlog RecoveryMetricsUpdated + /// event. + last_metrics: RecoveryMetrics, + /// Trace id. trace_id: String, } @@ -110,6 +119,8 @@ impl Recovery { ack_eliciting_in_flight: 0, rtt: RttEstimator::new(conf.initial_rtt), congestion: congestion_control::build_congestion_controller(conf), + stats: PathStats::default(), + last_metrics: RecoveryMetrics::default(), trace_id: String::from(""), } } @@ -185,6 +196,7 @@ impl Recovery { /// Handle packet acknowledgment event. /// /// See RFC 9002 Section A.7. On Receiving an Acknowledgment. + #[allow(clippy::too_many_arguments)] pub(super) fn on_ack_received( &mut self, ranges: &RangeSet, @@ -192,8 +204,9 @@ impl Recovery { space_id: SpaceId, spaces: &mut PacketNumSpaceMap, handshake_status: HandshakeStatus, + qlog: Option<&mut qlog::QlogWriter>, now: Instant, - ) -> Result<(usize, usize)> { + ) -> Result<(u64, u64)> { let space = spaces.get_mut(space_id).ok_or(Error::InternalError)?; // Update the largest packet number acknowledged in the space @@ -242,7 +255,7 @@ impl Recovery { } // Detect lost packets - let (lost_packets, lost_bytes) = self.detect_lost_packets(space, now); + let (lost_packets, lost_bytes) = self.detect_lost_packets(space, qlog, now); // Remove acked or lost packets from sent queue in batch. self.drain_sent_packets(space, now, self.rtt.smoothed_rtt()); @@ -331,6 +344,7 @@ impl Recovery { self.bytes_in_flight, self.congestion.congestion_window() ); + self.stat_acked_event(1, sent_pkt.sent_size as u64); space.acked.append(&mut sent_pkt.frames); newly_acked.push(AckedPacket { @@ -374,7 +388,12 @@ impl Recovery { /// It is called every time an ACK is received or the time threshold loss /// detection timer expires. /// See RFC 9002 Section A.10. Detecting Lost Packets - fn detect_lost_packets(&mut self, space: &mut PacketNumSpace, now: Instant) -> (usize, usize) { + fn detect_lost_packets( + &mut self, + space: &mut PacketNumSpace, + mut qlog: Option<&mut qlog::QlogWriter>, + now: Instant, + ) -> (u64, u64) { space.loss_time = None; let mut lost_packets = 0; @@ -407,7 +426,7 @@ impl Recovery { lost_packets += 1; if unacked.in_flight { - lost_bytes += unacked.sent_size; + lost_bytes += unacked.sent_size as u64; space.bytes_in_flight = space.bytes_in_flight.saturating_sub(unacked.sent_size); self.bytes_in_flight = self.bytes_in_flight.saturating_sub(unacked.sent_size); @@ -419,6 +438,9 @@ impl Recovery { } } latest_lost_packet = Some(unacked.clone()); + if let Some(qlog) = qlog.as_mut() { + self.qlog_recovery_packet_lost(qlog, unacked); + } trace!( "now={:?} {} {} ON_LOST {:?} inflight={} cwnd={}", now, @@ -444,7 +466,7 @@ impl Recovery { now, &lost_packet, self.in_persistent_congestion(), - lost_bytes as u64, + lost_bytes, self.bytes_in_flight as u64, ); trace!( @@ -459,6 +481,7 @@ impl Recovery { } } + self.stat_lost_event(lost_packets, lost_bytes); (lost_packets, lost_bytes) } @@ -532,8 +555,9 @@ impl Recovery { space_id: SpaceId, spaces: &mut PacketNumSpaceMap, handshake_status: HandshakeStatus, + qlog: Option<&mut qlog::QlogWriter>, now: Instant, - ) -> (usize, usize) { + ) -> (u64, u64) { let (earliest_loss_time, sid) = self.get_loss_time_and_space(space_id, spaces); let space = match spaces.get_mut(sid) { Some(space) => space, @@ -543,7 +567,7 @@ impl Recovery { // Loss timer mode if earliest_loss_time.is_some() { // Time threshold loss detection. - let (lost_packets, lost_bytes) = self.detect_lost_packets(space, now); + let (lost_packets, lost_bytes) = self.detect_lost_packets(space, qlog, now); self.drain_sent_packets(space, now, self.rtt.smoothed_rtt()); self.set_loss_detection_timer(space_id, spaces, handshake_status, now); return (lost_packets, lost_bytes); @@ -783,6 +807,270 @@ impl Recovery { pub(crate) fn can_send(&self) -> bool { self.bytes_in_flight < self.congestion.congestion_window() as usize } + + /// Update statistics for the packet sent event + pub(crate) fn stat_sent_event(&mut self, sent_pkts: u64, sent_bytes: u64) { + self.stats.sent_count = self.stats.sent_count.saturating_add(sent_pkts); + self.stats.sent_bytes = self.stats.sent_bytes.saturating_add(sent_bytes); + self.stat_cwnd_updated(); + } + + /// Update statistics for the packet recv event + pub(crate) fn stat_recv_event(&mut self, recv_pkts: u64, recv_bytes: u64) { + self.stats.recv_count = self.stats.recv_count.saturating_add(recv_pkts); + self.stats.recv_bytes = self.stats.recv_bytes.saturating_add(recv_bytes); + } + + /// Update statistics for the packet acked event + pub(crate) fn stat_acked_event(&mut self, acked_pkts: u64, acked_bytes: u64) { + self.stats.acked_count = self.stats.acked_count.saturating_add(acked_pkts); + self.stats.acked_bytes = self.stats.acked_bytes.saturating_add(acked_bytes); + } + + /// Update statistics for the packet loss event + pub(crate) fn stat_lost_event(&mut self, lost_pkts: u64, lost_bytes: u64) { + self.stats.lost_count = self.stats.lost_count.saturating_add(lost_pkts); + self.stats.lost_bytes = self.stats.lost_bytes.saturating_add(lost_bytes); + } + + /// Update statistics for the congestion_window + pub(crate) fn stat_cwnd_updated(&mut self) { + let cwnd = self.congestion.congestion_window(); + if self.stats.init_cwnd == 0 { + self.stats.init_cwnd = cwnd; + self.stats.min_cwnd = cwnd; + self.stats.max_cwnd = cwnd; + } + self.stats.final_cwnd = cwnd; + if self.stats.max_cwnd < cwnd { + self.stats.max_cwnd = cwnd; + } + if self.stats.min_cwnd > cwnd { + self.stats.min_cwnd = cwnd; + } + let bytes_in_flight = self.bytes_in_flight as u64; + if self.stats.max_inflight < bytes_in_flight { + self.stats.max_inflight = bytes_in_flight; + } + } + + /// Update statistics for the congestion window limited event + pub(crate) fn stat_cwnd_limited(&mut self) { + let is_cwnd_limited = !self.can_send(); + let now = Instant::now(); + if let Some(last_cwnd_limited_time) = self.stats.last_cwnd_limited_time { + // Update duration timely, in case it stays in cwnd limited all the time. + let duration = now.saturating_duration_since(last_cwnd_limited_time); + self.stats.cwnd_limited_duration = + self.stats.cwnd_limited_duration.saturating_add(duration); + if is_cwnd_limited { + self.stats.last_cwnd_limited_time = Some(now); + } else { + self.stats.last_cwnd_limited_time = None; + } + } else if is_cwnd_limited { + // A new cwnd limited event + self.stats.cwnd_limited_count = self.stats.cwnd_limited_count.saturating_add(1); + self.stats.last_cwnd_limited_time = Some(now); + } + } + + /// Update with the latest values from recovery. + pub(crate) fn stat_lazy_update(&mut self) { + self.stats.min_rtt = self.rtt.min_rtt(); + self.stats.max_rtt = self.rtt.max_rtt(); + self.stats.srtt = self.rtt.smoothed_rtt(); + self.stats.rttvar = self.rtt.rttvar(); + self.stats.in_slow_start = self.congestion.in_slow_start(); + } + + /// Write a qlog RecoveryMetricsUpdated event if any recovery metric is updated. + pub(crate) fn qlog_recovery_metrics_updated(&mut self, qlog: &mut qlog::QlogWriter) { + let mut updated = false; + + let mut min_rtt = None; + if self.last_metrics.min_rtt != self.rtt.min_rtt() { + self.last_metrics.min_rtt = self.rtt.min_rtt(); + min_rtt = Some(self.last_metrics.min_rtt.as_secs_f32() * 1000.0); + updated = true; + } + + let mut smoothed_rtt = None; + if self.last_metrics.smoothed_rtt != self.rtt.smoothed_rtt() { + self.last_metrics.smoothed_rtt = self.rtt.smoothed_rtt(); + smoothed_rtt = Some(self.last_metrics.smoothed_rtt.as_secs_f32() * 1000.0); + updated = true; + } + + let mut latest_rtt = None; + if self.last_metrics.latest_rtt != self.rtt.latest_rtt() { + self.last_metrics.latest_rtt = self.rtt.latest_rtt(); + latest_rtt = Some(self.last_metrics.latest_rtt.as_secs_f32() * 1000.0); + updated = true; + } + + let mut rtt_variance = None; + if self.last_metrics.rttvar != self.rtt.rttvar() { + self.last_metrics.rttvar = self.rtt.rttvar(); + rtt_variance = Some(self.last_metrics.rttvar.as_secs_f32() * 1000.0); + updated = true; + } + + let mut congestion_window = None; + if self.last_metrics.cwnd != self.congestion.congestion_window() { + self.last_metrics.cwnd = self.congestion.congestion_window(); + congestion_window = Some(self.last_metrics.cwnd); + updated = true; + } + + let mut bytes_in_flight = None; + if self.last_metrics.bytes_in_flight != self.bytes_in_flight as u64 { + self.last_metrics.bytes_in_flight = self.bytes_in_flight as u64; + bytes_in_flight = Some(self.last_metrics.bytes_in_flight); + updated = true; + } + + let mut pacing_rate = None; + if self.last_metrics.pacing_rate != self.congestion.pacing_rate() { + self.last_metrics.pacing_rate = self.congestion.pacing_rate(); + pacing_rate = self.last_metrics.pacing_rate.map(|v| v * 8); // bps + updated = true; + } + + if !updated { + return; + } + + let ev_data = EventData::RecoveryMetricsUpdated { + min_rtt, + smoothed_rtt, + latest_rtt, + rtt_variance, + pto_count: None, + congestion_window, + bytes_in_flight, + ssthresh: None, + packets_in_flight: None, + pacing_rate, + }; + qlog.add_event_data(Instant::now(), ev_data).ok(); + } + + /// Write a qlog RecoveryPacketLost event. + pub(crate) fn qlog_recovery_packet_lost( + &mut self, + qlog: &mut qlog::QlogWriter, + pkt: &SentPacket, + ) { + let ev_data = EventData::RecoveryPacketLost { + header: Some(qlog::events::PacketHeader { + packet_type: pkt.pkt_type.to_qlog(), + packet_number: pkt.pkt_num, + ..qlog::events::PacketHeader::default() + }), + frames: None, + is_mtu_probe_packet: None, + trigger: None, + }; + qlog.add_event_data(Instant::now(), ev_data).ok(); + } +} + +#[derive(Default)] +pub struct PathStats { + /// The number of QUIC packets received. + pub recv_count: u64, + + /// The number of received bytes. + pub recv_bytes: u64, + + /// The number of QUIC packets sent. + pub sent_count: u64, + + /// The number of sent bytes. + pub sent_bytes: u64, + + /// The number of QUIC packets lost. + pub lost_count: u64, + + /// The number of lost bytes. + pub lost_bytes: u64, + + /// Total number of bytes acked. + pub acked_bytes: u64, + + /// Total number of packets acked. + pub acked_count: u64, + + /// Initial congestion window in bytes. + pub init_cwnd: u64, + + /// Final congestion window in bytes. + pub final_cwnd: u64, + + /// Maximum congestion window in bytes. + pub max_cwnd: u64, + + /// Minimum congestion window in bytes. + pub min_cwnd: u64, + + /// Maximum inflight data in bytes. + pub max_inflight: u64, + + /// Total loss events. + pub loss_event_count: u64, + + /// Total congestion window limited events. + pub cwnd_limited_count: u64, + + /// Total duration of congestion windowlimited events. + pub cwnd_limited_duration: Duration, + + /// The time for last congestion window event + last_cwnd_limited_time: Option, + + /* Note: the following fields are lazily updated from Recovery */ + /// Minimum roundtrip time. + pub min_rtt: Duration, + + /// Maximum roundtrip time. + pub max_rtt: Duration, + + /// Smoothed roundtrip time. + pub srtt: Duration, + + /// Roundtrip time variation. + pub rttvar: Duration, + + /// Whether the congestion controller is in slow start status. + pub in_slow_start: bool, +} + +/// Metrics used for emitting qlog RecoveryMetricsUpdated event. +#[derive(Default)] +struct RecoveryMetrics { + /// The minimum RTT observed on the path, ignoring ack delay + min_rtt: Duration, + + /// The smoothed RTT of the path is an exponentially weighted moving average + /// of an endpoint's RTT samples + smoothed_rtt: Duration, + + /// The most recent RTT sample. + latest_rtt: Duration, + + /// The RTT variance estimates the variation in the RTT samples using a + /// mean variation + rttvar: Duration, + + /// Congestion window in bytes. + cwnd: u64, + + /// Total number of bytes in fight. + bytes_in_flight: u64, + + /// Pacing rate in Bps + pacing_rate: Option, } #[cfg(test)] @@ -806,7 +1094,7 @@ mod tests { in_flight: true, has_data: true, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() } } @@ -864,7 +1152,15 @@ mod tests { let mut acked = RangeSet::default(); acked.insert(0..1); acked.insert(2..3); - recovery.on_ack_received(&acked, 0, SpaceId::Handshake, &mut spaces, status, now)?; + recovery.on_ack_received( + &acked, + 0, + SpaceId::Handshake, + &mut spaces, + status, + None, + now, + )?; assert_eq!(spaces.get(space_id).unwrap().sent.len(), 2); assert_eq!(spaces.get(space_id).unwrap().ack_eliciting_in_flight, 1); assert_eq!(recovery.ack_eliciting_in_flight, 1); @@ -872,7 +1168,7 @@ mod tests { // Advance ticks until loss timeout now = recovery.loss_detection_timer().unwrap(); let (lost_pkts, lost_bytes) = - recovery.on_loss_detection_timeout(SpaceId::Handshake, &mut spaces, status, now); + recovery.on_loss_detection_timeout(SpaceId::Handshake, &mut spaces, status, None, now); assert_eq!(lost_pkts, 1); assert_eq!(lost_bytes, 1001); assert_eq!(spaces.get(space_id).unwrap().ack_eliciting_in_flight, 0); @@ -928,16 +1224,30 @@ mod tests { acked.insert(1..4); // Detect packet loss base on reordering threshold - let (lost_pkts, lost_bytes) = - recovery.on_ack_received(&acked, 0, SpaceId::Handshake, &mut spaces, status, now)?; + let (lost_pkts, lost_bytes) = recovery.on_ack_received( + &acked, + 0, + SpaceId::Handshake, + &mut spaces, + status, + None, + now, + )?; assert_eq!(spaces.get(space_id).unwrap().sent.len(), 4); assert_eq!(lost_pkts, 1); assert_eq!(lost_bytes, 1000); // Advance ticks and fake receiving of duplicated ack now += recovery.rtt.smoothed_rtt(); - let (lost_pkts, lost_bytes) = - recovery.on_ack_received(&acked, 0, SpaceId::Handshake, &mut spaces, status, now)?; + let (lost_pkts, lost_bytes) = recovery.on_ack_received( + &acked, + 0, + SpaceId::Handshake, + &mut spaces, + status, + None, + now, + )?; assert_eq!(lost_pkts, 0); assert_eq!(lost_bytes, 0); @@ -982,8 +1292,15 @@ mod tests { now += Duration::from_millis(100); let mut acked = RangeSet::default(); acked.insert(0..1); - let (lost_pkts, lost_bytes) = - recovery.on_ack_received(&acked, 0, SpaceId::Handshake, &mut spaces, status, now)?; + let (lost_pkts, lost_bytes) = recovery.on_ack_received( + &acked, + 0, + SpaceId::Handshake, + &mut spaces, + status, + None, + now, + )?; assert_eq!(spaces.get(space_id).unwrap().sent.len(), 1); assert_eq!(lost_pkts, 0); assert_eq!(lost_bytes, 0); @@ -992,7 +1309,7 @@ mod tests { // Advance ticks until pto timeout now = recovery.loss_detection_timer().unwrap(); let (lost_pkts, lost_bytes) = - recovery.on_loss_detection_timeout(SpaceId::Handshake, &mut spaces, status, now); + recovery.on_loss_detection_timeout(SpaceId::Handshake, &mut spaces, status, None, now); assert_eq!(recovery.pto_count, 1); assert_eq!(lost_pkts, 0); assert_eq!(lost_bytes, 0); @@ -1045,7 +1362,15 @@ mod tests { now += Duration::from_millis(100); let mut acked = RangeSet::default(); acked.insert(0..2); - recovery.on_ack_received(&acked, 0, SpaceId::Handshake, &mut spaces, status, now)?; + recovery.on_ack_received( + &acked, + 0, + SpaceId::Handshake, + &mut spaces, + status, + None, + now, + )?; assert_eq!(spaces.get(SpaceId::Handshake).unwrap().sent.len(), 1); assert_eq!( spaces.get(SpaceId::Handshake).unwrap().bytes_in_flight, @@ -1146,7 +1471,7 @@ mod tests { vec![500..950], )); // Fake receiving duplicated ACK. - recovery.on_ack_received(&ack, 0, SpaceId::Data, &mut spaces, status, now)?; + recovery.on_ack_received(&ack, 0, SpaceId::Data, &mut spaces, status, None, now)?; assert!(check_acked_packets( &spaces.get(SpaceId::Data).unwrap().sent, vec![500..950], @@ -1222,8 +1547,15 @@ mod tests { acked.insert(0..2); // Detect packet loss base on reordering threshold - let (lost_pkts, lost_bytes) = - recovery.on_ack_received(&acked, 0, SpaceId::Handshake, &mut spaces, status, now)?; + let (lost_pkts, lost_bytes) = recovery.on_ack_received( + &acked, + 0, + SpaceId::Handshake, + &mut spaces, + status, + None, + now, + )?; assert_eq!(cwnd_before_ack, recovery.congestion.congestion_window()); Ok(()) diff --git a/src/connection/rtt.rs b/src/connection/rtt.rs index 8ceb1766..5716d3d9 100644 --- a/src/connection/rtt.rs +++ b/src/connection/rtt.rs @@ -34,6 +34,9 @@ pub struct RttEstimator { /// The minimum RTT observed on the path, ignoring ack delay. /// It is used by loss detection to reject implausibly small RTT samples. min_rtt: Duration, + + /// The maximum RTT observed on the path, ignoring ack delay. + max_rtt: Duration, } /// An statistical description of the network path's RTT @@ -44,6 +47,7 @@ impl RttEstimator { smoothed_rtt: None, rttvar: initial_rtt / 2, min_rtt: initial_rtt, + max_rtt: initial_rtt, } } @@ -67,6 +71,11 @@ impl RttEstimator { self.rttvar } + /// Return the Maximum RTT observed so far for this estimator. + pub fn max_rtt(&self) -> Duration { + self.max_rtt + } + /// Return the PTO computed as described in RFC 9002 Section 6.2.1 pub fn pto_base(&self) -> Duration { self.smoothed_rtt() + cmp::max(4 * self.rttvar, TIMER_GRANULARITY) @@ -76,6 +85,7 @@ impl RttEstimator { pub fn update(&mut self, ack_delay: Duration, rtt: Duration) { self.latest_rtt = rtt; self.min_rtt = cmp::min(self.min_rtt, self.latest_rtt); + self.max_rtt = cmp::max(self.max_rtt, self.latest_rtt); if let Some(smoothed_rtt) = self.smoothed_rtt { // The endpoint MUST NOT subtract the acknowledgment delay from the @@ -98,6 +108,7 @@ impl RttEstimator { self.smoothed_rtt = Some(self.latest_rtt); self.rttvar = self.latest_rtt / 2; self.min_rtt = self.latest_rtt; + self.max_rtt = self.latest_rtt; } } } @@ -113,6 +124,7 @@ mod tests { let r = RttEstimator::new(initial_rtt); assert_eq!(r.latest_rtt(), initial_rtt); assert_eq!(r.min_rtt(), initial_rtt); + assert_eq!(r.max_rtt(), initial_rtt); assert_eq!(r.rttvar(), initial_rtt / 2); assert_eq!(r.smoothed_rtt(), initial_rtt); assert_eq!(r.pto_base(), initial_rtt * 3); @@ -129,6 +141,7 @@ mod tests { r.update(ack_delay, rtt_sample); assert_eq!(r.latest_rtt(), rtt_sample); assert_eq!(r.min_rtt(), rtt_sample); + assert_eq!(r.max_rtt(), rtt_sample); assert_eq!(r.rttvar(), rtt_sample / 2); assert_eq!(r.smoothed_rtt(), rtt_sample); assert_eq!(r.pto_base(), rtt_sample * 3); @@ -139,6 +152,7 @@ mod tests { r.update(ack_delay, rtt_sample); assert_eq!(r.latest_rtt(), rtt_sample); assert_eq!(r.min_rtt(), time::Duration::from_millis(400)); + assert_eq!(r.max_rtt(), time::Duration::from_millis(700)); assert_eq!(r.rttvar(), time::Duration::from_millis(200)); assert_eq!(r.smoothed_rtt(), time::Duration::from_millis(425)); assert_eq!(r.pto_base(), time::Duration::from_millis(1225)); @@ -149,6 +163,7 @@ mod tests { r.update(ack_delay, rtt_sample); assert_eq!(r.latest_rtt(), rtt_sample); assert_eq!(r.min_rtt(), time::Duration::from_millis(225)); + assert_eq!(r.max_rtt(), time::Duration::from_millis(700)); assert_eq!(r.rttvar(), time::Duration::from_millis(200)); assert_eq!(r.smoothed_rtt(), time::Duration::from_millis(400)); assert_eq!(r.pto_base(), time::Duration::from_millis(1200)); diff --git a/src/connection/space.rs b/src/connection/space.rs index dd6211d6..26a19fef 100644 --- a/src/connection/space.rs +++ b/src/connection/space.rs @@ -20,6 +20,7 @@ use std::time::Instant; use rustc_hash::FxHashMap; use crate::frame; +use crate::packet; use crate::ranges::RangeSet; use crate::tls::Level; use crate::window::SeqNumWindow; @@ -101,6 +102,9 @@ pub struct PacketNumSpace { /// Acknowledged frames. pub acked: Vec, + /// Buffered frames to be sent in multipath mode. + pub buffered: BufferQueue, + /// The time the most recent ack-eliciting packet was sent. pub time_of_last_sent_ack_eliciting_pkt: Option, @@ -123,9 +127,6 @@ pub struct PacketNumSpace { /// Packet number space for application data pub is_data: bool, - - /// Reinjected frames to be sent. - pub reinject: ReinjectQueue, } impl PacketNumSpace { @@ -144,6 +145,7 @@ impl PacketNumSpace { sent: VecDeque::new(), lost: Vec::new(), acked: Vec::new(), + buffered: BufferQueue::default(), time_of_last_sent_ack_eliciting_pkt: None, loss_time: None, largest_acked_pkt: std::u64::MAX, @@ -151,7 +153,6 @@ impl PacketNumSpace { bytes_in_flight: 0, ack_eliciting_in_flight: 0, is_data: id != SpaceId::Initial && id != SpaceId::Handshake, - reinject: ReinjectQueue::default(), } } @@ -186,9 +187,9 @@ impl PacketNumSpace { self.loss_probes > 0 } - /// Return whether the space should send a reinjection packet. - pub fn need_send_reinjected_frames(&self) -> bool { - !self.reinject.frames.is_empty() + /// Return whether the space should send a buffered packet. + pub fn need_send_buffered_frames(&self) -> bool { + !self.buffered.is_empty() } } @@ -258,10 +259,10 @@ impl PacketNumSpaceMap { }; } - /// Return whether the connection should send a reinjection packet. - pub fn need_send_reinjected_frames(&self) -> bool { + /// Return whether the connection should send a buffered packet. + pub fn need_send_buffered_frames(&self) -> bool { for space in self.spaces.values() { - if space.need_send_reinjected_frames() { + if space.need_send_buffered_frames() { return true; } } @@ -296,13 +297,14 @@ pub struct RateSamplePacketState { /// packet.lost: The volume of data that was declared lost on transmission. pub lost: u64, - // P.sent_time: The time when the packet was sent. (Use time_sent in SentPacket) - // sent_time: Instant, } /// Metadata of sent packet #[derive(Clone)] pub struct SentPacket { + /// The packet type of the sent packet. + pub pkt_type: packet::PacketType, + /// The packet number of the sent packet. pub pkt_num: u64, @@ -337,8 +339,27 @@ pub struct SentPacket { /// Snapshot of the current delivery information. pub rate_sample_state: RateSamplePacketState, - /// Whether it is a reinjected packet. - pub reinjected: bool, + /// Status about buffered frames written into the packet. + pub buffer_flags: BufferFlags, +} + +impl Default for SentPacket { + fn default() -> Self { + SentPacket { + pkt_type: packet::PacketType::OneRTT, + pkt_num: 0, + frames: vec![], + time_sent: Instant::now(), + time_acked: None, + time_lost: None, + ack_eliciting: false, + in_flight: false, + has_data: false, + sent_size: 0, + rate_sample_state: RateSamplePacketState::default(), + buffer_flags: BufferFlags::default(), + } + } } impl std::fmt::Debug for SentPacket { @@ -363,16 +384,96 @@ pub struct AckedPacket { pub rtt: Duration, } -/// Metadata of packets to be reinjected +#[derive(Debug, Clone, Copy, PartialEq)] +pub enum BufferType { + High = 0, + Mid = 1, + Low = 2, +} + +impl From for BufferType { + fn from(index: usize) -> BufferType { + match index { + 0 => BufferType::High, + 1 => BufferType::Mid, + _ => BufferType::Low, + } + } +} + +/// Metadata of buffered packets to be sent #[derive(Default)] -pub struct ReinjectQueue { - /// The reinjected frames to be sent. - pub frames: VecDeque, +pub struct BufferQueue { + queues: [VecDeque; 3], + count: usize, +} + +impl BufferQueue { + /// Remove the first frame and returns it + pub fn pop_front(&mut self) -> Option<(frame::Frame, BufferType)> { + for (i, queue) in self.queues.iter_mut().enumerate() { + if !queue.is_empty() { + self.count -= 1; + return Some((queue.pop_front().unwrap(), BufferType::from(i))); + } + } + None + } + + /// Prepend a frame to the specified queue. + pub fn push_front(&mut self, frame: frame::Frame, queue_type: BufferType) { + self.count += 1; + self.queues[queue_type as usize].push_front(frame) + } + + /// Append a frame to the back of the queue. + pub fn push_back(&mut self, frame: frame::Frame, queue_type: BufferType) { + self.count += 1; + self.queues[queue_type as usize].push_back(frame) + } + + /// Move all the frames into self. + pub fn append(&mut self, frames: &mut VecDeque, queue_type: BufferType) { + self.count += frames.len(); + self.queues[queue_type as usize].append(frames) + } + + /// Return the number of frames in the queue. + pub fn len(&self) -> usize { + self.count + } + + /// Return true if the queue is empty. + pub fn is_empty(&self) -> bool { + self.count == 0 + } +} + +#[derive(Clone, Default, Debug)] +pub struct BufferFlags { + pub from_high: bool, + pub from_mid: bool, + pub from_low: bool, +} + +impl BufferFlags { + pub fn has_buffered(&self) -> bool { + self.from_high || self.from_mid || self.from_low + } + + pub fn mark(&mut self, queue_type: BufferType) { + match queue_type { + BufferType::High => self.from_high = true, + BufferType::Mid => self.from_mid = true, + BufferType::Low => self.from_low = true, + } + } } #[cfg(test)] mod tests { use super::*; + use crate::frame::Frame; #[test] fn initial_spaces() { @@ -444,11 +545,77 @@ mod tests { has_data: false, sent_size: 240, rate_sample_state: Default::default(), - reinjected: false, + ..SentPacket::default() }; assert_eq!( format!("{:?}", sent_pkt), "pn=9 frames=[PING, PADDINGS len=200] sent_size=240" ); } + + #[test] + fn buffer_queue() { + // initial queue + let mut queue = BufferQueue::default(); + assert_eq!(queue.len(), 0); + assert_eq!(queue.is_empty(), true); + + // push back/push front + let f1 = Frame::MaxStreamData { + stream_id: 4, + max: 10240, + }; + queue.push_back(f1.clone(), BufferType::High); + assert_eq!(queue.len(), 1); + assert_eq!(queue.is_empty(), false); + + let f2 = Frame::MaxStreamData { + stream_id: 8, + max: 24000, + }; + queue.push_front(f2.clone(), BufferType::High); + assert_eq!(queue.len(), 2); + assert_eq!(queue.is_empty(), false); + + let f3 = Frame::Ping; + queue.push_back(f3.clone(), BufferType::Low); + + assert_eq!(queue.pop_front(), Some((f2.clone(), BufferType::High))); + assert_eq!(queue.pop_front(), Some((f1.clone(), BufferType::High))); + assert_eq!(queue.pop_front(), Some((f3.clone(), BufferType::Low))); + assert_eq!(queue.pop_front(), None); + assert_eq!(queue.is_empty(), true); + + // append + let mut fs = VecDeque::new(); + fs.push_back(f1.clone()); + fs.push_back(f2.clone()); + queue.append(&mut fs, BufferType::Mid); + assert_eq!(queue.len(), 2); + assert_eq!(fs.len(), 0); + assert_eq!(queue.pop_front(), Some((f1.clone(), BufferType::Mid))); + assert_eq!(queue.pop_front(), Some((f2.clone(), BufferType::Mid))); + } + + #[test] + fn buffer_flags() { + use BufferType::*; + let cases = [ + (vec![], false), + (vec![High], true), + (vec![Mid], true), + (vec![Low], true), + (vec![Low, High], true), + (vec![Low, Mid], true), + (vec![High, Mid], true), + (vec![High, Mid, Low], true), + ]; + for case in cases { + let mut flags = BufferFlags::default(); + for flag in case.0 { + flags.mark(flag); + } + assert_eq!(flags.has_buffered(), case.1); + } + } } diff --git a/src/connection/stream.rs b/src/connection/stream.rs index 1f0b5003..0c1485d3 100644 --- a/src/connection/stream.rs +++ b/src/connection/stream.rs @@ -3424,7 +3424,7 @@ mod tests { let mut map = StreamMap::new(false, 50, 50, StreamTransportParams::default()); map.update_peer_stream_transport_params(peer_tp); - // 1. Set priority on a invalid stream. + // 1. Set priority on an invalid stream. assert_eq!( map.stream_set_priority(1, 1, true), Err(Error::StreamStateError) @@ -5202,7 +5202,7 @@ mod tests { map.on_stream_frame_acked(0, 10, 4); assert!(map.is_closed(0)); - // Receive a ACK frame for a stream which has been closed, do nothing. + // Receive an ACK frame for a stream which has been closed, do nothing. map.on_stream_frame_acked(0, 10, 4); } @@ -5224,7 +5224,7 @@ mod tests { assert!(stream.send.is_complete()); map.on_reset_stream_frame_acked(4); - // Receive a ACK for a RESET_STREAM frame, no effect on the stream receive-side. + // Receive an ACK for a RESET_STREAM frame, no effect on the stream receive-side. // The stream is still not complete because the stream receive-side is not complete. let stream = map.get_mut(4).unwrap(); assert!(!stream.is_complete()); @@ -5247,7 +5247,7 @@ mod tests { map.on_reset_stream_frame_acked(4); assert!(map.is_closed(4)); - // Receive a ACK for a RESET_STREAM frame which has been closed, do nothing. + // Receive an ACK for a RESET_STREAM frame which has been closed, do nothing. map.on_reset_stream_frame_acked(4); } diff --git a/src/endpoint.rs b/src/endpoint.rs index 24b175a6..545a7d72 100644 --- a/src/endpoint.rs +++ b/src/endpoint.rs @@ -1332,7 +1332,7 @@ mod tests { impl TestSocket { /// Create a UdpSocket using random unused port. fn new(reg: &mio::Registry, conf: &CaseConf, trace_id: String) -> Result { - let addr: SocketAddr = "127.8.8.8:0".parse().unwrap(); + let addr: SocketAddr = "127.0.0.1:0".parse().unwrap(); let mut socket = mio::net::UdpSocket::bind(addr)?; const TOKEN: mio::Token = mio::Token(0); diff --git a/src/ffi.rs b/src/ffi.rs index 5cbdfb52..aff4af82 100644 --- a/src/ffi.rs +++ b/src/ffi.rs @@ -1281,7 +1281,7 @@ fn sock_addr_to_c(addr: &SocketAddr, out: &mut sockaddr_storage) -> socklen_t { *out_in = sockaddr_in { sin_family: AF_INET as sa_family_t, sin_addr, - #[cfg(any(target_os = "macos", target_os = "ios",))] + #[cfg(any(target_os = "macos", target_os = "ios", target_os = "freebsd"))] sin_len: sa_len as u8, sin_port, sin_zero: std::mem::zeroed(), @@ -1298,7 +1298,7 @@ fn sock_addr_to_c(addr: &SocketAddr, out: &mut sockaddr_storage) -> socklen_t { *out_in6 = sockaddr_in6 { sin6_family: AF_INET6 as sa_family_t, sin6_addr, - #[cfg(any(target_os = "macos", target_os = "ios",))] + #[cfg(any(target_os = "macos", target_os = "ios", target_os = "freebsd"))] sin6_len: sa_len as u8, sin6_port: sin_port, sin6_flowinfo: addr.flowinfo(), @@ -1309,7 +1309,7 @@ fn sock_addr_to_c(addr: &SocketAddr, out: &mut sockaddr_storage) -> socklen_t { } } -/// Meta information of a incoming packet. +/// Meta information of an incoming packet. #[repr(C)] pub struct PacketInfo<'a> { src: &'a sockaddr, @@ -1328,7 +1328,7 @@ impl<'a> From<&PacketInfo<'a>> for crate::PacketInfo { } } -/// Data and meta information of a outgoing packet. +/// Data and meta information of an outgoing packet. #[repr(C)] pub struct PacketOutSpec { iov: *const iovec, diff --git a/src/h3/connection.rs b/src/h3/connection.rs index 6bdecb1d..18504ce4 100644 --- a/src/h3/connection.rs +++ b/src/h3/connection.rs @@ -4998,7 +4998,7 @@ mod tests { assert_eq!(s.client_poll(), Err(Http3Error::IdError)); } - // Server try to send GOAWAY frame on a uninitialized control stream. + // Server try to send GOAWAY frame on an uninitialized control stream. #[test] fn server_send_goaway_on_uninitialized_control_stream() { let mut s = Session::new().unwrap(); diff --git a/src/lib.rs b/src/lib.rs index 459016ac..f48ccfe0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -754,7 +754,7 @@ impl EventQueue { self.0 = Some(VecDeque::new()); } - /// Add a endpoint-faceing event. + /// Add an endpoint-faceing event. fn add(&mut self, e: Event) -> bool { if let Some(events) = &mut self.0 { events.push_back(e); @@ -763,7 +763,7 @@ impl EventQueue { false } - /// Return a endpoint-facing event. + /// Return an endpoint-facing event. fn poll(&mut self) -> Option { if let Some(events) = &mut self.0 { return events.pop_front(); @@ -867,6 +867,15 @@ pub enum Shutdown { Write = 1, } +/// Important events about path +pub enum PathEvent { + /// The path has been validated. + Validated(usize), + + /// The path has been abandoned. + Abandoned(usize), +} + #[cfg(test)] mod tests { use super::*; @@ -875,6 +884,7 @@ mod tests { fn init() { env_logger::builder() .filter_level(log::LevelFilter::Trace) + .format_timestamp_millis() .is_test(true) .init(); } diff --git a/src/multipath_scheduler/multipath_scheduler.rs b/src/multipath_scheduler/multipath_scheduler.rs index 6f2faf8d..1e3e284d 100644 --- a/src/multipath_scheduler/multipath_scheduler.rs +++ b/src/multipath_scheduler/multipath_scheduler.rs @@ -26,6 +26,7 @@ use crate::connection::space::SentPacket; use crate::connection::stream::StreamMap; use crate::Error; use crate::MultipathConfig; +use crate::PathEvent; use crate::Result; /// MultipathScheduler is a packet scheduler that decides the path over which @@ -53,6 +54,9 @@ pub(crate) trait MultipathScheduler { streams: &mut StreamMap, ) { } + + /// Process a path event. + fn on_path_updated(&mut self, paths: &mut PathMap, event: PathEvent) {} } /// Available multipath scheduling algorithms. @@ -106,7 +110,7 @@ pub(crate) fn build_multipath_scheduler(conf: &MultipathConfig) -> Box bool { +pub(crate) fn buffer_required(algor: MultipathAlgorithm) -> bool { match algor { MultipathAlgorithm::MinRtt => false, MultipathAlgorithm::Redundant => true, diff --git a/src/multipath_scheduler/scheduler_redundant.rs b/src/multipath_scheduler/scheduler_redundant.rs index b88000d9..d089cc47 100644 --- a/src/multipath_scheduler/scheduler_redundant.rs +++ b/src/multipath_scheduler/scheduler_redundant.rs @@ -16,6 +16,7 @@ use log::*; use std::time::Instant; use crate::connection::path::PathMap; +use crate::connection::space::BufferType; use crate::connection::space::PacketNumSpaceMap; use crate::connection::space::SentPacket; use crate::connection::stream::StreamMap; @@ -68,7 +69,7 @@ impl MultipathScheduler for RedundantScheduler { spaces: &mut PacketNumSpaceMap, streams: &mut StreamMap, ) { - if packet.reinjected { + if packet.buffer_flags.has_buffered() { return; } @@ -84,7 +85,7 @@ impl MultipathScheduler for RedundantScheduler { for frame in &packet.frames { if let Frame::Stream { .. } = frame { debug!("RedundantScheduler: inject {:?} on path {:?}", frame, pid); - space.reinject.frames.push_back(frame.clone()); + space.buffered.push_back(frame.clone(), BufferType::High); } } } diff --git a/src/qlog/events.rs b/src/qlog/events.rs index b2385018..4967847e 100644 --- a/src/qlog/events.rs +++ b/src/qlog/events.rs @@ -248,6 +248,7 @@ pub enum EventData { #[serde(rename = "quic:packet_sent")] QuicPacketSent { header: PacketHeader, + frames: Option>, is_coalesced: Option, retry_token: Option, stateless_reset_token: Option, @@ -262,6 +263,7 @@ pub enum EventData { #[serde(rename = "quic:packet_received")] QuicPacketReceived { header: PacketHeader, + frames: Option>, is_coalesced: Option, retry_token: Option, stateless_reset_token: Option, @@ -854,7 +856,7 @@ pub enum ConnectionState { Closed, } -#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug)] +#[derive(Serialize, Deserialize, Clone, PartialEq, Eq, Debug, Default)] #[serde(rename_all = "snake_case")] pub enum PacketType { Initial, @@ -865,6 +867,7 @@ pub enum PacketType { OneRtt, Retry, VersionNegotiation, + #[default] Unknown, } @@ -877,7 +880,7 @@ pub enum PacketNumberSpace { } #[serde_with::skip_serializing_none] -#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug)] +#[derive(Clone, Serialize, Deserialize, PartialEq, Eq, Debug, Default)] pub struct PacketHeader { pub packet_type: PacketType, pub packet_number: u64, @@ -1659,6 +1662,7 @@ pub mod tests { let pkt_hdr = new_test_pkt_hdr(PacketType::Initial); let event_data = EventData::QuicPacketSent { header: pkt_hdr, + frames: None, is_coalesced: None, retry_token: None, stateless_reset_token: None, diff --git a/src/qlog/qlog.rs b/src/qlog/qlog.rs index 849d42fd..b95f7ef7 100644 --- a/src/qlog/qlog.rs +++ b/src/qlog/qlog.rs @@ -32,7 +32,12 @@ pub const QLOG_VERSION: &str = "0.4"; /// The serialization format for QlogFileSeq is JSON-SEQ /// See RFC 7464: JavaScript Object Notation (JSON) Text Sequences -pub const JSON_TEXT_SEQS: &str = "JSON-SEQ"; +pub const JSON_SEQ_FORMAT: &str = "JSON-SEQ"; + +/// JSON Text Sequences are very similar to JSON, except that JSON objects are +/// serialized as individual records, each prefixed by an ASCII Record Separator +/// (, 0x1E), and each ending with an ASCII Line Feed character (\n, 0x0A). +pub const JSON_SEQ_RS: &[u8] = b"\x1e"; /// A qlog file using the QlogFileSeq schema can be serialized to a streamable /// JSON format called JSON Text Sequences (JSON-SEQ) ([RFC7464]) @@ -211,8 +216,8 @@ impl QlogWriter { start_time: std::time::Instant, ) -> Self { let qlog = QlogFileSeq { - qlog_format: crate::qlog::JSON_TEXT_SEQS.to_string(), - qlog_version: crate::qlog::QLOG_VERSION.to_string(), + qlog_format: JSON_SEQ_FORMAT.to_string(), + qlog_version: QLOG_VERSION.to_string(), title, description, trace, @@ -233,7 +238,7 @@ impl QlogWriter { return Err(Error::Done); } - self.writer.as_mut().write_all(b" ")?; + self.writer.as_mut().write_all(JSON_SEQ_RS)?; serde_json::to_writer(self.writer.as_mut(), &self.qlog).map_err(|_| Error::Done)?; self.writer.as_mut().write_all(b"\n")?; self.ready = true; @@ -254,7 +259,7 @@ impl QlogWriter { pub fn add_event(&mut self, event: Event) -> Result<()> { self.check(event.importance())?; - self.writer.as_mut().write_all(b" ")?; + self.writer.as_mut().write_all(JSON_SEQ_RS)?; serde_json::to_writer(self.writer.as_mut(), &event).map_err(|_| Error::Done)?; self.writer.as_mut().write_all(b"\n")?; Ok(()) @@ -365,10 +370,12 @@ mod tests { assert_eq!( log, - r#" {"qlog_format":"JSON-SEQ","qlog_version":"0.4","title":"title","description":"description","trace":{"title":"qlog trace","description":"qlog trace description","vantage_point":{"type":"server"}}} - {"time":0.0,"name":"connectivity:connection_state_updated","data":{"new":"handshake_completed"}} - {"time":0.0,"name":"connectivity:connection_state_updated","data":{"new":"handshake_confirmed"}} -"# + format!( + "\u{1e}{}\n\u{1e}{}\n\u{1e}{}\n", + r#"{"qlog_format":"JSON-SEQ","qlog_version":"0.4","title":"title","description":"description","trace":{"title":"qlog trace","description":"qlog trace description","vantage_point":{"type":"server"}}}"#, + r#"{"time":0.0,"name":"connectivity:connection_state_updated","data":{"new":"handshake_completed"}}"#, + r#"{"time":0.0,"name":"connectivity:connection_state_updated","data":{"new":"handshake_confirmed"}}"#, + ) ); Ok(()) diff --git a/src/token.rs b/src/token.rs index 5aa53f03..5f3cb100 100644 --- a/src/token.rs +++ b/src/token.rs @@ -41,7 +41,7 @@ pub enum AddressTokenType { ResumeToken = 1, } -/// QUIC uses a address token in the Initial packet to provide address validation +/// QUIC uses an address token in the Initial packet to provide address validation /// prior to completing the handshake. #[derive(Debug)] pub struct AddressToken { diff --git a/tools/Cargo.toml b/tools/Cargo.toml index dac337c9..02e3c618 100644 --- a/tools/Cargo.toml +++ b/tools/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "tquic_tools" -version = "0.6.0" +version = "0.7.0" edition = "2021" rust-version = "1.70.0" license = "Apache-2.0" @@ -23,7 +23,7 @@ rand = "0.8.5" statrs = "0.16" jemallocator = { version = "0.5", package = "tikv-jemallocator" } signal-hook = "0.3.17" -tquic = { path = "..", version = "0.6.0"} +tquic = { path = "..", version = "0.7.0"} [lib] crate-type = ["lib"] diff --git a/tools/README.md b/tools/README.md new file mode 100644 index 00000000..5c997241 --- /dev/null +++ b/tools/README.md @@ -0,0 +1,25 @@ +# TQUIC tools + +[TQUIC](https://github.com/Tencent/tquic) is a high-performance, lightweight, and cross-platform library for the [IETF QUIC](https://datatracker.ietf.org/wg/quic/bout/) protocol. + +The crate contains client and server tools based on TQUIC: +- tquic_client: A QUIC and HTTP/3 client. It's also an HTTP/3 benchmarking tool. +- tquic_server: A QUIC and HTTP/3 static file server. + + +## Installation + +``` +cargo install tquic_tools +``` + + +## Documentation + +- [English version](https://tquic.net/docs/getting_started/demo/) +- [Chinese version](https://tquic.net/zh/docs/getting_started/demo/) + + +## License + +The project is under the Apache 2.0 license. diff --git a/tools/script/tquic_qvis.sh b/tools/script/tquic_qvis.sh new file mode 100755 index 00000000..309b7f5b --- /dev/null +++ b/tools/script/tquic_qvis.sh @@ -0,0 +1,35 @@ +#!/bin/bash + +# This tool is used to convert qlogs generated by tquic tools from JSON-SEQ +# format to JSON format, and make other changes to be compatible with qvis. +# See https://qvis.quictools.info + +# Check whether jq is installed +if ! command -v jq &> /dev/null +then + echo "Please install jq to use this script." + echo "See https://jqlang.github.io/jq/download/" + exit 1 +fi + +# Check whether a file name is provided +if [ "$#" -ne 1 ]; then + echo "Usage: $0 " + echo "Description: convert qlog from JSON-SEQ to JSON format compatible with qvis" + exit 1 +fi + +# Check whether the file exists +if [ ! -f "$1" ]; then + echo "Error: File '$1' not found." + exit 1 +fi + +# Convert to JSON format +OUT="$1.qvis.json" +sed 's/^\x1e//' $1 | jq -s '.[1:] as $events | .[0] | .trace.events=$events | .traces=[.trace] | del(.trace) | .qlog_format="JSON"' > $OUT + +# Change for backward compatibility +# Note qvis reportedly does not plan to implement qlog 0.4 and will continue +# using 0.3 until a 1.0 RC is specced. +sed -i -e 's/name": "quic:/name": "transport:/' -e 's/"qlog_version": "0.4"/"qlog_version": "0.3"/' $OUT diff --git a/tools/src/bin/tquic_client.rs b/tools/src/bin/tquic_client.rs index b5304c0b..cae6b14a 100644 --- a/tools/src/bin/tquic_client.rs +++ b/tools/src/bin/tquic_client.rs @@ -20,10 +20,12 @@ use std::fs::create_dir_all; use std::fs::File; use std::io::BufWriter; use std::io::Write; +use std::net::IpAddr; use std::net::Ipv4Addr; use std::net::Ipv6Addr; use std::net::SocketAddr; use std::net::ToSocketAddrs; +use std::path::Path; use std::rc::Rc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; @@ -120,6 +122,10 @@ pub struct ClientOpt { #[clap(long, default_value = "INFO", value_name = "STR")] pub log_level: log::LevelFilter, + /// Log file path. If no file is specified, logs will be written to `stderr`. + #[clap(long, value_name = "FILE")] + pub log_file: Option, + /// Override server's address. #[clap(short, long, value_name = "ADDR")] pub connect_to: Option, @@ -137,7 +143,7 @@ pub struct ClientOpt { /// Dump response body into the given directory. /// If the specified directory does not exist, a new directory will be created. #[clap(long, value_name = "DIR")] - pub dump_path: Option, + pub dump_dir: Option, /// File used for session resumption. #[clap(short, long, value_name = "FILE")] @@ -145,7 +151,6 @@ pub struct ClientOpt { /// Enable early data. #[clap(short, long)] - // TODO: support early data. pub enable_early_data: bool, /// Disable stateless reset. @@ -172,9 +177,13 @@ pub struct ClientOpt { #[clap(long, default_value = "MINRTT")] pub multipath_algor: MultipathAlgorithm, - /// Extra local addresses for client. - #[clap(long, value_delimiter = ' ', value_name = "ADDR")] - pub local_addresses: Vec, + /// Optional local IP addresses for client. e.g 192.168.1.10,192.168.2.20 + #[clap(long, value_delimiter = ',', value_name = "ADDR")] + pub local_addresses: Vec, + + /// Set active_connection_id_limit transport parameter. Values lower than 2 will be ignored. + #[clap(long, default_value = "2", value_name = "NUM")] + pub active_cid_limit: u64, /// Set max_udp_payload_size transport parameter. #[clap(long, default_value = "65527", value_name = "NUM")] @@ -197,7 +206,7 @@ pub struct ClientOpt { pub initial_rtt: u64, /// Linear factor for calculating the probe timeout. - #[clap(long, default_value = "3", value_name = "NUM")] + #[clap(long, default_value = "10", value_name = "NUM")] pub pto_linear_factor: u64, /// Upper limit of probe timeout in microseconds. @@ -208,9 +217,9 @@ pub struct ClientOpt { #[clap(short, long, value_name = "FILE")] pub keylog_file: Option, - /// Save QUIC qlog into the given file. - #[clap(long, value_name = "FILE")] - pub qlog_file: Option, + /// Save qlog file (.qlog) into the given directory. + #[clap(long, value_name = "DIR")] + pub qlog_dir: Option, /// Length of connection id in bytes. #[clap(long, default_value = "8", value_name = "NUM")] @@ -257,12 +266,12 @@ impl Client { pub fn start(&mut self) { self.start_time = Instant::now(); let mut threads = vec![]; - for i in 0..self.option.threads { + for _ in 0..self.option.threads { let client_opt = self.option.clone(); let client_ctx = self.context.clone(); let terminated = self.terminated.clone(); let thread = thread::spawn(move || { - let mut worker = Worker::new(i, client_opt, client_ctx, terminated).unwrap(); + let mut worker = Worker::new(client_opt, client_ctx, terminated).unwrap(); worker.start().unwrap(); }); threads.push(thread); @@ -412,7 +421,6 @@ struct Worker { impl Worker { /// Create a new single thread client. pub fn new( - index: u32, option: ClientOpt, client_ctx: Arc>, terminated: Arc, @@ -435,6 +443,7 @@ impl Worker { config.set_min_congestion_window(option.min_congestion_window); config.enable_multipath(option.enable_multipath); config.set_multipath_algorithm(option.multipath_algor); + config.set_active_connection_id_limit(option.active_cid_limit); let tls_config = TlsConfig::new_client_config( ApplicationProto::convert_to_vec(&option.alpn), option.enable_early_data, @@ -445,17 +454,36 @@ impl Worker { let registry = poll.registry(); let worker_ctx = Rc::new(RefCell::new(WorkerContext::with_option(&option))); let senders = Rc::new(RefCell::new(FxHashMap::default())); - let handlers = WorkerHandler::new(&option, worker_ctx.clone(), senders.clone()); + // Use unspecified local addr or the given local addr let remote = option.connect_to.unwrap(); - let mut sock = QuicSocket::new_client_socket(remote.is_ipv4(), registry)?; - if index == 0 && !option.local_addresses.is_empty() { - for local in &option.local_addresses { - let _ = sock.add(local, registry); + let local = if !option.local_addresses.is_empty() { + SocketAddr::new(option.local_addresses[0], 0) + } else { + match remote.is_ipv4() { + true => SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 0), + false => SocketAddr::new(IpAddr::V6(Ipv6Addr::UNSPECIFIED), 0), + } + }; + let mut sock = QuicSocket::new(&local, registry)?; + + let mut assigned_addrs = Vec::new(); + assigned_addrs.push(sock.local_addr()); + if let Some(addrs) = option.local_addresses.get(1..) { + for local in addrs { + let addr = sock.add(&SocketAddr::new(*local, 0), registry)?; + assigned_addrs.push(addr); } } let sock = Rc::new(sock); + let handlers = WorkerHandler::new( + &option, + &assigned_addrs, + worker_ctx.clone(), + senders.clone(), + ); + Ok(Worker { option, endpoint: Endpoint::new(Box::new(config), false, Box::new(handlers), sock.clone()), @@ -755,7 +783,7 @@ impl Request { } // TODO: support custom headers. - fn new(method: &str, url: &Url, body: &Option>, dump_path: &Option) -> Self { + fn new(method: &str, url: &Url, body: &Option>, dump_dir: &Option) -> Self { let authority = match url.port() { Some(port) => format!("{}:{}", url.host_str().unwrap(), port), None => url.host_str().unwrap().to_string(), @@ -778,7 +806,7 @@ impl Request { url: url.clone(), line: format!("GET {}\r\n", url.path()), headers, - response_writer: Self::make_response_writer(url, dump_path), + response_writer: Self::make_response_writer(url, dump_dir), start_time: None, } } @@ -875,7 +903,7 @@ impl RequestSender { /// Receive responses. pub fn recv_responses(&mut self, conn: &mut Connection, stream_id: u64) { if self.streams.get_mut(&stream_id).is_none() { - debug!("{} stream {} not exist", conn.trace_id(), stream_id); + debug!("{} stream {} request not exist", conn.trace_id(), stream_id); return; } @@ -891,7 +919,7 @@ impl RequestSender { fn send_request(&mut self, conn: &mut Connection) -> Result<()> { let url = &self.option.urls[self.current_url_idx]; - let mut request = Request::new("GET", url, &None, &self.option.dump_path); + let mut request = Request::new("GET", url, &None, &self.option.dump_dir); debug!( "{} send request {} current index {}", conn.trace_id(), @@ -1046,20 +1074,6 @@ impl RequestSender { worker_ctx.request_done += 1; Self::sample_request_time(request, &mut worker_ctx); self.streams.remove(&stream_id); - - if self.request_done == self.option.max_requests_per_conn { - worker_ctx.concurrent_conns -= 1; - debug!( - "{} all requests finished, close connection", - conn.trace_id() - ); - match conn.close(true, 0x00, b"ok") { - Ok(_) | Err(Error::Done) => (), - Err(e) => panic!("error closing conn: {:?}", e), - } - - return; - } } } } @@ -1113,20 +1127,6 @@ impl RequestSender { let request = self.streams.get_mut(&stream_id).unwrap(); Self::sample_request_time(request, &mut worker_ctx); self.streams.remove(&stream_id); - - if self.request_done == self.option.max_requests_per_conn { - worker_ctx.concurrent_conns -= 1; - debug!( - "{} all requests finished, close connection", - conn.trace_id() - ); - match conn.close(true, 0x00, b"ok") { - Ok(_) | Err(Error::Done) => (), - Err(e) => panic!("error closing conn: {:?}", e), - } - - return; - } } Ok((stream_id, tquic::h3::Http3Event::Reset(e))) => { error!( @@ -1184,13 +1184,14 @@ struct WorkerHandler { /// Remote server. remote: SocketAddr, - /// Extra local addresses. + /// Local address list local_addresses: Vec, } impl WorkerHandler { fn new( option: &ClientOpt, + local_addresses: &[SocketAddr], worker_ctx: Rc>, senders: Rc>>, ) -> Self { @@ -1199,7 +1200,50 @@ impl WorkerHandler { worker_ctx, senders, remote: option.connect_to.unwrap(), - local_addresses: option.local_addresses.clone(), + local_addresses: local_addresses.to_owned(), + } + } + + fn try_new_request_sender(&mut self, conn: &mut Connection) { + let index = conn.index().unwrap(); + let mut senders = self.senders.borrow_mut(); + let sender = senders.get(&index); + if sender.is_some() { + return; + } + + let sender = RequestSender::new(&self.option, conn, self.worker_ctx.clone()); + senders.insert(index, sender); + } + + fn try_close_conn(&mut self, conn: &mut Connection) { + let index = conn.index().unwrap(); + let senders = self.senders.borrow_mut(); + let sender = senders.get(&index); + if let Some(s) = sender { + if s.request_done == s.option.max_requests_per_conn && !conn.is_closing() { + let mut worker_ctx = self.worker_ctx.borrow_mut(); + worker_ctx.concurrent_conns -= 1; + debug!( + "{} all requests finished, close connection", + conn.trace_id() + ); + match conn.close(true, 0x00, b"ok") { + Ok(_) | Err(Error::Done) => (), + Err(e) => panic!("error closing conn: {:?}", e), + } + } + } + } + + fn try_send_request(&mut self, conn: &mut Connection) { + let index = conn.index().unwrap(); + let mut senders = self.senders.borrow_mut(); + let sender = senders.get_mut(&index); + if let Some(s) = sender { + s.send_requests(conn); + } else { + error!("{} sender not exist", conn.trace_id()); } } } @@ -1220,11 +1264,13 @@ impl TransportHandler for WorkerHandler { } } - if let Some(qlog_file) = &self.option.qlog_file { + if let Some(qlog_dir) = &self.option.qlog_dir { + let qlog_file = format!("{}.qlog", conn.trace_id()); + let qlog_file = Path::new(qlog_dir).join(qlog_file); if let Ok(qlog) = std::fs::OpenOptions::new() .create(true) .append(true) - .open(qlog_file) + .open(qlog_file.as_path()) { conn.set_qlog( Box::new(qlog), @@ -1232,9 +1278,13 @@ impl TransportHandler for WorkerHandler { format!("id={}", conn.trace_id()), ); } else { - error!("{} set qlog failed", conn.trace_id()); + error!("{} set qlog {:?} failed", conn.trace_id(), qlog_file); } } + + if conn.is_in_early_data() { + self.try_new_request_sender(conn); + } } fn on_conn_established(&mut self, conn: &mut Connection) { @@ -1250,29 +1300,27 @@ impl TransportHandler for WorkerHandler { } // Try to add additional paths - for local in &self.local_addresses { - match conn.add_path(*local, self.remote) { - Ok(_) => debug!( - "{} add new path {}-{}", - conn.trace_id(), - *local, - self.remote - ), - Err(e) => debug!( - "{} fail to add path {}-{}: {}", - conn.trace_id(), - *local, - self.remote, - e - ), + if let Some(addrs) = self.local_addresses.get(1..) { + for local in addrs { + match conn.add_path(*local, self.remote) { + Ok(_) => debug!( + "{} add new path {}-{}", + conn.trace_id(), + *local, + self.remote + ), + Err(e) => debug!( + "{} fail to add path {}-{}: {}", + conn.trace_id(), + *local, + self.remote, + e + ), + } } } - let mut sender = RequestSender::new(&self.option, conn, self.worker_ctx.clone()); - sender.send_requests(conn); - let mut senders = self.senders.borrow_mut(); - let index = conn.index().unwrap(); - senders.insert(index, sender); + self.try_new_request_sender(conn); } fn on_conn_closed(&mut self, conn: &mut Connection) { @@ -1333,7 +1381,7 @@ impl TransportHandler for WorkerHandler { if let Some(s) = sender { s.recv_responses(conn, stream_id); } else { - error!("{} stream {} not exist", conn.trace_id(), stream_id); + debug!("{} stream {} sender not exist", conn.trace_id(), stream_id); } } @@ -1344,14 +1392,8 @@ impl TransportHandler for WorkerHandler { fn on_stream_closed(&mut self, conn: &mut Connection, stream_id: u64) { debug!("{} stream {} is closed", conn.trace_id(), stream_id); - let index = conn.index().unwrap(); - let mut senders = self.senders.borrow_mut(); - let sender = senders.get_mut(&index); - if let Some(s) = sender { - s.send_requests(conn); - } else { - error!("{} stream {} not exist", conn.trace_id(), stream_id); - } + self.try_send_request(conn); + self.try_close_conn(conn); } fn on_new_token(&mut self, _conn: &mut Connection, _token: Vec) {} @@ -1392,20 +1434,29 @@ fn parse_option() -> std::result::Result { Ok(option) } -fn process_option(option: &mut ClientOpt) { - env_logger::builder().filter_level(option.log_level).init(); +fn process_option(option: &mut ClientOpt) -> Result<()> { + env_logger::builder() + .target(tquic_tools::log_target(&option.log_file)?) + .filter_level(option.log_level) + .format_timestamp_millis() + .init(); + + if let Some(dump_dir) = &option.dump_dir { + if let Err(e) = create_dir_all(dump_dir) { + warn!("create dump directory {} error: {:?}", dump_dir, e); + return Err(Box::new(e)); + } + } - if let Some(dump_path) = &option.dump_path { - if let Err(e) = create_dir_all(dump_path) { - warn!( - "create dump path directory error: {:?}, can't dump response body", - e - ); - option.dump_path = None; + if let Some(qlog_dir) = &option.qlog_dir { + if let Err(e) = create_dir_all(qlog_dir) { + warn!("create qlog directory {} error: {:?}", qlog_dir, e); + return Err(Box::new(e)); } } process_connect_address(option); + Ok(()) } fn main() -> Result<()> { @@ -1416,7 +1467,7 @@ fn main() -> Result<()> { }; // Process client option. - process_option(&mut option); + process_option(&mut option)?; // Create client. let mut client = Client::new(option)?; diff --git a/tools/src/bin/tquic_server.rs b/tools/src/bin/tquic_server.rs index 781a5b3d..f6392e1f 100644 --- a/tools/src/bin/tquic_server.rs +++ b/tools/src/bin/tquic_server.rs @@ -16,16 +16,17 @@ use std::cmp; use std::collections::HashMap; +use std::fs::create_dir_all; use std::fs::File; use std::net::SocketAddr; use std::path; +use std::path::Path; use std::rc::Rc; use std::time::Instant; use bytes::Bytes; use clap::Parser; -use log::debug; -use log::error; +use log::*; use mio::event::Event; use rustc_hash::FxHashMap; @@ -70,6 +71,10 @@ pub struct ServerOpt { #[clap(long, default_value = "INFO")] pub log_level: log::LevelFilter, + /// Log file path. If no file is specified, logs will be written to `stderr`. + #[clap(long, value_name = "FILE")] + pub log_file: Option, + /// Address to listen. #[clap(short, long, default_value = "0.0.0.0:4433", value_name = "ADDR")] pub listen: SocketAddr, @@ -114,6 +119,10 @@ pub struct ServerOpt { #[clap(long, default_value = "MINRTT")] pub multipath_algor: MultipathAlgorithm, + /// Set active_connection_id_limit transport parameter. Values lower than 2 will be ignored. + #[clap(long, default_value = "2", value_name = "NUM")] + pub active_cid_limit: u64, + /// Set max_udp_payload_size transport parameter. #[clap(long, default_value = "65527", value_name = "NUM")] pub recv_udp_payload_size: u16, @@ -135,7 +144,7 @@ pub struct ServerOpt { pub initial_rtt: u64, /// Linear factor for calculating the probe timeout. - #[clap(long, default_value = "3", value_name = "NUM")] + #[clap(long, default_value = "10", value_name = "NUM")] pub pto_linear_factor: u64, /// Upper limit of probe timeout in microseconds. @@ -146,9 +155,9 @@ pub struct ServerOpt { #[clap(long, value_name = "FILE")] pub keylog_file: Option, - /// Save QUIC qlog into the given file. - #[clap(long, value_name = "FILE")] - pub qlog_file: Option, + /// Save qlog file (.qlog) into the given directory. + #[clap(long, value_name = "DIR")] + pub qlog_dir: Option, /// Length of connection id in bytes. #[clap(long, default_value = "8", value_name = "NUM")] @@ -196,6 +205,7 @@ impl Server { config.set_min_congestion_window(option.min_congestion_window); config.enable_multipath(option.enable_multipath); config.set_multipath_algorithm(option.multipath_algor); + config.set_active_connection_id_limit(option.active_cid_limit); if let Some(address_token_key) = &option.address_token_key { let address_token_key = convert_address_token_key(address_token_key); @@ -632,8 +642,8 @@ struct ServerHandler { /// SSL key logger keylog: Option, - /// Qlog file - qlog: Option, + /// Qlog directory + qlog_dir: Option, } impl ServerHandler { @@ -648,22 +658,12 @@ impl ServerHandler { None => None, }; - let qlog = match &option.qlog_file { - Some(qlog_file) => Some( - std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(qlog_file)?, - ), - None => None, - }; - Ok(Self { root: option.root.clone(), buf: vec![0; MAX_BUF_SIZE], conns: FxHashMap::default(), keylog, - qlog, + qlog_dir: option.qlog_dir.clone(), }) } @@ -699,13 +699,28 @@ impl TransportHandler for ServerHandler { } } - if let Some(qlog) = &mut self.qlog { - if let Ok(qlog) = qlog.try_clone() { + // The qlog of each server connection is written to a different log file + // in JSON-SEQ format. + // + // Note: The server qlogs can also be written to the same file, with a + // recommended prefix for each line of logs that includes the trace id. + // The qlog of each connection can be then extracted by offline log + // processing. + if let Some(qlog_dir) = &self.qlog_dir { + let qlog_file = format!("{}.qlog", conn.trace_id()); + let qlog_file = Path::new(qlog_dir).join(qlog_file); + if let Ok(qlog) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(qlog_file.as_path()) + { conn.set_qlog( Box::new(qlog), "server qlog".into(), format!("id={}", conn.trace_id()), ); + } else { + error!("{} set qlog {:?} failed", conn.trace_id(), qlog_file); } } } @@ -758,11 +773,26 @@ impl TransportHandler for ServerHandler { fn on_new_token(&mut self, _conn: &mut Connection, _token: Vec) {} } -fn main() -> Result<()> { - let option = ServerOpt::parse(); +fn process_option(option: &mut ServerOpt) -> Result<()> { + env_logger::builder() + .target(tquic_tools::log_target(&option.log_file)?) + .filter_level(option.log_level) + .format_timestamp_millis() + .init(); + + if let Some(qlog_dir) = &option.qlog_dir { + if let Err(e) = create_dir_all(qlog_dir) { + warn!("create qlog directory {} error: {:?}", qlog_dir, e); + return Err(Box::new(e)); + } + } + Ok(()) +} - // Initialize logging. - env_logger::builder().filter_level(option.log_level).init(); +fn main() -> Result<()> { + // Parse and process server option + let mut option = ServerOpt::parse(); + process_option(&mut option)?; // Initialize HTTP file server. let mut server = Server::new(&option)?; diff --git a/tools/src/common.rs b/tools/src/common.rs index 229929d9..5e600f0d 100644 --- a/tools/src/common.rs +++ b/tools/src/common.rs @@ -13,14 +13,12 @@ // limitations under the License. use std::io::ErrorKind; -use std::net::IpAddr; -use std::net::Ipv4Addr; -use std::net::Ipv6Addr; use std::net::SocketAddr; use clap::builder::PossibleValue; use clap::ValueEnum; -use log::debug; +use env_logger::Target; +use log::*; use mio::net::UdpSocket; use mio::Interest; use mio::Registry; @@ -122,21 +120,13 @@ impl QuicSocket { }) } - pub fn new_client_socket(is_ipv4: bool, registry: &Registry) -> Result { - let local = match is_ipv4 { - true => IpAddr::V4(Ipv4Addr::UNSPECIFIED), - false => IpAddr::V6(Ipv6Addr::UNSPECIFIED), - }; - QuicSocket::new(&SocketAddr::new(local, 0), registry) - } - /// Return the local address of the initial socket. pub fn local_addr(&self) -> SocketAddr { self.local_addr } /// Add additional socket binding with given local address. - pub fn add(&mut self, local: &SocketAddr, registry: &Registry) -> Result<()> { + pub fn add(&mut self, local: &SocketAddr, registry: &Registry) -> Result { let socket = UdpSocket::bind(*local)?; let local_addr = socket.local_addr()?; let sid = self.socks.insert(socket); @@ -144,7 +134,7 @@ impl QuicSocket { let socket = self.socks.get_mut(sid).unwrap(); registry.register(socket, Token(sid), Interest::READABLE)?; - Ok(()) + Ok(local_addr) } /// Delete socket binding with given local address. @@ -222,3 +212,19 @@ impl PacketSendHandler for QuicSocket { Ok(count) } } + +/// Get the target for the log output. +pub fn log_target(log_file: &Option) -> Result { + if let Some(log_file) = log_file { + if let Ok(file) = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(log_file) + { + return Ok(Target::Pipe(Box::new(file))); + } + return Err(format!("create log file {:?} failed", log_file).into()); + } + + Ok(Target::Stderr) +} diff --git a/tools/tests/tquic_tools_test.sh b/tools/tests/tquic_tools_test.sh new file mode 100755 index 00000000..ad1098ef --- /dev/null +++ b/tools/tests/tquic_tools_test.sh @@ -0,0 +1,139 @@ +#!/bin/bash + +# Copyright (c) 2024 The TQUIC Authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This simple script contains additional end-to-end test cases for tquic tools. +# When conditions permit, it's plan to implement all of the following test cases +# in the `github.com/tquic-group/quic-interop-runner` repo. + +set -e + +BIN_DIR="./" +TEST_DIR="./test-`date +%Y%m%d%H%M%S`" +TEST_CASES="multipath_minrtt,multipath_roundrobin,multipath_redundant" + +show_help() { + echo "Usage: $0 [options]" + echo " -b, Set the directory of tquic_client/tquic_server." + echo " -w, Set the workring directory for testing." + echo " -l, List all supported test cases." + echo " -t, Run the specified test cases." + echo " -h, Display this help and exit." +} + +while getopts ":b:w:t:lh" opt; do + case $opt in + b) + BIN_DIR="$OPTARG" + ;; + w) + TEST_DIR="$OPTARG" + ;; + t) + TEST_CASES="$OPTARG" + ;; + l) + echo $TEST_CASES + exit 0 + ;; + h) + show_help + exit 0 + ;; + \?) + echo "Invalid option: -$OPTARG" >&2 + show_help + exit 1 + ;; + :) + echo "Option -$OPTARG requires an argument." >&2 + exit 1 + ;; + esac +done + +generate_cert() { + local cert_dir="$1/cert" + mkdir -p $cert_dir + openssl genpkey -algorithm RSA -out $cert_dir/cert.key -pkeyopt rsa_keygen_bits:2048 -quiet + openssl req -new -key $cert_dir/cert.key -out $cert_dir/cert.csr -subj "/C=CN/ST=beijing/O=tquic/CN=example.org" + openssl x509 -req -in $cert_dir/cert.csr -signkey $cert_dir/cert.key -out $cert_dir/cert.crt +} + +generate_files() { + local data_dir="$1/data" + mkdir -p $data_dir + dd if=/dev/urandom of=$data_dir/1m bs=1M count=1 + dd if=/dev/urandom of=$data_dir/10m bs=1M count=10 +} + +test_multipath() { + local test_dir=$1 + local algor=$2 + echo "[-] Running multipath test for $algor" + + # prepare environment + local cert_dir="$test_dir/cert" + local data_dir="$test_dir/data" + local dump_dir="$test_dir/dump" + local qlog_dir="$test_dir/qlog" + + generate_cert $test_dir + generate_files $test_dir + + # start tquic server + $BIN_DIR/tquic_server -l 127.0.8.8:8443 --enable-multipath --multipath-algor $algor \ + --cert $cert_dir/cert.crt --key $cert_dir/cert.key \ + --root $data_dir --active-cid-limit 8 & + server_pid=$! + trap "kill $server_pid" RETURN + + # start tquic client + mkdir -p $dump_dir + $BIN_DIR/tquic_client -c 127.0.8.8:8443 --enable-multipath --multipath-algor $algor \ + --local-addresses 127.0.0.1,127.0.0.2,127.0.0.3,127.0.0.4 --active-cid-limit 8 \ + --qlog-dir $qlog_dir --log-file $test_dir/client.log --log-level trace \ + --dump-dir $dump_dir \ + https://example.org/10m + + # check files + if ! cmp -s $dump_dir/10m $data_dir/10m; then + echo "Files not same $dump_dir/1m:$data_dir/1m" + exit 1 + fi + + # check logs + grep "recv packet OneRTT" $test_dir/client.log | grep "local=.*" -o | sort | uniq -c + + echo "Test $algor OK" +} + +echo "$TEST_CASES" | sed 's/,/\n/g' | while read -r TEST_CASE; do + case $TEST_CASE in + multipath_minrtt) + test_multipath "$TEST_DIR/minrtt" minrtt + ;; + multipath_redundant) + test_multipath "$TEST_DIR/redundant" redundant + ;; + multipath_roundrobin) + test_multipath "$TEST_DIR/roundrobin" roundrobin + ;; + *) + echo "[x] Unknown test case $TEST_CASE" + ;; + esac +done +