Skip to content

Commit

Permalink
Fix for a fuzzer-discovered integer underflow of the flow control win…
Browse files Browse the repository at this point in the history
…dow size

Removed the SubAssign, etc. syntactic sugar functions and switched to return Result on over/underflow

Whenever possible, switched to returning a library GoAway protocol
error. Otherwise we check for over/underflow only with `debug_assert!`,
assuming that those code paths do not over/underflow.

Signed-off-by: Michael Rodler <mrodler@amazon.de>
Co-Authored-By: f0rki <m@mrodler.eu>
  • Loading branch information
Michael Rodler and f0rki committed Jun 13, 2023
1 parent 97bc3e3 commit ead98f1
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 65 deletions.
4 changes: 3 additions & 1 deletion src/proto/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ where

/// connection flow control
pub(crate) fn set_target_window_size(&mut self, size: WindowSize) {
self.inner.streams.set_target_connection_window_size(size);
let _res = self.inner.streams.set_target_connection_window_size(size);
// TODO: proper error handling
debug_assert!(_res.is_ok());
}

/// Send a new SETTINGS frame with an updated initial window size.
Expand Down
2 changes: 1 addition & 1 deletion src/proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub type PingPayload = [u8; 8];
pub type WindowSize = u32;

// Constants
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1;
pub const MAX_WINDOW_SIZE: WindowSize = (1 << 31) - 1; // i32::MAX as u32
pub const DEFAULT_REMOTE_RESET_STREAM_MAX: usize = 20;
pub const DEFAULT_RESET_STREAM_MAX: usize = 10;
pub const DEFAULT_RESET_STREAM_SECS: u64 = 30;
Expand Down
78 changes: 45 additions & 33 deletions src/proto/streams/flow_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ impl FlowControl {
self.window_size > self.available
}

pub fn claim_capacity(&mut self, capacity: WindowSize) {
self.available -= capacity;
pub fn claim_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
self.available.decrease_by(capacity)
}

pub fn assign_capacity(&mut self, capacity: WindowSize) {
self.available += capacity;
pub fn assign_capacity(&mut self, capacity: WindowSize) -> Result<(), Reason> {
self.available.increase_by(capacity)
}

/// If a WINDOW_UPDATE frame should be sent, returns a positive number
Expand Down Expand Up @@ -136,36 +136,38 @@ impl FlowControl {
///
/// This is called after receiving a SETTINGS frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_send_window(&mut self, sz: WindowSize) {
pub fn dec_send_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
// ~~This should not be able to overflow `window_size` from the bottom.~~ wrong. it can.
self.window_size.decrease_by(sz)?;
Ok(())
}

/// Decrement the recv-side window size.
///
/// This is called after receiving a SETTINGS ACK frame with a lower
/// INITIAL_WINDOW_SIZE value.
pub fn dec_recv_window(&mut self, sz: WindowSize) {
pub fn dec_recv_window(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"dec_recv_window; sz={}; window={}, available={}",
sz,
self.window_size,
self.available
);
// This should not be able to overflow `window_size` from the bottom.
self.window_size -= sz;
self.available -= sz;
self.window_size.decrease_by(sz)?;
self.available.decrease_by(sz)?;
Ok(())
}

/// Decrements the window reflecting data has actually been sent. The caller
/// must ensure that the window has capacity.
pub fn send_data(&mut self, sz: WindowSize) {
pub fn send_data(&mut self, sz: WindowSize) -> Result<(), Reason> {
tracing::trace!(
"send_data; sz={}; window={}; available={}",
sz,
Expand All @@ -176,12 +178,15 @@ impl FlowControl {
// If send size is zero it's meaningless to update flow control window
if sz > 0 {
// Ensure that the argument is correct
assert!(self.window_size >= sz as usize);
assert!(self.window_size.0 >= sz as i32);

// Update values
self.window_size -= sz;
self.available -= sz;
// debug_assert!(self.window_size.0 >= sz as i32);
self.window_size.decrease_by(sz)?;
// debug_assert!(self.available.0 >= sz as i32);
self.available.increase_by(sz)?;
}
Ok(())
}
}

Expand All @@ -208,6 +213,32 @@ impl Window {
assert!(self.0 >= 0, "negative Window");
self.0 as WindowSize
}

pub fn decrease_by(&mut self, other: WindowSize) -> Result<(), Reason> {
if let Some(v) = self.0.checked_sub(other as i32) {
self.0 = v;
Ok(())
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}

pub fn increase_by(&mut self, other: WindowSize) -> Result<(), Reason> {
if let Some(v) = self.0.checked_add(other as i32) {
self.0 = v;
Ok(())
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}

pub fn add(&self, other: WindowSize) -> Result<Self, Reason> {
if let Some(v) = self.0.checked_add(other as i32) {
Ok(Self(v))
} else {
Err(Reason::FLOW_CONTROL_ERROR)
}
}
}

impl PartialEq<usize> for Window {
Expand All @@ -230,25 +261,6 @@ impl PartialOrd<usize> for Window {
}
}

impl ::std::ops::SubAssign<WindowSize> for Window {
fn sub_assign(&mut self, other: WindowSize) {
self.0 -= other as i32;
}
}

impl ::std::ops::Add<WindowSize> for Window {
type Output = Self;
fn add(self, other: WindowSize) -> Self::Output {
Window(self.0 + other as i32)
}
}

impl ::std::ops::AddAssign<WindowSize> for Window {
fn add_assign(&mut self, other: WindowSize) {
self.0 += other as i32;
}
}

impl fmt::Display for Window {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
fmt::Display::fmt(&self.0, f)
Expand Down
32 changes: 24 additions & 8 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ impl Prioritize {
flow.inc_window(config.remote_init_window_sz)
.expect("invalid initial window size");

flow.assign_capacity(config.remote_init_window_sz);
// TODO: proper error handling
let _res = flow.assign_capacity(config.remote_init_window_sz);
debug_assert!(_res.is_ok());

tracing::trace!("Prioritize::new; flow={:?}", flow);

Expand Down Expand Up @@ -253,7 +255,9 @@ impl Prioritize {
if available as usize > capacity {
let diff = available - capacity as WindowSize;

stream.send_flow.claim_capacity(diff);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(diff);
debug_assert!(_res.is_ok());

self.assign_connection_capacity(diff, stream, counts);
}
Expand Down Expand Up @@ -324,7 +328,9 @@ impl Prioritize {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
if available > 0 {
stream.send_flow.claim_capacity(available);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(available);
debug_assert!(_res.is_ok());
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream, counts);
}
Expand All @@ -337,7 +343,9 @@ impl Prioritize {
if stream.requested_send_capacity as usize > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data as WindowSize;

stream.send_flow.claim_capacity(reserved);
// TODO: proper error handling
let _res = stream.send_flow.claim_capacity(reserved);
debug_assert!(_res.is_ok());
self.assign_connection_capacity(reserved, stream, counts);
}
}
Expand All @@ -363,7 +371,9 @@ impl Prioritize {
let span = tracing::trace_span!("assign_connection_capacity", inc);
let _e = span.enter();

self.flow.assign_capacity(inc);
// TODO: proper error handling
let _res = self.flow.assign_capacity(inc);
debug_assert!(_res.is_ok());

// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
Expand Down Expand Up @@ -443,7 +453,9 @@ impl Prioritize {
stream.assign_capacity(assign, self.max_buffer_size);

// Claim the capacity from the connection
self.flow.claim_capacity(assign);
// TODO: proper error handling
let _res = self.flow.claim_capacity(assign);
debug_assert!(_res.is_ok());
}

tracing::trace!(
Expand Down Expand Up @@ -763,12 +775,16 @@ impl Prioritize {
// Assign the capacity back to the connection that
// was just consumed from the stream in the previous
// line.
self.flow.assign_capacity(len);
// TODO: proper error handling
let _res = self.flow.assign_capacity(len);
debug_assert!(_res.is_ok());
});

let (eos, len) = tracing::trace_span!("updating connection flow")
.in_scope(|| {
self.flow.send_data(len);
// TODO: proper error handling
let _res = self.flow.send_data(len);
debug_assert!(_res.is_ok());

// Wrap the frame's data payload to ensure that the
// correct amount of data gets written.
Expand Down
49 changes: 36 additions & 13 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl Recv {
// settings
flow.inc_window(DEFAULT_INITIAL_WINDOW_SIZE)
.expect("invalid initial remote window size");
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE);
flow.assign_capacity(DEFAULT_INITIAL_WINDOW_SIZE).unwrap();

Recv {
init_window_sz: config.local_init_window_sz,
Expand Down Expand Up @@ -354,7 +354,9 @@ impl Recv {
self.in_flight_data -= capacity;

// Assign capacity to connection
self.flow.assign_capacity(capacity);
// TODO: proper error handling
let _res = self.flow.assign_capacity(capacity);
debug_assert!(_res.is_ok());

if self.flow.unclaimed_capacity().is_some() {
if let Some(task) = task.take() {
Expand Down Expand Up @@ -382,7 +384,9 @@ impl Recv {
stream.in_flight_recv_data -= capacity;

// Assign capacity to stream
stream.recv_flow.assign_capacity(capacity);
// TODO: proper error handling
let _res = stream.recv_flow.assign_capacity(capacity);
debug_assert!(_res.is_ok());

if stream.recv_flow.unclaimed_capacity().is_some() {
// Queue the stream for sending the WINDOW_UPDATE frame.
Expand Down Expand Up @@ -428,7 +432,11 @@ impl Recv {
///
/// The `task` is an optional parked task for the `Connection` that might
/// be blocked on needing more window capacity.
pub fn set_target_connection_window(&mut self, target: WindowSize, task: &mut Option<Waker>) {
pub fn set_target_connection_window(
&mut self,
target: WindowSize,
task: &mut Option<Waker>,
) -> Result<(), Reason> {
tracing::trace!(
"set_target_connection_window; target={}; available={}, reserved={}",
target,
Expand All @@ -441,11 +449,15 @@ impl Recv {
//
// Update the flow controller with the difference between the new
// target and the current target.
let current = (self.flow.available() + self.in_flight_data).checked_size();
let current = self
.flow
.available()
.add(self.in_flight_data)?
.checked_size();
if target > current {
self.flow.assign_capacity(target - current);
self.flow.assign_capacity(target - current)?;
} else {
self.flow.claim_capacity(current - target);
self.flow.claim_capacity(current - target)?;
}

// If changing the target capacity means we gained a bunch of capacity,
Expand All @@ -456,6 +468,7 @@ impl Recv {
task.wake();
}
}
Ok(())
}

pub(crate) fn apply_local_settings(
Expand Down Expand Up @@ -495,9 +508,13 @@ impl Recv {
let dec = old_sz - target;
tracing::trace!("decrementing all windows; dec={}", dec);

store.for_each(|mut stream| {
stream.recv_flow.dec_recv_window(dec);
})
store.try_for_each(|mut stream| {
stream
.recv_flow
.dec_recv_window(dec)
.map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
Ordering::Greater => {
// We must increase the (local) window on every open stream.
Expand All @@ -510,7 +527,10 @@ impl Recv {
.recv_flow
.inc_window(inc)
.map_err(proto::Error::library_go_away)?;
stream.recv_flow.assign_capacity(inc);
stream
.recv_flow
.assign_capacity(inc)
.map_err(proto::Error::library_go_away)?;
Ok::<_, proto::Error>(())
})?;
}
Expand Down Expand Up @@ -617,7 +637,10 @@ impl Recv {
}

// Update stream level flow control
stream.recv_flow.send_data(sz);
stream
.recv_flow
.send_data(sz)
.map_err(proto::Error::library_go_away)?;

// Track the data as in-flight
stream.in_flight_recv_data += sz;
Expand Down Expand Up @@ -658,7 +681,7 @@ impl Recv {
}

// Update connection level flow control
self.flow.send_data(sz);
self.flow.send_data(sz).map_err(Error::library_go_away)?;

// Track the data as in-flight
self.in_flight_data += sz;
Expand Down

0 comments on commit ead98f1

Please sign in to comment.