Skip to content

Commit

Permalink
Minor allocation optimization at RX
Browse files Browse the repository at this point in the history
  • Loading branch information
Mallets committed Mar 9, 2021
1 parent cf026cf commit c799dd3
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 22 deletions.
5 changes: 2 additions & 3 deletions zenoh/src/net/protocol/session/defaults.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@ zconfigurable! {
// The default backoff time in nanoseconds to allow the batching to potentially progress
pub static ref QUEUE_PULL_BACKOFF: u64 = 100;

// The default number of buffers allocated at RX side. The size of each buffer is automatically
// selected equal to the link mtu.
pub static ref RX_BUFF_POOL_SIZE: usize = 4;
// The total size of buffers allocated at RX side
pub static ref RX_BUFF_SIZE: usize = 16_777_216;
}
36 changes: 17 additions & 19 deletions zenoh/src/net/protocol/session/transport/link/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,13 @@ use super::super::super::link::Link;
use super::core::ZInt;
use super::io::{ArcSlice, RBuf};
use super::proto::{SessionMessage, ZenohMessage};
use super::session::defaults::{QUEUE_PRIO_CTRL, RX_BUFF_POOL_SIZE};
use super::session::defaults::{QUEUE_PRIO_CTRL, RX_BUFF_SIZE};
use super::{SeqNumGenerator, SessionTransport};
use async_std::channel::{bounded, Receiver, Sender};
use async_std::prelude::*;
use async_std::sync::{Arc, Mutex};
use async_std::task;
use batch::*;
use std::convert::TryInto;
use std::time::Duration;
use tx::*;
use zenoh_util::collections::RecyclingBufferPool;
Expand Down Expand Up @@ -210,25 +209,17 @@ async fn read_stream(link: SessionTransportLink) -> ZResult<()> {
let lease = Duration::from_millis(link.lease);
// The RBuf to read a message batch onto
let mut rbuf = RBuf::new();

let pool = RecyclingBufferPool::new(*RX_BUFF_POOL_SIZE, link.inner.get_mtu());
// 16 bits for reading the batch length
let mut length = [0u8, 0u8];
// The pool of buffers
let n = 1 + (*RX_BUFF_SIZE / link.inner.get_mtu());
let pool = RecyclingBufferPool::new(n, link.inner.get_mtu());
loop {
// Clear the RBuf
rbuf.clear();
// Retrieve one buffer
let mut buffer = if let Some(buffer) = pool.try_take() {
buffer
} else {
pool.alloc(link.inner.get_mtu())
};

// Async read from the underlying link
let _ = match link
.inner
.read_exact(&mut buffer[0..2])
.timeout(lease)
.await
{
let _ = match link.inner.read_exact(&mut length).timeout(lease).await {
Ok(res) => res?,
Err(_) => {
// Link lease has expired
Expand All @@ -240,9 +231,15 @@ async fn read_stream(link: SessionTransportLink) -> ZResult<()> {
}
};

let length: [u8; 2] = buffer[0..2].try_into().unwrap();
let to_read = u16::from_le_bytes(length) as usize;

// Retrieve one buffer
let mut buffer = if let Some(buffer) = pool.try_take() {
buffer
} else {
pool.alloc(to_read)
};

let _ = match link
.inner
.read_exact(&mut buffer[0..to_read])
Expand Down Expand Up @@ -278,8 +275,9 @@ async fn read_dgram(link: SessionTransportLink) -> ZResult<()> {
let lease = Duration::from_millis(link.lease);
// The RBuf to read a message batch onto
let mut rbuf = RBuf::new();

let pool = RecyclingBufferPool::new(*RX_BUFF_POOL_SIZE, link.inner.get_mtu());
// The pool of buffers
let n = 1 + (*RX_BUFF_SIZE / link.inner.get_mtu());
let pool = RecyclingBufferPool::new(n, link.inner.get_mtu());
loop {
// Clear the rbuf
rbuf.clear();
Expand Down

0 comments on commit c799dd3

Please sign in to comment.