Skip to content

Commit

Permalink
fix(http1): reduce memory used with flatten write strategy
Browse files Browse the repository at this point in the history
If the write buffer was filled with large bufs from the user, such that
it couldn't be fully written to the transport, the write buffer could
start to grow significantly as it moved its cursor without shifting over
the unwritten bytes.

This will now try to shift over the unwritten bytes if the next buf
wouldn't fit in the already allocated space.
  • Loading branch information
seanmonstar committed May 27, 2021
1 parent e61b494 commit eb0c646
Showing 1 changed file with 62 additions and 8 deletions.
70 changes: 62 additions & 8 deletions src/proto/h1/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,12 @@ where
B: Buf,
{
pub(crate) fn new(io: T) -> Buffered<T, B> {
let write_buf = WriteBuf::new(&io);
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
let write_buf = WriteBuf::new(strategy);
Buffered {
flush_pipeline: false,
io,
Expand Down Expand Up @@ -419,6 +424,24 @@ impl<T: AsRef<[u8]>> Cursor<T> {
}

impl Cursor<Vec<u8>> {
/// If we've advanced the position a bit in this cursor, and wish to
/// extend the underlying vector, we may wish to unshift the "read" bytes
/// off, and move everything else over.
fn maybe_unshift(&mut self, additional: usize) {
if self.pos == 0 {
// nothing to do
return;
}

if self.bytes.capacity() - self.bytes.len() >= additional {
// there's room!
return;
}

self.bytes.drain(0..self.pos);
self.pos = 0;
}

fn reset(&mut self) {
self.pos = 0;
self.bytes.clear();
Expand Down Expand Up @@ -463,12 +486,7 @@ pub(super) struct WriteBuf<B> {
}

impl<B: Buf> WriteBuf<B> {
fn new(io: &impl AsyncWrite) -> WriteBuf<B> {
let strategy = if io.is_write_vectored() {
WriteStrategy::Queue
} else {
WriteStrategy::Flatten
};
fn new(strategy: WriteStrategy) -> WriteBuf<B> {
WriteBuf {
headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
Expand All @@ -492,6 +510,8 @@ where
match self.strategy {
WriteStrategy::Flatten => {
let head = self.headers_mut();

head.maybe_unshift(buf.remaining());
//perf: This is a little faster than <Vec as BufMut>>::put,
//but accomplishes the same result.
loop {
Expand Down Expand Up @@ -804,7 +824,6 @@ mod tests {
let _ = pretty_env_logger::try_init();

let mock = Mock::new()
// Just a single write
.write(b"hello world, it's hyper!")
.build();

Expand All @@ -820,6 +839,41 @@ mod tests {
buffered.flush().await.expect("flush");
}

#[test]
fn write_buf_flatten_partially_flushed() {
let _ = pretty_env_logger::try_init();

let b = |s: &str| Cursor::new(s.as_bytes().to_vec());

let mut write_buf = WriteBuf::<Cursor<Vec<u8>>>::new(WriteStrategy::Flatten);

write_buf.buffer(b("hello "));
write_buf.buffer(b("world, "));

assert_eq!(write_buf.chunk(), b"hello world, ");

// advance most of the way, but not all
write_buf.advance(11);

assert_eq!(write_buf.chunk(), b", ");
assert_eq!(write_buf.headers.pos, 11);
assert_eq!(write_buf.headers.bytes.capacity(), INIT_BUFFER_SIZE);

// there's still room in the headers buffer, so just push on the end
write_buf.buffer(b("it's hyper!"));

assert_eq!(write_buf.chunk(), b", it's hyper!");
assert_eq!(write_buf.headers.pos, 11);

let rem1 = write_buf.remaining();
let cap = write_buf.headers.bytes.capacity();

// but when this would go over capacity, don't copy the old bytes
write_buf.buffer(Cursor::new(vec![b'X'; cap]));
assert_eq!(write_buf.remaining(), cap + rem1);
assert_eq!(write_buf.headers.pos, 0);
}

#[tokio::test]
async fn write_buf_queue_disable_auto() {
let _ = pretty_env_logger::try_init();
Expand Down

0 comments on commit eb0c646

Please sign in to comment.