Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update multipath #168

Merged
merged 1 commit into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 64 additions & 49 deletions src/connection/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ use self::ConnectionFlags::*;
use crate::codec;
use crate::codec::Decoder;
use crate::codec::Encoder;
use crate::connection::space::BufferFlags;
use crate::connection::space::BufferType;
use crate::connection::space::RateSamplePacketState;
use crate::error::ConnectionError;
use crate::error::Error;
Expand All @@ -65,6 +67,7 @@ use crate::FourTuple;
use crate::FourTupleIter;
use crate::MultipathConfig;
use crate::PacketInfo;
use crate::PathEvent;
use crate::RecoveryConfig;
use crate::Result;
use crate::Shutdown;
Expand Down Expand Up @@ -846,7 +849,12 @@ impl Connection {
}

Frame::PathResponse { data } => {
self.paths.on_path_resp_received(path_id, data);
if self.paths.on_path_resp_received(path_id, data) {
// Notify the path event to the multipath scheduler
if let Some(ref mut scheduler) = self.multipath_scheduler {
scheduler.on_path_updated(&mut self.paths, PathEvent::Validated(path_id));
}
}
}

frame::Frame::PathAbandon {
Expand Down Expand Up @@ -1581,7 +1589,7 @@ impl Connection {
has_data: write_status.has_data,
frames: write_status.frames,
rate_sample_state: Default::default(),
reinjected: write_status.reinjected,
buffer_flags: write_status.buffer_flags,
};
debug!(
"{} sent packet {:?} {:?} {:?}",
Expand Down Expand Up @@ -1731,8 +1739,8 @@ impl Connection {
// Write a CRYPTO frame
self.try_write_crypto_frame(out, st, pkt_type, path_id)?;

// Write reinjected frames
self.try_write_reinjected_frames(out, st, pkt_type, path_id)?;
// Write buffered frames
self.try_write_buffered_frames(out, st, pkt_type, path_id)?;

// Write STREAM frames
self.try_write_stream_frames(out, st, pkt_type, path_id)?;
Expand Down Expand Up @@ -2228,9 +2236,9 @@ impl Connection {
// Read stream data and write into the packet buffer directly.
let (frame_data_len, fin) = stream.send.read(&mut out[len + frame_hdr_len..])?;

// Retain stream data for reinjection if needed.
// Retain stream data if needed.
let data = if self.flags.contains(EnableMultipath)
&& reinjection_required(self.multipath_conf.multipath_algorithm)
&& buffer_required(self.multipath_conf.multipath_algorithm)
{
let start = len + frame_hdr_len;
Bytes::copy_from_slice(&out[start..start + frame_data_len])
Expand Down Expand Up @@ -2306,8 +2314,8 @@ impl Connection {
Ok(())
}

/// Populate reinjected frame to packet payload buffer.
fn try_write_reinjected_frames(
/// Populate buffered frame to packet payload buffer.
fn try_write_buffered_frames(
&mut self,
out: &mut [u8],
st: &mut FrameWriteStatus,
Expand All @@ -2318,33 +2326,31 @@ impl Connection {
return Ok(());
}

let out = &mut out[st.written..];
let path = self.paths.get(path_id)?;
if pkt_type != PacketType::OneRTT
|| self.is_closing()
|| out.len() <= frame::MAX_STREAM_OVERHEAD
|| out.len() - st.written <= frame::MAX_STREAM_OVERHEAD
|| !path.active()
{
return Ok(());
}

// Get reinjected frames on the path.
// Get buffered frames on the path.
let space = self
.spaces
.get_mut(path.space_id)
.ok_or(Error::InternalError)?;
let frames = &mut space.reinject.frames;
if space.buffered.is_empty() {
return Ok(());
}
debug!(
"{} try to write reinjected frames: path_id={} reinjected_frames={}",
"{} try to write buffered frames: path_id={} frames={}",
self.trace_id,
path_id,
frames.len()
space.buffered.len()
);
if frames.is_empty() {
return Ok(());
}

while let Some(frame) = frames.pop_front() {
while let Some((frame, buffer_type)) = space.buffered.pop_front() {
match frame {
Frame::Stream {
stream_id,
Expand All @@ -2358,73 +2364,82 @@ impl Connection {
_ => continue,
};

// Check acked range and injected the first non-acked subrange
// Check acked range and write the first non-acked subrange
let range = offset..offset + length as u64;
if let Some(r) = stream.send.filter_acked(range) {
let data_len = (r.end - r.start) as usize;
let frame_len = Self::reinjected_stream_frame_to_packet(
let data_len = Self::write_buffered_stream_frame_to_packet(
stream_id,
r.start,
data_len,
fin && data_len == length,
fin && r.end == offset + length as u64,
data.slice((r.start - offset) as usize..(r.end - offset) as usize),
out,
buffer_type,
st,
)?;

// Processing the following subrange.
if r.end < offset + length as u64 {
if r.start + (data_len as u64) < offset + length as u64 {
let frame = Frame::Stream {
stream_id,
offset: r.end,
length: length - (r.end - offset) as usize,
offset: r.start + data_len as u64,
length: length - data_len,
fin,
data: data.slice((r.end - offset) as usize..),
data: data.slice(data_len..),
};
frames.push_front(frame);
space.buffered.push_front(frame, buffer_type);
}

if data_len == 0 {
break;
}
}
}

// Ignore other reinjected frames.
// Ignore other buffered frames.
_ => continue,
}

let out = &mut out[st.written..];
if out.len() <= frame::MAX_STREAM_OVERHEAD {
break;
}
}

Ok(())
}

fn reinjected_stream_frame_to_packet(
fn write_buffered_stream_frame_to_packet(
stream_id: u64,
offset: u64,
length: usize,
fin: bool,
data: Bytes,
mut fin: bool,
mut data: Bytes,
out: &mut [u8],
buffer_type: BufferType,
st: &mut FrameWriteStatus,
) -> Result<usize> {
let len = frame::encode_stream_header(stream_id, offset, length as u64, fin, out)?;
out[len..len + data.len()].copy_from_slice(&data);
let out = &mut out[st.written..];
if out.len() <= frame::MAX_STREAM_OVERHEAD {
return Ok(0);
}

let hdr_len = frame::stream_header_wire_len(stream_id, offset);
let data_len = cmp::min(data.len(), out.len() - hdr_len);
if data_len < data.len() {
data.truncate(data_len);
fin = false;
}

frame::encode_stream_header(stream_id, offset, data_len as u64, fin, out)?;
out[hdr_len..hdr_len + data.len()].copy_from_slice(&data);

let frame_len = len + data.len();
st.written += frame_len;
st.written += hdr_len + data_len;
st.ack_eliciting = true;
st.in_flight = true;
st.has_data = true;
st.reinjected = true;
st.buffer_flags.mark(buffer_type);
st.frames.push(Frame::Stream {
stream_id,
offset,
length: data.len(),
fin,
data,
});
Ok(frame_len)
Ok(data_len)
}

/// Populate a QUIC frame to the give buffer.
Expand Down Expand Up @@ -2623,7 +2638,7 @@ impl Connection {
}
}

// Select a validated path with ACK/PTO/Reinjected packets to send.
// Select a validated path with ACK/PTO/Buffered packets to send.
for (pid, path) in self.paths.iter_mut() {
if !path.active() {
continue;
Expand All @@ -2636,7 +2651,7 @@ impl Connection {
if space.loss_probes > 0 {
return Ok(pid);
}
if space.need_send_reinjected_frames() && path.recovery.can_send() {
if space.need_send_buffered_frames() && path.recovery.can_send() {
return Ok(pid);
}
continue;
Expand Down Expand Up @@ -2732,7 +2747,7 @@ impl Connection {
|| path.need_send_validation_frames()
|| self.cids.need_send_cid_control_frames()
|| self.streams.need_send_stream_frames()
|| self.spaces.need_send_reinjected_frames())
|| self.spaces.need_send_buffered_frames())
{
if !self.is_server && self.tls_session.is_in_early_data() {
return Ok(PacketType::ZeroRTT);
Expand Down Expand Up @@ -3972,8 +3987,8 @@ struct FrameWriteStatus {
/// Whether the congestion window should be ignored.
is_probe: bool,

/// Whether it is a reinjected packet.
reinjected: bool,
/// Status about buffered frames written to the packet.
buffer_flags: BufferFlags,
}

/// Handshake status for loss recovery
Expand Down
59 changes: 31 additions & 28 deletions src/connection/path.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,25 +35,6 @@ pub(crate) const INITIAL_CHAL_TIMEOUT: u64 = 25;

pub(crate) const MAX_PROBING_TIMEOUTS: usize = 8;

/// The states about the path validation.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum PathState {
/// The path validation failed
Failed,

/// No path validation has been performed.
Unknown,

/// The path is under validation.
Validating,

/// The remote address has been validated, but not the path MTU.
ValidatingMTU,

/// The path has been validated.
Validated,
}

/// A network path on which QUIC packets can be sent.
pub struct Path {
/// The local address.
Expand Down Expand Up @@ -173,9 +154,10 @@ impl Path {
}

/// Handle incoming PATH_RESPONSE data.
pub(super) fn on_path_resp_received(&mut self, data: [u8; 8], multipath: bool) {
/// Return true if the path status changes to `Validated`.
pub(super) fn on_path_resp_received(&mut self, data: [u8; 8], multipath: bool) -> bool {
if self.state == PathState::Validated {
return;
return false;
}

self.verified_peer_address = true;
Expand All @@ -199,11 +181,12 @@ impl Path {
self.promote_to(PathState::Validated);
self.set_active(multipath);
self.sent_chals.clear();
return;
return true;
}

// If the MTU was not validated, probe again.
self.need_send_challenge = true;
false
}

/// Fetch a received challenge data item.
Expand Down Expand Up @@ -318,6 +301,25 @@ impl std::fmt::Debug for Path {
}
}

/// The states about the path validation.
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
pub enum PathState {
/// The path validation failed
Failed,

/// No path validation has been performed.
Unknown,

/// The path is under validation.
Validating,

/// The remote address has been validated, but not the path MTU.
ValidatingMTU,

/// The path has been validated.
Validated,
}

/// Path manager for a QUIC connection
pub(crate) struct PathMap {
/// The paths of the connection. Each path has a path identifier
Expand Down Expand Up @@ -451,10 +453,11 @@ impl PathMap {
}

/// Process a PATH_RESPONSE frame on the give path
pub fn on_path_resp_received(&mut self, path_id: usize, data: [u8; 8]) {
pub fn on_path_resp_received(&mut self, path_id: usize, data: [u8; 8]) -> bool {
if let Some(path) = self.paths.get_mut(path_id) {
path.on_path_resp_received(data, self.is_multipath);
return path.on_path_resp_received(data, self.is_multipath);
}
false
}

/// Handle the sent event of PATH_CHALLENGE.
Expand Down Expand Up @@ -600,11 +603,11 @@ mod tests {
assert_eq!(path_mgr.get_mut(pid)?.state, PathState::Validating);

// Fake receiving of unmatched PATH_RESPONSE
path_mgr.on_path_resp_received(pid, [0xab; 8]);
assert_eq!(path_mgr.on_path_resp_received(pid, [0xab; 8]), false);
assert_eq!(path_mgr.get_mut(pid)?.state, PathState::ValidatingMTU);

// Fake receiving of PATH_RESPONSE
path_mgr.on_path_resp_received(pid, data);
assert_eq!(path_mgr.on_path_resp_received(pid, data), false);
assert_eq!(path_mgr.get_mut(pid)?.path_chal_initiated(), true);
assert_eq!(path_mgr.get_mut(pid)?.validated(), false);
assert_eq!(path_mgr.get_mut(pid)?.state, PathState::ValidatingMTU);
Expand All @@ -613,14 +616,14 @@ mod tests {
path_mgr.on_path_chal_sent(pid, data, 1300, now)?;

// Fake receiving of PATH_RESPONSE
path_mgr.on_path_resp_received(pid, data);
assert_eq!(path_mgr.on_path_resp_received(pid, data), true);
assert_eq!(path_mgr.get_mut(pid)?.path_chal_initiated(), false);
assert_eq!(path_mgr.get_mut(pid)?.validated(), true);
assert_eq!(path_mgr.get_mut(pid)?.state, PathState::Validated);
assert_eq!(path_mgr.get_mut(pid)?.sent_chals.len(), 0);

// Fake receiving of depulicated PATH_RESPONSE
path_mgr.on_path_resp_received(pid, data);
assert_eq!(path_mgr.on_path_resp_received(pid, data), false);
assert_eq!(path_mgr.get_mut(pid)?.validated(), true);

// Timeout event
Expand Down
Loading
Loading