Skip to content

Commit

Permalink
fix: replace byte pool with bytes to fix memory leak
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 28, 2023
1 parent 303a3b3 commit 43902ee
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 45 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pin-utils = "0.1.0-alpha.4"
futures = "0.3.15"
self_cell = "1.0.1"
stop-token = "0.7"
byte-pool = "0.2.4"
bytes = "1"
once_cell = "1.8.0"
log = "0.4.8"
thiserror = "1.0.9"
Expand Down
75 changes: 35 additions & 40 deletions src/imap_stream.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,18 @@
use std::fmt;
use std::pin::Pin;
use std::sync::Arc;

#[cfg(feature = "runtime-async-std")]
use async_std::io::{Read, Write, WriteExt};
use byte_pool::{Block, BytePool};
use bytes::BytesMut;
use futures::stream::Stream;
use futures::task::{Context, Poll};
use futures::{io, ready};
use nom::Needed;
use once_cell::sync::Lazy;
#[cfg(feature = "runtime-tokio")]
use tokio::io::{AsyncRead as Read, AsyncWrite as Write, AsyncWriteExt};

use crate::types::{Request, ResponseData};

/// The global buffer pool we use for storing incoming data.
pub(crate) static POOL: Lazy<Arc<BytePool>> = Lazy::new(|| Arc::new(BytePool::new()));

/// Wraps a stream, and parses incoming data as imap server messages. Writes outgoing data
/// as imap client messages.
#[derive(Debug)]
Expand Down Expand Up @@ -85,7 +80,7 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
return Ok(None);
}

let block: Block<'static> = self.buffer.take_block();
let block = self.buffer.take_block();
// Be aware, now self.buffer is invalid until block is returned or reset!

let res = ResponseData::try_new_or_recover(block, |buf| {
Expand Down Expand Up @@ -133,7 +128,7 @@ impl<R: Read + Write + Unpin> ImapStream<R> {
/// Abstraction around needed buffer management.
struct Buffer {
/// The buffer itself.
block: Block<'static>,
block: BytesMut,
/// Offset where used bytes range ends.
offset: usize,
}
Expand All @@ -144,7 +139,7 @@ impl Buffer {

fn new() -> Self {
Self {
block: POOL.alloc(Self::BLOCK_SIZE),
block: BytesMut::zeroed(Self::BLOCK_SIZE),
offset: 0,
}
}
Expand All @@ -171,23 +166,23 @@ impl Buffer {
// aka advance()?
fn extend_used(&mut self, num_bytes: usize) {
self.offset += num_bytes;
if self.offset > self.block.size() {
self.offset = self.block.size();
if self.offset > self.block.len() {
self.offset = self.block.len();
}
}

/// Ensure the buffer has free capacity, optionally ensuring minimum buffer size.
fn ensure_capacity(&mut self, required: usize) -> io::Result<()> {
let free_bytes: usize = self.block.size() - self.offset;
let free_bytes: usize = self.block.len() - self.offset;
let min_required_bytes: usize = required;
let extra_bytes_needed: usize = min_required_bytes.saturating_sub(self.block.size());
let extra_bytes_needed: usize = min_required_bytes.saturating_sub(self.block.len());
if free_bytes == 0 || extra_bytes_needed > 0 {
let increase = std::cmp::max(Buffer::BLOCK_SIZE, extra_bytes_needed);
self.grow(increase)?;
}

// Assert that the buffer at least one free byte.
debug_assert!(self.offset < self.block.size());
debug_assert!(self.offset < self.block.len());
Ok(())
}

Expand All @@ -200,9 +195,8 @@ impl Buffer {
///
/// [`BLOCK_SIZE`]: Self::BLOCK_SIZE
/// [`MAX_CAPACITY`]: Self::MAX_CAPACITY
// TODO: This bypasses the byte-pool block re-use. That's bad.
fn grow(&mut self, num_bytes: usize) -> io::Result<()> {
let min_size = self.block.size() + num_bytes;
let min_size = self.block.len() + num_bytes;
let new_size = match min_size % Self::BLOCK_SIZE {
0 => min_size,
n => min_size + (Self::BLOCK_SIZE - n),
Expand All @@ -213,7 +207,7 @@ impl Buffer {
"incoming data too large",
))
} else {
self.block.realloc(new_size);
self.block.resize(new_size, 0);
Ok(())
}
}
Expand All @@ -226,8 +220,8 @@ impl Buffer {
/// [`return_block`]: Self::return_block
/// [`reset_with_data`]: Self::reset_with_data
// TODO: Enforce this with typestate.
fn take_block(&mut self) -> Block<'static> {
std::mem::replace(&mut self.block, POOL.alloc(Self::BLOCK_SIZE))
fn take_block(&mut self) -> BytesMut {
std::mem::replace(&mut self.block, BytesMut::zeroed(Self::BLOCK_SIZE))
}

/// Reset the buffer to be a new allocation with given data copied in.
Expand All @@ -246,13 +240,14 @@ impl Buffer {
0 => min_size + Self::BLOCK_SIZE,
n => min_size + (Self::BLOCK_SIZE - n),
};
self.block = POOL.alloc(new_size);
self.block = BytesMut::zeroed(new_size);
self.block[..data.len()].copy_from_slice(data);

self.offset = data.len();
}

/// Return the block which backs this buffer.
fn return_block(&mut self, block: Block<'static>) {
fn return_block(&mut self, block: BytesMut) {
self.block = block;
}
}
Expand All @@ -261,7 +256,7 @@ impl fmt::Debug for Buffer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Buffer")
.field("used", &self.used())
.field("capacity", &self.block.size())
.field("capacity", &self.block.capacity())
.finish()
}
}
Expand Down Expand Up @@ -327,7 +322,7 @@ mod tests {
let mut buf = Buffer::new();
let slice: &[u8] = buf.free_as_mut_slice();
assert_eq!(slice.len(), Buffer::BLOCK_SIZE);
assert_eq!(slice.len(), buf.block.size());
assert_eq!(slice.len(), buf.block.len());
}

#[test]
Expand Down Expand Up @@ -356,18 +351,18 @@ mod tests {

let slice = &buf.block[..buf.used()];
assert_eq!(slice, b"hello");
assert_eq!(buf.free_as_mut_slice().len(), buf.block.size() - buf.offset);
assert_eq!(buf.free_as_mut_slice().len(), buf.block.len() - buf.offset);
}

#[test]
fn test_buffer_grow() {
let mut buf = Buffer::new();
assert_eq!(buf.block.size(), Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), Buffer::BLOCK_SIZE);
buf.grow(1).unwrap();
assert_eq!(buf.block.size(), 2 * Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), 2 * Buffer::BLOCK_SIZE);

buf.grow(Buffer::BLOCK_SIZE + 1).unwrap();
assert_eq!(buf.block.size(), 4 * Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), 4 * Buffer::BLOCK_SIZE);

let ret = buf.grow(Buffer::MAX_CAPACITY);
assert!(ret.is_err());
Expand All @@ -379,29 +374,29 @@ mod tests {
let mut buf = Buffer::new();
buf.extend_used(Buffer::BLOCK_SIZE - 1);
assert_eq!(buf.free_as_mut_slice().len(), 1);
assert_eq!(buf.block.size(), Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), Buffer::BLOCK_SIZE);

// Still has capacity, no size request.
buf.ensure_capacity(0).unwrap();
assert_eq!(buf.free_as_mut_slice().len(), 1);
assert_eq!(buf.block.size(), Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), Buffer::BLOCK_SIZE);

// No more capacity, initial size.
buf.extend_used(1);
assert_eq!(buf.free_as_mut_slice().len(), 0);
assert_eq!(buf.block.size(), Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), Buffer::BLOCK_SIZE);

// No capacity, no size request.
buf.ensure_capacity(0).unwrap();
assert_eq!(buf.free_as_mut_slice().len(), Buffer::BLOCK_SIZE);
assert_eq!(buf.block.size(), 2 * Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), 2 * Buffer::BLOCK_SIZE);

// Some capacity, size request.
buf.extend_used(5);
assert_eq!(buf.offset, Buffer::BLOCK_SIZE + 5);
buf.ensure_capacity(3 * Buffer::BLOCK_SIZE - 6).unwrap();
assert_eq!(buf.free_as_mut_slice().len(), 2 * Buffer::BLOCK_SIZE - 5);
assert_eq!(buf.block.size(), 3 * Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), 3 * Buffer::BLOCK_SIZE);
}

/// Regression test for a bug in ensure_capacity() caused
Expand Down Expand Up @@ -437,32 +432,32 @@ mod tests {
// This test identifies blocks by their size.
let mut buf = Buffer::new();
buf.grow(1).unwrap();
let block_size = buf.block.size();
let block_size = buf.block.len();

let block = buf.take_block();
assert_eq!(block.size(), block_size);
assert_ne!(buf.block.size(), block_size);
assert_eq!(block.len(), block_size);
assert_ne!(buf.block.len(), block_size);

buf.return_block(block);
assert_eq!(buf.block.size(), block_size);
assert_eq!(buf.block.len(), block_size);
}

#[test]
fn test_buffer_reset_with_data() {
// This test identifies blocks by their size.
let data: [u8; 2 * Buffer::BLOCK_SIZE] = [b'a'; 2 * Buffer::BLOCK_SIZE];
let mut buf = Buffer::new();
let block_size = buf.block.size();
let block_size = buf.block.len();
assert_eq!(block_size, Buffer::BLOCK_SIZE);
buf.reset_with_data(&data);
assert_ne!(buf.block.size(), block_size);
assert_eq!(buf.block.size(), 3 * Buffer::BLOCK_SIZE);
assert_ne!(buf.block.len(), block_size);
assert_eq!(buf.block.len(), 3 * Buffer::BLOCK_SIZE);
assert!(!buf.free_as_mut_slice().is_empty());

let data: [u8; 0] = [];
let mut buf = Buffer::new();
buf.reset_with_data(&data);
assert_eq!(buf.block.size(), Buffer::BLOCK_SIZE);
assert_eq!(buf.block.len(), Buffer::BLOCK_SIZE);
}

#[test]
Expand Down
4 changes: 2 additions & 2 deletions src/parse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ pub(crate) async fn handle_unilateral(
mod tests {
use super::*;
use async_channel::bounded;
use bytes::BytesMut;

fn input_stream(data: &[&str]) -> Vec<io::Result<ResponseData>> {
data.iter()
.map(|line| {
let mut block = crate::imap_stream::POOL.alloc(line.as_bytes().len());
block.copy_from_slice(line.as_bytes());
let block = BytesMut::from(line.as_bytes());
ResponseData::try_new(block, |bytes| -> io::Result<_> {
let (remaining, response) = imap_proto::parser::parse_response(bytes).unwrap();
assert_eq!(remaining.len(), 0);
Expand Down
4 changes: 2 additions & 2 deletions src/types/response_data.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use std::fmt;

use byte_pool::Block;
use bytes::BytesMut;
use imap_proto::{RequestId, Response};
use self_cell::self_cell;

self_cell!(
pub struct ResponseData {
owner: Block<'static>,
owner: BytesMut,

#[covariant]
dependent: Response,
Expand Down

0 comments on commit 43902ee

Please sign in to comment.