Skip to content

Commit

Permalink
Misc bug fixes related to stream state (#273)
Browse files Browse the repository at this point in the history
This patch includes two new significant debug assertions:

* Assert stream counts are zero when the connection finalizes.
* Assert all stream state has been released when the connection is 
  dropped.

These two assertions were added in an effort to test the fix provided
by #261. In doing so, many related bugs have been discovered and fixed.
The details related to these bugs can be found in #273.
  • Loading branch information
carllerche committed May 9, 2018
1 parent b4383b6 commit cf62b78
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 145 deletions.
15 changes: 13 additions & 2 deletions src/proto/connection.rs
Expand Up @@ -178,7 +178,6 @@ where
}
}


/// Advances the internal state of the connection.
pub fn poll(&mut self) -> Poll<(), proto::Error> {
use codec::RecvError::*;
Expand Down Expand Up @@ -341,7 +340,8 @@ where
},
None => {
trace!("codec closed");
self.streams.recv_eof();
self.streams.recv_eof(false)
.ok().expect("mutex poisoned");
return Ok(Async::Ready(()));
},
}
Expand Down Expand Up @@ -397,3 +397,14 @@ where
self.ping_pong.ping_shutdown();
}
}

impl<T, P, B> Drop for Connection<T, P, B>
where
P: Peer,
B: IntoBuf,
{
fn drop(&mut self) {
// Ignore errors as this indicates that the mutex is poisoned.
let _ = self.streams.recv_eof(true);
}
}
50 changes: 41 additions & 9 deletions src/proto/streams/counts.rs
Expand Up @@ -60,11 +60,13 @@ impl Counts {
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_recv_streams(&mut self) {
pub fn inc_num_recv_streams(&mut self, stream: &mut store::Ptr) {
assert!(self.can_inc_num_recv_streams());
assert!(!stream.is_counted);

// Increment the number of remote initiated streams
self.num_recv_streams += 1;
stream.is_counted = true;
}

/// Returns true if the send stream concurrency can be incremented
Expand All @@ -77,11 +79,13 @@ impl Counts {
/// # Panics
///
/// Panics on failure as this should have been validated before hand.
pub fn inc_num_send_streams(&mut self) {
pub fn inc_num_send_streams(&mut self, stream: &mut store::Ptr) {
assert!(self.can_inc_num_send_streams());
assert!(!stream.is_counted);

// Increment the number of remote initiated streams
self.num_send_streams += 1;
stream.is_counted = true;
}

/// Returns true if the number of pending reset streams can be incremented.
Expand Down Expand Up @@ -110,23 +114,36 @@ impl Counts {
///
/// If the stream state transitions to closed, this function will perform
/// all necessary cleanup.
///
/// TODO: Is this function still needed?
pub fn transition<F, U>(&mut self, mut stream: store::Ptr, f: F) -> U
where
F: FnOnce(&mut Self, &mut store::Ptr) -> U,
{
let is_counted = stream.is_counted();
// TODO: Does this need to be computed before performing the action?
let is_pending_reset = stream.is_pending_reset_expiration();

// Run the action
let ret = f(self, &mut stream);

self.transition_after(stream, is_counted, is_pending_reset);
self.transition_after(stream, is_pending_reset);

ret
}

// TODO: move this to macro?
pub fn transition_after(&mut self, mut stream: store::Ptr, is_counted: bool, is_reset_counted: bool) {
pub fn transition_after(&mut self, mut stream: store::Ptr, is_reset_counted: bool) {
trace!("transition_after; stream={:?}; state={:?}; is_closed={:?}; \
pending_send_empty={:?}; buffered_send_data={}; \
num_recv={}; num_send={}",
stream.id,
stream.state,
stream.is_closed(),
stream.pending_send.is_empty(),
stream.buffered_send_data,
self.num_recv_streams,
self.num_send_streams);

if stream.is_closed() {
if !stream.is_pending_reset_expiration() {
stream.unlink();
Expand All @@ -136,9 +153,10 @@ impl Counts {
}
}

if is_counted {
if stream.is_counted {
trace!("dec_num_streams; stream={:?}", stream.id);
// Decrement the number of active streams.
self.dec_num_streams(stream.id);
self.dec_num_streams(&mut stream);
}
}

Expand All @@ -148,13 +166,17 @@ impl Counts {
}
}

fn dec_num_streams(&mut self, id: StreamId) {
if self.peer.is_local_init(id) {
fn dec_num_streams(&mut self, stream: &mut store::Ptr) {
assert!(stream.is_counted);

if self.peer.is_local_init(stream.id) {
assert!(self.num_send_streams > 0);
self.num_send_streams -= 1;
stream.is_counted = false;
} else {
assert!(self.num_recv_streams > 0);
self.num_recv_streams -= 1;
stream.is_counted = false;
}
}

Expand All @@ -163,3 +185,13 @@ impl Counts {
self.num_reset_streams -= 1;
}
}

impl Drop for Counts {
fn drop(&mut self) {
use std::thread;

if !thread::panicking() {
debug_assert!(!self.has_streams());
}
}
}
74 changes: 51 additions & 23 deletions src/proto/streams/prioritize.rs
Expand Up @@ -134,6 +134,7 @@ impl Prioritize {
frame: frame::Data<B>,
buffer: &mut Buffer<Frame<B>>,
stream: &mut store::Ptr,
counts: &mut Counts,
task: &mut Option<Task>,
) -> Result<(), UserError>
where
Expand Down Expand Up @@ -176,7 +177,7 @@ impl Prioritize {

if frame.is_end_stream() {
stream.state.send_close();
self.reserve_capacity(0, stream);
self.reserve_capacity(0, stream, counts);
}

trace!(
Expand Down Expand Up @@ -210,7 +211,11 @@ impl Prioritize {
}

/// Request capacity to send data
pub fn reserve_capacity(&mut self, capacity: WindowSize, stream: &mut store::Ptr) {
pub fn reserve_capacity(
&mut self,
capacity: WindowSize,
stream: &mut store::Ptr,
counts: &mut Counts) {
trace!(
"reserve_capacity; stream={:?}; requested={:?}; effective={:?}; curr={:?}",
stream.id,
Expand Down Expand Up @@ -239,7 +244,7 @@ impl Prioritize {

stream.send_flow.claim_capacity(diff);

self.assign_connection_capacity(diff, stream);
self.assign_connection_capacity(diff, stream, counts);
}
} else {
// Update the target requested capacity
Expand Down Expand Up @@ -284,36 +289,49 @@ impl Prioritize {
&mut self,
inc: WindowSize,
store: &mut Store,
counts: &mut Counts,
) -> Result<(), Reason> {
// Update the connection's window
self.flow.inc_window(inc)?;

self.assign_connection_capacity(inc, store);
self.assign_connection_capacity(inc, store, counts);
Ok(())
}

/// Reclaim all capacity assigned to the stream and re-assign it to the
/// connection
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr) {
pub fn reclaim_all_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
let available = stream.send_flow.available().as_size();
stream.send_flow.claim_capacity(available);
// Re-assign all capacity to the connection
self.assign_connection_capacity(available, stream);
self.assign_connection_capacity(available, stream, counts);
}

/// Reclaim just reserved capacity, not buffered capacity, and re-assign
/// it to the connection
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr) {
pub fn reclaim_reserved_capacity(&mut self, stream: &mut store::Ptr, counts: &mut Counts) {
// only reclaim requested capacity that isn't already buffered
if stream.requested_send_capacity > stream.buffered_send_data {
let reserved = stream.requested_send_capacity - stream.buffered_send_data;

stream.send_flow.claim_capacity(reserved);
self.assign_connection_capacity(reserved, stream);
self.assign_connection_capacity(reserved, stream, counts);
}
}

pub fn clear_pending_capacity(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_capacity.pop(store) {
counts.transition(stream, |_, stream| {
trace!("clear_pending_capacity; stream={:?}", stream.id);
})
}
}

pub fn assign_connection_capacity<R>(&mut self, inc: WindowSize, store: &mut R)
pub fn assign_connection_capacity<R>(
&mut self,
inc: WindowSize,
store: &mut R,
counts: &mut Counts)
where
R: Resolve,
{
Expand All @@ -323,15 +341,17 @@ impl Prioritize {

// Assign newly acquired capacity to streams pending capacity.
while self.flow.available() > 0 {
let mut stream = match self.pending_capacity.pop(store) {
let stream = match self.pending_capacity.pop(store) {
Some(stream) => stream,
None => return,
};

// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
counts.transition(stream, |_, mut stream| {
// Try to assign capacity to the stream. This will also re-queue the
// stream if there isn't enough connection level capacity to fulfill
// the capacity request.
self.try_assign_capacity(&mut stream);
})
}
}

Expand Down Expand Up @@ -595,6 +615,13 @@ impl Prioritize {
}
}

pub fn clear_pending_send(&mut self, store: &mut Store, counts: &mut Counts) {
while let Some(stream) = self.pending_send.pop(store) {
let is_pending_reset = stream.is_pending_reset_expiration();
counts.transition_after(stream, is_pending_reset);
}
}

fn pop_frame<B>(
&mut self,
buffer: &mut Buffer<Frame<B>>,
Expand All @@ -613,21 +640,22 @@ impl Prioritize {
trace!("pop_frame; stream={:?}; stream.state={:?}",
stream.id, stream.state);

// It's possible that this stream, besides having data to send,
// is also queued to send a reset, and thus is already in the queue
// to wait for "some time" after a reset.
//
// To be safe, we just always ask the stream.
let is_pending_reset = stream.is_pending_reset_expiration();

// If the stream receives a RESET from the peer, it may have
// had data buffered to be sent, but all the frames are cleared
// in clear_queue(). Instead of doing O(N) traversal through queue
// to remove, lets just ignore peer_reset streams here.
if stream.state.is_peer_reset() {
counts.transition_after(stream, is_pending_reset);
continue;
}

// It's possible that this stream, besides having data to send,
// is also queued to send a reset, and thus is already in the queue
// to wait for "some time" after a reset.
//
// To be safe, we just always ask the stream.
let is_counted = stream.is_counted();
let is_pending_reset = stream.is_pending_reset_expiration();
trace!(" --> stream={:?}; is_pending_reset={:?};",
stream.id, is_pending_reset);

Expand Down Expand Up @@ -754,7 +782,7 @@ impl Prioritize {
self.pending_send.push(&mut stream);
}

counts.transition_after(stream, is_counted, is_pending_reset);
counts.transition_after(stream, is_pending_reset);

return Some(frame);
},
Expand All @@ -770,7 +798,7 @@ impl Prioritize {
if let Some(mut stream) = self.pending_open.pop(store) {
trace!("schedule_pending_open; stream={:?}", stream.id);

counts.inc_num_send_streams();
counts.inc_num_send_streams(&mut stream);
self.pending_send.push(&mut stream);
} else {
return;
Expand Down

0 comments on commit cf62b78

Please sign in to comment.