Skip to content

Commit

Permalink
Transmission pipeline reorg. Recycling buffers at RX
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 8, 2021
1 parent c98c5f1 commit cf026cf
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 509 deletions.
38 changes: 28 additions & 10 deletions zenoh-util/src/collections/buffer_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,30 +33,47 @@ impl RecyclingBufferPool {
RecyclingBufferPool { inner }
}

pub async fn pull(&self) -> RecyclingBuffer {
pub fn alloc(&self, size: usize) -> RecyclingBuffer {
RecyclingBuffer::new(vec![0u8; size], None)
}

pub fn try_take(&self) -> Option<RecyclingBuffer> {
if let Some(buffer) = self.inner.try_pull() {
Some(RecyclingBuffer::new(
buffer,
Some(Arc::downgrade(&self.inner)),
))
} else {
None
}
}

pub async fn take(&self) -> RecyclingBuffer {
let buffer = self.inner.pull().await;
RecyclingBuffer::new(buffer, Arc::downgrade(&self.inner))
RecyclingBuffer::new(buffer, Some(Arc::downgrade(&self.inner)))
}
}

#[derive(Clone)]
pub struct RecyclingBuffer {
pool: Option<Weak<LifoQueue<Vec<u8>>>>,
buffer: Option<Vec<u8>>,
pool: Weak<LifoQueue<Vec<u8>>>,
}

impl RecyclingBuffer {
pub fn new(buffer: Vec<u8>, pool: Weak<LifoQueue<Vec<u8>>>) -> RecyclingBuffer {
pub fn new(buffer: Vec<u8>, pool: Option<Weak<LifoQueue<Vec<u8>>>>) -> RecyclingBuffer {
RecyclingBuffer {
buffer: Some(buffer),
pool,
buffer: Some(buffer),
}
}

pub async fn recycle(mut self) {
if let Some(pool) = self.pool.upgrade() {
let buffer = self.buffer.take().unwrap();
pool.push(buffer).await;
if let Some(pool) = self.pool.take() {
if let Some(pool) = pool.upgrade() {
let buffer = self.buffer.take().unwrap();
pool.push(buffer).await;
}
}
}
}
Expand All @@ -78,8 +95,9 @@ impl DerefMut for RecyclingBuffer {

impl Drop for RecyclingBuffer {
fn drop(&mut self) {
if let Some(buffer) = self.buffer.take() {
if let Some(pool) = self.pool.upgrade() {
if let Some(pool) = self.pool.take() {
if let Some(pool) = pool.upgrade() {
let buffer = self.buffer.take().unwrap();
task::block_on(pool.push(buffer));
}
}
Expand Down
15 changes: 15 additions & 0 deletions zenoh/src/net/protocol/io/arc_slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ use std::io::IoSlice;
use std::ops::{
Deref, Index, Range, RangeFrom, RangeFull, RangeInclusive, RangeTo, RangeToInclusive,
};
use zenoh_util::collections::RecyclingBuffer;

/*************************************/
/* ARC SLICE BUFFER */
/*************************************/
#[derive(Clone)]
pub enum ArcSliceBuffer {
RecyclingBuffer(Arc<RecyclingBuffer>),
OwnedBuffer(Arc<Vec<u8>>),
#[cfg(feature = "zero-copy")]
SharedBuffer(Arc<SharedMemoryBuf>),
Expand All @@ -35,6 +37,7 @@ impl Deref for ArcSliceBuffer {

fn deref(&self) -> &Self::Target {
match self {
Self::RecyclingBuffer(buf) => buf.as_slice(),
Self::OwnedBuffer(buf) => buf.as_slice(),
#[cfg(feature = "zero-copy")]
Self::SharedBuffer(buf) => buf.as_slice(),
Expand Down Expand Up @@ -100,6 +103,18 @@ impl Index<RangeToInclusive<usize>> for ArcSliceBuffer {
}

// From traits
impl From<Arc<RecyclingBuffer>> for ArcSliceBuffer {
fn from(buf: Arc<RecyclingBuffer>) -> ArcSliceBuffer {
ArcSliceBuffer::RecyclingBuffer(buf)
}
}

impl From<RecyclingBuffer> for ArcSliceBuffer {
fn from(buf: RecyclingBuffer) -> ArcSliceBuffer {
ArcSliceBuffer::from(Arc::new(buf))
}
}

impl From<Arc<Vec<u8>>> for ArcSliceBuffer {
fn from(buf: Arc<Vec<u8>>) -> ArcSliceBuffer {
ArcSliceBuffer::OwnedBuffer(buf)
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/protocol/link/udp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,9 +568,9 @@ async fn accept_read_task(listener: Arc<ListenerUdp>, manager: SessionManager) {

log::trace!("Ready to accept UDP connections on: {:?}", src_addr);
// Buffers for deserialization
let buff_pool = RecyclingBufferPool::new(1, UDP_MAX_MTU);
let pool = RecyclingBufferPool::new(1, UDP_MAX_MTU);
loop {
let mut buff = buff_pool.pull().await;
let mut buff = pool.take().await;
// Wait for incoming connections
let (n, dst_addr) = match listener.socket.recv_from(&mut buff).await {
Ok((n, dst_addr)) => (n, dst_addr),
Expand Down
4 changes: 4 additions & 0 deletions zenoh/src/net/protocol/session/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,8 @@ zconfigurable! {

// The default backoff time in nanoseconds to allow the batching to potentially progress
pub static ref QUEUE_PULL_BACKOFF: u64 = 100;

// The default number of buffers allocated at RX side. The size of each buffer is automatically
// selected equal to the link mtu.
pub static ref RX_BUFF_POOL_SIZE: usize = 4;
}
Loading

0 comments on commit cf026cf

Please sign in to comment.